Permalink
Browse files

IMPALA-5383: Fix PARQUET_FILE_SIZE option for ADLS

PARQUET_FILE_SIZE query option doesn't work with ADLS because the
AdlFileSystem doesn't have a notion of block sizes. And impala depends
on the filesystem remembering the block size which is then used as the
target parquet file size (this is done for Hdfs so that the parquet file
size and block size match even if the parquet_file_size isn't a valid
blocksize).

We special case for Adls just like we do for S3 to bypass the
FileSystem block size, and instead just use the requested
PARQUET_FILE_SIZE as the output partitions block_size (and consequently
the parquet file target size).

Testing: Re-enabled test_insert_parquet_verify_size() for ADLS.

Also fixed a miscellaneous bug with the ADLS client listing helper function.

Change-Id: I474a913b0ff9b2709f397702b58cb1c74251c25b
Reviewed-on: http://gerrit.cloudera.org:8080/7018
Reviewed-by: Sailesh Mukil <sailesh@cloudera.com>
Tested-by: Impala Public Jenkins
  • Loading branch information...
smukil authored and Impala Public Jenkins committed May 30, 2017
1 parent 9caf214 commit 1f34a9e7034cb1b068dbcaba94d3f01295995fee
@@ -390,10 +390,12 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
output_partition->current_file_name));
}
if (IsS3APath(output_partition->current_file_name.c_str())) {
if (IsS3APath(output_partition->current_file_name.c_str()) ||
IsADLSPath(output_partition->current_file_name.c_str())) {
// On S3A, the file cannot be stat'ed until after it's closed, and even so, the block
// size reported will be just the filesystem default. So, remember the requested
// block size.
// size reported will be just the filesystem default. Similarly, the block size
// reported for ADLS will be the filesystem default. So, remember the requested block
// size.
output_partition->block_size = block_size;
} else {
// HDFS may choose to override the block size that we've recommended, so for non-S3
View
@@ -85,6 +85,13 @@ bool IsS3APath(const char* path) {
return strncmp(path, "s3a://", 6) == 0;
}
bool IsADLSPath(const char* path) {
if (strstr(path, ":/") == NULL) {
return ExecEnv::GetInstance()->default_fs().compare(0, 6, "adl://") == 0;
}
return strncmp(path, "adl://", 6) == 0;
}
// Returns the length of the filesystem name in 'path' which is the length of the
// 'scheme://authority'. Returns 0 if the path is unqualified.
static int GetFilesystemNameLength(const char* path) {
View
@@ -50,6 +50,9 @@ bool IsHdfsPath(const char* path);
/// Returns true iff the path refers to a location on an S3A filesystem.
bool IsS3APath(const char* path);
/// Returns true iff the path refers to a location on an ADL filesystem.
bool IsADLSPath(const char* path);
/// Returns true iff 'pathA' and 'pathB' are on the same filesystem.
bool FilesystemsMatch(const char* pathA, const char* pathB);
}
@@ -161,10 +161,6 @@ def add_test_dimensions(cls):
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension("compression_codec", *PARQUET_CODECS))
# ADLS does not have a configurable block size, so the 'PARQUET_FILE_SIZE' option
# that's passed as a hint to Hadoop is ignored for AdlFileSystem. So, we skip this
# test for ADLS.
@SkipIfADLS.hdfs_block_size
@SkipIfIsilon.hdfs_block_size
@SkipIfLocal.hdfs_client
def test_insert_parquet_verify_size(self, vector, unique_database):
View
@@ -73,4 +73,5 @@ def delete_file_dir(self, path, recursive=False):
def get_all_file_sizes(self, path):
"""Returns a list of integers which are all the file sizes of files found under
'path'."""
return [self.adlsclient.info(f)['length'] for f in self.ls(path)]
return [self.adlsclient.info(f)['length'] for f in self.adlsclient.ls(path) \
if self.adlsclient.info(f)['type'] == 'FILE']

0 comments on commit 1f34a9e

Please sign in to comment.