diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/action/AbstractDBArgumentSetter.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/action/AbstractDBArgumentSetter.java new file mode 100644 index 000000000..b6eddb99d --- /dev/null +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/action/AbstractDBArgumentSetter.java @@ -0,0 +1,138 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.batch.action; + +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.StageConfigurer; +import io.cdap.cdap.etl.api.action.Action; +import io.cdap.cdap.etl.api.action.ActionContext; +import io.cdap.cdap.etl.api.action.SettableArguments; +import io.cdap.plugin.db.ConnectionConfig; +import io.cdap.plugin.util.DBUtils; +import io.cdap.plugin.util.DriverCleanup; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; + +/** + * Action that converts db column into pipeline argument. + */ +public class AbstractDBArgumentSetter extends Action { + + private static final String JDBC_PLUGIN_ID = "driver"; + private final ArgumentSetterConfig config; + + public AbstractDBArgumentSetter(ArgumentSetterConfig config) { + this.config = config; + } + + @Override + public void run(ActionContext context) throws Exception { + Class driverClass = context.loadPluginClass(JDBC_PLUGIN_ID); + FailureCollector failureCollector = context.getFailureCollector(); + SettableArguments settableArguments = context.getArguments(); + processArguments(driverClass, failureCollector, settableArguments); + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) + throws IllegalArgumentException { + DBUtils.validateJDBCPluginPipeline(pipelineConfigurer, config, JDBC_PLUGIN_ID); + Class driverClass = DBUtils.getDriverClass( + pipelineConfigurer, config, ConnectionConfig.JDBC_PLUGIN_TYPE); + StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); + FailureCollector collector = stageConfigurer.getFailureCollector(); + config.validate(collector); + try { + processArguments(driverClass, collector, null); + } catch (SQLException e) { + collector.addFailure("SQL error while executing query: " + e.getMessage(), null) + .withStacktrace(e.getStackTrace()); + } catch (IllegalAccessException | InstantiationException e) { + collector.addFailure("Unable to instantiate JDBC driver: " + e.getMessage(), null) + .withStacktrace(e.getStackTrace()); + } catch (Exception e) { + collector.addFailure(e.getMessage(), null).withStacktrace(e.getStackTrace()); + } + } + + /** + * Creates connection to database. Reads row from database based on selection conditions and + * depending on whether settable arguments is provided or not set the argument from row columns. + * + * @param driverClass {@link Class} + * @param failureCollector {@link FailureCollector} + * @param settableArguments {@link SettableArguments} + * @throws SQLException is raised when there is sql related exception + * @throws IllegalAccessException is raised when there is access related exception + * @throws InstantiationException is raised when there is class/driver issue + */ + private void processArguments(Class driverClass, + FailureCollector failureCollector, SettableArguments settableArguments) + throws SQLException, IllegalAccessException, InstantiationException { + DriverCleanup driverCleanup; + + driverCleanup = DBUtils.ensureJDBCDriverIsAvailable(driverClass, config.getConnectionString(), + config.jdbcPluginName); + Properties connectionProperties = new Properties(); + connectionProperties.putAll(config.getConnectionArguments()); + try { + Connection connection = DriverManager + .getConnection(config.getConnectionString(), connectionProperties); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(config.getQuery()); + boolean hasRecord = resultSet.next(); + if (!hasRecord) { + failureCollector.addFailure("No record found.", + "The argument selection conditions must match only one record."); + return; + } + if (settableArguments != null) { + setArguments(resultSet, failureCollector, settableArguments); + } + if (resultSet.next()) { + failureCollector + .addFailure("More than one records found.", + "The argument selection conditions must match only one record."); + } + } finally { + driverCleanup.destroy(); + } + } + + /** + * Converts column from jdbc results set into pipeline arguments + * + * @param resultSet - result set from db {@link ResultSet} + * @param failureCollector - context failure collector @{link FailureCollector} + * @param arguments - context argument setter {@link SettableArguments} + * @throws SQLException - raises {@link SQLException} when configuration is not valid + */ + private void setArguments(ResultSet resultSet, FailureCollector failureCollector, + SettableArguments arguments) throws SQLException { + String[] columns = config.getArgumentsColumns().split(","); + for (String column : columns) { + arguments.set(column, resultSet.getString(column)); + } + } +} diff --git a/database-commons/src/main/java/io/cdap/plugin/db/batch/action/ArgumentSetterConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/batch/action/ArgumentSetterConfig.java new file mode 100644 index 000000000..51803487b --- /dev/null +++ b/database-commons/src/main/java/io/cdap/plugin/db/batch/action/ArgumentSetterConfig.java @@ -0,0 +1,141 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.batch.action; + +import com.google.common.base.Strings; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.plugin.db.ConnectionConfig; + +/** + * Config for ArgumentSetter reading from database + */ +public abstract class ArgumentSetterConfig extends ConnectionConfig { + + public static final String DATABASE_NAME = "databaseName"; + public static final String TABLE_NAME = "tableName"; + public static final String ARGUMENT_SELECTION_CONDITIONS = "argumentSelectionConditions"; + public static final String ARGUMENTS_COLUMNS = "argumentsColumns"; + + @Name(ConnectionConfig.CONNECTION_STRING) + @Description("JDBC connection string including database name.") + @Macro + public String connectionString; + + @Name(DATABASE_NAME) + @Description("The name of the database which contains\n" + + "the configuration table") + @Macro + String databaseName; + + @Name(TABLE_NAME) + @Description("The name of the table in the database\n" + + "containing the configurations for the pipeline") + @Macro + String tableName; + + @Name(ARGUMENT_SELECTION_CONDITIONS) + @Description("A set of conditions for identifying the\n" + + "arguments to run a pipeline. Users can\n" + + "specify multiple conditions in the format\n" + + "column1=;column2=. A particular use case for this\n" + + "would be feed=marketing AND\n" + + "date=20200427. The conditions specified\n" + + "should be logically ANDed to determine the\n" + + "arguments for a run. When the conditions are\n" + + "applied, the table should return exactly 1 row.\n" + + "If it doesn’t return any rows, or if it returns\n" + + "multiple rows, the pipeline should abort with\n" + + "appropriate errors. Typically, users should\n" + + "use macros in this field, so that they can\n" + + "specify the conditions at runtime.") + @Macro + String argumentSelectionConditions; + + @Name(ARGUMENTS_COLUMNS) + @Description("Names of the columns that contain the\n" + + "arguments for this run. The values of this\n" + + "columns in the row that satisfies the argument\n" + + "selection conditions determines the\n" + + "arguments for the pipeline run") + @Macro + String argumentsColumns; + + public String getDatabaseName() { + return databaseName; + } + + public String getTableName() { + return tableName; + } + + public String getArgumentSelectionConditions() { + return argumentSelectionConditions; + } + + public String getArgumentsColumns() { + return argumentsColumns; + } + + public String getQuery() { + if (this.getArgumentSelectionConditions() == null) { + throw new IllegalArgumentException("Argument selection conditions are empty."); + } + String[] split = this.getArgumentSelectionConditions().split(";"); + String conditions = String.join(" AND ", split); + + return String + .format("SELECT %s FROM %s WHERE %s", this.getArgumentsColumns(), this.getTableName(), + conditions); + } + + /** + * Validates config input fields. + * + * @param collector context failure collector {@link FailureCollector} + */ + public void validate(FailureCollector collector) { + if (!containsMacro(CONNECTION_STRING) && Strings.isNullOrEmpty(this.connectionString)) { + collector.addFailure("Invalid connection string.", "Connection string cannot be empty."); + } + if (!containsMacro(ConnectionConfig.USER) && Strings.isNullOrEmpty(this.user)) { + collector.addFailure("Invalid username.", "Username cannot be empty."); + } + if (!containsMacro(ConnectionConfig.PASSWORD) && Strings.isNullOrEmpty(this.password)) { + collector.addFailure("Invalid password.", "Password cannot be empty."); + } + if (!containsMacro(DATABASE_NAME) && Strings.isNullOrEmpty(this.getDatabaseName())) { + collector.addFailure("Invalid database.", "Valid database must be specified."); + } + if (!containsMacro(TABLE_NAME) && Strings.isNullOrEmpty(this.getTableName())) { + collector.addFailure("Invalid table.", "Valid table must be specified."); + } + if (!containsMacro(ARGUMENTS_COLUMNS) && Strings.isNullOrEmpty(this.getArgumentsColumns())) { + collector + .addFailure("Invalid arguments columns.", "Arguments column names must be specified."); + } + if (!containsMacro(ARGUMENT_SELECTION_CONDITIONS) && Strings + .isNullOrEmpty(this.getArgumentSelectionConditions())) { + collector + .addFailure("Invalid conditions.", "Filter conditions must be specified."); + } + collector.getOrThrowException(); + } +} diff --git a/generic-database-plugin/docs/DatabaseArgumentSetter-action.md b/generic-database-plugin/docs/DatabaseArgumentSetter-action.md new file mode 100644 index 000000000..bf5dcaaf3 --- /dev/null +++ b/generic-database-plugin/docs/DatabaseArgumentSetter-action.md @@ -0,0 +1,48 @@ +# Database Action + + +Description +----------- +Action that converts table column into pipeline arguments. + + +Use Case +-------- +The action can be used whenever you want to pass data from database as arguments to a data pipeline. +For example, you may want to read a particular table at runtime to set arguments for the pipeline. + + +Properties +---------- +**Driver Name:** Name of the JDBC driver to use. + +**Connection String:** JDBC connection string including database name. + +**Database:** Database name. + +**Table:** Table name. + +**Argument selection conditions:** A set of conditions for identifying the arguments to run a pipeline. Multiple conditions can be specified in the format + column1=;column2=. + +**Username:** User identity for connecting to the specified database. + +**Password:** Password to use to connect to the specified database. + +**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments +will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. + +**Arguments column:** Name of the column that contains the arguments. + +Example +------- +Suppose you have a configuration management database and you want to read a particular table at runtime to set arguments for the pipeline, including the source configuration, the transformations, the sink configuration, etc. + +``` +Driver Name: "postgres" +Connection String: "jdbc:postgresql://localhost:5432/configuration" +Database: "configuration" +Table: "files" +Argument selection conditions: "file_id=1" +Arguments column: "file_name" +``` diff --git a/generic-database-plugin/src/main/java/io/cdap/plugin/jdbc/DatabaseArgumentSetter.java b/generic-database-plugin/src/main/java/io/cdap/plugin/jdbc/DatabaseArgumentSetter.java new file mode 100644 index 000000000..35e9e3be5 --- /dev/null +++ b/generic-database-plugin/src/main/java/io/cdap/plugin/jdbc/DatabaseArgumentSetter.java @@ -0,0 +1,51 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.jdbc; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.action.Action; +import io.cdap.plugin.db.batch.action.AbstractDBArgumentSetter; +import io.cdap.plugin.db.batch.action.ArgumentSetterConfig; + +/** + * Action that runs a db command + */ +@Plugin(type = Action.PLUGIN_TYPE) +@Name("DatabaseArgumentSetter") +@Description("Reads single record from database table and converts it to arguments for pipeline") +public class DatabaseArgumentSetter extends AbstractDBArgumentSetter { + + private final DatabaseArgumentSetterConfig config; + + public DatabaseArgumentSetter(DatabaseArgumentSetterConfig config) { + super(config); + this.config = config; + } + + /** + * Database action databaseActionConfig. + */ + public static class DatabaseArgumentSetterConfig extends ArgumentSetterConfig { + + @Override + public String getConnectionString() { + return connectionString; + } + } +} diff --git a/generic-database-plugin/widgets/DatabaseArgumentSetter-action.json b/generic-database-plugin/widgets/DatabaseArgumentSetter-action.json new file mode 100644 index 000000000..699ca6dc1 --- /dev/null +++ b/generic-database-plugin/widgets/DatabaseArgumentSetter-action.json @@ -0,0 +1,95 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "Database Argument Setter", + "configuration-groups": [ + { + "label": "Connection", + "properties": [ + { + "widget-type": "textbox", + "label": "Plugin Name", + "name": "jdbcPluginName" + }, + { + "widget-type": "textbox", + "label": "Plugin Type", + "name": "jdbcPluginType", + "widget-attributes": { + "default": "jdbc" + } + }, + { + "widget-type": "textbox", + "label": "Connection String", + "name": "connectionString" + } + ] + }, + { + "label": "Credentials", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "user" + }, + { + "widget-type": "password", + "label": "Password", + "name": "password" + } + ] + }, + { + "label": "Configuration", + "properties": [ + { + "widget-type": "text", + "label": "Database", + "name": "databaseName" + }, + { + "widget-type": "text", + "label": "Table", + "name": "tableName" + }, + { + "widget-type": "keyvalue", + "label": "Argument Selection Conditions", + "name": "argumentSelectionConditions", + "widget-attributes": { + "showDelimiter": "false", + "key-placeholder": "Key", + "value-placeholder": "Value", + "kv-delimiter" : "=", + "delimiter" : ";" + } + }, + { + "widget-type": "text", + "label": "Arguments column", + "name": "argumentsColumns" + } + ] + }, + { + "label": "Advanced", + "properties": [ + { + "widget-type": "keyvalue", + "label": "Connection Arguments", + "name": "connectionArguments", + "widget-attributes": { + "showDelimiter": "false", + "key-placeholder": "Key", + "value-placeholder": "Value", + "kv-delimiter": "=", + "delimiter": ";" + } + } + ] + } + ] +}