From 8049f811379c6f316520934fa7c495a4fc54d45d Mon Sep 17 00:00:00 2001 From: Taras Bobrovytsky Date: Wed, 31 May 2017 17:53:01 -0700 Subject: [PATCH 1/9] Update VERSION to 2.9.0 to begin release candidate testing Change-Id: I88b03479ae1d73afc9e3f5883ee09ae2f9bcfe09 --- bin/save-version.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/save-version.sh b/bin/save-version.sh index 6de5a7dbf4..53b8eee123 100755 --- a/bin/save-version.sh +++ b/bin/save-version.sh @@ -21,7 +21,7 @@ # Note: for internal (aka pre-release) versions, the version should have # "-INTERNAL" appended. Parts of the code will look for this to distinguish # between released and internal versions. -VERSION=2.9.0-SNAPSHOT +VERSION=2.9.0-RELEASE GIT_HASH=$(git rev-parse HEAD 2> /dev/null) if [ -z $GIT_HASH ] then From 4086f2c84de754d0a4a0ea87c0ee49b7e6eb469f Mon Sep 17 00:00:00 2001 From: Sailesh Mukil Date: Mon, 10 Apr 2017 17:08:01 -0700 Subject: [PATCH 2/9] IMPALA-5333: Add support for Impala to work with ADLS This patch leverages the AdlFileSystem in Hadoop to allow Impala to talk to the Azure Data Lake Store. This patch has functional changes as well as adds test infrastructure for testing Impala over ADLS. We do not support ACLs on ADLS since the Hadoop ADLS connector does not integrate ADLS ACLs with Hadoop users/groups. For testing, we use the azure-data-lake-store-python client from Microsoft. This client seems to have some consistency issues. For example, a drop table through Impala will delete the files in ADLS, however, listing that directory through the python client immediately after the drop, will still show the files. This behavior is unexpected since ADLS claims to be strongly consistent. Some tests have been skipped due to this limitation with the tag SkipIfADLS.slow_client. Tracked by IMPALA-5335. The azure-data-lake-store-python client also only works on CentOS 6.6 and over, so the python dependencies for Azure will not be downloaded when the TARGET_FILESYSTEM is not "adls". While running ADLS tests, the expectation will be that it runs on a machine that is at least running CentOS 6.6. Note: This is only a test limitation, not a functional one. Clusters with older OSes like CentOS 6.4 will still work with ADLS. Added another dependency to bootstrap_build.sh for the ADLS Python client. Testing: Ran core tests with and without TARGET_FILESYSTEM as 'adls' to make sure that all tests pass and that nothing breaks. Change-Id: Ic56b9988b32a330443f24c44f9cb2c80842f7542 Reviewed-on: http://gerrit.cloudera.org:8080/6910 Tested-by: Impala Public Jenkins Reviewed-by: Sailesh Mukil --- bin/bootstrap_build.sh | 2 +- bin/impala-config.sh | 20 +++++ fe/pom.xml | 6 ++ .../apache/impala/analysis/LoadDataStmt.java | 17 +++-- .../org/apache/impala/catalog/HdfsTable.java | 8 +- .../apache/impala/common/FileSystemUtil.java | 22 +++++- .../apache/impala/service/JniFrontend.java | 5 +- .../impala/analysis/AnalyzeStmtsTest.java | 4 +- infra/python/bootstrap_virtualenv.py | 17 +++++ infra/python/deps/adls-requirements.txt | 21 +++++ infra/python/deps/compiled-requirements.txt | 6 +- infra/python/deps/pip_download.py | 2 +- .../common/etc/hadoop/conf/core-site.xml.tmpl | 20 +++++ tests/common/impala_test_suite.py | 27 ++++++- tests/common/skip.py | 21 +++++ tests/custom_cluster/test_hdfs_fd_caching.py | 3 +- tests/custom_cluster/test_insert_behaviour.py | 3 +- .../test_parquet_max_page_header.py | 3 +- tests/custom_cluster/test_permanent_udfs.py | 4 +- tests/data_errors/test_data_errors.py | 5 +- tests/failure/test_failpoints.py | 3 +- tests/metadata/test_compute_stats.py | 3 +- tests/metadata/test_ddl.py | 18 +++-- tests/metadata/test_hdfs_encryption.py | 3 +- tests/metadata/test_hdfs_permissions.py | 3 +- tests/metadata/test_hms_integration.py | 4 +- .../test_metadata_query_statements.py | 4 +- tests/metadata/test_partition_metadata.py | 3 +- tests/metadata/test_refresh_partition.py | 3 +- tests/metadata/test_views_compatibility.py | 3 +- tests/query_test/test_compressed_formats.py | 4 +- tests/query_test/test_hdfs_caching.py | 6 +- tests/query_test/test_hdfs_fd_caching.py | 3 +- tests/query_test/test_insert_behaviour.py | 9 ++- tests/query_test/test_insert_parquet.py | 7 +- tests/query_test/test_join_queries.py | 4 +- tests/query_test/test_nested_types.py | 9 ++- tests/query_test/test_observability.py | 3 +- tests/query_test/test_partitioning.py | 3 +- tests/query_test/test_scanners.py | 11 ++- tests/stress/test_ddl_stress.py | 3 +- tests/util/adls_util.py | 76 +++++++++++++++++++ tests/util/filesystem_utils.py | 7 ++ 43 files changed, 359 insertions(+), 49 deletions(-) create mode 100644 infra/python/deps/adls-requirements.txt create mode 100644 tests/util/adls_util.py diff --git a/bin/bootstrap_build.sh b/bin/bootstrap_build.sh index c3bd22bf30..fa3dbfde2a 100755 --- a/bin/bootstrap_build.sh +++ b/bin/bootstrap_build.sh @@ -32,7 +32,7 @@ set -euxo pipefail # Install dependencies: sudo apt-get update sudo apt-get --yes install g++ gcc git libsasl2-dev libssl-dev make maven openjdk-7-jdk \ - python-dev python-setuptools + python-dev python-setuptools libffi-dev export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/ ./buildall.sh -notests -so diff --git a/bin/impala-config.sh b/bin/impala-config.sh index 86a115eb23..11c34f7994 100755 --- a/bin/impala-config.sh +++ b/bin/impala-config.sh @@ -240,6 +240,10 @@ export FILESYSTEM_PREFIX="${FILESYSTEM_PREFIX-}" export AWS_SECRET_ACCESS_KEY="${AWS_SECRET_ACCESS_KEY-DummySecretAccessKey}" export AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY_ID-DummyAccessKeyId}" export S3_BUCKET="${S3_BUCKET-}" +export azure_tenant_id="${azure_tenant_id-DummyAdlsTenantId}" +export azure_client_id="${azure_client_id-DummyAdlsClientId}" +export azure_client_secret="${azure_client_secret-DummyAdlsClientSecret}" +export azure_data_lake_store_name="${azure_data_lake_store_name-}" export HDFS_REPLICATION="${HDFS_REPLICATION-3}" export ISILON_NAMENODE="${ISILON_NAMENODE-}" export DEFAULT_FS="${DEFAULT_FS-hdfs://localhost:20500}" @@ -268,6 +272,22 @@ if [ "${TARGET_FILESYSTEM}" = "s3" ]; then fi DEFAULT_FS="s3a://${S3_BUCKET}" export DEFAULT_FS +elif [ "${TARGET_FILESYSTEM}" = "adls" ]; then + # Basic error checking + if [[ "${azure_client_id}" = "DummyAdlsClientId" ||\ + "${azure_tenant_id}" = "DummyAdlsTenantId" ||\ + "${azure_client_secret}" = "DummyAdlsClientSecret" ]]; then + echo "All 3 of the following need to be assigned valid values and belong + to the owner of the ADLS store in order to access the filesystem: + azure_client_id, azure_tenant_id, azure_client_secret." + return 1 + fi + if [[ "${azure_data_lake_store_name}" = "" ]]; then + echo "azure_data_lake_store_name cannot be an empty string for ADLS" + return 1 + fi + DEFAULT_FS="adl://${azure_data_lake_store_name}.azuredatalakestore.net" + export DEFAULT_FS elif [ "${TARGET_FILESYSTEM}" = "isilon" ]; then if [ "${ISILON_NAMENODE}" = "" ]; then echo "In order to access the Isilon filesystem, ISILON_NAMENODE" diff --git a/fe/pom.xml b/fe/pom.xml index c6aeaa3d57..238f1187f7 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -84,6 +84,12 @@ under the License. ${hadoop.version} + + org.apache.hadoop + hadoop-azure-datalake + ${hadoop.version} + + org.apache.hadoop hadoop-mapreduce-client-core diff --git a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java index 3357df9ceb..ec0244df05 100644 --- a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.adl.AdlFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.HdfsFileFormat; @@ -126,6 +127,9 @@ public void analyze(Analyzer analyzer) throws AnalysisException { * paths for this LOAD statement (which maps onto a sequence of file move operations, * with the requisite permission requirements), and check to see if all files to be * moved are in format that Impala understands. Errors are raised as AnalysisExceptions. + * + * We don't check permissions for the S3AFileSystem and the AdlFileSystem due to + * limitations with thier getAclStatus() API. (see HADOOP-13892 and HADOOP-14437) */ private void analyzePaths(Analyzer analyzer, HdfsTable hdfsTable) throws AnalysisException { @@ -137,9 +141,10 @@ private void analyzePaths(Analyzer analyzer, HdfsTable hdfsTable) try { Path source = sourceDataPath_.getPath(); FileSystem fs = source.getFileSystem(FileSystemUtil.getConfiguration()); - if (!(fs instanceof DistributedFileSystem) && !(fs instanceof S3AFileSystem)) { + if (!(fs instanceof DistributedFileSystem) && !(fs instanceof S3AFileSystem) && + !(fs instanceof AdlFileSystem)) { throw new AnalysisException(String.format("INPATH location '%s' " + - "must point to an HDFS or S3A filesystem.", sourceDataPath_)); + "must point to an HDFS, S3A or ADL filesystem.", sourceDataPath_)); } if (!fs.exists(source)) { throw new AnalysisException(String.format( @@ -150,6 +155,8 @@ private void analyzePaths(Analyzer analyzer, HdfsTable hdfsTable) // it. If the source file is a file, we must be able to read from it, and write to // its parent directory (in order to delete the file as part of the move operation). FsPermissionChecker checker = FsPermissionChecker.getInstance(); + // TODO: Disable permission checking for S3A as well (HADOOP-13892) + boolean shouldCheckPerms = !(fs instanceof AdlFileSystem); if (fs.isDirectory(source)) { if (FileSystemUtil.getTotalNumVisibleFiles(source) == 0) { @@ -162,7 +169,7 @@ private void analyzePaths(Analyzer analyzer, HdfsTable hdfsTable) sourceDataPath_)); } if (!checker.getPermissions(fs, source).checkPermissions( - FsAction.READ_WRITE)) { + FsAction.READ_WRITE) && shouldCheckPerms) { throw new AnalysisException(String.format("Unable to LOAD DATA from %s " + "because Impala does not have READ and WRITE permissions on this directory", source)); @@ -175,14 +182,14 @@ private void analyzePaths(Analyzer analyzer, HdfsTable hdfsTable) } if (!checker.getPermissions(fs, source.getParent()).checkPermissions( - FsAction.WRITE)) { + FsAction.WRITE) && shouldCheckPerms) { throw new AnalysisException(String.format("Unable to LOAD DATA from %s " + "because Impala does not have WRITE permissions on its parent " + "directory %s", source, source.getParent())); } if (!checker.getPermissions(fs, source).checkPermissions( - FsAction.READ)) { + FsAction.READ) && shouldCheckPerms) { throw new AnalysisException(String.format("Unable to LOAD DATA from %s " + "because Impala does not have READ permissions on this file", source)); } diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 55e4197011..23a8c96653 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -768,7 +768,7 @@ private void loadMetadataAndDiskIds(Set locations, * permissions Impala has on the given path. If the path does not exist, recurses up * the path until a existing parent directory is found, and inherit access permissions * from that. - * Always returns READ_WRITE for S3 files. + * Always returns READ_WRITE for S3 and ADLS files. */ private TAccessLevel getAvailableAccessLevel(FileSystem fs, Path location) throws IOException { @@ -781,6 +781,12 @@ private TAccessLevel getAvailableAccessLevel(FileSystem fs, Path location) // permissions. (see HADOOP-13892) if (FileSystemUtil.isS3AFileSystem(fs)) return TAccessLevel.READ_WRITE; + // The ADLS connector currently returns ACLs for files in ADLS, but can only map + // them to the ADLS client SPI and not the Hadoop users/groups, causing unexpected + // behavior. So ADLS ACLs are unsupported until the connector is able to map + // permissions to hadoop users/groups (HADOOP-14437). + if (FileSystemUtil.isADLFileSystem(fs)) return TAccessLevel.READ_WRITE; + FsPermissionChecker permissionChecker = FsPermissionChecker.getInstance(); while (location != null) { if (fs.exists(location)) { diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java index f8c50b4fc7..9ae4269f1e 100644 --- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.adl.AdlFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.protocol.EncryptionZone; @@ -301,7 +302,8 @@ public static boolean supportsStorageIds(FileSystem fs) { // Common case. if (isDistributedFileSystem(fs)) return true; // Blacklist FileSystems that are known to not to include storage UUIDs. - return !(fs instanceof S3AFileSystem || fs instanceof LocalFileSystem); + return !(fs instanceof S3AFileSystem || fs instanceof LocalFileSystem || + fs instanceof AdlFileSystem); } /** @@ -318,6 +320,20 @@ public static boolean isS3AFileSystem(Path path) throws IOException { return isS3AFileSystem(path.getFileSystem(CONF)); } + /** + * Returns true iff the filesystem is AdlFileSystem. + */ + public static boolean isADLFileSystem(FileSystem fs) { + return fs instanceof AdlFileSystem; + } + + /** + * Returns true iff the path is on AdlFileSystem. + */ + public static boolean isADLFileSystem(Path path) throws IOException { + return isADLFileSystem(path.getFileSystem(CONF)); + } + /** * Returns true iff the filesystem is an instance of LocalFileSystem. */ @@ -463,6 +479,8 @@ public static boolean isImpalaWritableFilesystem(String location) throws IOException { Path path = new Path(location); return (FileSystemUtil.isDistributedFileSystem(path) || - FileSystemUtil.isLocalFileSystem(path) || FileSystemUtil.isS3AFileSystem(path)); + FileSystemUtil.isLocalFileSystem(path) || + FileSystemUtil.isS3AFileSystem(path) || + FileSystemUtil.isADLFileSystem(path)); } } diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java index 2b6748dd71..cfd83a52df 100644 --- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java +++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.adl.AdlFileSystem; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.impala.analysis.DescriptorTable; @@ -716,7 +717,9 @@ private String checkShortCircuitRead(Configuration conf) { private String checkFileSystem(Configuration conf) { try { FileSystem fs = FileSystem.get(CONF); - if (!(fs instanceof DistributedFileSystem || fs instanceof S3AFileSystem)) { + if (!(fs instanceof DistributedFileSystem || + fs instanceof S3AFileSystem || + fs instanceof AdlFileSystem)) { return "Currently configured default filesystem: " + fs.getClass().getSimpleName() + ". " + CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY + diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java index 448c62fdf4..06ad842cb5 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java @@ -2990,11 +2990,11 @@ public void TestLoadData() throws AnalysisException { AnalysisError(String.format("load data inpath '%s' %s into table " + "tpch.lineitem", "file:///test-warehouse/test.out", overwrite), "INPATH location 'file:/test-warehouse/test.out' must point to an " + - "HDFS or S3A filesystem"); + "HDFS, S3A or ADL filesystem."); AnalysisError(String.format("load data inpath '%s' %s into table " + "tpch.lineitem", "s3n://bucket/test-warehouse/test.out", overwrite), "INPATH location 's3n://bucket/test-warehouse/test.out' must point to an " + - "HDFS or S3A filesystem"); + "HDFS, S3A or ADL filesystem."); // File type / table type mismatch. AnalyzesOk(String.format("load data inpath '%s' %s into table " + diff --git a/infra/python/bootstrap_virtualenv.py b/infra/python/bootstrap_virtualenv.py index 0d6b3c82b2..4fb8689dfe 100644 --- a/infra/python/bootstrap_virtualenv.py +++ b/infra/python/bootstrap_virtualenv.py @@ -60,6 +60,10 @@ # by the compiled requirements step. KUDU_REQS_PATH = os.path.join(DEPS_DIR, "kudu-requirements.txt") +# Requirements for the ADLS test client step, which depends on Cffi (C Foreign Function +# Interface) being installed by the compiled requirements step. +ADLS_REQS_PATH = os.path.join(DEPS_DIR, "adls-requirements.txt") + def delete_virtualenv_if_exist(): if os.path.exists(ENV_DIR): shutil.rmtree(ENV_DIR) @@ -213,6 +217,18 @@ def install_compiled_deps_if_possible(): mark_reqs_installed(COMPILED_REQS_PATH) return True +def install_adls_deps(): + # The ADLS dependencies require that the OS is at least CentOS 6.6 or above, + # which is why we break this into a seperate step. If the target filesystem is + # ADLS, the expectation is that the dev environment is running at least CentOS 6.6. + if reqs_are_installed(ADLS_REQS_PATH): + LOG.debug("Skipping ADLS deps: matching adls-installed-requirements.txt found") + return True + if os.environ.get('TARGET_FILESYSTEM') == "adls": + LOG.info("Installing ADLS packages into the virtualenv") + exec_pip_install(["-r", ADLS_REQS_PATH]) + mark_reqs_installed(ADLS_REQS_PATH) + def install_kudu_client_if_possible(): '''Installs the Kudu python module if possible, which depends on the toolchain and the compiled requirements in compiled-requirements.txt. If the toolchain isn't @@ -348,3 +364,4 @@ def setup_virtualenv_if_not_exists(): setup_virtualenv_if_not_exists() if install_compiled_deps_if_possible(): install_kudu_client_if_possible() + install_adls_deps() diff --git a/infra/python/deps/adls-requirements.txt b/infra/python/deps/adls-requirements.txt new file mode 100644 index 0000000000..f027a6ef4f --- /dev/null +++ b/infra/python/deps/adls-requirements.txt @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# The following dependencies depend on cffi, so it must be installed after the toolchain +# is bootstrapped and all requirements in requirements.txt and compiled-requirements.txt +# are installed into the virtualenv. +azure-datalake-store == 0.0.9 diff --git a/infra/python/deps/compiled-requirements.txt b/infra/python/deps/compiled-requirements.txt index 945e3f62f1..b9273d1f92 100644 --- a/infra/python/deps/compiled-requirements.txt +++ b/infra/python/deps/compiled-requirements.txt @@ -32,7 +32,11 @@ impyla == 0.14.0 thrift == 0.9.0 thrift_sasl == 0.1.0 psutil == 0.7.1 - +# Required for ADLS Python client + pycparser == 2.17 + cffi==1.10.0 + cryptography==1.8.1 + scandir == 1.5 # Required for Kudu: Cython == 0.23.4 numpy == 1.10.4 diff --git a/infra/python/deps/pip_download.py b/infra/python/deps/pip_download.py index bd54d30199..cc9b412bf8 100755 --- a/infra/python/deps/pip_download.py +++ b/infra/python/deps/pip_download.py @@ -37,7 +37,7 @@ # The requirement files that list all of the required packages and versions. REQUIREMENTS_FILES = ['requirements.txt', 'compiled-requirements.txt', - 'kudu-requirements.txt'] + 'kudu-requirements.txt', 'adls-requirements.txt'] def check_md5sum(filename, expected_md5): actual_md5 = md5(open(filename).read()).hexdigest() diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl index aaea5799c8..9ff4ee74f5 100644 --- a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl +++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.tmpl @@ -112,4 +112,24 @@ DEFAULT + + dfs.adls.oauth2.access.token.provider.type + ClientCredential + + + + dfs.adls.oauth2.client.id + ${azure_client_id} + + + + dfs.adls.oauth2.credential + ${azure_client_secret} + + + + dfs.adls.oauth2.refresh.url + https://login.windows.net/${azure_tenant_id}/oauth2/token + + diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 7cbacd3725..3af1ed0985 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -51,7 +51,13 @@ from tests.performance.query import Query from tests.performance.query_exec_functions import execute_using_jdbc from tests.performance.query_executor import JdbcQueryExecConfig -from tests.util.filesystem_utils import IS_S3, S3_BUCKET_NAME, FILESYSTEM_PREFIX +from tests.util.filesystem_utils import ( + IS_S3, + IS_ADLS, + S3_BUCKET_NAME, + ADLS_STORE_NAME, + FILESYSTEM_PREFIX) + from tests.util.hdfs_util import ( HdfsConfig, get_hdfs_client, @@ -68,9 +74,19 @@ from hive_metastore import ThriftHiveMetastore from thrift.protocol import TBinaryProtocol +# Initializing the logger before conditional imports, since we will need it +# for them. logging.basicConfig(level=logging.INFO, format='-- %(message)s') LOG = logging.getLogger('impala_test_suite') +# The ADLS python client isn't downloaded when ADLS isn't the target FS, so do a +# conditional import. +if IS_ADLS: + try: + from tests.util.adls_util import ADLSClient + except ImportError: + LOG.error("Need the ADLSClient for testing with ADLS") + IMPALAD_HOST_PORT_LIST = pytest.config.option.impalad.split(',') assert len(IMPALAD_HOST_PORT_LIST) > 0, 'Must specify at least 1 impalad to target' IMPALAD = IMPALAD_HOST_PORT_LIST[0] @@ -126,8 +142,11 @@ def setup_class(cls): cls.impalad_test_service = cls.create_impala_service() cls.hdfs_client = cls.create_hdfs_client() - cls.s3_client = S3Client(S3_BUCKET_NAME) - cls.filesystem_client = cls.s3_client if IS_S3 else cls.hdfs_client + cls.filesystem_client = cls.hdfs_client + if IS_S3: + cls.filesystem_client = S3Client(S3_BUCKET_NAME) + elif IS_ADLS: + cls.filesystem_client = ADLSClient(ADLS_STORE_NAME) @classmethod def teardown_class(cls): @@ -645,7 +664,7 @@ def create_table_info_dimension(cls, exploration_strategy): # If 'skip_hbase' is specified or the filesystem is isilon, s3 or local, we don't # need the hbase dimension. if pytest.config.option.skip_hbase or TARGET_FILESYSTEM.lower() \ - in ['s3', 'isilon', 'local']: + in ['s3', 'isilon', 'local', 'adls']: for tf_dimension in tf_dimensions: if tf_dimension.value.file_format == "hbase": tf_dimensions.remove(tf_dimension) diff --git a/tests/common/skip.py b/tests/common/skip.py index e33071262e..642dea4df5 100644 --- a/tests/common/skip.py +++ b/tests/common/skip.py @@ -30,6 +30,7 @@ IS_LOCAL, IS_HDFS, IS_S3, + IS_ADLS, SECONDARY_FILESYSTEM) @@ -51,6 +52,26 @@ class SkipIfS3: qualified_path = pytest.mark.skipif(IS_S3, reason="Tests rely on HDFS qualified paths, IMPALA-1872") +class SkipIfADLS: + + # These ones are skipped due to product limitations. + caching = pytest.mark.skipif(IS_ADLS, reason="SET CACHED not implemented for ADLS") + hive = pytest.mark.skipif(IS_ADLS, reason="Hive doesn't work with ADLS") + hdfs_block_size = pytest.mark.skipif(IS_ADLS, reason="ADLS uses it's own block size") + hdfs_acls = pytest.mark.skipif(IS_ADLS, reason="HDFS acls are not supported on ADLS") + jira = partial(pytest.mark.skipif, IS_ADLS) + hdfs_encryption = pytest.mark.skipif(IS_ADLS, + reason="HDFS encryption is not supported with ADLS") + + # These ones need test infra work to re-enable. + udfs = pytest.mark.skipif(IS_ADLS, reason="udas/udfs not copied to ADLS") + datasrc = pytest.mark.skipif(IS_ADLS, reason="data sources not copied to ADLS") + hbase = pytest.mark.skipif(IS_ADLS, reason="HBase not started with ADLS") + qualified_path = pytest.mark.skipif(IS_ADLS, + reason="Tests rely on HDFS qualified paths, IMPALA-1872") + eventually_consistent = pytest.mark.skipif(IS_ADLS, + reason="The client is slow to realize changes to file metadata") + class SkipIfKudu: unsupported_env = pytest.mark.skipif(os.environ["KUDU_IS_SUPPORTED"] == "false", reason="Kudu is not supported in this environment") diff --git a/tests/custom_cluster/test_hdfs_fd_caching.py b/tests/custom_cluster/test_hdfs_fd_caching.py index acc13dfd39..c030c8c392 100644 --- a/tests/custom_cluster/test_hdfs_fd_caching.py +++ b/tests/custom_cluster/test_hdfs_fd_caching.py @@ -18,9 +18,10 @@ import pytest from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.common.skip import SkipIfS3 +from tests.common.skip import SkipIfS3, SkipIfADLS @SkipIfS3.caching +@SkipIfADLS.caching class TestHdfsFdCaching(CustomClusterTestSuite): """Tests that if HDFS file handle caching is enabled, file handles are actually cached and the associated metrics return valid results. In addition, tests that the upper bound diff --git a/tests/custom_cluster/test_insert_behaviour.py b/tests/custom_cluster/test_insert_behaviour.py index 6df8ed7a8d..af6a2701b8 100644 --- a/tests/custom_cluster/test_insert_behaviour.py +++ b/tests/custom_cluster/test_insert_behaviour.py @@ -18,13 +18,14 @@ import pytest from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.common.skip import SkipIfS3, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfLocal from tests.util.filesystem_utils import IS_ISILON, WAREHOUSE from tests.util.hdfs_util import HdfsConfig, get_hdfs_client, get_hdfs_client_from_conf TEST_TBL = "insert_inherit_permission" @SkipIfS3.hdfs_acls +@SkipIfADLS.hdfs_acls class TestInsertBehaviourCustomCluster(CustomClusterTestSuite): @classmethod diff --git a/tests/custom_cluster/test_parquet_max_page_header.py b/tests/custom_cluster/test_parquet_max_page_header.py index b87a10ef8e..913d883a59 100644 --- a/tests/custom_cluster/test_parquet_max_page_header.py +++ b/tests/custom_cluster/test_parquet_max_page_header.py @@ -24,7 +24,7 @@ import subprocess from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.common.skip import SkipIfS3, SkipIfIsilon +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon class TestParquetMaxPageHeader(CustomClusterTestSuite): '''This tests large page headers in parquet files. Parquet page header size can @@ -92,6 +92,7 @@ def __generate_test_data(self, dir, file): put.wait() @SkipIfS3.hive + @SkipIfADLS.hive @SkipIfIsilon.hive @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("-max_page_header_size=31457280") diff --git a/tests/custom_cluster/test_permanent_udfs.py b/tests/custom_cluster/test_permanent_udfs.py index 3823c55db0..100b5eab1b 100644 --- a/tests/custom_cluster/test_permanent_udfs.py +++ b/tests/custom_cluster/test_permanent_udfs.py @@ -24,7 +24,7 @@ from tempfile import mkdtemp from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import create_uncompressed_text_dimension from tests.util.filesystem_utils import get_fs_path @@ -162,6 +162,7 @@ def __verify_udf_in_impala(self, udf): @SkipIfIsilon.hive @SkipIfS3.hive + @SkipIfADLS.hive @SkipIfLocal.hive @pytest.mark.execute_serially def test_corrupt_java_udf(self): @@ -182,6 +183,7 @@ def test_corrupt_java_udf(self): @SkipIfIsilon.hive @SkipIfS3.hive + @SkipIfADLS.hive @SkipIfLocal.hive @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( diff --git a/tests/data_errors/test_data_errors.py b/tests/data_errors/test_data_errors.py index fa3f18901a..21791d46a1 100644 --- a/tests/data_errors/test_data_errors.py +++ b/tests/data_errors/test_data_errors.py @@ -25,7 +25,7 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIf, SkipIfS3, SkipIfLocal +from tests.common.skip import SkipIf, SkipIfS3, SkipIfADLS, SkipIfLocal from tests.common.test_dimensions import create_exec_option_dimension class TestDataErrors(ImpalaTestSuite): @@ -105,6 +105,7 @@ def test_hdfs_safe_mode_error_255(self, unique_database): assert "Safe mode is OFF" in output @SkipIfS3.qualified_path +@SkipIfADLS.qualified_path class TestHdfsScanNodeErrors(TestDataErrors): @classmethod def add_test_dimensions(cls): @@ -122,6 +123,7 @@ def test_hdfs_scan_node_errors(self, vector): self.run_test_case('DataErrorsTest/hdfs-scan-node-errors', vector) @SkipIfS3.qualified_path +@SkipIfADLS.qualified_path @SkipIfLocal.qualified_path class TestHdfsSeqScanNodeErrors(TestHdfsScanNodeErrors): @classmethod @@ -136,6 +138,7 @@ def test_hdfs_seq_scan_node_errors(self, vector): @SkipIfS3.qualified_path +@SkipIfADLS.qualified_path class TestHdfsRcFileScanNodeErrors(TestHdfsScanNodeErrors): @classmethod def add_test_dimensions(cls): diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py index f67afe4bfc..1f0c1c5df0 100644 --- a/tests/failure/test_failpoints.py +++ b/tests/failure/test_failpoints.py @@ -26,7 +26,7 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.impala_test_suite import ImpalaTestSuite, LOG -from tests.common.skip import SkipIf, SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.common.skip import SkipIf, SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import create_exec_option_dimension from tests.common.test_vector import ImpalaTestDimension @@ -53,6 +53,7 @@ @SkipIf.skip_hbase # -skip_hbase argument specified @SkipIfS3.hbase # S3: missing coverage: failures +@SkipIfADLS.hbase @SkipIfIsilon.hbase # ISILON: missing coverage: failures. @SkipIfLocal.hbase class TestFailpoints(ImpalaTestSuite): diff --git a/tests/metadata/test_compute_stats.py b/tests/metadata/test_compute_stats.py index f39fd02e38..3a13859b54 100644 --- a/tests/metadata/test_compute_stats.py +++ b/tests/metadata/test_compute_stats.py @@ -19,7 +19,7 @@ from subprocess import check_call from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import ( create_exec_option_dimension, create_single_exec_option_dimension, @@ -70,6 +70,7 @@ def test_compute_stats_keywords(self, vector): self.cleanup_db("parquet") @SkipIfS3.hive + @SkipIfADLS.hive @SkipIfIsilon.hive @SkipIfLocal.hive def test_compute_stats_impala_2201(self, vector, unique_database): diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py index 37ca7cd8f3..d69f603a5c 100644 --- a/tests/metadata/test_ddl.py +++ b/tests/metadata/test_ddl.py @@ -22,9 +22,9 @@ from test_ddl_base import TestDdlBase from tests.common.impala_test_suite import LOG from tests.common.parametrize import UniqueDatabase -from tests.common.skip import SkipIf, SkipIfLocal, SkipIfOldAggsJoins +from tests.common.skip import SkipIf, SkipIfADLS, SkipIfLocal, SkipIfOldAggsJoins from tests.common.test_dimensions import create_single_exec_option_dimension -from tests.util.filesystem_utils import WAREHOUSE, IS_HDFS, IS_LOCAL, IS_S3 +from tests.util.filesystem_utils import WAREHOUSE, IS_HDFS, IS_LOCAL, IS_S3, IS_ADLS # Validates DDL statements (create, drop) class TestDdlStatements(TestDdlBase): @@ -53,11 +53,13 @@ def test_drop_table_with_purge(self, unique_database): format(getpass.getuser(), unique_database)) # Drop the table (with purge) and make sure it doesn't exist in trash self.client.execute("drop table {0}.t2 purge".format(unique_database)) - if not IS_S3: + if not IS_S3 and not IS_ADLS: # In S3, deletes are eventual. So even though we dropped the table, the files - # belonging to this table will still be visible for some unbounded time. This + # belonging to this table may still be visible for some unbounded time. This # happens only with PURGE. A regular DROP TABLE is just a copy of files which is # consistent. + # The ADLS Python client is not strongly consistent, so these files may still be + # visible after a DROP. (Remove after IMPALA-5335 is resolved) assert not self.filesystem_client.exists("test-warehouse/{0}.db/t2/".\ format(unique_database)) assert not self.filesystem_client.exists("test-warehouse/{0}.db/t2/t2.txt".\ @@ -82,6 +84,7 @@ def test_drop_table_with_purge(self, unique_database): self.filesystem_client.delete_file_dir( "test-warehouse/{0}.db/data_t3".format(unique_database), recursive=True) + @SkipIfADLS.eventually_consistent @SkipIfLocal.hdfs_client def test_drop_cleans_hdfs_dirs(self, unique_database): self.client.execute('use default') @@ -129,6 +132,7 @@ def test_drop_cleans_hdfs_dirs(self, unique_database): # Re-create database to make unique_database teardown succeed. self._create_db(unique_database) + @SkipIfADLS.eventually_consistent @SkipIfLocal.hdfs_client def test_truncate_cleans_hdfs_files(self, unique_database): # Verify the db directory exists @@ -291,11 +295,13 @@ def test_drop_partition_with_purge(self, vector, unique_database): # Drop the partition (with purge) and make sure it doesn't exist in trash self.client.execute("alter table {0}.t1 drop partition(j=2) purge".\ format(unique_database)); - if not IS_S3: + if not IS_S3 and not IS_ADLS: # In S3, deletes are eventual. So even though we dropped the partition, the files - # belonging to this partition will still be visible for some unbounded time. This + # belonging to this partition may still be visible for some unbounded time. This # happens only with PURGE. A regular DROP TABLE is just a copy of files which is # consistent. + # The ADLS Python client is not strongly consistent, so these files may still be + # visible after a DROP. (Remove after IMPALA-5335 is resolved) assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/j=2/j2.txt".\ format(unique_database)) assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/j=2".\ diff --git a/tests/metadata/test_hdfs_encryption.py b/tests/metadata/test_hdfs_encryption.py index 9ace3c4eb4..ff135b8a4c 100644 --- a/tests/metadata/test_hdfs_encryption.py +++ b/tests/metadata/test_hdfs_encryption.py @@ -19,7 +19,7 @@ import pytest from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import ( create_single_exec_option_dimension, create_uncompressed_text_dimension) @@ -34,6 +34,7 @@ @SkipIfS3.hdfs_encryption +@SkipIfADLS.hdfs_encryption @SkipIfIsilon.hdfs_encryption @SkipIfLocal.hdfs_encryption @pytest.mark.execute_serially diff --git a/tests/metadata/test_hdfs_permissions.py b/tests/metadata/test_hdfs_permissions.py index 3c4957e6ad..af5a41ce3d 100644 --- a/tests/metadata/test_hdfs_permissions.py +++ b/tests/metadata/test_hdfs_permissions.py @@ -16,7 +16,7 @@ # under the License. from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfS3, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfLocal from tests.common.test_dimensions import ( create_single_exec_option_dimension, create_uncompressed_text_dimension) @@ -27,6 +27,7 @@ @SkipIfS3.hdfs_acls +@SkipIfADLS.hdfs_acls @SkipIfLocal.hdfs_client class TestHdfsPermissions(ImpalaTestSuite): @classmethod diff --git a/tests/metadata/test_hms_integration.py b/tests/metadata/test_hms_integration.py index 05bacfc2ca..bcdc1e63ba 100644 --- a/tests/metadata/test_hms_integration.py +++ b/tests/metadata/test_hms_integration.py @@ -30,7 +30,7 @@ from subprocess import call from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import ( create_single_exec_option_dimension, create_uncompressed_text_dimension) @@ -41,6 +41,7 @@ LOG = logging.getLogger('test_configuration') @SkipIfS3.hive +@SkipIfADLS.hive @SkipIfIsilon.hive @SkipIfLocal.hive class TestHmsIntegrationSanity(ImpalaTestSuite): @@ -87,6 +88,7 @@ def test_sanity(self, vector): assert 'test_tbl' in self.client.execute("show tables in hms_sanity_db").data @SkipIfS3.hive +@SkipIfADLS.hive @SkipIfIsilon.hive @SkipIfLocal.hive class TestHmsIntegration(ImpalaTestSuite): diff --git a/tests/metadata/test_metadata_query_statements.py b/tests/metadata/test_metadata_query_statements.py index cadc209d06..bf21ecb0ff 100644 --- a/tests/metadata/test_metadata_query_statements.py +++ b/tests/metadata/test_metadata_query_statements.py @@ -22,7 +22,7 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfIsilon, SkipIfS3, SkipIfLocal +from tests.common.skip import SkipIfIsilon, SkipIfS3, SkipIfADLS, SkipIfLocal from tests.common.test_dimensions import ALL_NODES_ONLY from tests.common.test_dimensions import create_exec_option_dimension from tests.common.test_dimensions import create_uncompressed_text_dimension @@ -75,6 +75,7 @@ def test_describe_path(self, vector, unique_database): # data doesn't reside in hdfs. @SkipIfIsilon.hive @SkipIfS3.hive + @SkipIfADLS.hive @SkipIfLocal.hive def test_describe_formatted(self, vector, unique_database): # For describe formmated, we try to match Hive's output as closely as possible. @@ -148,6 +149,7 @@ def __create_data_sources(self): self.client.execute(self.CREATE_DATA_SRC_STMT % (name,)) @SkipIfS3.hive + @SkipIfADLS.hive @SkipIfIsilon.hive @SkipIfLocal.hive @pytest.mark.execute_serially # because of invalidate metadata diff --git a/tests/metadata/test_partition_metadata.py b/tests/metadata/test_partition_metadata.py index c9f90e10dd..0758fdec1a 100644 --- a/tests/metadata/test_partition_metadata.py +++ b/tests/metadata/test_partition_metadata.py @@ -16,7 +16,7 @@ # under the License. from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import create_single_exec_option_dimension from tests.util.filesystem_utils import WAREHOUSE @@ -78,6 +78,7 @@ def test_multiple_partitions_same_location(self, vector, unique_database): assert data.split('\t') == ['6', '9'] @SkipIfS3.hive + @SkipIfADLS.hive @SkipIfIsilon.hive @SkipIfLocal.hive def test_partition_metadata_compatibility(self, vector, unique_database): diff --git a/tests/metadata/test_refresh_partition.py b/tests/metadata/test_refresh_partition.py index 4602ebc5b2..a8b5042c26 100644 --- a/tests/metadata/test_refresh_partition.py +++ b/tests/metadata/test_refresh_partition.py @@ -17,11 +17,12 @@ from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.test_dimensions import create_single_exec_option_dimension from tests.common.test_dimensions import create_uncompressed_text_dimension -from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal from tests.util.filesystem_utils import get_fs_path @SkipIfS3.hive +@SkipIfADLS.hive @SkipIfIsilon.hive @SkipIfLocal.hive class TestRefreshPartition(ImpalaTestSuite): diff --git a/tests/metadata/test_views_compatibility.py b/tests/metadata/test_views_compatibility.py index 3fd38f2189..116eb10992 100644 --- a/tests/metadata/test_views_compatibility.py +++ b/tests/metadata/test_views_compatibility.py @@ -22,7 +22,7 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import create_uncompressed_text_dimension from tests.util.test_file_parser import QueryTestSectionReader @@ -46,6 +46,7 @@ # Missing Coverage: Views created by Hive and Impala being visible and queryble by each # other on non hdfs storage. @SkipIfS3.hive +@SkipIfADLS.hive @SkipIfIsilon.hive @SkipIfLocal.hive class TestViewCompatibility(ImpalaTestSuite): diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py index efb2c2b432..6ea89f5361 100644 --- a/tests/query_test/test_compressed_formats.py +++ b/tests/query_test/test_compressed_formats.py @@ -23,7 +23,7 @@ from subprocess import call from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import create_single_exec_option_dimension from tests.common.test_vector import ImpalaTestDimension from tests.util.filesystem_utils import get_fs_path @@ -40,6 +40,7 @@ # Missing Coverage: Compressed data written by Hive is queriable by Impala on a non-hdfs # filesystem. @SkipIfS3.hive +@SkipIfADLS.hive @SkipIfIsilon.hive @SkipIfLocal.hive class TestCompressedFormats(ImpalaTestSuite): @@ -148,6 +149,7 @@ def test_seq_writer(self, vector, unique_database): self.run_test_case('QueryTest/seq-writer', vector, unique_database) @SkipIfS3.hive + @SkipIfADLS.hive @SkipIfIsilon.hive @SkipIfLocal.hive def test_seq_writer_hive_compatibility(self, vector, unique_database): diff --git a/tests/query_test/test_hdfs_caching.py b/tests/query_test/test_hdfs_caching.py index f446913575..96b25eca19 100644 --- a/tests/query_test/test_hdfs_caching.py +++ b/tests/query_test/test_hdfs_caching.py @@ -24,13 +24,14 @@ from tests.common.impala_cluster import ImpalaCluster from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import create_single_exec_option_dimension from tests.util.filesystem_utils import get_fs_path from tests.util.shell_util import exec_process # End to end test that hdfs caching is working. @SkipIfS3.caching # S3: missing coverage: verify SET CACHED gives error +@SkipIfADLS.caching @SkipIfIsilon.caching @SkipIfLocal.caching class TestHdfsCaching(ImpalaTestSuite): @@ -106,6 +107,7 @@ def test_cache_cancellation(self, vector): # run as a part of exhaustive tests which require the workload to be 'functional-query'. # TODO: Move this to TestHdfsCaching once we make exhaustive tests run for other workloads @SkipIfS3.caching +@SkipIfADLS.caching @SkipIfIsilon.caching @SkipIfLocal.caching class TestHdfsCachingFallbackPath(ImpalaTestSuite): @@ -114,6 +116,7 @@ def get_workload(self): return 'functional-query' @SkipIfS3.hdfs_encryption + @SkipIfADLS.hdfs_encryption @SkipIfIsilon.hdfs_encryption @SkipIfLocal.hdfs_encryption def test_hdfs_caching_fallback_path(self, vector, unique_database, testid_checksum): @@ -164,6 +167,7 @@ def test_hdfs_caching_fallback_path(self, vector, unique_database, testid_checks @SkipIfS3.caching +@SkipIfADLS.caching @SkipIfIsilon.caching @SkipIfLocal.caching class TestHdfsCachingDdl(ImpalaTestSuite): diff --git a/tests/query_test/test_hdfs_fd_caching.py b/tests/query_test/test_hdfs_fd_caching.py index 4d72714409..6961f3ba0b 100644 --- a/tests/query_test/test_hdfs_fd_caching.py +++ b/tests/query_test/test_hdfs_fd_caching.py @@ -20,10 +20,11 @@ from tests.common.impala_cluster import ImpalaCluster from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfS3 +from tests.common.skip import SkipIfS3, SkipIfADLS @SkipIfS3.caching +@SkipIfADLS.caching class TestHdfsFdCaching(ImpalaTestSuite): """ This test suite tests the behavior of HDFS file descriptor caching by evaluating the diff --git a/tests/query_test/test_insert_behaviour.py b/tests/query_test/test_insert_behaviour.py index 1584343502..e70fdee15c 100644 --- a/tests/query_test/test_insert_behaviour.py +++ b/tests/query_test/test_insert_behaviour.py @@ -22,7 +22,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.parametrize import UniqueDatabase -from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal from tests.util.filesystem_utils import WAREHOUSE, get_fs_path, IS_S3 @SkipIfLocal.hdfs_client @@ -45,6 +45,7 @@ def teardown_method(self, method): if method.__name__ == "test_insert_select_with_empty_resultset": self.cleanup_db(self.TEST_DB_NAME) + @SkipIfADLS.eventually_consistent @pytest.mark.execute_serially def test_insert_removes_staging_files(self): TBL_NAME = "insert_overwrite_nopart" @@ -130,6 +131,7 @@ def test_insert_alter_partition_location(self, unique_database): assert len(self.filesystem_client.ls(part_dir)) == 1 @SkipIfS3.hdfs_acls + @SkipIfADLS.hdfs_acls @SkipIfIsilon.hdfs_acls @pytest.mark.xfail(run=False, reason="Fails intermittently on test clusters") @pytest.mark.execute_serially @@ -190,6 +192,7 @@ def check_has_acls(part, acl=TEST_ACL): check_has_acls("p1=1/p2=2/p3=30", "default:group:new_leaf_group:-w-") @SkipIfS3.hdfs_acls + @SkipIfADLS.hdfs_acls @SkipIfIsilon.hdfs_acls def test_insert_file_permissions(self, unique_database): """Test that INSERT correctly respects file permission (minimum ACLs)""" @@ -240,6 +243,7 @@ def test_insert_file_permissions(self, unique_database): self.execute_query_expect_success(self.client, insert_query) @SkipIfS3.hdfs_acls + @SkipIfADLS.hdfs_acls @SkipIfIsilon.hdfs_acls def test_insert_acl_permissions(self, unique_database): """Test that INSERT correctly respects ACLs""" @@ -317,6 +321,7 @@ def test_insert_acl_permissions(self, unique_database): self.execute_query_expect_success(self.client, insert_query) @SkipIfS3.hdfs_acls + @SkipIfADLS.hdfs_acls @SkipIfIsilon.hdfs_acls def test_load_permissions(self, unique_database): # We rely on test_insert_acl_permissions() to exhaustively check that ACL semantics @@ -369,6 +374,7 @@ def test_load_permissions(self, unique_database): # We expect this to succeed, it's not an error if all files in the dir cannot be read self.execute_query_expect_success(self.client, load_dir_query) + @SkipIfADLS.eventually_consistent @pytest.mark.execute_serially def test_insert_select_with_empty_resultset(self): """Test insert/select query won't trigger partition directory or zero size data file @@ -439,6 +445,7 @@ def check_path_exists(path, should_exist): self.execute_query_expect_failure(self.client, insert_query) @SkipIfS3.hdfs_acls + @SkipIfADLS.hdfs_acls @SkipIfIsilon.hdfs_acls def test_multiple_group_acls(self, unique_database): """Test that INSERT correctly respects multiple group ACLs""" diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py index 9d9eb7b2b3..ee24549b24 100644 --- a/tests/query_test/test_insert_parquet.py +++ b/tests/query_test/test_insert_parquet.py @@ -30,7 +30,7 @@ from tests.common.environ import impalad_basedir from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.parametrize import UniqueDatabase -from tests.common.skip import SkipIfIsilon, SkipIfLocal, SkipIfS3 +from tests.common.skip import SkipIfIsilon, SkipIfLocal, SkipIfS3, SkipIfADLS from tests.common.test_dimensions import create_exec_option_dimension from tests.common.test_vector import ImpalaTestDimension from tests.util.filesystem_utils import get_fs_path @@ -161,6 +161,10 @@ def add_test_dimensions(cls): cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension("compression_codec", *PARQUET_CODECS)) + # ADLS does not have a configurable block size, so the 'PARQUET_FILE_SIZE' option + # that's passed as a hint to Hadoop is ignored for AdlFileSystem. So, we skip this + # test for ADLS. + @SkipIfADLS.hdfs_block_size @SkipIfIsilon.hdfs_block_size @SkipIfLocal.hdfs_client def test_insert_parquet_verify_size(self, vector, unique_database): @@ -297,6 +301,7 @@ def test_set_column_orders(self, vector, unique_database, tmpdir): @SkipIfIsilon.hive @SkipIfLocal.hive @SkipIfS3.hive +@SkipIfADLS.hive # TODO: Should we move this to test_parquet_stats.py? class TestHdfsParquetTableStatsWriter(ImpalaTestSuite): diff --git a/tests/query_test/test_join_queries.py b/tests/query_test/test_join_queries.py index 631db00fce..db1a4fa1fe 100644 --- a/tests/query_test/test_join_queries.py +++ b/tests/query_test/test_join_queries.py @@ -26,7 +26,8 @@ SkipIfIsilon, SkipIfLocal, SkipIfOldAggsJoins, - SkipIfS3) + SkipIfS3, + SkipIfADLS) from tests.common.test_vector import ImpalaTestDimension class TestJoinQueries(ImpalaTestSuite): @@ -60,6 +61,7 @@ def test_partitioned_joins(self, vector): self.run_test_case('QueryTest/joins-partitioned', vector) @SkipIfS3.hbase + @SkipIfADLS.hbase @SkipIfIsilon.hbase @SkipIf.skip_hbase @SkipIfLocal.hbase diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py index ffed8b5678..563589874b 100644 --- a/tests/query_test/test_nested_types.py +++ b/tests/query_test/test_nested_types.py @@ -22,7 +22,13 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfOldAggsJoins, SkipIfIsilon, SkipIfS3, SkipIfLocal +from tests.common.skip import ( + SkipIfOldAggsJoins, + SkipIfIsilon, + SkipIfS3, + SkipIfADLS, + SkipIfLocal) + from tests.util.filesystem_utils import WAREHOUSE, get_fs_path @SkipIfOldAggsJoins.nested_types @@ -553,6 +559,7 @@ def test_max_nesting_depth(self, vector, unique_database): @SkipIfIsilon.hive @SkipIfS3.hive + @SkipIfADLS.hive @SkipIfLocal.hive def test_load_hive_table(self, vector, unique_database): """Tests that Impala rejects Hive-created tables with complex types that exceed diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py index 0f82bd53f2..cdb322d3c7 100644 --- a/tests/query_test/test_observability.py +++ b/tests/query_test/test_observability.py @@ -16,7 +16,7 @@ # under the License. from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal class TestObservability(ImpalaTestSuite): @classmethod @@ -55,6 +55,7 @@ def test_broadcast_num_rows(self): @SkipIfS3.hbase @SkipIfLocal.hbase @SkipIfIsilon.hbase + @SkipIfADLS.hbase def test_scan_summary(self): """IMPALA-4499: Checks that the exec summary for scans show the table name.""" # HDFS table diff --git a/tests/query_test/test_partitioning.py b/tests/query_test/test_partitioning.py index 04d6c3a80c..8fc3399085 100644 --- a/tests/query_test/test_partitioning.py +++ b/tests/query_test/test_partitioning.py @@ -19,7 +19,7 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import create_single_exec_option_dimension # Tests to validate HDFS partitioning. @@ -46,6 +46,7 @@ def test_partition_col_types(self, vector, unique_database): # Missing Coverage: Impala deals with boolean partitions created by Hive on a non-hdfs # filesystem. @SkipIfS3.hive + @SkipIfADLS.hive @SkipIfIsilon.hive @SkipIfLocal.hive def test_boolean_partitions(self, vector, unique_database): diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 5dbe02a21e..74a69acb16 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -32,7 +32,12 @@ from testdata.common import widetable from tests.common.impala_test_suite import ImpalaTestSuite, LOG -from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfOldAggsJoins, SkipIfLocal +from tests.common.skip import ( + SkipIfS3, + SkipIfADLS, + SkipIfIsilon, + SkipIfOldAggsJoins, + SkipIfLocal) from tests.common.test_dimensions import create_single_exec_option_dimension from tests.common.test_result_verifier import ( parse_column_types, @@ -335,6 +340,7 @@ def test_corrupt_rle_counts(self, vector, unique_database): vector, unique_database) @SkipIfS3.hdfs_block_size + @SkipIfADLS.hdfs_block_size @SkipIfIsilon.hdfs_block_size @SkipIfLocal.multiple_impalad def test_misaligned_parquet_row_groups(self, vector): @@ -390,6 +396,7 @@ def _misaligned_parquet_row_groups_helper( assert total == num_scanners_with_no_reads @SkipIfS3.hdfs_block_size + @SkipIfADLS.hdfs_block_size @SkipIfIsilon.hdfs_block_size @SkipIfLocal.multiple_impalad def test_multiple_blocks(self, vector): @@ -403,6 +410,7 @@ def test_multiple_blocks(self, vector): self._multiple_blocks_helper(table_name, 40000, ranges_per_node=2) @SkipIfS3.hdfs_block_size + @SkipIfADLS.hdfs_block_size @SkipIfIsilon.hdfs_block_size @SkipIfLocal.multiple_impalad def test_multiple_blocks_one_row_group(self, vector): @@ -693,6 +701,7 @@ def test_text_scanner_with_header(self, vector, unique_database): # Missing Coverage: No coverage for truncated files errors or scans. @SkipIfS3.hive +@SkipIfADLS.hive @SkipIfIsilon.hive @SkipIfLocal.hive class TestScanTruncatedFiles(ImpalaTestSuite): diff --git a/tests/stress/test_ddl_stress.py b/tests/stress/test_ddl_stress.py index 9aa6de5d1a..b46f20124d 100644 --- a/tests/stress/test_ddl_stress.py +++ b/tests/stress/test_ddl_stress.py @@ -18,7 +18,7 @@ import pytest from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal +from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal # Number of tables to create per thread NUM_TBLS_PER_THREAD = 10 @@ -48,6 +48,7 @@ def add_test_dimensions(cls): v.get_value('table_format').compression_codec == 'none')) @SkipIfS3.caching + @SkipIfADLS.caching @SkipIfIsilon.caching @SkipIfLocal.caching @pytest.mark.stress diff --git a/tests/util/adls_util.py b/tests/util/adls_util.py new file mode 100644 index 0000000000..f6160741c3 --- /dev/null +++ b/tests/util/adls_util.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# ADLS access utilities +# +# This file uses the azure-data-lake-store-python client and provides simple +# functions to the Impala test suite to access Azure Data Lake Store. + +from azure.datalake.store import core, lib, multithread, exceptions +from tests.util.filesystem_base import BaseFilesystem +from tests.util.filesystem_utils import ADLS_CLIENT_ID, ADLS_TENANT_ID, ADLS_CLIENT_SECRET + +class ADLSClient(BaseFilesystem): + + def __init__(self, store): + self.token = lib.auth(tenant_id = ADLS_TENANT_ID, + client_secret = ADLS_CLIENT_SECRET, + client_id = ADLS_CLIENT_ID) + self.adlsclient = core.AzureDLFileSystem(self.token, store_name=store) + + def create_file(self, path, file_data, overwrite=True): + if not overwrite and self.exists(path): return False + with self.adlsclient.open(path, 'wb') as f: + num_bytes = f.write(file_data) + assert num_bytes == len(file_data), "ADLS write failed." + return True + + def make_dir(self, path, permission=None): + self.adlsclient.mkdir(path) + return True + + def copy(self, src, dst): + # The ADLS Python client doesn't support cp() yet, so we have to download and + # reupload to the destination. + src_contents = self.adlsclient.cat(src) + self.create_file(dst, src_contents, overwrite=True) + assert self.exists(dst), \ + 'ADLS copy failed: Destination file {dst} does not exist'.format(dst=dst) + + def ls(self, path): + file_paths = self.adlsclient.ls(path) + files= [] + for f in file_paths: + fname = f.split("/")[-1] + if not fname == '': + files += [fname] + return files + + def exists(self, path): + return self.adlsclient.exists(path) + + def delete_file_dir(self, path, recursive=False): + try: + self.adlsclient.rm(path, recursive) + except exceptions.FileNotFoundError as e: + return False + return True + + def get_all_file_sizes(self, path): + """Returns a list of integers which are all the file sizes of files found under + 'path'.""" + return [self.adlsclient.info(f)['length'] for f in self.ls(path)] diff --git a/tests/util/filesystem_utils.py b/tests/util/filesystem_utils.py index d4357200b1..77112bee88 100644 --- a/tests/util/filesystem_utils.py +++ b/tests/util/filesystem_utils.py @@ -29,6 +29,7 @@ IS_ISILON = FILESYSTEM == "isilon" IS_LOCAL = FILESYSTEM == "local" IS_HDFS = FILESYSTEM == "hdfs" +IS_ADLS = FILESYSTEM == "adls" # This condition satisfies both the states where one can assume a default fs # - The environment variable is set to an empty string. # - Tne environment variables is unset ( None ) @@ -42,6 +43,12 @@ # S3 specific values S3_BUCKET_NAME = os.getenv("S3_BUCKET") +# ADLS specific values +ADLS_STORE_NAME = os.getenv("azure_data_lake_store_name") +ADLS_CLIENT_ID = os.getenv("azure_client_id") +ADLS_TENANT_ID = os.getenv("azure_tenant_id") +ADLS_CLIENT_SECRET = os.getenv("azure_client_secret") + def get_fs_path(path): return "%s%s" % (FILESYSTEM_PREFIX, path) From 2ffc86a5b218035cc42fa220f4d33a92b29d3fa6 Mon Sep 17 00:00:00 2001 From: Sailesh Mukil Date: Thu, 25 May 2017 17:58:33 -0700 Subject: [PATCH 3/9] IMPALA-5375: Builds on CentOS 6.4 failing with broken python dependencies Builds on CentOS 6.4 fail due to dependencies not met for the new 'cryptography' python package. The ADLS commit states that the new packages are only required for ADLS and that ADLS on a dev environment is only supported from CentOS 6.7. This patch moves the compiled requirements for ADLS from compiled-requirements.txt to adls-requirements.txt and passing a compiler to the Pip environment while installing the ADLS requirements. Testing: Tested it on a machine that with TARGET_FILESYSTEM='adls' and also tested it on a CentOS 6.4 machine with the default configuration. Change-Id: I7d456a861a85edfcad55236aa8b0dbac2ff6fc78 Reviewed-on: http://gerrit.cloudera.org:8080/6998 Reviewed-by: Tim Armstrong Tested-by: Impala Public Jenkins --- infra/python/bootstrap_virtualenv.py | 14 ++++++++------ infra/python/deps/adls-requirements.txt | 5 +++++ infra/python/deps/compiled-requirements.txt | 5 ----- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/infra/python/bootstrap_virtualenv.py b/infra/python/bootstrap_virtualenv.py index 4fb8689dfe..e26aaf36b0 100644 --- a/infra/python/bootstrap_virtualenv.py +++ b/infra/python/bootstrap_virtualenv.py @@ -218,15 +218,17 @@ def install_compiled_deps_if_possible(): return True def install_adls_deps(): - # The ADLS dependencies require that the OS is at least CentOS 6.6 or above, + # The ADLS dependencies require that the OS is at least CentOS 6.7 or above, # which is why we break this into a seperate step. If the target filesystem is - # ADLS, the expectation is that the dev environment is running at least CentOS 6.6. - if reqs_are_installed(ADLS_REQS_PATH): - LOG.debug("Skipping ADLS deps: matching adls-installed-requirements.txt found") - return True + # ADLS, the expectation is that the dev environment is running at least CentOS 6.7. if os.environ.get('TARGET_FILESYSTEM') == "adls": + if reqs_are_installed(ADLS_REQS_PATH): + LOG.debug("Skipping ADLS deps: matching adls-installed-requirements.txt found") + return True + cc = select_cc() + assert cc is not None LOG.info("Installing ADLS packages into the virtualenv") - exec_pip_install(["-r", ADLS_REQS_PATH]) + exec_pip_install(["-r", ADLS_REQS_PATH], cc=cc) mark_reqs_installed(ADLS_REQS_PATH) def install_kudu_client_if_possible(): diff --git a/infra/python/deps/adls-requirements.txt b/infra/python/deps/adls-requirements.txt index f027a6ef4f..9df42b6367 100644 --- a/infra/python/deps/adls-requirements.txt +++ b/infra/python/deps/adls-requirements.txt @@ -18,4 +18,9 @@ # The following dependencies depend on cffi, so it must be installed after the toolchain # is bootstrapped and all requirements in requirements.txt and compiled-requirements.txt # are installed into the virtualenv. + +pycparser == 2.17 +cffi==1.10.0 +cryptography==1.8.1 + scandir == 1.5 azure-datalake-store == 0.0.9 diff --git a/infra/python/deps/compiled-requirements.txt b/infra/python/deps/compiled-requirements.txt index b9273d1f92..b3f9f4d2ec 100644 --- a/infra/python/deps/compiled-requirements.txt +++ b/infra/python/deps/compiled-requirements.txt @@ -32,11 +32,6 @@ impyla == 0.14.0 thrift == 0.9.0 thrift_sasl == 0.1.0 psutil == 0.7.1 -# Required for ADLS Python client - pycparser == 2.17 - cffi==1.10.0 - cryptography==1.8.1 - scandir == 1.5 # Required for Kudu: Cython == 0.23.4 numpy == 1.10.4 From 117fc388bff2a754be081eae7667627f84f1b33c Mon Sep 17 00:00:00 2001 From: Sailesh Mukil Date: Tue, 30 May 2017 18:56:43 +0000 Subject: [PATCH 4/9] IMPALA-5383: Fix PARQUET_FILE_SIZE option for ADLS PARQUET_FILE_SIZE query option doesn't work with ADLS because the AdlFileSystem doesn't have a notion of block sizes. And impala depends on the filesystem remembering the block size which is then used as the target parquet file size (this is done for Hdfs so that the parquet file size and block size match even if the parquet_file_size isn't a valid blocksize). We special case for Adls just like we do for S3 to bypass the FileSystem block size, and instead just use the requested PARQUET_FILE_SIZE as the output partitions block_size (and consequently the parquet file target size). Testing: Re-enabled test_insert_parquet_verify_size() for ADLS. Also fixed a miscellaneous bug with the ADLS client listing helper function. Change-Id: I474a913b0ff9b2709f397702b58cb1c74251c25b Reviewed-on: http://gerrit.cloudera.org:8080/7018 Reviewed-by: Sailesh Mukil Tested-by: Impala Public Jenkins --- be/src/exec/hdfs-table-sink.cc | 8 +++++--- be/src/util/hdfs-util.cc | 7 +++++++ be/src/util/hdfs-util.h | 3 +++ tests/query_test/test_insert_parquet.py | 4 ---- tests/util/adls_util.py | 3 ++- 5 files changed, 17 insertions(+), 8 deletions(-) diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 9da6e57fa9..b49451a9e2 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -390,10 +390,12 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, output_partition->current_file_name)); } - if (IsS3APath(output_partition->current_file_name.c_str())) { + if (IsS3APath(output_partition->current_file_name.c_str()) || + IsADLSPath(output_partition->current_file_name.c_str())) { // On S3A, the file cannot be stat'ed until after it's closed, and even so, the block - // size reported will be just the filesystem default. So, remember the requested - // block size. + // size reported will be just the filesystem default. Similarly, the block size + // reported for ADLS will be the filesystem default. So, remember the requested block + // size. output_partition->block_size = block_size; } else { // HDFS may choose to override the block size that we've recommended, so for non-S3 diff --git a/be/src/util/hdfs-util.cc b/be/src/util/hdfs-util.cc index 440b68dc2e..28d318cc23 100644 --- a/be/src/util/hdfs-util.cc +++ b/be/src/util/hdfs-util.cc @@ -85,6 +85,13 @@ bool IsS3APath(const char* path) { return strncmp(path, "s3a://", 6) == 0; } +bool IsADLSPath(const char* path) { + if (strstr(path, ":/") == NULL) { + return ExecEnv::GetInstance()->default_fs().compare(0, 6, "adl://") == 0; + } + return strncmp(path, "adl://", 6) == 0; +} + // Returns the length of the filesystem name in 'path' which is the length of the // 'scheme://authority'. Returns 0 if the path is unqualified. static int GetFilesystemNameLength(const char* path) { diff --git a/be/src/util/hdfs-util.h b/be/src/util/hdfs-util.h index 32be643d22..b9f415bb72 100644 --- a/be/src/util/hdfs-util.h +++ b/be/src/util/hdfs-util.h @@ -50,6 +50,9 @@ bool IsHdfsPath(const char* path); /// Returns true iff the path refers to a location on an S3A filesystem. bool IsS3APath(const char* path); +/// Returns true iff the path refers to a location on an ADL filesystem. +bool IsADLSPath(const char* path); + /// Returns true iff 'pathA' and 'pathB' are on the same filesystem. bool FilesystemsMatch(const char* pathA, const char* pathB); } diff --git a/tests/query_test/test_insert_parquet.py b/tests/query_test/test_insert_parquet.py index ee24549b24..c19363f8ed 100644 --- a/tests/query_test/test_insert_parquet.py +++ b/tests/query_test/test_insert_parquet.py @@ -161,10 +161,6 @@ def add_test_dimensions(cls): cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension("compression_codec", *PARQUET_CODECS)) - # ADLS does not have a configurable block size, so the 'PARQUET_FILE_SIZE' option - # that's passed as a hint to Hadoop is ignored for AdlFileSystem. So, we skip this - # test for ADLS. - @SkipIfADLS.hdfs_block_size @SkipIfIsilon.hdfs_block_size @SkipIfLocal.hdfs_client def test_insert_parquet_verify_size(self, vector, unique_database): diff --git a/tests/util/adls_util.py b/tests/util/adls_util.py index f6160741c3..b72b4c16b8 100644 --- a/tests/util/adls_util.py +++ b/tests/util/adls_util.py @@ -73,4 +73,5 @@ def delete_file_dir(self, path, recursive=False): def get_all_file_sizes(self, path): """Returns a list of integers which are all the file sizes of files found under 'path'.""" - return [self.adlsclient.info(f)['length'] for f in self.ls(path)] + return [self.adlsclient.info(f)['length'] for f in self.adlsclient.ls(path) \ + if self.adlsclient.info(f)['type'] == 'FILE'] From b8558506957dbf44b8ceb29c8b7382bfd8180e05 Mon Sep 17 00:00:00 2001 From: Sailesh Mukil Date: Tue, 30 May 2017 19:50:13 +0000 Subject: [PATCH 5/9] IMPALA-5378: Disk IO manager needs to understand ADLS The Disk IO Manager had customized support for S3 and remote HDFS that allows for these to use a separate queue and have a customized number of IO threads. ADLS did not have this support. Based on the code in DiskIoMgr::Init and DiskIoMgr::AssignQueue, IOs for ADLS were previously put in local disk queues. Since local disks are considered rotational unless we can confirm otherwise by looking at the /sys filesystem, this means that THREADS_PER_ROTATIONAL_DISK=1 was being applied as the thread count. This patch adds customized support for ADLS, similar to how it was done for S3. We set 16 threads as the default number of IO threads for ADLS. For smaller clusters, setting a higher number like 64 would work better. We keep the thread count to a lower default of 16 since there is an undocumented concurrency limit for clusters, which is around 500-700 connections, which means we would hurt node level parallelism if we have higher thread level parallelism, for larger clusters. We also set the default maximum chunk size for ADLS as 128k. This is due to the fact that direct reads aren't supported for ADLS, which means that the JNI array allocation and the memcpy adds significant overhead for larger buffers. 128k was chosen emperically for S3 for the same reason. Since this reason also holds for ADLS, we keep the same value. A new flag called FLAGS_adls_read_chunk_size is used to control this value. TODO: Settle on a buffer size with the most optimal buffer size emperically. Change-Id: I067f053fec941e3631610c5cc89a384f257ba906 Reviewed-on: http://gerrit.cloudera.org:8080/7033 Reviewed-by: Sailesh Mukil Tested-by: Impala Public Jenkins --- be/src/runtime/disk-io-mgr-scan-range.cc | 11 +++++++++++ be/src/runtime/disk-io-mgr.cc | 9 +++++++++ be/src/runtime/disk-io-mgr.h | 4 ++++ 3 files changed, 24 insertions(+) diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc index 9ccd1da59f..0668eb61b3 100644 --- a/be/src/runtime/disk-io-mgr-scan-range.cc +++ b/be/src/runtime/disk-io-mgr-scan-range.cc @@ -33,6 +33,13 @@ DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRea "when performing HDFS read operations. This is necessary to use HDFS hedged reads " "(assuming the HDFS client is configured to do so)."); +// TODO: Run perf tests and empirically settle on the most optimal default value for the +// read buffer size. Currently setting it as 128k for the same reason as for S3, i.e. +// due to JNI array allocation and memcpy overhead, 128k was emperically found to have the +// least overhead. +DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to use when " + "reading from ADLS."); + // Implementation of the ScanRange functionality. Each ScanRange contains a queue // of ready buffers. For each ScanRange, there is only a single producer and // consumer thread, i.e. only one disk thread will push to a scan range at @@ -389,6 +396,10 @@ int64_t DiskIoMgr::ScanRange::MaxReadChunkSize() const { DCHECK(IsS3APath(file())); return 128 * 1024; } + if (disk_id_ == io_mgr_->RemoteAdlsDiskId()) { + DCHECK(IsADLSPath(file())); + return FLAGS_adls_read_chunk_size; + } // The length argument of hdfsRead() is an int. Ensure we don't overflow it. return numeric_limits::max(); } diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 1f28b1a671..536ed88f2f 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -51,6 +51,11 @@ DEFINE_int32(num_remote_hdfs_io_threads, 8, "number of remote HDFS I/O threads") // open to S3 and use of multiple CPU cores since S3 reads are relatively compute // expensive (SSL and JNI buffer overheads). DEFINE_int32(num_s3_io_threads, 16, "number of S3 I/O threads"); +// The maximum number of ADLS I/O threads. This number is a good default to have for +// clusters that may vary widely in size, due to an undocumented concurrency limit +// enforced by ADLS for a cluster, which spans between 500-700. For smaller clusters +// (~10 nodes), 64 threads would be more ideal. +DEFINE_int32(num_adls_io_threads, 16, "number of ADLS I/O threads"); // The read size is the size of the reads sent to hdfs/os. // There is a trade off of latency and throughout, trying to keep disks busy but // not introduce seeks. The literature seems to agree that with 8 MB reads, random @@ -388,6 +393,8 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) { num_threads_per_disk = FLAGS_num_remote_hdfs_io_threads; } else if (i == RemoteS3DiskId()) { num_threads_per_disk = FLAGS_num_s3_io_threads; + } else if (i == RemoteAdlsDiskId()) { + num_threads_per_disk = FLAGS_num_adls_io_threads; } else if (num_threads_per_disk_ != 0) { num_threads_per_disk = num_threads_per_disk_; } else if (DiskInfo::is_rotational(i)) { @@ -1248,9 +1255,11 @@ int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) { return RemoteDfsDiskId(); } if (IsS3APath(file)) return RemoteS3DiskId(); + if (IsADLSPath(file)) return RemoteAdlsDiskId(); } // Assign to a local disk queue. DCHECK(!IsS3APath(file)); // S3 is always remote. + DCHECK(!IsADLSPath(file)); // ADLS is always remote. if (disk_id == -1) { // disk id is unknown, assign it an arbitrary one. disk_id = next_disk_id_.Add(1); diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h index b70c9c8e14..41d3226c86 100644 --- a/be/src/runtime/disk-io-mgr.h +++ b/be/src/runtime/disk-io-mgr.h @@ -756,6 +756,9 @@ class DiskIoMgr { /// The disk ID (and therefore disk_queues_ index) used for S3 accesses. int RemoteS3DiskId() const { return num_local_disks() + REMOTE_S3_DISK_OFFSET; } + /// The disk ID (and therefore disk_queues_ index) used for ADLS accesses. + int RemoteAdlsDiskId() const { return num_local_disks() + REMOTE_ADLS_DISK_OFFSET; } + /// Dumps the disk IoMgr queues (for readers and disks) std::string DebugString(); @@ -787,6 +790,7 @@ class DiskIoMgr { enum { REMOTE_DFS_DISK_OFFSET = 0, REMOTE_S3_DISK_OFFSET, + REMOTE_ADLS_DISK_OFFSET, REMOTE_NUM_DISKS }; From 5141a10ee1f71945dd5d15000796b7cf717e7928 Mon Sep 17 00:00:00 2001 From: hzhelifu Date: Mon, 28 Aug 2017 15:05:37 +0800 Subject: [PATCH 6/9] support runtimefilter for kudu. --- be/src/exec/kudu-scan-node-base.cc | 99 +++++++++++++++++++ be/src/exec/kudu-scan-node-base.h | 12 ++- be/src/exec/kudu-scan-node.cc | 3 + be/src/exec/kudu-scanner.cc | 7 ++ be/src/runtime/coordinator.cc | 3 +- be/src/runtime/runtime-filter.h | 1 + .../planner/RuntimeFilterGenerator.java | 2 +- 7 files changed, 122 insertions(+), 5 deletions(-) diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc index c8a7870402..1914329772 100644 --- a/be/src/exec/kudu-scan-node-base.cc +++ b/be/src/exec/kudu-scan-node-base.cc @@ -18,6 +18,8 @@ #include "exec/kudu-scan-node-base.h" #include +#include + #include #include #include @@ -27,13 +29,17 @@ #include "exec/kudu-scanner.h" #include "exec/kudu-util.h" #include "exprs/expr.h" +#include "exprs/expr-context.h" +#include "runtime/descriptors.h" #include "runtime/exec-env.h" #include "runtime/mem-pool.h" #include "runtime/query-state.h" +#include "runtime/runtime-filter.inline.h" #include "runtime/runtime-state.h" #include "runtime/row-batch.h" #include "runtime/string-value.h" #include "runtime/tuple-row.h" +#include "util/jni-util.h" #include "util/periodic-counter-updater.h" #include "util/runtime-profile-counters.h" @@ -41,6 +47,7 @@ using kudu::client::KuduClient; using kudu::client::KuduTable; +using boost::algorithm::join; namespace impala { @@ -61,6 +68,42 @@ KuduScanNodeBase::~KuduScanNodeBase() { DCHECK(is_closed()); } +Status KuduScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::Init(tnode, state)); + + const TQueryOptions& query_options = state->query_options(); + for (const TRuntimeFilterDesc& filter: tnode.runtime_filters) { + auto it = filter.planid_to_target_ndx.find(tnode.node_id); + DCHECK(it != filter.planid_to_target_ndx.end()); + const TRuntimeFilterTargetDesc& target = filter.targets[it->second]; + if (state->query_options().runtime_filter_mode == TRuntimeFilterMode::LOCAL && + !target.is_local_target) { + continue; + } + if (query_options.disable_row_runtime_filtering && + !target.is_bound_by_partition_columns) { + continue; + } + + FilterContext filter_ctx; + RETURN_IF_ERROR( + Expr::CreateExprTree(pool_, target.target_expr, &filter_ctx.expr_ctx)); + filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, false); + + string filter_profile_title = Substitute("Filter $0 ($1)", filter.filter_id, + PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES)); + RuntimeProfile* profile = state->obj_pool()->Add( + new RuntimeProfile(state->obj_pool(), filter_profile_title)); + runtime_profile_->AddChild(profile); + filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile, + target.is_bound_by_partition_columns)); + + filter_ctxs_.push_back(filter_ctx); + } + + return Status::OK(); +} + Status KuduScanNodeBase::Prepare(RuntimeState* state) { RETURN_IF_ERROR(ScanNode::Prepare(state)); runtime_state_ = state; @@ -123,6 +166,46 @@ bool KuduScanNodeBase::HasScanToken() { return (next_scan_token_idx_ < scan_tokens_.size()); } +Status KuduScanNodeBase::TransformFilterToKuduBF() { + for (auto& filter_ctx: filter_ctxs_) { + auto it = filter_ctx.filter->filter_desc().planid_to_target_ndx.find(id()); + DCHECK(it != filter_ctx.filter->filter_desc().planid_to_target_ndx.end()); + const TRuntimeFilterTargetDesc& target = filter_ctx.filter->filter_desc().targets[it->second]; + + TExpr expr = target.target_expr; + string col_name; + GetSlotRefColumnName(expr.nodes[0], &col_name); + col_to_bf_[col_name] = const_cast(filter_ctx.filter->GetBloomFilter()); + } + return Status::OK(); +} + +bool KuduScanNodeBase::WaitForRuntimeFilters(int32_t time_ms) { + vector arrived_filter_ids; + int32_t start = MonotonicMillis(); + for (auto& ctx: filter_ctxs_) { + if (ctx.filter->WaitForArrival(time_ms)) { + arrived_filter_ids.push_back(Substitute("$0", ctx.filter->id())); + } + } + int32_t end = MonotonicMillis(); + const string& wait_time = PrettyPrinter::Print(end - start, TUnit::TIME_MS); + + if (arrived_filter_ids.size() == filter_ctxs_.size()) { + runtime_profile()->AddInfoString("Runtime filters", + Substitute("All filters arrived. Waited $0", wait_time)); + VLOG_QUERY << "Filters arrived. Waited " << wait_time; + TransformFilterToKuduBF(); + return true; + } + + const string& filter_str = Substitute("Only following filters arrived: $0, waited $1", + join(arrived_filter_ids, ", "), wait_time); + runtime_profile()->AddInfoString("Runtime filters", filter_str); + VLOG_QUERY << filter_str; + return false; +} + const string* KuduScanNodeBase::GetNextScanToken() { if (!HasScanToken()) return nullptr; const string* token = &scan_tokens_[next_scan_token_idx_++]; @@ -141,4 +224,20 @@ Status KuduScanNodeBase::GetConjunctCtxs(vector* ctxs) { return Expr::CloneIfNotExists(conjunct_ctxs_, runtime_state_, ctxs); } +void KuduScanNodeBase::GetSlotRefColumnName(const TExprNode& node, string* col_name) { + const KuduTableDescriptor* table_desc = + static_cast(tuple_desc_->table_desc()); + TSlotId slot_id = node.slot_ref.slot_id; + BOOST_FOREACH(SlotDescriptor* slot, tuple_desc_->slots()) { + if (slot->id() == slot_id) { + int col_idx = slot->col_pos(); + *col_name = table_desc->col_descs()[col_idx].name(); + return; + } + } + + DCHECK(false) << "Could not find a slot with slot id: " << slot_id; +} + + } // namespace impala diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h index 70e5b947cf..e3638daf71 100644 --- a/be/src/exec/kudu-scan-node-base.h +++ b/be/src/exec/kudu-scan-node-base.h @@ -21,6 +21,7 @@ #include #include +#include "exec/filter-context.h" #include "exec/scan-node.h" #include "runtime/descriptors.h" @@ -38,7 +39,7 @@ class KuduScanNodeBase : public ScanNode { public: KuduScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); ~KuduScanNodeBase(); - + virtual Status Init(const TPlanNode& tnode, RuntimeState* state); virtual Status Prepare(RuntimeState* state); virtual Status Open(RuntimeState* state); virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) = 0; @@ -56,9 +57,12 @@ class KuduScanNodeBase : public ScanNode { /// Returns the next scan token. Returns NULL if there are no more scan tokens. /// Not thread safe, access must be synchronized. const std::string* GetNextScanToken(); + //Status TransformFilterToKuduBF(impala_kudu::BloomFiltersPB& kudu_bf); + bool WaitForRuntimeFilters(int32_t time_ms); RuntimeState* runtime_state_; - + std::vector filter_ctxs_; + std::map col_to_bf_; /// Stops periodic counters and aggregates counter values for the entire scan node. /// This should be called as soon as the scan node is complete to get the most accurate /// counter values. @@ -101,9 +105,11 @@ class KuduScanNodeBase : public ScanNode { /// Returns a cloned copy of the scan node's conjuncts. Requires that the expressions /// have been open previously. Status GetConjunctCtxs(vector* ctxs); - + void GetSlotRefColumnName(const TExprNode& node, string* col_name); + Status TransformFilterToKuduBF(); const TupleDescriptor* tuple_desc() const { return tuple_desc_; } kudu::client::KuduClient* kudu_client() { return client_; } + std::map& kudu_bloom_filter() { return col_to_bf_; } RuntimeProfile::Counter* kudu_round_trips() const { return kudu_round_trips_; } }; diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc index c34574854f..ee28fe182c 100644 --- a/be/src/exec/kudu-scan-node.cc +++ b/be/src/exec/kudu-scan-node.cc @@ -197,6 +197,9 @@ void KuduScanNode::RunScannerThread(const string& name, const string* initial_to // Here, even though a read of 'done_' may conflict with a write to it, // ProcessScanToken() will return early, as will GetNextScanToken(). while (!done_ && scan_token != NULL) { + if (filter_ctxs_.size() > 0 && col_to_bf_.empty()) { + WaitForRuntimeFilters(1000); + } status = ProcessScanToken(&scanner, *scan_token); if (!status.ok()) break; diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc index 8ed160fe6c..4e911ce5c3 100644 --- a/be/src/exec/kudu-scanner.cc +++ b/be/src/exec/kudu-scanner.cc @@ -18,6 +18,7 @@ #include "exec/kudu-scanner.h" #include +#include #include #include #include @@ -154,6 +155,12 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token) { uint64_t row_format_flags = kudu::client::KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES; scanner_->SetRowFormatFlags(row_format_flags); + for (auto& kudu_bf: scan_node_->kudu_bloom_filter()) { + kudu::client::KuduValueBloomFilter* bf = kudu::client::KuduValueBloomFilterBuilder().Build(kudu_bf.second); + kudu::client::KuduPredicate* p = scan_node_->table_->NewBloomFilterPredicate(kudu_bf.first, bf); + scanner_->AddConjunctPredicate(p); + } + { SCOPED_TIMER(state_->total_storage_wait_timer()); KUDU_RETURN_IF_ERROR(scanner_->Open(), "Unable to open scanner"); diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 741da32dec..06c0b30fea 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -350,7 +350,8 @@ void Coordinator::InitFilterRoutingTable() { f->src_fragment_instance_idxs()->insert(src_idxs.begin(), src_idxs.end()); // target plan node of filter - } else if (plan_node.__isset.hdfs_scan_node) { + } else if (plan_node.__isset.hdfs_scan_node || + plan_node.__isset.kudu_scan_node) { auto it = filter.planid_to_target_ndx.find(plan_node.node_id); DCHECK(it != filter.planid_to_target_ndx.end()); const TRuntimeFilterTargetDesc& t_target = filter.targets[it->second]; diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h index 828c36d824..166254ccc4 100644 --- a/be/src/runtime/runtime-filter.h +++ b/be/src/runtime/runtime-filter.h @@ -55,6 +55,7 @@ class RuntimeFilter { /// Sets the internal filter bloom_filter to 'bloom_filter'. Can only legally be called /// once per filter. Does not acquire the memory associated with 'bloom_filter'. inline void SetBloomFilter(BloomFilter* bloom_filter); + const BloomFilter* GetBloomFilter() const { return bloom_filter_; } /// Returns false iff 'bloom_filter_' has been set via SetBloomFilter() and hash[val] is /// not in that 'bloom_filter_'. Otherwise returns true. Is safe to call concurrently diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java index 133a14eb90..336c94a363 100644 --- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java +++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java @@ -536,7 +536,7 @@ private void finalizeRuntimeFilter(RuntimeFilter runtimeFilter) { * Currently, runtime filters can only be assigned to HdfsScanNodes. */ private void assignRuntimeFilters(Analyzer analyzer, ScanNode scanNode) { - if (!(scanNode instanceof HdfsScanNode)) return; + if (!((scanNode instanceof HdfsScanNode) || (scanNode instanceof KuduScanNode))) return; TupleId tid = scanNode.getTupleIds().get(0); if (!runtimeFiltersByTid_.containsKey(tid)) return; for (RuntimeFilter filter: runtimeFiltersByTid_.get(tid)) { From b966906a442c4da9b039c70d96aaee9e39ee37dd Mon Sep 17 00:00:00 2001 From: hzhelifu Date: Wed, 20 Sep 2017 16:24:29 +0800 Subject: [PATCH 7/9] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E9=80=9A=E8=BF=87?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- be/src/exec/kudu-scan-node-base.cc | 41 +++++++++++++++---- be/src/exec/kudu-scan-node-base.h | 14 +++++-- be/src/exec/kudu-scan-node.cc | 4 +- be/src/exec/kudu-scanner.cc | 30 ++++++++++++-- .../apache/impala/planner/KuduScanNode.java | 4 ++ .../planner/RuntimeFilterGenerator.java | 17 ++++---- 6 files changed, 84 insertions(+), 26 deletions(-) diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc index 1914329772..6e68aba786 100644 --- a/be/src/exec/kudu-scan-node-base.cc +++ b/be/src/exec/kudu-scan-node-base.cc @@ -49,6 +49,9 @@ using kudu::client::KuduClient; using kudu::client::KuduTable; using boost::algorithm::join; +DEFINE_int32(kudu_runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in ms, " + "that a scan node will wait for expected runtime filters to arrive."); + namespace impala { const string KuduScanNodeBase::KUDU_ROUND_TRIPS = "TotalKuduScanRoundTrips"; @@ -57,6 +60,7 @@ const string KuduScanNodeBase::KUDU_REMOTE_TOKENS = "KuduRemoteScanTokens"; KuduScanNodeBase::KuduScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ScanNode(pool, tnode, descs), + wait_time_ms_(0), tuple_id_(tnode.kudu_scan_node.tuple_id), client_(nullptr), counters_running_(false), @@ -148,6 +152,11 @@ Status KuduScanNodeBase::Open(RuntimeState* state) { KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc->table_name(), &table_), "Unable to open Kudu table"); + + wait_time_ms_ = FLAGS_kudu_runtime_filter_wait_time_ms; + if (runtime_state_->query_options().runtime_filter_wait_time_ms > 0) { + wait_time_ms_ = runtime_state_->query_options().runtime_filter_wait_time_ms; + } return Status::OK(); } @@ -166,36 +175,50 @@ bool KuduScanNodeBase::HasScanToken() { return (next_scan_token_idx_ < scan_tokens_.size()); } -Status KuduScanNodeBase::TransformFilterToKuduBF() { - for (auto& filter_ctx: filter_ctxs_) { - auto it = filter_ctx.filter->filter_desc().planid_to_target_ndx.find(id()); - DCHECK(it != filter_ctx.filter->filter_desc().planid_to_target_ndx.end()); - const TRuntimeFilterTargetDesc& target = filter_ctx.filter->filter_desc().targets[it->second]; +Status KuduScanNodeBase::CollectKuduBFs(const std::vector& ctxs) { + std::vector::const_iterator it = ctxs.begin(); + for (; it != ctxs.end(); ++it) { + const TRuntimeFilterDesc& desc = (*it)->filter->filter_desc(); + const auto iter = desc.planid_to_target_ndx.find(id()); + DCHECK(iter != desc.planid_to_target_ndx.end()); + const TRuntimeFilterTargetDesc& target = desc.targets[iter->second]; - TExpr expr = target.target_expr; + // Is there any other method to get column name? string col_name; - GetSlotRefColumnName(expr.nodes[0], &col_name); - col_to_bf_[col_name] = const_cast(filter_ctx.filter->GetBloomFilter()); + GetSlotRefColumnName(target.target_expr.nodes[0], &col_name); + + // Check whether the column has been collected. + if (column_done.find(col_name) != column_done.end()) continue; + column_done.insert(col_name); + BloomFilter* bf = const_cast((*it)->filter->GetBloomFilter()); + if (bf != nullptr) { + column_to_bf_.insert(std::make_pair(col_name, bf)); + } } + return Status::OK(); } bool KuduScanNodeBase::WaitForRuntimeFilters(int32_t time_ms) { vector arrived_filter_ids; + vector ctxs; int32_t start = MonotonicMillis(); for (auto& ctx: filter_ctxs_) { if (ctx.filter->WaitForArrival(time_ms)) { arrived_filter_ids.push_back(Substitute("$0", ctx.filter->id())); + ctxs.push_back(&ctx); } } int32_t end = MonotonicMillis(); const string& wait_time = PrettyPrinter::Print(end - start, TUnit::TIME_MS); + // Collect bloom filters from filter contexts. + CollectKuduBFs(ctxs); + if (arrived_filter_ids.size() == filter_ctxs_.size()) { runtime_profile()->AddInfoString("Runtime filters", Substitute("All filters arrived. Waited $0", wait_time)); VLOG_QUERY << "Filters arrived. Waited " << wait_time; - TransformFilterToKuduBF(); return true; } diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h index e3638daf71..53cde21969 100644 --- a/be/src/exec/kudu-scan-node-base.h +++ b/be/src/exec/kudu-scan-node-base.h @@ -62,7 +62,14 @@ class KuduScanNodeBase : public ScanNode { RuntimeState* runtime_state_; std::vector filter_ctxs_; - std::map col_to_bf_; + + // Column name to bloom filter which references to bloom filter pointer in RuntimeFilter. + std::map column_to_bf_; + // The set of column names which has been collected. + std::set column_done; + // Scan node will wait for expected runtime filters to arrive. + int32 wait_time_ms_; + /// Stops periodic counters and aggregates counter values for the entire scan node. /// This should be called as soon as the scan node is complete to get the most accurate /// counter values. @@ -106,11 +113,12 @@ class KuduScanNodeBase : public ScanNode { /// have been open previously. Status GetConjunctCtxs(vector* ctxs); void GetSlotRefColumnName(const TExprNode& node, string* col_name); - Status TransformFilterToKuduBF(); const TupleDescriptor* tuple_desc() const { return tuple_desc_; } kudu::client::KuduClient* kudu_client() { return client_; } - std::map& kudu_bloom_filter() { return col_to_bf_; } RuntimeProfile::Counter* kudu_round_trips() const { return kudu_round_trips_; } + Status CollectKuduBFs(const std::vector& filter); + std::map& GetKuduBFs() { return column_to_bf_; } + void ClearKuduBFs() { column_to_bf_.clear(); } }; } diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc index ee28fe182c..f3c372872f 100644 --- a/be/src/exec/kudu-scan-node.cc +++ b/be/src/exec/kudu-scan-node.cc @@ -197,8 +197,8 @@ void KuduScanNode::RunScannerThread(const string& name, const string* initial_to // Here, even though a read of 'done_' may conflict with a write to it, // ProcessScanToken() will return early, as will GetNextScanToken(). while (!done_ && scan_token != NULL) { - if (filter_ctxs_.size() > 0 && col_to_bf_.empty()) { - WaitForRuntimeFilters(1000); + if (filter_ctxs_.size() > 0) { + WaitForRuntimeFilters(wait_time_ms_); } status = ProcessScanToken(&scanner, *scan_token); if (!status.ok()) break; diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc index 4e911ce5c3..b1fec7833f 100644 --- a/be/src/exec/kudu-scanner.cc +++ b/be/src/exec/kudu-scanner.cc @@ -155,10 +155,15 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token) { uint64_t row_format_flags = kudu::client::KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES; scanner_->SetRowFormatFlags(row_format_flags); - for (auto& kudu_bf: scan_node_->kudu_bloom_filter()) { - kudu::client::KuduValueBloomFilter* bf = kudu::client::KuduValueBloomFilterBuilder().Build(kudu_bf.second); - kudu::client::KuduPredicate* p = scan_node_->table_->NewBloomFilterPredicate(kudu_bf.first, bf); - scanner_->AddConjunctPredicate(p); + // Push down the bloom filter predicates. + if (!scan_node_->GetKuduBFs().empty()) { + for (auto& one: scan_node_->GetKuduBFs()) { + kudu::client::KuduValueBloomFilter* bf = kudu::client::KuduValueBloomFilterBuilder().Build(one.second); + kudu::client::KuduPredicate* p = scan_node_->table_->NewBloomFilterPredicate(one.first, bf); + scanner_->AddConjunctPredicate(p); + } + // Clear the map of bloom filters. + scan_node_->ClearKuduBFs(); } { @@ -255,6 +260,23 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me Status KuduScanner::GetNextScannerBatch() { SCOPED_TIMER(state_->total_storage_wait_timer()); int64_t now = MonotonicMicros(); + + // Wait for the runtime filters. + // It should not wait for runtime filters any more, if the size of + // bloom filters that have been collected equals to the size of filter context's. + if (scan_node_->filter_ctxs_.size() > 0 && + (scan_node_->filter_ctxs_.size() != scan_node_->column_done.size())) { + scan_node_->WaitForRuntimeFilters(scan_node_->wait_time_ms_/10); + if (!scan_node_->GetKuduBFs().empty()) { + for (auto& one: scan_node_->GetKuduBFs()) { + kudu::client::KuduValueBloomFilter* bf = kudu::client::KuduValueBloomFilterBuilder().Build(one.second); + kudu::client::KuduPredicate* p = scan_node_->table_->NewBloomFilterPredicate(one.first, bf); + scanner_->AddConjunctPredicate(p); + } + scan_node_->ClearKuduBFs(); + } + } + KUDU_RETURN_IF_ERROR(scanner_->NextBatch(&cur_kudu_batch_), "Unable to advance iterator"); COUNTER_ADD(scan_node_->kudu_round_trips(), 1); cur_kudu_batch_num_read_ = 0; diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index 4156a7299e..ccef8d132d 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -296,6 +296,10 @@ protected String getNodeExplainString(String prefix, String detailPrefix, kuduConjuncts_) + "\n"); } } + if (!runtimeFilters_.isEmpty()) { + result.append(detailPrefix + "runtime filters: "); + result.append(getRuntimeFilterExplainString(false)); + } } return result.toString(); } diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java index 336c94a363..0aa32fd902 100644 --- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java +++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java @@ -536,14 +536,15 @@ private void finalizeRuntimeFilter(RuntimeFilter runtimeFilter) { * Currently, runtime filters can only be assigned to HdfsScanNodes. */ private void assignRuntimeFilters(Analyzer analyzer, ScanNode scanNode) { - if (!((scanNode instanceof HdfsScanNode) || (scanNode instanceof KuduScanNode))) return; - TupleId tid = scanNode.getTupleIds().get(0); - if (!runtimeFiltersByTid_.containsKey(tid)) return; - for (RuntimeFilter filter: runtimeFiltersByTid_.get(tid)) { - if (filter.isFinalized()) continue; - Expr targetExpr = computeTargetExpr(filter, tid, analyzer); - if (targetExpr == null) continue; - filter.addTarget(scanNode, analyzer, targetExpr); + if (scanNode instanceof HdfsScanNode || scanNode instanceof KuduScanNode) { + TupleId tid = scanNode.getTupleIds().get(0); + if (!runtimeFiltersByTid_.containsKey(tid)) return; + for (RuntimeFilter filter: runtimeFiltersByTid_.get(tid)) { + if (filter.isFinalized()) continue; + Expr targetExpr = computeTargetExpr(filter, tid, analyzer); + if (targetExpr == null) continue; + filter.addTarget(scanNode, analyzer, targetExpr); + } } } From d0f6041997e1cd91162a527dc5d4c1f1ca526bb6 Mon Sep 17 00:00:00 2001 From: hzhelifu Date: Mon, 25 Sep 2017 14:19:19 +0800 Subject: [PATCH 8/9] wait for runtimefilter. --- be/src/exec/kudu-scan-node-base.cc | 64 ++++++---------------------- be/src/exec/kudu-scan-node-base.h | 17 ++++---- be/src/exec/kudu-scan-node.cc | 21 +++++++-- be/src/exec/kudu-scan-node.h | 8 ++++ be/src/exec/kudu-scanner.cc | 68 ++++++++++++++++++++++++------ be/src/exec/kudu-scanner.h | 8 ++++ 6 files changed, 111 insertions(+), 75 deletions(-) diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc index 6e68aba786..1f9b4bf8b7 100644 --- a/be/src/exec/kudu-scan-node-base.cc +++ b/be/src/exec/kudu-scan-node-base.cc @@ -60,7 +60,6 @@ const string KuduScanNodeBase::KUDU_REMOTE_TOKENS = "KuduRemoteScanTokens"; KuduScanNodeBase::KuduScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ScanNode(pool, tnode, descs), - wait_time_ms_(0), tuple_id_(tnode.kudu_scan_node.tuple_id), client_(nullptr), counters_running_(false), @@ -153,10 +152,6 @@ Status KuduScanNodeBase::Open(RuntimeState* state) { KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc->table_name(), &table_), "Unable to open Kudu table"); - wait_time_ms_ = FLAGS_kudu_runtime_filter_wait_time_ms; - if (runtime_state_->query_options().runtime_filter_wait_time_ms > 0) { - wait_time_ms_ = runtime_state_->query_options().runtime_filter_wait_time_ms; - } return Status::OK(); } @@ -175,49 +170,20 @@ bool KuduScanNodeBase::HasScanToken() { return (next_scan_token_idx_ < scan_tokens_.size()); } -Status KuduScanNodeBase::CollectKuduBFs(const std::vector& ctxs) { - std::vector::const_iterator it = ctxs.begin(); - for (; it != ctxs.end(); ++it) { - const TRuntimeFilterDesc& desc = (*it)->filter->filter_desc(); - const auto iter = desc.planid_to_target_ndx.find(id()); - DCHECK(iter != desc.planid_to_target_ndx.end()); - const TRuntimeFilterTargetDesc& target = desc.targets[iter->second]; - - // Is there any other method to get column name? - string col_name; - GetSlotRefColumnName(target.target_expr.nodes[0], &col_name); - - // Check whether the column has been collected. - if (column_done.find(col_name) != column_done.end()) continue; - column_done.insert(col_name); - BloomFilter* bf = const_cast((*it)->filter->GetBloomFilter()); - if (bf != nullptr) { - column_to_bf_.insert(std::make_pair(col_name, bf)); - } - } - - return Status::OK(); -} - bool KuduScanNodeBase::WaitForRuntimeFilters(int32_t time_ms) { vector arrived_filter_ids; - vector ctxs; int32_t start = MonotonicMillis(); for (auto& ctx: filter_ctxs_) { if (ctx.filter->WaitForArrival(time_ms)) { arrived_filter_ids.push_back(Substitute("$0", ctx.filter->id())); - ctxs.push_back(&ctx); } } int32_t end = MonotonicMillis(); const string& wait_time = PrettyPrinter::Print(end - start, TUnit::TIME_MS); - // Collect bloom filters from filter contexts. - CollectKuduBFs(ctxs); - if (arrived_filter_ids.size() == filter_ctxs_.size()) { runtime_profile()->AddInfoString("Runtime filters", - Substitute("All filters arrived. Waited $0", wait_time)); + Substitute("All filters arrived. Waited $0", wait_time)); VLOG_QUERY << "Filters arrived. Waited " << wait_time; return true; } @@ -235,6 +201,18 @@ const string* KuduScanNodeBase::GetNextScanToken() { return token; } +Status KuduScanNodeBase::IssueRuntimeFilters(RuntimeState* state) { + DCHECK(!initial_ranges_issued_); + initial_ranges_issued_ = true; + + int32 wait_time_ms = FLAGS_kudu_runtime_filter_wait_time_ms; + if (state->query_options().runtime_filter_wait_time_ms > 0) { + wait_time_ms = state->query_options().runtime_filter_wait_time_ms; + } + if (filter_ctxs_.size() > 0) WaitForRuntimeFilters(wait_time_ms); + return Status::OK(); +} + void KuduScanNodeBase::StopAndFinalizeCounters() { if (!counters_running_) return; counters_running_ = false; @@ -247,20 +225,4 @@ Status KuduScanNodeBase::GetConjunctCtxs(vector* ctxs) { return Expr::CloneIfNotExists(conjunct_ctxs_, runtime_state_, ctxs); } -void KuduScanNodeBase::GetSlotRefColumnName(const TExprNode& node, string* col_name) { - const KuduTableDescriptor* table_desc = - static_cast(tuple_desc_->table_desc()); - TSlotId slot_id = node.slot_ref.slot_id; - BOOST_FOREACH(SlotDescriptor* slot, tuple_desc_->slots()) { - if (slot->id() == slot_id) { - int col_idx = slot->col_pos(); - *col_name = table_desc->col_descs()[col_idx].name(); - return; - } - } - - DCHECK(false) << "Could not find a slot with slot id: " << slot_id; -} - - } // namespace impala diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h index 53cde21969..43b4e603f4 100644 --- a/be/src/exec/kudu-scan-node-base.h +++ b/be/src/exec/kudu-scan-node-base.h @@ -63,12 +63,14 @@ class KuduScanNodeBase : public ScanNode { RuntimeState* runtime_state_; std::vector filter_ctxs_; - // Column name to bloom filter which references to bloom filter pointer in RuntimeFilter. - std::map column_to_bf_; - // The set of column names which has been collected. - std::set column_done; - // Scan node will wait for expected runtime filters to arrive. - int32 wait_time_ms_; + /// Set to true when the initial scan ranges are issued to the IoMgr. This happens on + /// the first call to GetNext(). The token manager, in a different thread, will read + /// this variable. + bool initial_ranges_issued_; + + /// Waits for runtime filters if necessary. + /// Only valid to call if !initial_ranges_issued_. Sets initial_ranges_issued_ to true. + Status IssueRuntimeFilters(RuntimeState* state); /// Stops periodic counters and aggregates counter values for the entire scan node. /// This should be called as soon as the scan node is complete to get the most accurate @@ -116,9 +118,6 @@ class KuduScanNodeBase : public ScanNode { const TupleDescriptor* tuple_desc() const { return tuple_desc_; } kudu::client::KuduClient* kudu_client() { return client_; } RuntimeProfile::Counter* kudu_round_trips() const { return kudu_round_trips_; } - Status CollectKuduBFs(const std::vector& filter); - std::map& GetKuduBFs() { return column_to_bf_; } - void ClearKuduBFs() { column_to_bf_.clear(); } }; } diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc index f3c372872f..19d6f1bb79 100644 --- a/be/src/exec/kudu-scan-node.cc +++ b/be/src/exec/kudu-scan-node.cc @@ -34,11 +34,16 @@ DEFINE_int32(kudu_max_row_batches, 0, "The maximum size of the row batch queue, " " for Kudu scanners."); +// Amount of time to block waiting for GetNext() to release scanner threads between +// checking if a scanner thread should yield itself back to the global thread pool. +const int SCANNER_THREAD_WAIT_TIME_MS = 20; + namespace impala { KuduScanNode::KuduScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : KuduScanNodeBase(pool, tnode, descs), + runtimefilters_issued_barrier_(1), num_active_scanners_(0), done_(false), thread_avail_cb_id_(-1) { @@ -80,6 +85,14 @@ Status KuduScanNode::Open(RuntimeState* state) { } Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { + if (!initial_ranges_issued_) { + RETURN_IF_ERROR(IssueRuntimeFilters(state)); + runtimefilters_issued_barrier_.Notify(); + } + return GetNextInternal(state, row_batch, eos); +} + +Status KuduScanNode::GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos) { DCHECK(row_batch != NULL); RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); RETURN_IF_CANCELLED(state); @@ -194,12 +207,14 @@ void KuduScanNode::RunScannerThread(const string& name, const string* initial_to const string* scan_token = initial_token; Status status = scanner.Open(); if (status.ok()) { + // Wake up every SCANNER_THREAD_COUNTERS to yield scanner threads back if unused, or + // to return if there's an error. + bool unused = false; + runtimefilters_issued_barrier_.Wait(SCANNER_THREAD_WAIT_TIME_MS, &unused); + // Here, even though a read of 'done_' may conflict with a write to it, // ProcessScanToken() will return early, as will GetNextScanToken(). while (!done_ && scan_token != NULL) { - if (filter_ctxs_.size() > 0) { - WaitForRuntimeFilters(wait_time_ms_); - } status = ProcessScanToken(&scanner, *scan_token); if (!status.ok()) break; diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h index 6341cb62e4..d9eb56f689 100644 --- a/be/src/exec/kudu-scan-node.h +++ b/be/src/exec/kudu-scan-node.h @@ -25,6 +25,8 @@ #include "exec/kudu-scan-node-base.h" #include "runtime/thread-resource-mgr.h" #include "gutil/gscoped_ptr.h" +#include "util/promise.h" +#include "util/counting-barrier.h" #include "util/thread.h" namespace impala { @@ -49,6 +51,9 @@ class KuduScanNode : public KuduScanNodeBase { private: friend class KuduScanner; + /// Released when initial runtime filters are issued in the first call to GetNext(). + CountingBarrier runtimefilters_issued_barrier_; + // Outgoing row batches queue. Row batches are produced asynchronously by the scanner // threads and consumed by the main thread. boost::scoped_ptr materialized_row_batches_; @@ -96,6 +101,9 @@ class KuduScanNode : public KuduScanNodeBase { /// in 'materialized_row_batches_' until the scanner reports eos, an error occurs, or /// the limit is reached. Status ProcessScanToken(KuduScanner* scanner, const std::string& scan_token); + + /// Checks for eos conditions and returns batches from materialized_row_batches_. + Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos); }; } diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc index b1fec7833f..2baba88ad5 100644 --- a/be/src/exec/kudu-scanner.cc +++ b/be/src/exec/kudu-scanner.cc @@ -29,6 +29,7 @@ #include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" #include "runtime/raw-value.h" +#include "runtime/runtime-filter.h" #include "runtime/runtime-state.h" #include "runtime/row-batch.h" #include "runtime/string-value.h" @@ -73,6 +74,7 @@ Status KuduScanner::Open() { if (slot->type().type != TYPE_TIMESTAMP) continue; timestamp_slots_.push_back(slot); } + //filter_ctx_pushed_down_.resize(scan_node_->filter_ctxs_.size(), false); return scan_node_->GetConjunctCtxs(&conjunct_ctxs_); } @@ -155,16 +157,8 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token) { uint64_t row_format_flags = kudu::client::KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES; scanner_->SetRowFormatFlags(row_format_flags); - // Push down the bloom filter predicates. - if (!scan_node_->GetKuduBFs().empty()) { - for (auto& one: scan_node_->GetKuduBFs()) { - kudu::client::KuduValueBloomFilter* bf = kudu::client::KuduValueBloomFilterBuilder().Build(one.second); - kudu::client::KuduPredicate* p = scan_node_->table_->NewBloomFilterPredicate(one.first, bf); - scanner_->AddConjunctPredicate(p); - } - // Clear the map of bloom filters. - scan_node_->ClearKuduBFs(); - } + // Apply the runtime filters here. + RETURN_IF_ERROR(ApplyRuntimeFilters()); { SCOPED_TIMER(state_->total_storage_wait_timer()); @@ -173,6 +167,51 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token) { return Status::OK(); } +Status KuduScanner::ApplyRuntimeFilters() { + if (scan_node_->filter_ctxs_.empty()) return Status::OK(); + + // Reset 'filter_ctx_pushed_down_'. + filter_ctx_pushed_down_.clear(); + + // Apply runtime filters. + const TupleDescriptor* tuple_desc = scan_node_->tuple_desc_; + const TableDescriptor* table_desc = tuple_desc->table_desc(); + std::vector::iterator it = scan_node_->filter_ctxs_.begin(); + for (int i = 0; it != scan_node_->filter_ctxs_.end(); ++it, ++i) { + // Skip 'not arrived' & 'ALWAYS_TRUE_FILTER' + const RuntimeFilter* rf = it->filter; + if (!(rf->HasBloomFilter())) continue; + BloomFilter* bf = const_cast(rf->GetBloomFilter()); + if (bf == BloomFilter::ALWAYS_TRUE_FILTER) continue; + + // Is there any other way to get column name? + string column_name; + const TRuntimeFilterDesc& desc = it->filter->filter_desc(); + const auto iter = desc.planid_to_target_ndx.find(scan_node_->id()); + CHECK(iter != desc.planid_to_target_ndx.end()); + const TRuntimeFilterTargetDesc& target = desc.targets[iter->second]; + TSlotId slot_id = target.target_expr.nodes[0].slot_ref.slot_id; + std::vector::const_iterator slot = tuple_desc->slots().begin(); + for (; slot != tuple_desc->slots().end(); ++slot){ + if ((*slot)->id() == slot_id) { + int col_idx = (*slot)->col_pos(); + column_name = table_desc->col_descs()[col_idx].name(); + break; + } + } + CHECK(!column_name.empty()); + + kudu::client::KuduValueBloomFilter* b = kudu::client::KuduValueBloomFilterBuilder().Build(bf); + kudu::client::KuduPredicate* p = scan_node_->table_->NewBloomFilterPredicate(column_name, b); + scanner_->AddConjunctPredicate(p); + + // Record the idx of 'filter_ctxs_'. + filter_ctx_pushed_down_.push_back(i); + } + + return Status::OK(); +} + void KuduScanner::CloseCurrentClientScanner() { DCHECK_NOTNULL(scanner_.get()); scanner_->Close(); @@ -233,6 +272,11 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me } } + // Evaluate runtime filters that haven't been pushed down to Kudu. + /*if (!EvalRuntimeFilters(reinterpret_cast(output_row))) { + continue; + }*/ + // Evaluate the conjuncts that haven't been pushed down to Kudu. Conjunct evaluation // is performed directly on the Kudu tuple because its memory layout is identical to // Impala's. We only copy the surviving tuples to Impala's output row batch. @@ -264,7 +308,7 @@ Status KuduScanner::GetNextScannerBatch() { // Wait for the runtime filters. // It should not wait for runtime filters any more, if the size of // bloom filters that have been collected equals to the size of filter context's. - if (scan_node_->filter_ctxs_.size() > 0 && + /*if (scan_node_->filter_ctxs_.size() > 0 && (scan_node_->filter_ctxs_.size() != scan_node_->column_done.size())) { scan_node_->WaitForRuntimeFilters(scan_node_->wait_time_ms_/10); if (!scan_node_->GetKuduBFs().empty()) { @@ -275,7 +319,7 @@ Status KuduScanner::GetNextScannerBatch() { } scan_node_->ClearKuduBFs(); } - } + }*/ KUDU_RETURN_IF_ERROR(scanner_->NextBatch(&cur_kudu_batch_), "Unable to advance iterator"); COUNTER_ADD(scan_node_->kudu_round_trips(), 1); diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h index 7a6ca767cb..4aab6d48d3 100644 --- a/be/src/exec/kudu-scanner.h +++ b/be/src/exec/kudu-scanner.h @@ -59,6 +59,9 @@ class KuduScanner { void Close(); private: + /// Apply the runtime filters to KuduScanner. + Status ApplyRuntimeFilters(); + /// Handles the case where the projection is empty (e.g. count(*)). /// Does this by adding sets of rows to 'row_batch' instead of adding one-by-one. Status HandleEmptyProjection(RowBatch* row_batch); @@ -105,6 +108,11 @@ class KuduScanner { /// Timestamp slots in the tuple descriptor of the scan node. Used to convert Kudu /// UNIXTIME_MICRO values inline. vector timestamp_slots_; + + /// Be used to mark the filter contexts that have been pushed down yet. + /// The value 'true' means the filter context has been pushed down, else not. + /// The size of this vector equals to the size of 'filter_ctxs_'. + vector filter_ctx_pushed_down_; }; } /// namespace impala From 2e8fe3b33081cdcea05cda1827360a4154e913a0 Mon Sep 17 00:00:00 2001 From: hzhelifu Date: Thu, 28 Sep 2017 13:29:39 +0800 Subject: [PATCH 9/9] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=AE=8C=E6=88=90?= =?UTF-8?q?=EF=BC=8C=E4=BD=86=E6=98=AF=E6=80=A7=E8=83=BD=E4=B8=8D=E8=A1=8C?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- be/src/exec/kudu-scan-node-base.h | 2 +- be/src/exec/kudu-scanner.cc | 84 +++++++++++++++++++------------ be/src/exec/kudu-scanner.h | 1 + 3 files changed, 54 insertions(+), 33 deletions(-) diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h index 43b4e603f4..a81a394743 100644 --- a/be/src/exec/kudu-scan-node-base.h +++ b/be/src/exec/kudu-scan-node-base.h @@ -114,7 +114,7 @@ class KuduScanNodeBase : public ScanNode { /// Returns a cloned copy of the scan node's conjuncts. Requires that the expressions /// have been open previously. Status GetConjunctCtxs(vector* ctxs); - void GetSlotRefColumnName(const TExprNode& node, string* col_name); + const TupleDescriptor* tuple_desc() const { return tuple_desc_; } kudu::client::KuduClient* kudu_client() { return client_; } RuntimeProfile::Counter* kudu_round_trips() const { return kudu_round_trips_; } diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc index 2baba88ad5..7702347552 100644 --- a/be/src/exec/kudu-scanner.cc +++ b/be/src/exec/kudu-scanner.cc @@ -74,7 +74,8 @@ Status KuduScanner::Open() { if (slot->type().type != TYPE_TIMESTAMP) continue; timestamp_slots_.push_back(slot); } - //filter_ctx_pushed_down_.resize(scan_node_->filter_ctxs_.size(), false); + + filter_ctx_pushed_down_.resize(scan_node_->filter_ctxs_.size(), false); return scan_node_->GetConjunctCtxs(&conjunct_ctxs_); } @@ -157,7 +158,8 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token) { uint64_t row_format_flags = kudu::client::KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES; scanner_->SetRowFormatFlags(row_format_flags); - // Apply the runtime filters here. + // Apply the runtime filters. + LOG(INFO) << "ApplyRuntimeFilters start ..."; RETURN_IF_ERROR(ApplyRuntimeFilters()); { @@ -170,23 +172,52 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token) { Status KuduScanner::ApplyRuntimeFilters() { if (scan_node_->filter_ctxs_.empty()) return Status::OK(); - // Reset 'filter_ctx_pushed_down_'. - filter_ctx_pushed_down_.clear(); + // Reset. + vector::iterator it = filter_ctx_pushed_down_.begin(); + for (; it != filter_ctx_pushed_down_.end(); ++it) { + (*it) = false; + } + + return PushDownRuntimeFilters(); +} - // Apply runtime filters. +Status KuduScanner::PushDownRuntimeFilters() { + if (scan_node_->filter_ctxs_.empty()) return Status::OK(); + + // Tuple descriptor & Table descriptor. const TupleDescriptor* tuple_desc = scan_node_->tuple_desc_; const TableDescriptor* table_desc = tuple_desc->table_desc(); - std::vector::iterator it = scan_node_->filter_ctxs_.begin(); - for (int i = 0; it != scan_node_->filter_ctxs_.end(); ++it, ++i) { - // Skip 'not arrived' & 'ALWAYS_TRUE_FILTER' - const RuntimeFilter* rf = it->filter; + vector::iterator it = filter_ctx_pushed_down_.begin(); + for (int i = 0; it != filter_ctx_pushed_down_.end(); ++it, ++i) { + if (*it) continue; + + // Runtime Filter. + const RuntimeFilter* rf = scan_node_->filter_ctxs_[i].filter; + + // Skip the filter which is not arrived. if (!(rf->HasBloomFilter())) continue; + // Mark True. + (*it) = true; + + // Skip the filter which is 'ALWAYS_TRUE_FILTER'. BloomFilter* bf = const_cast(rf->GetBloomFilter()); - if (bf == BloomFilter::ALWAYS_TRUE_FILTER) continue; + if (bf == BloomFilter::ALWAYS_TRUE_FILTER) { + LOG(INFO) << "Scanner id:" << scan_node_->id() + << " -> RuntimeFilter: i:" << i + << " -> ALWAYS_TRUE_FILTER."; + continue; + } + // Skip the filter which size is larger than 48MB (kudu rpc max = 50MB). + if (bf->GetHeapSpaceUsed() > 48*1024*1024) { + LOG(INFO) << "Scanner id:" << scan_node_->id() + << " -> RuntimeFilter: i:" << i + << " -> Larger than 48 MB" + << " -> size:" << bf->GetHeapSpaceUsed(); + continue; + } - // Is there any other way to get column name? string column_name; - const TRuntimeFilterDesc& desc = it->filter->filter_desc(); + const TRuntimeFilterDesc& desc = rf->filter_desc(); const auto iter = desc.planid_to_target_ndx.find(scan_node_->id()); CHECK(iter != desc.planid_to_target_ndx.end()); const TRuntimeFilterTargetDesc& target = desc.targets[iter->second]; @@ -201,12 +232,14 @@ Status KuduScanner::ApplyRuntimeFilters() { } CHECK(!column_name.empty()); - kudu::client::KuduValueBloomFilter* b = kudu::client::KuduValueBloomFilterBuilder().Build(bf); - kudu::client::KuduPredicate* p = scan_node_->table_->NewBloomFilterPredicate(column_name, b); + kudu::client::KuduValueBloomFilter* b = kudu::client::KuduValueBloomFilterBuilder().Build(bf); + kudu::client::KuduPredicate* p = scan_node_->table_->NewBloomFilterPredicate(column_name, b); scanner_->AddConjunctPredicate(p); - - // Record the idx of 'filter_ctxs_'. - filter_ctx_pushed_down_.push_back(i); + LOG(INFO) << "Scanner id:" << scan_node_->id() + << " -> RuntimeFilter: i:" << i + << " -> Pushed down" + << " -> column_name:" << column_name + << " -> size:" << bf->GetHeapSpaceUsed(); } return Status::OK(); @@ -305,21 +338,8 @@ Status KuduScanner::GetNextScannerBatch() { SCOPED_TIMER(state_->total_storage_wait_timer()); int64_t now = MonotonicMicros(); - // Wait for the runtime filters. - // It should not wait for runtime filters any more, if the size of - // bloom filters that have been collected equals to the size of filter context's. - /*if (scan_node_->filter_ctxs_.size() > 0 && - (scan_node_->filter_ctxs_.size() != scan_node_->column_done.size())) { - scan_node_->WaitForRuntimeFilters(scan_node_->wait_time_ms_/10); - if (!scan_node_->GetKuduBFs().empty()) { - for (auto& one: scan_node_->GetKuduBFs()) { - kudu::client::KuduValueBloomFilter* bf = kudu::client::KuduValueBloomFilterBuilder().Build(one.second); - kudu::client::KuduPredicate* p = scan_node_->table_->NewBloomFilterPredicate(one.first, bf); - scanner_->AddConjunctPredicate(p); - } - scan_node_->ClearKuduBFs(); - } - }*/ + // Continue to push down the runtime filters. + RETURN_IF_ERROR(PushDownRuntimeFilters()); KUDU_RETURN_IF_ERROR(scanner_->NextBatch(&cur_kudu_batch_), "Unable to advance iterator"); COUNTER_ADD(scan_node_->kudu_round_trips(), 1); diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h index 4aab6d48d3..a3848fc22b 100644 --- a/be/src/exec/kudu-scanner.h +++ b/be/src/exec/kudu-scanner.h @@ -61,6 +61,7 @@ class KuduScanner { private: /// Apply the runtime filters to KuduScanner. Status ApplyRuntimeFilters(); + Status PushDownRuntimeFilters(); /// Handles the case where the projection is empty (e.g. count(*)). /// Does this by adding sets of rows to 'row_batch' instead of adding one-by-one.