diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt index fa4c7f0f14..c00efa6835 100644 --- a/bin/rat_exclude_files.txt +++ b/bin/rat_exclude_files.txt @@ -136,6 +136,7 @@ fe/src/test/resources/hbase-jaas-client.conf.template fe/src/test/resources/hbase-jaas-server.conf.template fe/src/test/resources/users.ldif java/.mvn/maven.config +java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/README.md java/toolchains.xml.tmpl testdata/AllTypesError/*.txt testdata/AllTypesErrorNoNulls/*.txt diff --git a/fe/src/main/java/org/apache/impala/extdatasource/ExternalDataSourceExecutor.java b/fe/src/main/java/org/apache/impala/extdatasource/ExternalDataSourceExecutor.java index cd0644b7d4..ff041314c7 100644 --- a/fe/src/main/java/org/apache/impala/extdatasource/ExternalDataSourceExecutor.java +++ b/fe/src/main/java/org/apache/impala/extdatasource/ExternalDataSourceExecutor.java @@ -18,6 +18,7 @@ package org.apache.impala.extdatasource; import java.io.File; +import java.io.IOException; import java.lang.reflect.Constructor; import java.net.URL; import java.net.URLClassLoader; @@ -83,6 +84,9 @@ public class ExternalDataSourceExecutor { // Protects cachedClasses_, numClassCacheHits_, and numClassCacheMisses_. private final static Object cachedClassesLock_ = new Object(); + // setup by ctor() and cleared by release() + private URLClassLoader classLoader_; + private final ApiVersion apiVersion_; private final ExternalDataSource dataSource_; private final String jarPath_; @@ -156,6 +160,8 @@ private Class getDataSourceClass() throws Exception { // Only cache the class if the init string starts with CACHE_CLASS_PREFIX if (initString_ != null && initString_.startsWith(CACHE_CLASS_PREFIX)) { cachedClasses_.put(cacheMapKey, c); + } else { + classLoader_ = loader; } if (LOG.isTraceEnabled()) { LOG.trace("Loaded jar for class {} at path {}", className_, jarPath_); @@ -168,6 +174,27 @@ private Class getDataSourceClass() throws Exception { return c; } + @Override + protected void finalize() throws Throwable { + release(); + super.finalize(); + } + + /** + * Release the class loader we have created if the class is not cached. + */ + public void release() { + if (classLoader_ != null) { + try { + classLoader_.close(); + } catch (IOException e) { + // Log and ignore. + LOG.warn("Error closing the URLClassloader.", e); + } + classLoader_ = null; + } + } + public byte[] prepare(byte[] thriftParams) throws ImpalaException { TPrepareParams params = new TPrepareParams(); JniUtil.deserializeThrift(protocolFactory_, params, thriftParams); diff --git a/fe/src/test/java/org/apache/impala/service/FrontendTest.java b/fe/src/test/java/org/apache/impala/service/FrontendTest.java index 72c66b723e..a62ba3ad1a 100644 --- a/fe/src/test/java/org/apache/impala/service/FrontendTest.java +++ b/fe/src/test/java/org/apache/impala/service/FrontendTest.java @@ -143,13 +143,17 @@ public void TestGetTablesTypeTable() throws ImpalaException { // HiveServer2 GetTables has 5 columns. assertEquals(5, resp.schema.columns.size()); assertEquals(5, resp.rows.get(0).colVals.size()); - assertEquals(3, resp.rows.size()); + assertEquals(5, resp.rows.size()); assertEquals("alltypes_datasource", resp.rows.get(0).colVals.get(2).string_val.toLowerCase()); assertEquals("alltypes_date_partition", resp.rows.get(1).colVals.get(2).string_val.toLowerCase()); assertEquals("alltypes_date_partition_2", resp.rows.get(2).colVals.get(2).string_val.toLowerCase()); + assertEquals("alltypes_jdbc_datasource", + resp.rows.get(3).colVals.get(2).string_val.toLowerCase()); + assertEquals("alltypes_jdbc_datasource_2", + resp.rows.get(4).colVals.get(2).string_val.toLowerCase()); } @Test diff --git a/java/ext-data-source/jdbc/pom.xml b/java/ext-data-source/jdbc/pom.xml new file mode 100644 index 0000000000..fa5a9db8d7 --- /dev/null +++ b/java/ext-data-source/jdbc/pom.xml @@ -0,0 +1,84 @@ + + + + 4.0.0 + + org.apache.impala + impala-data-source + 4.4.0-SNAPSHOT + + impala-data-source-jdbc + Apache Impala External Data Source JDBC Library + JDBC External Data Source + jar + . + + + 2.9.0 + 1.3.166 + + + + + org.apache.impala + impala-data-source-api + ${project.version} + provided + + + org.apache.impala + impala-frontend + ${project.version} + provided + + + org.apache.commons + commons-dbcp2 + ${commons-dbcp2.version} + + + com.h2database + h2 + ${h2database.version} + test + + + junit + junit + ${junit.version} + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + 1.8 + 1.8 + + + + + diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java new file mode 100644 index 0000000000..d69ee9f509 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java @@ -0,0 +1,338 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc; + + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfig; +import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfigManager; +import org.apache.impala.extdatasource.jdbc.dao.DatabaseAccessor; +import org.apache.impala.extdatasource.jdbc.dao.DatabaseAccessorFactory; +import org.apache.impala.extdatasource.jdbc.dao.JdbcRecordIterator; +import org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessException; +import org.apache.impala.extdatasource.jdbc.util.QueryConditionUtil; +import org.apache.impala.extdatasource.thrift.TBinaryPredicate; +import org.apache.impala.extdatasource.thrift.TCloseParams; +import org.apache.impala.extdatasource.thrift.TCloseResult; +import org.apache.impala.extdatasource.thrift.TColumnDesc; +import org.apache.impala.extdatasource.thrift.TComparisonOp; +import org.apache.impala.extdatasource.thrift.TGetNextParams; +import org.apache.impala.extdatasource.thrift.TGetNextResult; +import org.apache.impala.extdatasource.thrift.TOpenParams; +import org.apache.impala.extdatasource.thrift.TOpenResult; +import org.apache.impala.extdatasource.thrift.TPrepareParams; +import org.apache.impala.extdatasource.thrift.TPrepareResult; +import org.apache.impala.extdatasource.thrift.TRowBatch; +import org.apache.impala.extdatasource.thrift.TTableSchema; +import org.apache.impala.extdatasource.v1.ExternalDataSource; +import org.apache.impala.thrift.TColumnData; +import org.apache.impala.thrift.TErrorCode; +import org.apache.impala.thrift.TStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * The Jdbc data source returns the data of the underlying database configured by + * initString. + */ +public class JdbcDataSource implements ExternalDataSource { + + private final static Logger LOG = LoggerFactory.getLogger(JdbcDataSource.class); + + /** + * @see org.apache.impala.extdatasource.ExternalDataSourceExecutor + */ + private final static String CACHE_CLASS_PREFIX = "CACHE_CLASS::"; + + private static final TStatus STATUS_OK = + new TStatus(TErrorCode.OK, Lists.newArrayList()); + + private boolean eos_; + private int batchSize_; + private TTableSchema schema_; + private DataSourceState state_; + // Handle to identify JdbcDataSource object. + // It's assigned with random UUID value in open() API, and returned to the caller. + // In getNext() and close() APIs, compare this value with the handle value passed in + // input parameters to make sure the object is valid. + private String scanHandle_; + // Set to true if initString started with "CACHE_CLASS::". + // It is passed to DatabaseAccessor::close() to indicate if dataSourceCache should be + // cleaned when DatabaseAccessor object is closed. + private boolean cacheClass_ = false; + + // Properties of external jdbc table, converted from initString which is specified in + // create table statement. + // Supported properties are defined in JdbcStorageConfig. + private Configuration tableConfig_; + private DatabaseAccessor dbAccessor_ = null; + // iterator_ is used when schema_.getColsSize() does not equal 0. + private JdbcRecordIterator iterator_ = null; + // currRow_ and totalNumberOfRecords_ are used when schema_.getColsSize() equals 0. + private int currRow_; + private long totalNumberOfRecords_ = 0; + + // Enumerates the states of the data source, which indicates which ExternalDataSource + // API has been called. The states are checked in each API to make sure that the APIs + // are called in right order, e.g. state transitions must be in the below order: + // CREATED -> OPENED -> CLOSED. + // Note that the ExternalDataSourceExecutors of frontend and backend will create + // separate JdbcDataSource objects so that the state of JdbcDataSource which is set + // by frontend will not be transferred to backend. The prepare() is called by frontend, + // open(), getNext() and close() are called by backend. We don't need to change state + // in prepare() since the state will not be transferred to other APIs, and the input + // state for open() must be 'CREATED'. + private enum DataSourceState { + // The object is created. + CREATED, + // The open() API is called. + OPENED, + // The close() API is called. + CLOSED + } + + public JdbcDataSource() { + eos_ = false; + currRow_ = 0; + state_ = DataSourceState.CREATED; + } + + @Override + public TPrepareResult prepare(TPrepareParams params) { + Preconditions.checkState(state_ == DataSourceState.CREATED); + if (!convertInitStringToConfiguration(params.getInit_string())) { + return new TPrepareResult( + new TStatus(TErrorCode.INTERNAL_ERROR, + Lists.newArrayList("Invalid init_string value"))); + } + List acceptedPredicates = acceptedPredicates(params.getPredicates()); + return new TPrepareResult(STATUS_OK) + .setAccepted_conjuncts(acceptedPredicates); + } + + @Override + public TOpenResult open(TOpenParams params) { + Preconditions.checkState(state_ == DataSourceState.CREATED); + state_ = DataSourceState.OPENED; + batchSize_ = params.getBatch_size(); + schema_ = params.getRow_schema(); + // 1. Check init string again because the call in prepare() was from + // the frontend and used a different instance of this JdbcDataSource class. + if (!convertInitStringToConfiguration(params.getInit_string())) { + return new TOpenResult( + new TStatus(TErrorCode.INTERNAL_ERROR, + Lists.newArrayList("Invalid init_string value"))); + } + // 2. Build the query and execute it + try { + dbAccessor_ = DatabaseAccessorFactory.getAccessor(tableConfig_); + buildQueryAndExecute(params); + } catch (JdbcDatabaseAccessException e) { + return new TOpenResult( + new TStatus(TErrorCode.INTERNAL_ERROR, Lists.newArrayList(e.getMessage()))); + } + scanHandle_ = UUID.randomUUID().toString(); + return new TOpenResult(STATUS_OK).setScan_handle(scanHandle_); + } + + @Override + public TGetNextResult getNext(TGetNextParams params) { + Preconditions.checkState(state_ == DataSourceState.OPENED); + Preconditions.checkArgument(params.getScan_handle().equals(scanHandle_)); + if (eos_) return new TGetNextResult(STATUS_OK).setEos(eos_); + + List cols = Lists.newArrayList(); + long numRows = 0; + if (schema_.getColsSize() != 0) { + if (iterator_ == null) { + return new TGetNextResult( + new TStatus(TErrorCode.INTERNAL_ERROR, + Lists.newArrayList("Iterator of JDBC resultset is null"))); + } + for (int i = 0; i < schema_.getColsSize(); ++i) { + cols.add(new TColumnData().setIs_null(Lists.newArrayList())); + } + + boolean hasNext = true; + try { + while (numRows < batchSize_ && (hasNext = iterator_.hasNext())) { + iterator_.next(schema_.getCols(), cols); + ++numRows; + } + } catch (Exception e) { + hasNext = false; + } + if (!hasNext) eos_ = true; + } else { // for count(*) + if (currRow_ + batchSize_ <= totalNumberOfRecords_) { + numRows = batchSize_; + } else { + numRows = totalNumberOfRecords_ - currRow_; + } + currRow_ += numRows; + if (currRow_ == totalNumberOfRecords_) eos_ = true; + } + return new TGetNextResult(STATUS_OK).setEos(eos_) + .setRows(new TRowBatch().setCols(cols).setNum_rows(numRows)); + } + + @Override + public TCloseResult close(TCloseParams params) { + Preconditions.checkState(state_ == DataSourceState.OPENED); + Preconditions.checkArgument(params.getScan_handle().equals(scanHandle_)); + try { + if (iterator_ != null) iterator_.close(); + if (dbAccessor_ != null) dbAccessor_.close(!cacheClass_); + state_ = DataSourceState.CLOSED; + return new TCloseResult(STATUS_OK); + } catch (Exception e) { + return new TCloseResult( + new TStatus(TErrorCode.INTERNAL_ERROR, Lists.newArrayList(e.getMessage()))); + } + } + + protected boolean convertInitStringToConfiguration(String initString) { + Preconditions.checkState(initString != null); + if (tableConfig_ == null) { + try { + TypeReference> typeRef + = new TypeReference>() { + }; + if (initString.startsWith(CACHE_CLASS_PREFIX)) { + initString = initString.substring(CACHE_CLASS_PREFIX.length()); + cacheClass_ = true; + } + Map config = new ObjectMapper().readValue(initString, typeRef); + tableConfig_ = JdbcStorageConfigManager.convertMapToConfiguration(config); + } catch (JsonProcessingException e) { + String errorMessage = String + .format("Invalid JSON from initString_ '%s'", initString); + LOG.error(errorMessage, e); + return false; + } + } + return true; + } + + private List acceptedPredicates(List> predicates) { + // Return the indexes of accepted predicates. + List acceptedPredicates = Lists.newArrayList(); + if (predicates == null || predicates.isEmpty()) { + return acceptedPredicates; + } + for (int i = 0; i < predicates.size(); ++i) { + boolean accepted = true; + for (TBinaryPredicate predicate : predicates.get(i)) { + // Don't support 'IS DISTINCT FROM' and 'IS NOT DISTINCT FROM' operators now. + if (predicate.getOp() == TComparisonOp.DISTINCT_FROM + || predicate.getOp() == TComparisonOp.NOT_DISTINCT) { + accepted = false; + break; + } + } + if (accepted) acceptedPredicates.add(i); + } + return acceptedPredicates; + } + + private void buildQueryAndExecute(TOpenParams params) + throws JdbcDatabaseAccessException { + Map columnMapping = getColumnMapping(tableConfig_ + .get(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName())); + // Build query statement + StringBuilder sb = new StringBuilder("SELECT "); + String project; + // If cols size equals to 0, it is 'select count(*) from tbl' statement. + if (schema_.getColsSize() == 0) { + project = "*"; + } else { + project = + schema_.getCols().stream().map( + TColumnDesc::getName).map( + name -> columnMapping.getOrDefault(name, name)) + .collect(Collectors.joining(", ")); + } + sb.append(project); + sb.append(" FROM "); + // Make jdbc table name to be quoted with double quotes if columnMapping is not empty + String jdbcTableName = tableConfig_.get(JdbcStorageConfig.TABLE.getPropertyName()); + if (!columnMapping.isEmpty() && jdbcTableName.charAt(0) != '\"') { + StringBuilder sb2 = new StringBuilder("\""); + sb2.append(jdbcTableName); + sb2.append("\""); + jdbcTableName = sb2.toString(); + } + sb.append(jdbcTableName); + String condition = QueryConditionUtil + .buildCondition(params.getPredicates(), columnMapping); + if (StringUtils.isNotBlank(condition)) { + sb.append(" WHERE ").append(condition); + } + // Execute query and get iterator + tableConfig_.set(JdbcStorageConfig.QUERY.getPropertyName(), sb.toString()); + + if (schema_.getColsSize() != 0) { + int limit = -1; + if (params.isSetLimit()) limit = (int) params.getLimit(); + iterator_ = dbAccessor_.getRecordIterator(tableConfig_, limit, 0); + } else { + totalNumberOfRecords_ = dbAccessor_.getTotalNumberOfRecords(tableConfig_); + } + } + + /* + * Return Impala-to-X column mapping, or empty if it is not set. + * + */ + private Map getColumnMapping(String columnMapping) { + if ((columnMapping == null) || (columnMapping.trim().isEmpty())) { + return Maps.newHashMap(); + } + + Map columnMap = Maps.newHashMap(); + String[] mappingPairs = columnMapping.split(","); + for (String mapPair : mappingPairs) { + String[] columns = mapPair.split("="); + // Make jdbc column name to be quoted with double quotes + String jdbcColumnName = columns[1].trim(); + if (!jdbcColumnName.isEmpty() && jdbcColumnName.charAt(0) != '\"') { + StringBuilder sb = new StringBuilder("\""); + sb.append(jdbcColumnName); + sb.append("\""); + jdbcColumnName = sb.toString(); + } + columnMap.put(columns[0].trim(), jdbcColumnName); + } + + return columnMap; + } +} diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/README.md b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/README.md new file mode 100644 index 0000000000..9bf02f7ea7 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/README.md @@ -0,0 +1,60 @@ +This JDBC External Data Source library is implemented with "External Data Source" mechanism +to query the JDBC table from Impala server. + +Following two source files consists Impala specific logic: + JdbcDataSource.java + util/QueryConditionUtil.java + +Other source files, which add supports to access external database tables through JDBC +drivers, are replicated from Hive JDBC Storage Handler with some modifications: +(https://github.com/apache/hive/tree/master/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc) + conf/DatabaseType.java + remove dbType for HIVE, DERBY, and METASTORE. + conf/JdbcStorageConfig.java + don't use org.apache.hadoop.hive.conf.Constants + conf/JdbcStorageConfigManager.java + add functions: convertMapToConfiguration(), getQueryToExecute(), + getOrigQueryToExecute() + remove functions: copyConfigurationToJob(), countNonNull(), + getPasswordFromProperties(), copySecretsToJob(), + convertPropertiesToConfiguration(), resolveMetadata(), getMetastoreDatabaseType(), + getMetastoreConnectionURL(), getMetastoreDriver(), getMetastoreJdbcUser(), + getMetastoreJdbcPasswd(). + modify functions: checkRequiredPropertiesAreDefined() + dao/DB2DatabaseAccessor.java + remove function constructQuery() + dao/DatabaseAccessor.java + remoce following APIs: + getColumnNames(), getColumnTypes(), getRecordWriter(), getBounds(), + needColumnQuote(). + remove following parameters for API getRecordIterator(): + 'partitionColumn', 'lowerBound', and 'upperBound'. + dao/DatabaseAccessorFactory.java + remove dbType for HIVE, DERBY, and METASTORE. + dao/GenericJdbcDatabaseAccessor.java + add member variable: dataSourceCache + remove member variable typeInfoTranslator + remove functions: getColumnMetadata(), getColumnMetadata(), getColumnNames() + getColumnTypes(), getColNamesFromRS(), getColTypesFromRS(), + getMetaDataQuery(), getRecordWriter(), constructQuery(), + addBoundaryToQuery(), removeDbcpPrefix(), getFromProperties(), getBounds(), + quote(), needColumnQuote(), getQualifiedTableName(), selectAllFromTable(), + finalize(). + remove following parameters for API getRecordIterator(): + 'partitionColumn', 'lowerBound', and 'upperBound'. + Modify functions: close(). + dao/JdbcRecordIterator.java + remove member variable accessor, remove parameter 'accessor' from ctor, + remove functions: remove() + modify functions: next(), close() + dao/JethroDatabaseAccessor.java + remove getMetaDataQuery() + dao/MsSqlDatabaseAccessor.java + dao/MySqlDatabaseAccessor.java + remove function needColumnQuote() + dao/OracleDatabaseAccessor.java + remove function constructQuery() + dao/PostgresDatabaseAccessor.java + exception/JdbcDatabaseAccessException.java + renamed from exception/HiveJdbcDatabaseAccessException.java + diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java new file mode 100644 index 0000000000..9b30350bc5 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.conf; + +public enum DatabaseType { + MYSQL, + H2, + DB2, + ORACLE, + POSTGRES, + MSSQL, + JETHRO_DATA +} diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java new file mode 100644 index 0000000000..0e1ac5ab30 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.conf; + + +public enum JdbcStorageConfig { + // Table properties specified in the create table statement. + // The database from which the external table comes, such as MySQL, ORACLE, POSTGRES, + // and MSSQL, etc. + DATABASE_TYPE("database.type", true), + // JDBC connection string, including the database type, IP address, port number, and + // database name. For example, "jdbc:postgresql://127.0.0.1:5432/functional + JDBC_URL("jdbc.url", true), + // Class name of JDBC driver. For example, "org.postgresql.Driver" + JDBC_DRIVER_CLASS("jdbc.driver", true), + // Driver URL for downloading the Jar file package that is used to access the external + // database. + JDBC_DRIVER_URL("driver.url", true), + // Username for accessing the external database. + DBCP_USERNAME("dbcp.username", false), + // Password of the user. + DBCP_PASSWORD("dbcp.password", false), + // Number of rows to fetch in a batch. + JDBC_FETCH_SIZE("jdbc.fetch.size", false), + // SQL query which specify how to get data from external database. + // User need to specify either “table” or “query” in the create table statement. + QUERY("query", false), + // Name of the external table to be mapped in Impala. + TABLE("table", true), + // Mapping of column names between external table and Impala. + COLUMN_MAPPING("column.mapping", false); + + private final String propertyName; + private boolean required = false; + + + JdbcStorageConfig(String propertyName, boolean required) { + this.propertyName = propertyName; + this.required = required; + } + + + public String getPropertyName() { + return propertyName; + } + + + public boolean isRequired() { + return required; + } + +} + diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfigManager.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfigManager.java new file mode 100644 index 0000000000..e5e2e972fb --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfigManager.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.conf; + + +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Main configuration handler class + */ +public class JdbcStorageConfigManager { + + private static final Logger LOG = LoggerFactory + .getLogger(JdbcStorageConfigManager.class); + + public static Configuration convertMapToConfiguration(Map props) { + checkRequiredPropertiesAreDefined(props); + Configuration conf = new Configuration(); + + for (Entry entry : props.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + + return conf; + } + + private static void checkRequiredPropertiesAreDefined(Map props) { + + try { + String dbTypeName = props.get(JdbcStorageConfig.DATABASE_TYPE.getPropertyName()); + DatabaseType.valueOf(dbTypeName); + } catch (Exception e) { + throw new IllegalArgumentException("Unknown database type.", e); + } + // Check the required parameters + for (JdbcStorageConfig config : JdbcStorageConfig.values()) { + if (config.isRequired() && !props.containsKey(config.getPropertyName())) { + throw new IllegalArgumentException(String.format("Required config '%s' was not " + + "present!", config.getPropertyName())); + } + } + } + + public static String getConfigValue(JdbcStorageConfig key, Configuration config) { + return config.get(key.getPropertyName()); + } + + public static String getOrigQueryToExecute(Configuration config) { + String query; + String tableName = config.get(JdbcStorageConfig.TABLE.getPropertyName()); + if (tableName != null) { + // We generate query as 'select * from tbl' + query = "select * from " + tableName; + } else { + query = config.get(JdbcStorageConfig.QUERY.getPropertyName()); + } + + return query; + } + + public static String getQueryToExecute(Configuration config) { + String query = config.get(JdbcStorageConfig.QUERY.getPropertyName()); + if (query != null) { + // Query has been defined, return it + return query; + } + + // We generate query as 'select * from tbl' + String tableName = config.get(JdbcStorageConfig.TABLE.getPropertyName()); + query = "select * from " + tableName; + + return query; + } + + private static boolean isEmptyString(String value) { + return ((value == null) || (value.trim().isEmpty())); + } +} diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DB2DatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DB2DatabaseAccessor.java new file mode 100644 index 0000000000..d8766429f4 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DB2DatabaseAccessor.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.dao; + +/** + * DB2 specific data accessor. DB2 JDBC drivers works similar to Postgres, so the current + * implementation of DB2DatabaseAccessor is the same as PostgresDatabaseAccessor + */ +public class DB2DatabaseAccessor extends GenericJdbcDatabaseAccessor { + + @Override + protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { + if (offset == 0) { + return addLimitToQuery(sql, limit); + } else { + if (limit == -1) { + return sql; + } + return sql + " LIMIT " + limit + " OFFSET " + offset; + } + } + + @Override + protected String addLimitToQuery(String sql, int limit) { + if (limit == -1) { + return sql; + } + return sql + " LIMIT " + limit; + } +} diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessor.java new file mode 100644 index 0000000000..597ae99842 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessor.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.dao; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessException; + +public interface DatabaseAccessor { + + int getTotalNumberOfRecords(Configuration conf) + throws JdbcDatabaseAccessException; + + JdbcRecordIterator getRecordIterator(Configuration conf, int limit, int offset) + throws JdbcDatabaseAccessException; + + void close(boolean cleanCache); +} diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java new file mode 100644 index 0000000000..5415b57631 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.dao; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.impala.extdatasource.jdbc.conf.DatabaseType; +import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfig; + +/** + * Factory for creating the correct DatabaseAccessor class for the job + */ +public class DatabaseAccessorFactory { + + private DatabaseAccessorFactory() { + } + + public static DatabaseAccessor getAccessor(DatabaseType dbType) { + DatabaseAccessor accessor; + switch (dbType) { + case MYSQL: + accessor = new MySqlDatabaseAccessor(); + break; + + case JETHRO_DATA: + accessor = new JethroDatabaseAccessor(); + break; + + case POSTGRES: + accessor = new PostgresDatabaseAccessor(); + break; + + case ORACLE: + accessor = new OracleDatabaseAccessor(); + break; + + case MSSQL: + accessor = new MsSqlDatabaseAccessor(); + break; + + case DB2: + accessor = new DB2DatabaseAccessor(); + break; + + default: + accessor = new GenericJdbcDatabaseAccessor(); + break; + } + + return accessor; + } + + + public static DatabaseAccessor getAccessor(Configuration conf) { + DatabaseType dbType = DatabaseType.valueOf( + conf.get(JdbcStorageConfig.DATABASE_TYPE.getPropertyName()).toUpperCase()); + return getAccessor(dbType); + } + +} diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java new file mode 100644 index 0000000000..5f74a2a6e8 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java @@ -0,0 +1,299 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.dao; + + +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import javax.sql.DataSource; + +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.commons.dbcp2.BasicDataSourceFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfig; +import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfigManager; +import org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessException; +import org.apache.impala.service.FeSupport; +import org.apache.impala.thrift.TCacheJarResult; +import org.apache.impala.thrift.TErrorCode; +import org.apache.impala.thrift.TStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; + +/** + * A data accessor that should in theory work with all JDBC compliant database drivers. + */ +public class GenericJdbcDatabaseAccessor implements DatabaseAccessor { + + protected static final Logger LOG = LoggerFactory + .getLogger(GenericJdbcDatabaseAccessor.class); + + protected static final String DBCP_CONFIG_PREFIX = "dbcp"; + protected static final int DEFAULT_FETCH_SIZE = 1000; + protected static final int CACHE_EXPIRE_TIMEOUT_S = 1800; + protected static final int CACHE_SIZE = 100; + + protected DataSource dbcpDataSource = null; + // Cache datasource for sharing + public static Cache dataSourceCache = CacheBuilder + .newBuilder() + .removalListener((RemovalListener) notification -> { + DataSource ds = notification.getValue(); + if (ds instanceof BasicDataSource) { + BasicDataSource dbcpDs = (BasicDataSource) ds; + try { + dbcpDs.close(); + LOG.info("Close datasource for '{}'.", notification.getKey()); + } catch (SQLException e) { + LOG.warn("Caught exception during datasource cleanup.", e); + } + } + }) + .expireAfterAccess(CACHE_EXPIRE_TIMEOUT_S, TimeUnit.SECONDS) + .maximumSize(CACHE_SIZE) + .build(); + + @Override + public int getTotalNumberOfRecords(Configuration conf) + throws JdbcDatabaseAccessException { + Connection conn = null; + PreparedStatement ps = null; + ResultSet rs = null; + + try { + initializeDatabaseSource(conf); + String sql = JdbcStorageConfigManager.getQueryToExecute(conf); + // TODO: If a target database cannot flatten this view query, try to text + // replace the generated "select *". + String countQuery = "SELECT COUNT(*) FROM (" + sql + ") tmptable"; + LOG.info("Query to execute is [{}]", countQuery); + + conn = dbcpDataSource.getConnection(); + ps = conn.prepareStatement(countQuery); + rs = ps.executeQuery(); + if (rs.next()) { + return rs.getInt(1); + } else { + LOG.warn("The count query '{}' did not return any results.", countQuery); + throw new JdbcDatabaseAccessException( + "Count query did not return any results."); + } + } catch (JdbcDatabaseAccessException he) { + throw he; + } catch (Exception e) { + LOG.error("Caught exception while trying to get the number of records", e); + throw new JdbcDatabaseAccessException(e); + } finally { + cleanupResources(conn, ps, rs); + } + } + + + @Override + public JdbcRecordIterator getRecordIterator(Configuration conf, int limit, int offset) + throws JdbcDatabaseAccessException { + + Connection conn = null; + PreparedStatement ps = null; + ResultSet rs = null; + + try { + initializeDatabaseSource(conf); + String sql = JdbcStorageConfigManager.getQueryToExecute(conf); + String partitionQuery = addLimitAndOffsetToQuery(sql, limit, offset); + + LOG.info("Query to execute is [{}]", partitionQuery); + + conn = dbcpDataSource.getConnection(); + ps = conn.prepareStatement(partitionQuery, ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY); + ps.setFetchSize(getFetchSize(conf)); + rs = ps.executeQuery(); + + return new JdbcRecordIterator(conn, ps, rs, conf); + } catch (Exception e) { + LOG.error("Caught exception while trying to execute query", e); + cleanupResources(conn, ps, rs); + throw new JdbcDatabaseAccessException( + "Caught exception while trying to execute query:" + e.getMessage(), e); + } + } + + + @Override + public void close(boolean cleanCache) { + dbcpDataSource = null; + if (cleanCache && dataSourceCache != null) { + dataSourceCache.invalidateAll(); + dataSourceCache = null; + } + } + + /** + * Uses generic JDBC escape functions to add a limit and offset clause to a query + * string + * + * @param sql + * @param limit + * @param offset + * @return + */ + protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { + if (offset == 0) { + return addLimitToQuery(sql, limit); + } else if (limit != -1) { + return sql + " {LIMIT " + limit + " OFFSET " + offset + "}"; + } else { + return sql + " {OFFSET " + offset + "}"; + } + } + + /* + * Uses generic JDBC escape functions to add a limit clause to a query string + */ + protected String addLimitToQuery(String sql, int limit) { + if (limit == -1) { + return sql; + } + return sql + " {LIMIT " + limit + "}"; + } + + protected void cleanupResources(Connection conn, PreparedStatement ps, ResultSet rs) { + try { + if (rs != null) { + rs.close(); + } + } catch (SQLException e) { + LOG.warn("Caught exception during resultset cleanup.", e); + } + + try { + if (ps != null) { + ps.close(); + } + } catch (SQLException e) { + LOG.warn("Caught exception during statement cleanup.", e); + } + + try { + if (conn != null) { + conn.close(); + } + } catch (SQLException e) { + LOG.warn("Caught exception during connection cleanup.", e); + } + } + + protected void initializeDatabaseSource(Configuration conf) + throws ExecutionException { + if (dbcpDataSource == null) { + synchronized (this) { + if (dbcpDataSource == null) { + Properties props = getConnectionPoolProperties(conf); + String jdbcUrl = props.getProperty("url"); + String username = props.getProperty("username", "-"); + String cacheMapKey = String.format("%s.%s", jdbcUrl, username); + dbcpDataSource = dataSourceCache.get(cacheMapKey, + () -> { + LOG.info("Datasource for '{}' was not cached. " + + "Loading now.", cacheMapKey); + BasicDataSource basicDataSource = + BasicDataSourceFactory.createDataSource(props); + // Put jdbc driver to cache + String driverUrl = props.getProperty("driverUrl"); + TCacheJarResult cacheResult = FeSupport.CacheJar(driverUrl); + TStatus cacheJarStatus = cacheResult.getStatus(); + if (cacheJarStatus.getStatus_code() != TErrorCode.OK) { + throw new JdbcDatabaseAccessException(String.format( + "Unable to cache jdbc driver jar at location '%s'. " + + "Check that the file exists and is readable. Message: %s", + driverUrl, cacheJarStatus.getError_msgs())); + } + String driverLocalPath = cacheResult.getLocal_path(); + // Create class loader for jdbc driver and set it for the + // BasicDataSource object so that the driver class could be loaded + // from jar file without searching classpath. + URL driverJarUrl = new File(driverLocalPath).toURI().toURL(); + URLClassLoader driverLoader = + URLClassLoader.newInstance( new URL[] { driverJarUrl }, + getClass().getClassLoader()); + basicDataSource.setDriverClassLoader(driverLoader); + return basicDataSource; + }); + } + } + } + } + + protected Properties getConnectionPoolProperties(Configuration conf) { + // Create the default properties object + Properties dbProperties = getDefaultDBCPProperties(); + + // user properties + Map userProperties = conf.getValByRegex(DBCP_CONFIG_PREFIX + "\\.*"); + if ((userProperties != null) && (!userProperties.isEmpty())) { + for (Entry entry : userProperties.entrySet()) { + dbProperties.put(entry.getKey().replaceFirst(DBCP_CONFIG_PREFIX + "\\.", ""), + entry.getValue()); + } + } + + // essential properties + dbProperties.put("url", conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName())); + dbProperties.put("driverClassName", + conf.get(JdbcStorageConfig.JDBC_DRIVER_CLASS.getPropertyName())); + dbProperties.put("driverUrl", + conf.get(JdbcStorageConfig.JDBC_DRIVER_URL.getPropertyName())); + dbProperties.put("type", "javax.sql.DataSource"); + return dbProperties; + } + + protected Properties getDefaultDBCPProperties() { + Properties props = new Properties(); + // Don't set 'initialSize', otherwise the driver class will be loaded in + // BasicDataSourceFactory.createDataSource() before the class loader is set + // by calling BasicDataSource.setDriverClassLoader. + // props.put("initialSize", "1"); + props.put("maxActive", "3"); + props.put("maxIdle", "0"); + props.put("maxWait", "10000"); + props.put("timeBetweenEvictionRunsMillis", "30000"); + return props; + } + + protected int getFetchSize(Configuration conf) { + return conf + .getInt(JdbcStorageConfig.JDBC_FETCH_SIZE.getPropertyName(), DEFAULT_FETCH_SIZE); + } + +} diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/JdbcRecordIterator.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/JdbcRecordIterator.java new file mode 100644 index 0000000000..73dafcbaf0 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/JdbcRecordIterator.java @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.dao; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import java.util.TimeZone; + +import org.apache.hadoop.conf.Configuration; +import org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessException; +import org.apache.impala.extdatasource.thrift.TColumnDesc; +import org.apache.impala.extdatasource.util.SerializationUtils; +import org.apache.impala.thrift.TColumnData; +import org.apache.impala.thrift.TColumnType; +import org.apache.impala.thrift.TScalarType; +import org.apache.impala.thrift.TTypeNodeType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * An iterator that allows iterating through a SQL resultset. Includes methods to clear up + * resources. + */ +public class JdbcRecordIterator { + + private static final Logger LOGGER = LoggerFactory.getLogger(JdbcRecordIterator.class); + + private final Connection conn; + private final PreparedStatement ps; + private final ResultSet rs; + private final List jdbcColumnNames; + + public JdbcRecordIterator(Connection conn, PreparedStatement ps, ResultSet rs, + Configuration conf) throws JdbcDatabaseAccessException { + this.conn = conn; + this.ps = ps; + this.rs = rs; + + try { + ResultSetMetaData metadata = rs.getMetaData(); + int numColumns = metadata.getColumnCount(); + List columnNames = new ArrayList<>(numColumns); + List jdbcColumnTypes = new ArrayList<>(numColumns); + for (int i = 0; i < numColumns; i++) { + columnNames.add(metadata.getColumnName(i + 1)); + jdbcColumnTypes.add(metadata.getColumnType(i + 1)); + } + jdbcColumnNames = columnNames; + } catch (Exception e) { + LOGGER.error("Error while trying to get column names.", e); + throw new JdbcDatabaseAccessException( + "Error while trying to get column names: " + e.getMessage(), e); + } + LOGGER.debug("Iterator ColumnNames = {}", jdbcColumnNames); + } + + public boolean hasNext() throws JdbcDatabaseAccessException { + try { + return rs.next(); + } catch (Exception e) { + LOGGER.warn("hasNext() threw exception", e); + throw new JdbcDatabaseAccessException( + "Error while retrieving next batch of rows: " + e.getMessage(), e); + } + } + + public void next(List colDescs, List colDatas) { + Preconditions.checkState(colDescs.size() == colDatas.size()); + for (int i = 0; i < colDescs.size(); ++i) { + TColumnType type = colDescs.get(i).getType(); + TColumnData colData = colDatas.get(i); + if (type.types.get(0).getType() != TTypeNodeType.SCALAR) { + // Unsupported non-scalar type. + throw new UnsupportedOperationException("Unsupported column type: " + + type.types.get(0).getType()); + } + Preconditions.checkState(type.getTypesSize() == 1); + TScalarType scalarType = type.types.get(0).scalar_type; + try { + Object value = rs.getObject(i + 1); + if (value == null) { + colData.addToIs_null(true); + continue; + } + switch (scalarType.type) { + case TINYINT: + colData.addToByte_vals(rs.getByte(i + 1)); + break; + case SMALLINT: + colData.addToShort_vals(rs.getShort(i + 1)); + break; + case INT: + colData.addToInt_vals(rs.getInt(i + 1)); + break; + case DATE: + LocalDate localDate = Instant.ofEpochMilli(rs.getDate(i + 1).getTime()) + .atZone(ZoneId.systemDefault()) + .toLocalDate(); + colData.addToInt_vals((int) localDate.toEpochDay()); + break; + case BIGINT: + colData.addToLong_vals(rs.getLong(i + 1)); + break; + case DOUBLE: + colData.addToDouble_vals(rs.getDouble(i + 1)); + break; + case FLOAT: + colData.addToDouble_vals(rs.getFloat(i + 1)); + break; + case STRING: + colData.addToString_vals(rs.getString(i + 1)); + break; + case BOOLEAN: + colData.addToBool_vals(rs.getBoolean(i + 1)); + break; + case TIMESTAMP: + // Use UTC time zone instead of system default time zone + colData.addToBinary_vals( + SerializationUtils.encodeTimestamp(rs.getTimestamp(i + 1, + Calendar.getInstance(TimeZone.getTimeZone(ZoneOffset.UTC))))); + break; + case DECIMAL: + BigDecimal val = rs.getBigDecimal(i + 1); + colData.addToBinary_vals( + SerializationUtils.encodeDecimal(new BigDecimal(val.byteValue()))); + break; + case BINARY: + case CHAR: + case DATETIME: + case INVALID_TYPE: + case NULL_TYPE: + default: + // Unsupported. + throw new UnsupportedOperationException("Unsupported column type: " + + scalarType.getType()); + } + colData.addToIs_null(false); + } catch (SQLException throwables) { + colData.addToIs_null(true); + } + } + } + + /** + * Release all DB resources + */ + public void close() throws JdbcDatabaseAccessException { + try { + rs.close(); + ps.close(); + conn.close(); + } catch (Exception e) { + LOGGER.warn("Caught exception while trying to close database objects", e); + throw new JdbcDatabaseAccessException( + "Error while releasing database resources: " + e.getMessage(), e); + } + } + + +} diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/JethroDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/JethroDatabaseAccessor.java new file mode 100644 index 0000000000..8cc2cefe88 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/JethroDatabaseAccessor.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.dao; + +/** + * JethroData specific data accessor. This is needed because JethroData JDBC drivers do + * not support generic LIMIT and OFFSET escape functions, and has some special + * optimization for getting the query metadata using limit 0. + */ + +public class JethroDatabaseAccessor extends GenericJdbcDatabaseAccessor { + + @Override + protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { + if (offset == 0) { + return addLimitToQuery(sql, limit); + } else { + return sql + " LIMIT " + offset + "," + limit; + } + } + + @Override + protected String addLimitToQuery(String sql, int limit) { + return "Select * from (" + sql + ") as \"tmp\" limit " + limit; + } + +} diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MsSqlDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MsSqlDatabaseAccessor.java new file mode 100644 index 0000000000..bdbc617012 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MsSqlDatabaseAccessor.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.dao; + +/** + * MSSQL specific data accessor. This is needed because MSSQL JDBC drivers do not support + * generic LIMIT and OFFSET escape functions + */ +public class MsSqlDatabaseAccessor extends GenericJdbcDatabaseAccessor { + + @Override + protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { + if (offset == 0) { + return addLimitToQuery(sql, limit); + } else { + if (limit == -1) { + return sql; + } + // Order by is not necessary, but MS SQL require it to use FETCH + return sql + " ORDER BY 1 OFFSET " + offset + " ROWS FETCH NEXT " + limit + + " ROWS ONLY"; + } + } + + @Override + protected String addLimitToQuery(String sql, int limit) { + if (limit == -1) { + return sql; + } + return sql + " {LIMIT " + limit + "}"; + } +} diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MySqlDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MySqlDatabaseAccessor.java new file mode 100644 index 0000000000..cd9ada13b8 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MySqlDatabaseAccessor.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.dao; + +/** + * MySQL specific data accessor. This is needed because MySQL JDBC drivers do not support + * generic LIMIT and OFFSET escape functions + */ +public class MySqlDatabaseAccessor extends GenericJdbcDatabaseAccessor { + + @Override + protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { + if (offset == 0) { + return addLimitToQuery(sql, limit); + } else { + if (limit != -1) { + return sql + " LIMIT " + offset + "," + limit; + } else { + return sql; + } + } + } + + + @Override + protected String addLimitToQuery(String sql, int limit) { + if (limit != -1) { + return sql + " LIMIT " + limit; + } else { + return sql; + } + } + +} diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/OracleDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/OracleDatabaseAccessor.java new file mode 100644 index 0000000000..bae31f20c2 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/OracleDatabaseAccessor.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.dao; + +/** + * Oracle specific data accessor. This is needed because Oracle JDBC drivers do not + * support generic LIMIT and OFFSET escape functions + */ +public class OracleDatabaseAccessor extends GenericJdbcDatabaseAccessor { + + // Random column name to reduce the chance of conflict + static final String ROW_NUM_COLUMN_NAME = "dummy_rownum_col_rn1938392"; + + @Override + protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { + if (offset == 0) { + return addLimitToQuery(sql, limit); + } else { + if (limit == -1) { + return sql; + } + // A simple ROWNUM > offset and ROWNUM <= (offset + limit) won't work, it will + // return nothing + return "SELECT * FROM (SELECT t.*, ROWNUM AS " + ROW_NUM_COLUMN_NAME + " FROM (" + + sql + ") t) WHERE " + + ROW_NUM_COLUMN_NAME + " >" + offset + " AND " + ROW_NUM_COLUMN_NAME + " <=" + + (offset + limit); + } + } + + + @Override + protected String addLimitToQuery(String sql, int limit) { + if (limit == -1) { + return sql; + } + return "SELECT * FROM (" + sql + ") WHERE ROWNUM <= " + limit; + } + +} diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/PostgresDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/PostgresDatabaseAccessor.java new file mode 100644 index 0000000000..9ec0d7ab64 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/PostgresDatabaseAccessor.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.dao; + +/** + * Postgres specific data accessor. Postgres JDBC drivers do not support generic LIMIT and + * OFFSET escape functions + */ +public class PostgresDatabaseAccessor extends GenericJdbcDatabaseAccessor { + + @Override + protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) { + if (offset == 0) { + return addLimitToQuery(sql, limit); + } else { + if (limit == -1) { + return sql; + } + return sql + " LIMIT " + limit + " OFFSET " + offset; + } + } + + @Override + protected String addLimitToQuery(String sql, int limit) { + if (limit == -1) { + return sql; + } + return sql + " LIMIT " + limit; + } +} diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/exception/JdbcDatabaseAccessException.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/exception/JdbcDatabaseAccessException.java new file mode 100644 index 0000000000..c85e4bb860 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/exception/JdbcDatabaseAccessException.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.exception; + +public class JdbcDatabaseAccessException extends Exception { + + private static final long serialVersionUID = -4106595742876276803L; + + public JdbcDatabaseAccessException() { + super(); + } + + public JdbcDatabaseAccessException(String message, Throwable cause) { + super(message, cause); + } + + public JdbcDatabaseAccessException(String message) { + super(message); + } + + public JdbcDatabaseAccessException(Throwable cause) { + super(cause); + } +} diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/util/QueryConditionUtil.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/util/QueryConditionUtil.java new file mode 100644 index 0000000000..4f7d581563 --- /dev/null +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/util/QueryConditionUtil.java @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc.util; + +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; + +import org.apache.impala.analysis.BinaryPredicate; +import org.apache.impala.analysis.BinaryPredicate.Operator; +import org.apache.impala.extdatasource.thrift.TBinaryPredicate; +import org.apache.impala.extdatasource.thrift.TComparisonOp; +import org.apache.impala.thrift.TColumnValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Translates the impala query predicates into a condition that can be run on the + * underlying database + */ +public class QueryConditionUtil { + + private final static Logger LOG = LoggerFactory.getLogger(QueryConditionUtil.class); + + public static String buildCondition(List> predicates, + Map columnMapping) { + List condition = Lists.newArrayList(); + for (List tBinaryPredicates : predicates) { + StringJoiner joiner = new StringJoiner(" OR ", "(", ")"); + for (TBinaryPredicate predicate : tBinaryPredicates) { + String name = predicate.getCol().getName(); + name = columnMapping.getOrDefault(name, name); + String op = converse(predicate.getOp()); + String value = getTColumnValueAsString(predicate.getValue()); + joiner.add(String.format("%s %s %s", name, op, value)); + } + condition.add(joiner.toString()); + } + return Joiner.on(" AND ").join(condition); + } + + /** + * Return the value of a defined field as a string. If the "value" is null or the type + * is not supported, an exception is thrown. + * + * @see org.apache.impala.planner.DataSourceScanNode#literalToColumnValue + */ + public static String getTColumnValueAsString(TColumnValue value) { + Preconditions.checkState(value != null); + StringBuilder sb = new StringBuilder(); + if (value.isSetBool_val()) { + sb.append(value.bool_val); + } else if (value.isSetByte_val()) { + sb.append(value.byte_val); + } else if (value.isSetShort_val()) { + sb.append(value.short_val); + } else if (value.isSetInt_val()) { + sb.append(value.int_val); + } else if (value.isSetLong_val()) { + sb.append(value.long_val); + } else if (value.isSetDouble_val()) { + sb.append(value.double_val); + } else if (value.isSetString_val()) { + sb.append(String.format("'%s'", value.string_val)); + } else { + // TODO: Support data types of DECIMAL, TIMESTAMP, DATE and binary for predicates. + // Keep in-sync with DataSourceScanNode.literalToColumnValue(). + throw new IllegalArgumentException("Unsupported data type."); + } + return sb.toString(); + } + + /** + * @see BinaryPredicate.Operator + */ + public static String converse(TComparisonOp op) { + for (Operator operator : BinaryPredicate.Operator.values()) { + if (operator.getThriftOp() == op) { + return operator.toString(); + } + } + return null; + } +} diff --git a/java/ext-data-source/jdbc/src/test/java/org/apache/impala/extdatasource/jdbc/JdbcDataSourceTest.java b/java/ext-data-source/jdbc/src/test/java/org/apache/impala/extdatasource/jdbc/JdbcDataSourceTest.java new file mode 100644 index 0000000000..229367c8df --- /dev/null +++ b/java/ext-data-source/jdbc/src/test/java/org/apache/impala/extdatasource/jdbc/JdbcDataSourceTest.java @@ -0,0 +1,253 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.extdatasource.jdbc; + +import com.google.common.collect.Lists; +import java.util.List; +import org.apache.commons.lang3.StringUtils; +import org.apache.impala.extdatasource.thrift.TBinaryPredicate; +import org.apache.impala.extdatasource.thrift.TCloseParams; +import org.apache.impala.extdatasource.thrift.TCloseResult; +import org.apache.impala.extdatasource.thrift.TColumnDesc; +import org.apache.impala.extdatasource.thrift.TComparisonOp; +import org.apache.impala.extdatasource.thrift.TGetNextParams; +import org.apache.impala.extdatasource.thrift.TGetNextResult; +import org.apache.impala.extdatasource.thrift.TOpenParams; +import org.apache.impala.extdatasource.thrift.TOpenResult; +import org.apache.impala.extdatasource.thrift.TPrepareParams; +import org.apache.impala.extdatasource.thrift.TPrepareResult; +import org.apache.impala.extdatasource.thrift.TRowBatch; +import org.apache.impala.extdatasource.thrift.TTableSchema; +import org.apache.impala.thrift.TColumnData; +import org.apache.impala.thrift.TColumnType; +import org.apache.impala.thrift.TColumnValue; +import org.apache.impala.thrift.TErrorCode; +import org.apache.impala.thrift.TPrimitiveType; +import org.apache.impala.thrift.TScalarType; +import org.apache.impala.thrift.TTypeNode; +import org.apache.impala.thrift.TTypeNodeType; +import org.apache.impala.thrift.TUniqueId; +import org.junit.Assert; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class JdbcDataSourceTest { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcDataSourceTest.class); + + private static String initString_ = "CACHE_CLASS::{\"database.type\":\"H2\", " + + "\"jdbc.url\":\"jdbc:h2:mem:test;MODE=MySQL;INIT=runscript from " + + "'classpath:test_script.sql'\", " + + "\"jdbc.driver\":\"org.h2.Driver\", " + + "\"table\":\"test_strategy\"," + + "\"column.mapping\":\"id=strategy_id\"}"; + + // Share data between tests + private static JdbcDataSource jdbcDataSource_ = new JdbcDataSource(); + private static String scanHandle_; + private static TTableSchema schema_; + private static List> predicates_ = Lists.newArrayList(); + private static List> acceptedPredicates_ = Lists.newArrayList(); + private static long expectReturnRows_ = 5L; + + @Test + public void test01Init() { + String colName = "id"; + + TComparisonOp op = TComparisonOp.LE; + + TTypeNode typeNode = new TTypeNode(); + typeNode.setType(TTypeNodeType.SCALAR); + TScalarType scalarType = new TScalarType(); + scalarType.setType(TPrimitiveType.INT); + typeNode.setScalar_type(scalarType); + TColumnType colType = new TColumnType(); + colType.setTypes(Lists.newArrayList(typeNode)); + TColumnDesc col = new TColumnDesc().setName(colName).setType(colType); + TColumnValue value = new TColumnValue(); + value.setInt_val(3); + TBinaryPredicate idPredicate = new TBinaryPredicate().setCol(col).setOp(op) + .setValue(value); + + // predicates filter + predicates_.add(Lists.newArrayList(idPredicate)); + expectReturnRows_ = 3L; + LOG.info("setup predicates:{}, expectReturnRows: {}", predicates_, expectReturnRows_); + + boolean ret = jdbcDataSource_.convertInitStringToConfiguration(initString_); + Assert.assertTrue(ret); + } + + @Test + public void test02Prepare() { + TPrepareParams params = new TPrepareParams(); + params.setTable_name("test_strategy"); + params.setInit_string(initString_); + params.setPredicates(Lists.newArrayList()); + params.setPredicates(predicates_); + TPrepareResult resp = jdbcDataSource_.prepare(params); + Assert.assertEquals(TErrorCode.OK, resp.getStatus().status_code); + if (resp.isSetAccepted_conjuncts()) { + acceptedPredicates_ = Lists.newArrayList(); + // @see org.apache.impala.planner.DataSourceScanNode#removeAcceptedConjuncts + List acceptedPredicatesIdx = resp.getAccepted_conjuncts(); + // Because conjuncts_ is modified in place using positional indexes from + // conjunctsIdx, we remove the accepted predicates in reverse order. + for (int i = acceptedPredicatesIdx.size() - 1; i >= 0; --i) { + int acceptedPredIdx = acceptedPredicatesIdx.get(i); + acceptedPredicates_.add(predicates_.remove(acceptedPredIdx)); + } + // Returns a view of the list in the original order as we will print these + // in the explain string and it's convenient to have predicates printed + // in the same order that they're specified. + acceptedPredicates_ = Lists.reverse(acceptedPredicates_); + } + if (resp.isSetNum_rows_estimate()) { + long estimate = resp.getNum_rows_estimate(); + Assert.assertEquals(5, estimate); + } + } + + @Test + public void test03Open() { + TOpenParams params = new TOpenParams(); + TUniqueId unique_id = new TUniqueId(); + unique_id.hi = 0xfeedbeeff00d7777L; + unique_id.lo = 0x2020202020202020L; + String str = "feedbeeff00d7777:2020202020202020"; + params.setQuery_id(unique_id); + params.setTable_name("test_strategy"); + params.setInit_string(initString_); + schema_ = initSchema(); + params.setRow_schema(schema_); + params.setBatch_size(5); + params.setPredicates(acceptedPredicates_); + TOpenResult resp = jdbcDataSource_.open(params); + Assert.assertEquals(TErrorCode.OK, resp.getStatus().status_code); + scanHandle_ = resp.getScan_handle(); + Assert.assertTrue(StringUtils.isNoneBlank(scanHandle_)); + } + + @Test + public void test04GetNext() { + TGetNextParams params = new TGetNextParams(); + params.setScan_handle(scanHandle_); + boolean eos; + long totalNumRows = 0; + do { + TGetNextResult resp = jdbcDataSource_.getNext(params); + Assert.assertEquals(TErrorCode.OK, resp.getStatus().status_code); + eos = resp.isEos(); + TRowBatch rowBatch = resp.getRows(); + long numRows = rowBatch.getNum_rows(); + totalNumRows += numRows; + List cols = rowBatch.getCols(); + Assert.assertEquals(schema_.getColsSize(), cols.size()); + } while (!eos); + Assert.assertEquals(expectReturnRows_, totalNumRows); + } + + @Test + public void test05Close() { + TCloseParams params = new TCloseParams(); + params.setScan_handle(scanHandle_); + TCloseResult resp = jdbcDataSource_.close(params); + Assert.assertEquals(TErrorCode.OK, resp.getStatus().status_code); + } + + private static TTableSchema initSchema() { + // strategy_id int, name string, referrer string, landing string, priority int, + // implementation string, last_modified timestamp + TTableSchema schema_ = new TTableSchema(); + TColumnDesc col = new TColumnDesc(); + col.setName("id"); + TTypeNode typeNode = new TTypeNode(); + typeNode.setType(TTypeNodeType.SCALAR); + TScalarType scalarType = new TScalarType(); + scalarType.setType(TPrimitiveType.INT); + typeNode.setScalar_type(scalarType); + TColumnType colType = new TColumnType(); + colType.setTypes(Lists.newArrayList(typeNode)); + col.setType(colType); + schema_.addToCols(col); + + col = new TColumnDesc(); + col.setName("name"); + typeNode = new TTypeNode(); + typeNode.setType(TTypeNodeType.SCALAR); + scalarType = new TScalarType(); + scalarType.setType(TPrimitiveType.STRING); + typeNode.setScalar_type(scalarType); + colType = new TColumnType(); + colType.setTypes(Lists.newArrayList(typeNode)); + col.setType(colType); + schema_.addToCols(col); + + col = new TColumnDesc(); + col.setName("priority"); + typeNode = new TTypeNode(); + typeNode.setType(TTypeNodeType.SCALAR); + scalarType = new TScalarType(); + scalarType.setType(TPrimitiveType.INT); + typeNode.setScalar_type(scalarType); + colType = new TColumnType(); + colType.setTypes(Lists.newArrayList(typeNode)); + col.setType(colType); + schema_.addToCols(col); + + col = new TColumnDesc(); + col.setName("implementation"); + typeNode = new TTypeNode(); + typeNode.setType(TTypeNodeType.SCALAR); + scalarType = new TScalarType(); + scalarType.setType(TPrimitiveType.STRING); + typeNode.setScalar_type(scalarType); + colType = new TColumnType(); + colType.setTypes(Lists.newArrayList(typeNode)); + col.setType(colType); + schema_.addToCols(col); + + col = new TColumnDesc(); + col.setName("last_modified"); + typeNode = new TTypeNode(); + typeNode.setType(TTypeNodeType.SCALAR); + scalarType = new TScalarType(); + scalarType.setType(TPrimitiveType.TIMESTAMP); + typeNode.setScalar_type(scalarType); + colType = new TColumnType(); + colType.setTypes(Lists.newArrayList(typeNode)); + col.setType(colType); + schema_.addToCols(col); + return schema_; + } + + public static void printData(List colDescs, List colDatas) { + for (int i = 0; i < colDatas.size(); ++i) { + TColumnDesc colDesc = colDescs.get(i); + TColumnData colData = colDatas.get(i); + System.out.println("idx: " + i); + System.out.println(" Name: " + colDesc); + System.out.println(" Data: " + colData); + } + } + +} diff --git a/testdata/bin/copy-data-sources.sh b/java/ext-data-source/jdbc/src/test/resources/log4j.properties old mode 100755 new mode 100644 similarity index 58% rename from testdata/bin/copy-data-sources.sh rename to java/ext-data-source/jdbc/src/test/resources/log4j.properties index c218e15ddb..31e2a489ea --- a/testdata/bin/copy-data-sources.sh +++ b/java/ext-data-source/jdbc/src/test/resources/log4j.properties @@ -1,5 +1,3 @@ -#!/bin/bash -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -16,17 +14,15 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# -# This script copies the test data source library into hdfs. - -set -euo pipefail -. $IMPALA_HOME/bin/report_build_error.sh -setup_report_build_error -. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1 +# Define some default values that can be overridden by system properties +# Don't use hadoop.root.logger because Hadoop's config scripts override it +impala.hadoop.root.logger=INFO,console -hadoop fs -mkdir -p ${FILESYSTEM_PREFIX}/test-warehouse/data-sources/ +# Define the root logger to the system property "impala.hadoop.root.logger". +log4j.rootLogger=${impala.hadoop.root.logger} -hadoop fs -put -f \ - ${IMPALA_HOME}/java/ext-data-source/test/target/impala-data-source-test-*.jar \ - ${FILESYSTEM_PREFIX}/test-warehouse/data-sources/test-data-source.jar +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n diff --git a/java/ext-data-source/jdbc/src/test/resources/test_script.sql b/java/ext-data-source/jdbc/src/test/resources/test_script.sql new file mode 100644 index 0000000000..f88cb51443 --- /dev/null +++ b/java/ext-data-source/jdbc/src/test/resources/test_script.sql @@ -0,0 +1,47 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +DROP TABLE IF EXISTS test_strategy; + +CREATE TABLE IF NOT EXISTS test_strategy ( + strategy_id int(11) NOT NULL, + name varchar(50) NOT NULL, + referrer varchar(1024) DEFAULT NULL, + landing varchar(1024) DEFAULT NULL, + priority int(11) DEFAULT NULL, + implementation varchar(512) DEFAULT NULL, + last_modified timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (strategy_id) +); + + +INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, implementation, + last_modified) +VALUES (1, 'S1', 'aaa', 'abc', 1000, NULL, '2012-05-08 15:01:15'); +INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, implementation, + last_modified) +VALUES (2, 'S2', 'bbb', 'def', 990, NULL, '2012-05-08 15:01:15'); +INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, implementation, + last_modified) +VALUES (3, 'S3', 'ccc', 'ghi', 1000, NULL, '2012-05-08 15:01:15'); +INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, implementation, + last_modified) +VALUES (4, 'S4', 'ddd', 'jkl', 980, NULL, '2012-05-08 15:01:15'); +INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, implementation, + last_modified) +VALUES (5, 'S5', 'eee', NULL, NULL, NULL, '2012-05-08 15:01:15'); diff --git a/java/ext-data-source/pom.xml b/java/ext-data-source/pom.xml index febc41a055..d16b0882d1 100644 --- a/java/ext-data-source/pom.xml +++ b/java/ext-data-source/pom.xml @@ -33,5 +33,6 @@ api sample test + jdbc diff --git a/testdata/bin/copy-ext-data-sources.sh b/testdata/bin/copy-ext-data-sources.sh new file mode 100755 index 0000000000..7b651914f2 --- /dev/null +++ b/testdata/bin/copy-ext-data-sources.sh @@ -0,0 +1,56 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# This script copies the test data source library into hdfs. + +set -euo pipefail +. $IMPALA_HOME/bin/report_build_error.sh +setup_report_build_error + +. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1 + +EXT_DATA_SOURCE_SRC_PATH=${IMPALA_HOME}/java/ext-data-source +EXT_DATA_SOURCES_HDFS_PATH=${FILESYSTEM_PREFIX}/test-warehouse/data-sources +JDBC_DRIVERS_HDFS_PATH=${EXT_DATA_SOURCES_HDFS_PATH}/jdbc-drivers + +hadoop fs -mkdir -p ${EXT_DATA_SOURCES_HDFS_PATH} +hadoop fs -mkdir -p ${JDBC_DRIVERS_HDFS_PATH} + +# Copy libraries of external data sources to HDFS +hadoop fs -put -f \ + ${EXT_DATA_SOURCE_SRC_PATH}/test/target/impala-data-source-test-*.jar \ + ${EXT_DATA_SOURCES_HDFS_PATH}/test-data-source.jar + +echo "Copied" ${EXT_DATA_SOURCE_SRC_PATH}/test/target/impala-data-source-test-*.jar \ + "into HDFS" ${EXT_DATA_SOURCES_HDFS_PATH} + +hadoop fs -put -f \ + ${EXT_DATA_SOURCE_SRC_PATH}/jdbc/target/impala-data-source-jdbc-*.jar \ + ${EXT_DATA_SOURCES_HDFS_PATH}/jdbc-data-source.jar + +echo "Copied" ${EXT_DATA_SOURCE_SRC_PATH}/jdbc/target/impala-data-source-jdbc-*.jar \ + "into HDFS" ${EXT_DATA_SOURCES_HDFS_PATH} + +# Copy Postgres JDBC driver to HDFS +hadoop fs -put -f \ + ${IMPALA_HOME}/fe/target/dependency/postgresql-*.jar \ + ${JDBC_DRIVERS_HDFS_PATH}/postgresql-jdbc.jar + +echo "Copied" ${IMPALA_HOME}/fe/target/dependency/postgresql-*.jar \ + "into HDFS" ${JDBC_DRIVERS_HDFS_PATH} diff --git a/testdata/bin/create-data-source-table.sql b/testdata/bin/create-data-source-table.sql deleted file mode 100644 index 5d9c032566..0000000000 --- a/testdata/bin/create-data-source-table.sql +++ /dev/null @@ -1,47 +0,0 @@ --- --- Licensed to the Apache Software Foundation (ASF) under one --- or more contributor license agreements. See the NOTICE file --- distributed with this work for additional information --- regarding copyright ownership. The ASF licenses this file --- to you under the Apache License, Version 2.0 (the --- "License"); you may not use this file except in compliance --- with the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, --- software distributed under the License is distributed on an --- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY --- KIND, either express or implied. See the License for the --- specific language governing permissions and limitations --- under the License. - --- Create test data sources and tables - -USE functional; - -DROP DATA SOURCE IF EXISTS AllTypesDataSource; -CREATE DATA SOURCE AllTypesDataSource -LOCATION '/test-warehouse/data-sources/test-data-source.jar' -CLASS 'org.apache.impala.extdatasource.AllTypesDataSource' -API_VERSION 'V1'; - -DROP TABLE IF EXISTS alltypes_datasource; -CREATE TABLE alltypes_datasource ( - id INT, - bool_col BOOLEAN, - tinyint_col TINYINT, - smallint_col SMALLINT, - int_col INT, - bigint_col BIGINT, - float_col FLOAT, - double_col DOUBLE, - timestamp_col TIMESTAMP, - string_col STRING, - dec_col1 DECIMAL(9,0), - dec_col2 DECIMAL(10,0), - dec_col3 DECIMAL(20,10), - dec_col4 DECIMAL(38,37), - dec_col5 DECIMAL(10,5), - date_col DATE) -PRODUCED BY DATA SOURCE AllTypesDataSource("TestInitString"); diff --git a/testdata/bin/create-ext-data-source-table.sql b/testdata/bin/create-ext-data-source-table.sql new file mode 100644 index 0000000000..84bd02e174 --- /dev/null +++ b/testdata/bin/create-ext-data-source-table.sql @@ -0,0 +1,98 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Create test data sources and tables + +USE functional; + +DROP DATA SOURCE IF EXISTS AllTypesDataSource; +CREATE DATA SOURCE AllTypesDataSource +LOCATION '/test-warehouse/data-sources/test-data-source.jar' +CLASS 'org.apache.impala.extdatasource.AllTypesDataSource' +API_VERSION 'V1'; + +DROP TABLE IF EXISTS alltypes_datasource; +CREATE TABLE alltypes_datasource ( + id INT, + bool_col BOOLEAN, + tinyint_col TINYINT, + smallint_col SMALLINT, + int_col INT, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE, + timestamp_col TIMESTAMP, + string_col STRING, + dec_col1 DECIMAL(9,0), + dec_col2 DECIMAL(10,0), + dec_col3 DECIMAL(20,10), + dec_col4 DECIMAL(38,37), + dec_col5 DECIMAL(10,5), + date_col DATE) +PRODUCED BY DATA SOURCE AllTypesDataSource("TestInitString"); + +DROP DATA SOURCE IF EXISTS JdbcDataSource; +CREATE DATA SOURCE JdbcDataSource +LOCATION '/test-warehouse/data-sources/jdbc-data-source.jar' +CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource' +API_VERSION 'V1'; + +DROP TABLE IF EXISTS alltypes_jdbc_datasource; +CREATE TABLE alltypes_jdbc_datasource ( + id INT, + bool_col BOOLEAN, + tinyint_col TINYINT, + smallint_col SMALLINT, + int_col INT, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE, + date_string_col STRING, + string_col STRING, + timestamp_col TIMESTAMP) +PRODUCED BY DATA SOURCE JdbcDataSource( +'{"database.type":"POSTGRES", +"jdbc.url":"jdbc:postgresql://localhost:5432/functional", +"jdbc.driver":"org.postgresql.Driver", +"driver.url":"/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar", +"dbcp.username":"hiveuser", +"dbcp.password":"password", +"table":"alltypes"}'); + +DROP TABLE IF EXISTS alltypes_jdbc_datasource_2; +CREATE TABLE alltypes_jdbc_datasource_2 ( + id INT, + bool_col BOOLEAN, + tinyint_col TINYINT, + smallint_col SMALLINT, + int_col INT, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE, + date_string_col STRING, + string_col STRING, + timestamp_col TIMESTAMP) +PRODUCED BY DATA SOURCE JdbcDataSource( +'{"database.type":"POSTGRES", +"jdbc.url":"jdbc:postgresql://localhost:5432/functional", +"jdbc.driver":"org.postgresql.Driver", +"driver.url":"hdfs://localhost:20500/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar", +"dbcp.username":"hiveuser", +"dbcp.password":"password", +"table":"AllTypesWithQuote", +"column.mapping":"id=id, bool_col=Bool_col, tinyint_col=Tinyint_col, smallint_col=Smallint_col, int_col=Int_col, bigint_col=Bigint_col, float_col=Float_col, double_col=Double_col, date_string_col=Date_string_col, string_col=String_col, timestamp=Timestamp"}'); diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh index e9d6183a67..9d87c06cf8 100755 --- a/testdata/bin/create-load-data.sh +++ b/testdata/bin/create-load-data.sh @@ -478,10 +478,12 @@ function custom-post-load-steps { function copy-and-load-ext-data-source { # Copy the test data source library into HDFS - ${IMPALA_HOME}/testdata/bin/copy-data-sources.sh + ${IMPALA_HOME}/testdata/bin/copy-ext-data-sources.sh + # Load the underlying data of the data source + ${IMPALA_HOME}/testdata/bin/load-ext-data-sources.sh # Create data sources table. ${IMPALA_HOME}/bin/impala-shell.sh -i ${IMPALAD} -f\ - ${IMPALA_HOME}/testdata/bin/create-data-source-table.sql + ${IMPALA_HOME}/testdata/bin/create-ext-data-source-table.sql } function check-hdfs-health { diff --git a/testdata/bin/load-ext-data-sources.sh b/testdata/bin/load-ext-data-sources.sh new file mode 100755 index 0000000000..0476bdae29 --- /dev/null +++ b/testdata/bin/load-ext-data-sources.sh @@ -0,0 +1,83 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# This script create and load the jdbc data source target table in Postgres. + +set -euo pipefail +. $IMPALA_HOME/bin/report_build_error.sh +setup_report_build_error + +. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1 + +# Create functional.alltype table +dropdb -U hiveuser functional || true +createdb -U hiveuser functional + +# Create jdbc table +cat > /tmp/jdbc_alltypes.sql <<__EOT__ +DROP TABLE IF EXISTS alltypes; +CREATE TABLE alltypes +( + id INT, + bool_col BOOLEAN, + tinyint_col SMALLINT, + smallint_col SMALLINT, + int_col INT, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE PRECISION, + date_string_col VARCHAR(8), + string_col VARCHAR(10), + timestamp_col TIMESTAMP +); +__EOT__ +sudo -u postgres psql -U hiveuser -d functional -f /tmp/jdbc_alltypes.sql + +# Create jdbc table with case sensitive names for table and columns. +cat > /tmp/jdbc_alltypes_with_quote.sql <<__EOT__ +DROP TABLE IF EXISTS "AllTypesWithQuote"; +CREATE TABLE "AllTypesWithQuote" +( + "id" INT, + "Bool_col" BOOLEAN, + "Tinyint_col" SMALLINT, + "Smallint_col" SMALLINT, + "Int_col" INT, + "Bigint_col" BIGINT, + "Float_col" FLOAT, + "Double_col" DOUBLE PRECISION, + "Date_string_col" VARCHAR(8), + "String_col" VARCHAR(10), + "Timestamp_col" TIMESTAMP +); +__EOT__ +sudo -u postgres psql -U hiveuser -d functional -f /tmp/jdbc_alltypes_with_quote.sql + +# Load data to jdbc table +cat ${IMPALA_HOME}/testdata/target/AllTypes/* > /tmp/jdbc_alltypes.csv +loadCmd="COPY alltypes FROM '/tmp/jdbc_alltypes.csv' DELIMITER ',' CSV" +sudo -u postgres psql -d functional -c "$loadCmd" + +loadCmd="COPY \"AllTypesWithQuote\" FROM '/tmp/jdbc_alltypes.csv' DELIMITER ',' CSV" +sudo -u postgres psql -d functional -c "$loadCmd" + + +# Clean tmp files +rm /tmp/jdbc_alltypes.* +rm /tmp/jdbc_alltypes_with_quote.* diff --git a/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test b/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test new file mode 100644 index 0000000000..2bd0682340 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test @@ -0,0 +1,105 @@ +==== +---- QUERY +# Test the jdbc data source +---- QUERY +# count(*) with a predicate evaluated by Impala +select count(*) from alltypes_jdbc_datasource +where float_col = 0 and string_col is not NULL +---- RESULTS +730 +---- TYPES +BIGINT +==== +---- QUERY +# count(*) with no predicates has no materialized slots +select count(*) from alltypes_jdbc_datasource +---- RESULTS +7300 +---- TYPES +BIGINT +==== +---- QUERY +# Gets all types including a row with a NULL value. The predicate pushed to +# the data source. +select * +from alltypes_jdbc_datasource +where id > 10 and int_col< 5 limit 5 +---- RESULTS +11,false,1,1,1,10,1.100000023841858,10.1,'01/02/09','1',2009-01-02 00:11:00.450000000 +12,true,2,2,2,20,2.200000047683716,20.2,'01/02/09','2',2009-01-02 00:12:00.460000000 +13,false,3,3,3,30,3.299999952316284,30.3,'01/02/09','3',2009-01-02 00:13:00.480000000 +14,true,4,4,4,40,4.400000095367432,40.4,'01/02/09','4',2009-01-02 00:14:00.510000000 +20,true,0,0,0,0,0,0,'01/03/09','0',2009-01-03 00:20:00.900000000 +---- TYPES +INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP +==== +---- QUERY +# Gets specified columns. +select id, bool_col, smallint_col, float_col, double_col, date_string_col +from alltypes_jdbc_datasource +where id > 10 and int_col< 5 limit 5 +---- RESULTS +11,false,1,1.100000023841858,10.1,'01/02/09' +12,true,2,2.200000047683716,20.2,'01/02/09' +13,false,3,3.299999952316284,30.3,'01/02/09' +14,true,4,4.400000095367432,40.4,'01/02/09' +20,true,0,0,0,'01/03/09' +---- TYPES +INT, BOOLEAN, SMALLINT, FLOAT, DOUBLE, STRING +==== +---- QUERY +# Gets specified columns from external jdbc table with case sensitive column names +# and table name. +select id, bool_col, smallint_col, float_col, double_col, date_string_col +from alltypes_jdbc_datasource_2 +where id > 10 and int_col< 5 limit 5 +---- RESULTS +11,false,1,1.100000023841858,10.1,'01/02/09' +12,true,2,2.200000047683716,20.2,'01/02/09' +13,false,3,3.299999952316284,30.3,'01/02/09' +14,true,4,4.400000095367432,40.4,'01/02/09' +20,true,0,0,0,'01/03/09' +---- TYPES +INT, BOOLEAN, SMALLINT, FLOAT, DOUBLE, STRING +==== +---- QUERY +# Inner join with a non jdbc table +select a.id, b.int_col +from alltypes_jdbc_datasource a inner join functional.alltypes b on (a.id = b.id) +where a.id = 1 +---- RESULTS +1,1 +---- TYPES +INT, INT +==== +---- QUERY +# Inner join with another jdbc table +select a.id, b.int_col +from alltypes_jdbc_datasource a inner join alltypes_jdbc_datasource_2 b on (a.id = b.id) +where a.id < 3 group by a.id, b.int_col +---- RESULTS +0,0 +1,1 +2,2 +---- TYPES +INT, INT +==== +---- QUERY +# Cross join +select a.id, b.id +from alltypes_jdbc_datasource a cross join alltypes_jdbc_datasource b +where (a.id < 3 and b.id < 3) +order by a.id, b.id limit 10 +---- RESULTS +0,0 +0,1 +0,2 +1,0 +1,1 +1,2 +2,0 +2,1 +2,2 +---- TYPES +INT, INT +==== diff --git a/tests/query_test/test_ext_data_sources.py b/tests/query_test/test_ext_data_sources.py new file mode 100644 index 0000000000..b32d39cd93 --- /dev/null +++ b/tests/query_test/test_ext_data_sources.py @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function + +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.skip import SkipIfCatalogV2 +from tests.common.test_dimensions import create_uncompressed_text_dimension + + +class TestExtDataSources(ImpalaTestSuite): + """Impala query tests for external data sources.""" + + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestExtDataSources, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension( + create_uncompressed_text_dimension(cls.get_workload())) + + def _get_tbl_properties(self, table_name): + """Extracts the table properties mapping from the output of DESCRIBE FORMATTED""" + return self._get_properties('Table Parameters:', table_name) + + def _get_properties(self, section_name, name, is_db=False): + """Extracts the db/table properties mapping from the output of DESCRIBE FORMATTED""" + result = self.client.execute("describe {0} formatted {1}".format( + "database" if is_db else "", name)) + match = False + properties = dict() + for row in result.data: + fields = row.split("\t") + if fields[0] != '': + # Start of new section. + if match: + # Finished processing matching section. + break + match = section_name in fields[0] + elif match: + if fields[1] == 'NULL': + break + properties[fields[1].rstrip()] = fields[2].rstrip() + return properties + + @SkipIfCatalogV2.data_sources_unsupported() + def test_verify_jdbc_table_properties(self, vector): + jdbc_tbl_name = "functional.alltypes_jdbc_datasource" + properties = self._get_tbl_properties(jdbc_tbl_name) + # Verify data source related table properties + assert properties['__IMPALA_DATA_SOURCE_NAME'] == 'jdbcdatasource' + assert properties['__IMPALA_DATA_SOURCE_LOCATION'] == \ + 'hdfs://localhost:20500/test-warehouse/data-sources/jdbc-data-source.jar' + assert properties['__IMPALA_DATA_SOURCE_CLASS'] == \ + 'org.apache.impala.extdatasource.jdbc.JdbcDataSource' + assert properties['__IMPALA_DATA_SOURCE_API_VERSION'] == 'V1' + assert 'database.type\\":\\"POSTGRES' \ + in properties['__IMPALA_DATA_SOURCE_INIT_STRING'] + assert 'table\\":\\"alltypes' \ + in properties['__IMPALA_DATA_SOURCE_INIT_STRING'] + + @SkipIfCatalogV2.data_sources_unsupported() + def test_data_source_tables(self, vector): + self.run_test_case('QueryTest/data-source-tables', vector) + + @SkipIfCatalogV2.data_sources_unsupported() + def test_jdbc_data_source(self, vector): + self.run_test_case('QueryTest/jdbc-data-source', vector) diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py index 1c7d9de67a..0ae78b304d 100644 --- a/tests/query_test/test_queries.py +++ b/tests/query_test/test_queries.py @@ -236,10 +236,6 @@ def test_strict_mode(self, vector): vector.get_value('exec_option')['abort_on_error'] = 1 self.run_test_case('QueryTest/strict-mode-abort', vector) - @SkipIfCatalogV2.data_sources_unsupported() - def test_data_source_tables(self, vector): - self.run_test_case('QueryTest/data-source-tables', vector) - def test_range_constant_propagation(self, vector): self.run_test_case('QueryTest/range-constant-propagation', vector)