Permalink
Browse files

IMPALA-3989: Display skew warning for poorly formatted Parquet files

Parquet files are scanned in the granularity of row groups. Each row
group belongs to one or more splits and each split is scanned by a
separate parquet scanner.

If some row groups span multiple splits, then we will most likely end
up seeing some scanners having remote reads and some scanners not
performing scans at all. This will attribute to skew across the
cluster where distribution of scans is uneven.

This change adds a counter (NumScannersWithNoReads) to the scan node's
runtime profile to track the number of parquet scanners that end up
doing no reads becuse of poor formatting.

Change-Id: Ibf48d978383d73efdade733a892e795ebd53c76a
Reviewed-on: http://gerrit.cloudera.org:8080/5400
Reviewed-by: Dan Hecht <dhecht@cloudera.com>
Tested-by: Impala Public Jenkins
  • Loading branch information...
Attila Jeges Impala Public Jenkins
Attila Jeges authored and Impala Public Jenkins committed Dec 2, 2016
1 parent 446d95e commit 8f59ce9dfc636cc9f6f03ca9f5ee289ca7cca602
Showing with 107 additions and 9 deletions.
  1. +47 −9 be/src/exec/hdfs-parquet-scanner.cc
  2. +4 −0 be/src/exec/hdfs-parquet-scanner.h
  3. +56 −0 tests/query_test/test_scanners.py
@@ -158,6 +158,7 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
process_footer_timer_stats_(NULL),
num_cols_counter_(NULL),
num_row_groups_counter_(NULL),
num_scanners_with_no_reads_counter_(NULL),
codegend_process_scratch_batch_fn_(NULL) {
assemble_rows_timer_.Stop();
}
@@ -170,6 +171,8 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT);
num_row_groups_counter_ =
ADD_COUNTER(scan_node_->runtime_profile(), "NumRowGroups", TUnit::UNIT);
num_scanners_with_no_reads_counter_ =
ADD_COUNTER(scan_node_->runtime_profile(), "NumScannersWithNoReads", TUnit::UNIT);
process_footer_timer_stats_ =
ADD_SUMMARY_STATS_TIMER(
scan_node_->runtime_profile(), "FooterProcessingTime");
@@ -311,6 +314,24 @@ static int64_t GetRowGroupMidOffset(const parquet::RowGroup& row_group) {
return start_offset + (end_offset - start_offset) / 2;
}
// Returns true if 'row_group' overlaps with 'split_range'.
static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group,
const DiskIoMgr::ScanRange* split_range) {
int64_t row_group_start = GetColumnStartOffset(row_group.columns[0].meta_data);
const parquet::ColumnMetaData& last_column =
row_group.columns[row_group.columns.size() - 1].meta_data;
int64_t row_group_end =
GetColumnStartOffset(last_column) + last_column.total_compressed_size;
int64_t split_start = split_range->offset();
int64_t split_end = split_start + split_range->len();
return (split_start >= row_group_start && split_start < row_group_end) ||
(split_end > row_group_start && split_end <= row_group_end) ||
(split_start <= row_group_start && split_end >= row_group_end);
}
int HdfsParquetScanner::CountScalarColumns(const vector<ParquetColumnReader*>& column_readers) {
DCHECK(!column_readers.empty());
int num_columns = 0;
@@ -431,6 +452,16 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
}
Status HdfsParquetScanner::NextRowGroup() {
const DiskIoMgr::ScanRange* split_range = static_cast<ScanRangeMetadata*>(
metadata_range_->meta_data())->original_split;
int64_t split_offset = split_range->offset();
int64_t split_length = split_range->len();
HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
bool start_with_first_row_group = row_group_idx_ == -1;
bool misaligned_row_group_skipped = false;
advance_row_group_ = false;
row_group_rows_read_ = 0;
@@ -442,26 +473,33 @@ Status HdfsParquetScanner::NextRowGroup() {
parse_status_ = Status::OK();
++row_group_idx_;
if (row_group_idx_ >= file_metadata_.row_groups.size()) break;
if (row_group_idx_ >= file_metadata_.row_groups.size()) {
if (start_with_first_row_group && misaligned_row_group_skipped) {
// We started with the first row group and skipped all the row groups because
// they were misaligned. The execution flow won't reach this point if there is at
// least one non-empty row group which this scanner can process.
COUNTER_ADD(num_scanners_with_no_reads_counter_, 1);
}
break;
}
const parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
// Also check 'file_metadata_.num_rows' to make sure 'select count(*)' and 'select *'
// behave consistently for corrupt files that have 'file_metadata_.num_rows == 0'
// but some data in row groups.
if (row_group.num_rows == 0|| file_metadata_.num_rows == 0) continue;
if (row_group.num_rows == 0 || file_metadata_.num_rows == 0) continue;
const DiskIoMgr::ScanRange* split_range = static_cast<ScanRangeMetadata*>(
metadata_range_->meta_data())->original_split;
HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumnOffsets(
file_desc->filename, file_desc->file_length, row_group));
// A row group is processed by the scanner whose split overlaps with the row
// group's mid point.
int64_t row_group_mid_pos = GetRowGroupMidOffset(row_group);
int64_t split_offset = split_range->offset();
int64_t split_length = split_range->len();
if (!(row_group_mid_pos >= split_offset &&
row_group_mid_pos < split_offset + split_length)) {
// A row group is processed by the scanner whose split overlaps with the row
// group's mid point. This row group will be handled by a different scanner.
// The mid-point does not fall within the split, this row group will be handled by a
// different scanner.
// If the row group overlaps with the split, we found a misaligned row group.
misaligned_row_group_skipped |= CheckRowGroupOverlapsSplit(row_group, split_range);
continue;
}
COUNTER_ADD(num_row_groups_counter_, 1);
@@ -443,6 +443,10 @@ class HdfsParquetScanner : public HdfsScanner {
/// Number of row groups that need to be read.
RuntimeProfile::Counter* num_row_groups_counter_;
/// Number of scanners that end up doing no reads because their splits don't overlap
/// with the midpoint of any row-group in the file.
RuntimeProfile::Counter* num_scanners_with_no_reads_counter_;
typedef int (*ProcessScratchBatchFn)(HdfsParquetScanner*, RowBatch*);
/// The codegen'd version of ProcessScratchBatch() if available, NULL otherwise.
ProcessScratchBatchFn codegend_process_scratch_batch_fn_;
@@ -41,6 +41,7 @@
parse_result_rows)
from tests.common.test_vector import ImpalaTestDimension
from tests.util.filesystem_utils import WAREHOUSE, get_fs_path
from tests.util.hdfs_util import NAMENODE
from tests.util.get_parquet_metadata import get_parquet_metadata
from tests.util.test_file_parser import QueryTestSectionReader
@@ -311,6 +312,61 @@ def test_corrupt_rle_counts(self, vector, unique_database):
self.run_test_case('QueryTest/parquet-corrupt-rle-counts-abort',
vector, unique_database)
@SkipIfS3.hdfs_block_size
@SkipIfIsilon.hdfs_block_size
@SkipIfLocal.multiple_impalad
def test_misaligned_parquet_row_groups(self, vector):
"""IMPALA-3989: Test that no warnings are issued when misaligned row groups are
encountered. Make sure that 'NumScannersWithNoReads' counters are set to the number of
scanners that end up doing no reads because of misaligned row groups.
"""
# functional.parquet.alltypes is well-formatted. 'NumScannersWithNoReads' counters are
# set to 0.
table_name = 'functional_parquet.alltypes'
self._misaligned_parquet_row_groups_helper(table_name, 7300)
# lineitem_multiblock_parquet/000000_0 is ill-formatted but every scanner reads some
# row groups. 'NumScannersWithNoReads' counters are set to 0.
table_name = 'functional_parquet.lineitem_multiblock'
self._misaligned_parquet_row_groups_helper(table_name, 20000)
# lineitem_sixblocks.parquet is ill-formatted but every scanner reads some row groups.
# 'NumScannersWithNoReads' counters are set to 0.
table_name = 'functional_parquet.lineitem_sixblocks'
self._misaligned_parquet_row_groups_helper(table_name, 40000)
# Scanning lineitem_one_row_group.parquet finds two scan ranges that end up doing no
# reads because the file is poorly formatted.
table_name = 'functional_parquet.lineitem_multiblock_one_row_group'
self._misaligned_parquet_row_groups_helper(
table_name, 40000, num_scanners_with_no_reads=2)
def _misaligned_parquet_row_groups_helper(
self, table_name, rows_in_table, num_scanners_with_no_reads=0, log_prefix=None):
"""Checks if executing a query logs any warnings and if there are any scanners that
end up doing no reads. 'log_prefix' specifies the prefix of the expected warning.
'num_scanners_with_no_reads' indicates the expected number of scanners that don't read
anything because the underlying file is poorly formatted
"""
query = 'select * from %s' % table_name
result = self.client.execute(query)
assert len(result.data) == rows_in_table
assert (not result.log and not log_prefix) or \
(log_prefix and result.log.startswith(log_prefix))
runtime_profile = str(result.runtime_profile)
num_scanners_with_no_reads_list = re.findall(
'NumScannersWithNoReads: ([0-9]*)', runtime_profile)
# This will fail if the number of impalads != 3
# The fourth fragment is the "Averaged Fragment"
assert len(num_scanners_with_no_reads_list) == 4
# Calculate the total number of scan ranges that ended up not reading anything because
# an underlying file was poorly formatted.
# Skip the Averaged Fragment; it comes first in the runtime profile.
total = 0
for n in num_scanners_with_no_reads_list[1:]:
total += int(n)
assert total == num_scanners_with_no_reads
@SkipIfS3.hdfs_block_size
@SkipIfIsilon.hdfs_block_size
@SkipIfLocal.multiple_impalad

0 comments on commit 8f59ce9

Please sign in to comment.