-
Notifications
You must be signed in to change notification settings - Fork 34
Database-based Argument Setter #77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
CuriousVini
merged 1 commit into
data-integrations:develop
from
AdaptiveScale:feature/db-argument-setter
Aug 6, 2020
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
138 changes: 138 additions & 0 deletions
138
database-commons/src/main/java/io/cdap/plugin/db/batch/action/AbstractDBArgumentSetter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
/* | ||
* Copyright © 2020 Cask Data, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package io.cdap.plugin.db.batch.action; | ||
|
||
import io.cdap.cdap.etl.api.FailureCollector; | ||
import io.cdap.cdap.etl.api.PipelineConfigurer; | ||
import io.cdap.cdap.etl.api.StageConfigurer; | ||
import io.cdap.cdap.etl.api.action.Action; | ||
import io.cdap.cdap.etl.api.action.ActionContext; | ||
import io.cdap.cdap.etl.api.action.SettableArguments; | ||
import io.cdap.plugin.db.ConnectionConfig; | ||
import io.cdap.plugin.util.DBUtils; | ||
import io.cdap.plugin.util.DriverCleanup; | ||
|
||
import java.sql.Connection; | ||
import java.sql.Driver; | ||
import java.sql.DriverManager; | ||
import java.sql.ResultSet; | ||
import java.sql.SQLException; | ||
import java.sql.Statement; | ||
import java.util.Properties; | ||
|
||
/** | ||
* Action that converts db column into pipeline argument. | ||
*/ | ||
public class AbstractDBArgumentSetter extends Action { | ||
|
||
private static final String JDBC_PLUGIN_ID = "driver"; | ||
private final ArgumentSetterConfig config; | ||
|
||
public AbstractDBArgumentSetter(ArgumentSetterConfig config) { | ||
this.config = config; | ||
} | ||
|
||
@Override | ||
public void run(ActionContext context) throws Exception { | ||
Class<? extends Driver> driverClass = context.loadPluginClass(JDBC_PLUGIN_ID); | ||
FailureCollector failureCollector = context.getFailureCollector(); | ||
SettableArguments settableArguments = context.getArguments(); | ||
processArguments(driverClass, failureCollector, settableArguments); | ||
} | ||
|
||
@Override | ||
public void configurePipeline(PipelineConfigurer pipelineConfigurer) | ||
throws IllegalArgumentException { | ||
DBUtils.validateJDBCPluginPipeline(pipelineConfigurer, config, JDBC_PLUGIN_ID); | ||
Class<? extends Driver> driverClass = DBUtils.getDriverClass( | ||
pipelineConfigurer, config, ConnectionConfig.JDBC_PLUGIN_TYPE); | ||
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); | ||
FailureCollector collector = stageConfigurer.getFailureCollector(); | ||
config.validate(collector); | ||
try { | ||
processArguments(driverClass, collector, null); | ||
} catch (SQLException e) { | ||
collector.addFailure("SQL error while executing query: " + e.getMessage(), null) | ||
.withStacktrace(e.getStackTrace()); | ||
} catch (IllegalAccessException | InstantiationException e) { | ||
collector.addFailure("Unable to instantiate JDBC driver: " + e.getMessage(), null) | ||
.withStacktrace(e.getStackTrace()); | ||
} catch (Exception e) { | ||
collector.addFailure(e.getMessage(), null).withStacktrace(e.getStackTrace()); | ||
} | ||
} | ||
|
||
/** | ||
* Creates connection to database. Reads row from database based on selection conditions and | ||
* depending on whether settable arguments is provided or not set the argument from row columns. | ||
* | ||
* @param driverClass {@link Class<? extends Driver>} | ||
* @param failureCollector {@link FailureCollector} | ||
* @param settableArguments {@link SettableArguments} | ||
* @throws SQLException is raised when there is sql related exception | ||
* @throws IllegalAccessException is raised when there is access related exception | ||
* @throws InstantiationException is raised when there is class/driver issue | ||
*/ | ||
private void processArguments(Class<? extends Driver> driverClass, | ||
FailureCollector failureCollector, SettableArguments settableArguments) | ||
throws SQLException, IllegalAccessException, InstantiationException { | ||
DriverCleanup driverCleanup; | ||
|
||
driverCleanup = DBUtils.ensureJDBCDriverIsAvailable(driverClass, config.getConnectionString(), | ||
config.jdbcPluginName); | ||
Properties connectionProperties = new Properties(); | ||
connectionProperties.putAll(config.getConnectionArguments()); | ||
try { | ||
Connection connection = DriverManager | ||
.getConnection(config.getConnectionString(), connectionProperties); | ||
Statement statement = connection.createStatement(); | ||
ResultSet resultSet = statement.executeQuery(config.getQuery()); | ||
boolean hasRecord = resultSet.next(); | ||
if (!hasRecord) { | ||
failureCollector.addFailure("No record found.", | ||
"The argument selection conditions must match only one record."); | ||
return; | ||
} | ||
if (settableArguments != null) { | ||
setArguments(resultSet, failureCollector, settableArguments); | ||
} | ||
if (resultSet.next()) { | ||
failureCollector | ||
.addFailure("More than one records found.", | ||
"The argument selection conditions must match only one record."); | ||
} | ||
} finally { | ||
driverCleanup.destroy(); | ||
} | ||
} | ||
|
||
/** | ||
* Converts column from jdbc results set into pipeline arguments | ||
* | ||
* @param resultSet - result set from db {@link ResultSet} | ||
* @param failureCollector - context failure collector @{link FailureCollector} | ||
* @param arguments - context argument setter {@link SettableArguments} | ||
* @throws SQLException - raises {@link SQLException} when configuration is not valid | ||
*/ | ||
private void setArguments(ResultSet resultSet, FailureCollector failureCollector, | ||
SettableArguments arguments) throws SQLException { | ||
String[] columns = config.getArgumentsColumns().split(","); | ||
for (String column : columns) { | ||
arguments.set(column, resultSet.getString(column)); | ||
} | ||
} | ||
} |
141 changes: 141 additions & 0 deletions
141
database-commons/src/main/java/io/cdap/plugin/db/batch/action/ArgumentSetterConfig.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
/* | ||
* Copyright © 2020 Cask Data, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package io.cdap.plugin.db.batch.action; | ||
|
||
import com.google.common.base.Strings; | ||
import io.cdap.cdap.api.annotation.Description; | ||
import io.cdap.cdap.api.annotation.Macro; | ||
import io.cdap.cdap.api.annotation.Name; | ||
import io.cdap.cdap.etl.api.FailureCollector; | ||
import io.cdap.plugin.db.ConnectionConfig; | ||
|
||
/** | ||
* Config for ArgumentSetter reading from database | ||
*/ | ||
public abstract class ArgumentSetterConfig extends ConnectionConfig { | ||
|
||
public static final String DATABASE_NAME = "databaseName"; | ||
public static final String TABLE_NAME = "tableName"; | ||
public static final String ARGUMENT_SELECTION_CONDITIONS = "argumentSelectionConditions"; | ||
public static final String ARGUMENTS_COLUMNS = "argumentsColumns"; | ||
|
||
@Name(ConnectionConfig.CONNECTION_STRING) | ||
@Description("JDBC connection string including database name.") | ||
@Macro | ||
public String connectionString; | ||
|
||
@Name(DATABASE_NAME) | ||
@Description("The name of the database which contains\n" | ||
+ "the configuration table") | ||
@Macro | ||
String databaseName; | ||
|
||
@Name(TABLE_NAME) | ||
@Description("The name of the table in the database\n" | ||
+ "containing the configurations for the pipeline") | ||
@Macro | ||
String tableName; | ||
|
||
@Name(ARGUMENT_SELECTION_CONDITIONS) | ||
@Description("A set of conditions for identifying the\n" | ||
+ "arguments to run a pipeline. Users can\n" | ||
+ "specify multiple conditions in the format\n" | ||
+ "column1=<column1-value>;column2=<colum\n" | ||
+ "n2-value>. A particular use case for this\n" | ||
+ "would be feed=marketing AND\n" | ||
+ "date=20200427. The conditions specified\n" | ||
+ "should be logically ANDed to determine the\n" | ||
+ "arguments for a run. When the conditions are\n" | ||
+ "applied, the table should return exactly 1 row.\n" | ||
+ "If it doesn’t return any rows, or if it returns\n" | ||
+ "multiple rows, the pipeline should abort with\n" | ||
+ "appropriate errors. Typically, users should\n" | ||
+ "use macros in this field, so that they can\n" | ||
+ "specify the conditions at runtime.") | ||
@Macro | ||
String argumentSelectionConditions; | ||
|
||
@Name(ARGUMENTS_COLUMNS) | ||
@Description("Names of the columns that contain the\n" | ||
+ "arguments for this run. The values of this\n" | ||
+ "columns in the row that satisfies the argument\n" | ||
+ "selection conditions determines the\n" | ||
+ "arguments for the pipeline run") | ||
@Macro | ||
String argumentsColumns; | ||
|
||
public String getDatabaseName() { | ||
return databaseName; | ||
} | ||
|
||
public String getTableName() { | ||
return tableName; | ||
} | ||
|
||
public String getArgumentSelectionConditions() { | ||
return argumentSelectionConditions; | ||
} | ||
|
||
public String getArgumentsColumns() { | ||
return argumentsColumns; | ||
} | ||
|
||
public String getQuery() { | ||
if (this.getArgumentSelectionConditions() == null) { | ||
throw new IllegalArgumentException("Argument selection conditions are empty."); | ||
} | ||
String[] split = this.getArgumentSelectionConditions().split(";"); | ||
String conditions = String.join(" AND ", split); | ||
|
||
return String | ||
.format("SELECT %s FROM %s WHERE %s", this.getArgumentsColumns(), this.getTableName(), | ||
conditions); | ||
} | ||
|
||
/** | ||
* Validates config input fields. | ||
* | ||
* @param collector context failure collector {@link FailureCollector} | ||
*/ | ||
public void validate(FailureCollector collector) { | ||
if (!containsMacro(CONNECTION_STRING) && Strings.isNullOrEmpty(this.connectionString)) { | ||
collector.addFailure("Invalid connection string.", "Connection string cannot be empty."); | ||
} | ||
if (!containsMacro(ConnectionConfig.USER) && Strings.isNullOrEmpty(this.user)) { | ||
collector.addFailure("Invalid username.", "Username cannot be empty."); | ||
} | ||
if (!containsMacro(ConnectionConfig.PASSWORD) && Strings.isNullOrEmpty(this.password)) { | ||
collector.addFailure("Invalid password.", "Password cannot be empty."); | ||
} | ||
if (!containsMacro(DATABASE_NAME) && Strings.isNullOrEmpty(this.getDatabaseName())) { | ||
collector.addFailure("Invalid database.", "Valid database must be specified."); | ||
} | ||
if (!containsMacro(TABLE_NAME) && Strings.isNullOrEmpty(this.getTableName())) { | ||
collector.addFailure("Invalid table.", "Valid table must be specified."); | ||
} | ||
if (!containsMacro(ARGUMENTS_COLUMNS) && Strings.isNullOrEmpty(this.getArgumentsColumns())) { | ||
collector | ||
.addFailure("Invalid arguments columns.", "Arguments column names must be specified."); | ||
} | ||
if (!containsMacro(ARGUMENT_SELECTION_CONDITIONS) && Strings | ||
.isNullOrEmpty(this.getArgumentSelectionConditions())) { | ||
collector | ||
.addFailure("Invalid conditions.", "Filter conditions must be specified."); | ||
} | ||
collector.getOrThrowException(); | ||
} | ||
} |
48 changes: 48 additions & 0 deletions
48
generic-database-plugin/docs/DatabaseArgumentSetter-action.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# Database Action | ||
|
||
|
||
Description | ||
----------- | ||
Action that converts table column into pipeline arguments. | ||
|
||
|
||
Use Case | ||
-------- | ||
The action can be used whenever you want to pass data from database as arguments to a data pipeline. | ||
For example, you may want to read a particular table at runtime to set arguments for the pipeline. | ||
|
||
|
||
Properties | ||
---------- | ||
**Driver Name:** Name of the JDBC driver to use. | ||
|
||
**Connection String:** JDBC connection string including database name. | ||
|
||
**Database:** Database name. | ||
|
||
**Table:** Table name. | ||
|
||
**Argument selection conditions:** A set of conditions for identifying the arguments to run a pipeline. Multiple conditions can be specified in the format | ||
column1=<column1-value>;column2=<column2-value>. | ||
|
||
**Username:** User identity for connecting to the specified database. | ||
|
||
**Password:** Password to use to connect to the specified database. | ||
|
||
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments | ||
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. | ||
|
||
**Arguments column:** Name of the column that contains the arguments. | ||
|
||
Example | ||
------- | ||
Suppose you have a configuration management database and you want to read a particular table at runtime to set arguments for the pipeline, including the source configuration, the transformations, the sink configuration, etc. | ||
|
||
``` | ||
Driver Name: "postgres" | ||
Connection String: "jdbc:postgresql://localhost:5432/configuration" | ||
Database: "configuration" | ||
Table: "files" | ||
Argument selection conditions: "file_id=1" | ||
Arguments column: "file_name" | ||
``` |
51 changes: 51 additions & 0 deletions
51
generic-database-plugin/src/main/java/io/cdap/plugin/jdbc/DatabaseArgumentSetter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
* Copyright © 2020 Cask Data, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package io.cdap.plugin.jdbc; | ||
|
||
import io.cdap.cdap.api.annotation.Description; | ||
import io.cdap.cdap.api.annotation.Name; | ||
import io.cdap.cdap.api.annotation.Plugin; | ||
import io.cdap.cdap.etl.api.action.Action; | ||
import io.cdap.plugin.db.batch.action.AbstractDBArgumentSetter; | ||
import io.cdap.plugin.db.batch.action.ArgumentSetterConfig; | ||
|
||
/** | ||
* Action that runs a db command | ||
*/ | ||
@Plugin(type = Action.PLUGIN_TYPE) | ||
@Name("DatabaseArgumentSetter") | ||
@Description("Reads single record from database table and converts it to arguments for pipeline") | ||
public class DatabaseArgumentSetter extends AbstractDBArgumentSetter { | ||
|
||
private final DatabaseArgumentSetterConfig config; | ||
|
||
public DatabaseArgumentSetter(DatabaseArgumentSetterConfig config) { | ||
super(config); | ||
this.config = config; | ||
} | ||
|
||
/** | ||
* Database action databaseActionConfig. | ||
*/ | ||
public static class DatabaseArgumentSetterConfig extends ArgumentSetterConfig { | ||
|
||
@Override | ||
public String getConnectionString() { | ||
return connectionString; | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does resultset has -
hasNext()
method? we should use that method.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@CuriousVini resulset does not have hasNext() instead it has next() which moves the cursor to next record and returns true/false based on whether record is available or not.