diff --git a/mariadb-plugin/docs/Mariadb-action.md b/mariadb-plugin/docs/Mariadb-action.md
new file mode 100644
index 000000000..08344b927
--- /dev/null
+++ b/mariadb-plugin/docs/Mariadb-action.md
@@ -0,0 +1,69 @@
+# MariaDB Action
+
+
+Description
+-----------
+Action that runs a MariaDB command.
+
+
+Use Case
+--------
+The action can be used whenever you want to run a MariaDB command before or after a data pipeline.
+For example, you may want to run a sql update command on a database before the pipeline source pulls data from tables.
+
+
+Properties
+----------
+**Driver Name:** Name of the JDBC driver to use.
+
+**Database Query:** Database query to execute.
+
+**Host:** Host that MariaDB is running on.
+
+**Port:** Port that MariaDB is running on.
+
+**Database:** MariaDB database name.
+
+**Username:** User identity for connecting to the specified database.
+
+**Password:** Password to use to connect to the specified database.
+
+**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
+will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
+
+**Auto Reconnect:** Should the driver try to re-establish stale and/or dead connections.
+
+**Use SSL:** Turns on SSL encryption. The connection will fail if SSL is not available.
+
+**Keystore URL:** URL to the client certificate KeyStore (if not specified, use defaults). Must be accessible at the
+same location on host where CDAP Master is running and all hosts on which at least one HDFS, MapReduce, or YARN daemon
+role is running.
+
+**Keystore Password:** Password for the client certificates KeyStore.
+
+**Truststore URL:** URL to the trusted root certificate KeyStore (if not specified, use defaults). Must be accessible at
+the same location on host where CDAP Master is running and all hosts on which at least one HDFS, MapReduce, or YARN
+daemon role is running.
+
+**Truststore Password:** Password for the trusted root certificates KeyStore
+
+**Use Compression:** Use zlib compression when communicating with the server. Select this option for WAN
+connections.
+
+**Use ANSI Quotes:** Treats " as an identifier quote character and not as a string quote character.
+
+**SQL_MODE:** Override the default SQL_MODE session variable used by the server.
+
+
+Example
+-------
+Suppose you want to execute a query against a MariaDB database named "prod" that is running on "localhost"
+port 3306, then configure the plugin with:
+
+```
+Driver Name: "mariadb"
+Database Query: "UPDATE table_name SET price = 20 WHERE ID = 6"
+Host: "localhost"
+Port: 3306
+Database: "prod"
+```
diff --git a/mariadb-plugin/docs/Mariadb-batchsink.md b/mariadb-plugin/docs/Mariadb-batchsink.md
new file mode 100644
index 000000000..11176c0db
--- /dev/null
+++ b/mariadb-plugin/docs/Mariadb-batchsink.md
@@ -0,0 +1,113 @@
+# MariaDB Batch Sink
+
+
+Description
+-----------
+Writes records to a MariaDB table. Each record will be written to a row in the table.
+
+
+Use Case
+--------
+This sink is used whenever you need to write to a MariaDB table.
+Suppose you periodically build a recommendation model for products on your online store.
+The model is stored in a FileSet and you want to export the contents
+of the FileSet to a MariaDB table where it can be served to your users.
+
+Column names would be autodetected from input schema.
+
+Properties
+----------
+**Reference Name:** Name used to uniquely identify this sink for lineage, annotating metadata, etc.
+
+**Driver Name:** Name of the JDBC driver to use.
+
+**Host:** Host that MariaDB is running on.
+
+**Port:** Port that MariaDB is running on.
+
+**Database:** MariaDB database name.
+
+**Table Name:** Name of the table to export to.
+
+**Username:** User identity for connecting to the specified database.
+
+**Password:** Password to use to connect to the specified database.
+
+**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
+will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
+
+**Auto Reconnect:** Should the driver try to re-establish stale and/or dead connections.
+
+**Use SSL:** Turns on SSL encryption. The connection will fail if SSL is not available.
+
+**Keystore URL:** URL to the client certificate KeyStore (if not specified, use defaults). Must be accessible at the
+same location on host where CDAP Master is running and all hosts on which at least one HDFS, MapReduce, or YARN daemon
+role is running.
+
+**Keystore Password:** Password for the client certificates KeyStore.
+
+**Truststore URL:** URL to the trusted root certificate KeyStore (if not specified, use defaults). Must be accessible at
+the same location on host where CDAP Master is running and all hosts on which at least one HDFS, MapReduce, or YARN
+daemon role is running.
+
+**Truststore Password:** Password for the trusted root certificates KeyStore
+
+**Use Compression:** Use zlib compression when communicating with the server. Select this option for WAN
+connections.
+
+**SQL_MODE:** Override the default SQL_MODE session variable used by the server.
+
+
+Data Types Mapping
+----------
+ +--------------------------------+-----------------------+------------------------------------+
+ | MariaDB Data Type | CDAP Schema Data Type | Comment |
+ +--------------------------------+-----------------------+------------------------------------+
+ | TINYINT | int | |
+ | BOOLEAN, BOOL | boolean | |
+ | SMALLINT | int | |
+ | MEDIUMINT | int | |
+ | INT, INTEGER | int | |
+ | BIGINT | long | |
+ | DECIMAL, DEC, NUMERIC, FIXED | decimal | |
+ | FLOAT | float | |
+ | DOUBLE, DOUBLE PRECISION, REAL | decimal | |
+ | BIT | boolean | |
+ | CHAR | string | |
+ | VARCHAR | string | |
+ | BINARY | bytes | |
+ | CHAR BYTE | bytes | |
+ | VARBINARY | bytes | |
+ | TINYBLOB | bytes | |
+ | BLOB | bytes | |
+ | MEDIUMBLOB | bytes | |
+ | LONGBLOB | bytes | |
+ | TINYTEXT | string | |
+ | TEXT | string | |
+ | MEDIUMTEXT | string | |
+ | LONGTEXT | string | |
+ | JSON | string | In MariaDB it is alias to LONGTEXT |
+ | ENUM | string | Mapping to String by default |
+ | SET | string | |
+ | DATE | date | |
+ | TIME | time_micros | |
+ | DATETIME | timestamp_micros | |
+ | TIMESTAMP | timestamp_micros | |
+ | YEAR | date | |
+ +--------------------------------+-----------------------+------------------------------------+
+
+Example
+-------
+Suppose you want to write output records to "users" table of MariaDB database named "prod" that is running on "localhost",
+port 3306, as "root" user with "root" password, then configure the plugin with:
+
+```
+Reference Name: "snk1"
+Driver Name: "mariadb"
+Host: "localhost"
+Port: 3306
+Database: "prod"
+Table Name: "users"
+Username: "root"
+Password: "root"
+```
diff --git a/mariadb-plugin/docs/Mariadb-batchsource.md b/mariadb-plugin/docs/Mariadb-batchsource.md
new file mode 100644
index 000000000..44c9db79d
--- /dev/null
+++ b/mariadb-plugin/docs/Mariadb-batchsource.md
@@ -0,0 +1,145 @@
+# MariaDB Batch Source
+
+
+Description
+-----------
+Reads from a MariaDB instance using a configurable SQL query.
+Outputs one record for each row returned by the query.
+
+
+Use Case
+--------
+The source is used whenever you need to read from a MariaDB instance. For example, you may want
+to create daily snapshots of a database table by using this source and writing to
+a TimePartitionedFileSet.
+
+
+Properties
+----------
+**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc.
+
+**Driver Name:** Name of the JDBC driver to use.
+
+**Host:** Host that MariaDB is running on.
+
+**Port:** Port that MariaDB is running on.
+
+**Database:** MariaDB database name.
+
+**Import Query:** The SELECT query to use to import data from the specified table.
+You can specify an arbitrary number of columns to import, or import all columns using \*. The Query should
+contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDITIONS'.
+The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query.
+The '$CONDITIONS' string is not required if numSplits is set to one.
+
+**Bounding Query:** Bounding Query should return the min and max of the values of the 'splitBy' field.
+For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one.
+
+**Split-By Field Name:** Field Name which will be used to generate splits. Not required if numSplits is set to one.
+
+**Number of Splits to Generate:** Number of splits to generate.
+
+**Username:** User identity for connecting to the specified database.
+
+**Password:** Password to use to connect to the specified database.
+
+**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
+will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
+
+**Auto Reconnect:** Should the driver try to re-establish stale and/or dead connections.
+
+**Schema:** The schema of records output by the source. This will be used in place of whatever schema comes
+back from the query. However, it must match the schema that comes back from the query,
+except it can mark fields as nullable and can contain a subset of the fields.
+
+**Use SSL:** Turns on SSL encryption. The connection will fail if SSL is not available.
+
+**Keystore URL:** URL to the client certificate KeyStore (if not specified, use defaults). Must be accessible at the
+same location on host where CDAP Master is running and all hosts on which at least one HDFS, MapReduce, or YARN daemon
+role is running.
+
+**Keystore Password:** Password for the client certificates KeyStore.
+
+**Truststore URL:** URL to the trusted root certificate KeyStore (if not specified, use defaults). Must be accessible at
+the same location on host where CDAP Master is running and all hosts on which at least one HDFS, MapReduce, or YARN
+daemon role is running.
+
+**Truststore Password:** Password for the trusted root certificates KeyStore
+
+**Use Compression:** Use zlib compression when communicating with the server. Select this option for WAN
+connections.
+
+**Use ANSI Quotes:** Treats " as an identifier quote character and not as a string quote character.
+
+**SQL_MODE:** Override the default SQL_MODE session variable used by the server.
+
+
+Data Types Mapping
+----------
+
+ +--------------------------------+-----------------------+------------------------------------+
+ | MariaDB Data Type | CDAP Schema Data Type | Comment |
+ +--------------------------------+-----------------------+------------------------------------+
+ | TINYINT | int | |
+ | BOOLEAN, BOOL | boolean | |
+ | SMALLINT | int | |
+ | MEDIUMINT | int | |
+ | INT, INTEGER | int | |
+ | BIGINT | long | |
+ | DECIMAL, DEC, NUMERIC, FIXED | decimal | |
+ | FLOAT | float | |
+ | DOUBLE, DOUBLE PRECISION, REAL | decimal | |
+ | BIT | boolean | |
+ | CHAR | string | |
+ | VARCHAR | string | |
+ | BINARY | bytes | |
+ | CHAR BYTE | bytes | |
+ | VARBINARY | bytes | |
+ | TINYBLOB | bytes | |
+ | BLOB | bytes | |
+ | MEDIUMBLOB | bytes | |
+ | LONGBLOB | bytes | |
+ | TINYTEXT | string | |
+ | TEXT | string | |
+ | MEDIUMTEXT | string | |
+ | LONGTEXT | string | |
+ | JSON | string | In MariaDB it is alias to LONGTEXT |
+ | ENUM | string | Mapping to String by default |
+ | SET | string | |
+ | DATE | date | |
+ | TIME | time_micros | |
+ | DATETIME | timestamp_micros | |
+ | TIMESTAMP | timestamp_micros | |
+ | YEAR | date | |
+ +--------------------------------+-----------------------+------------------------------------+
+
+
+Example
+------
+Suppose you want to read data from MariaDB database named "prod" that is running on "localhost" port 3306,
+as "root" user with "root" password, then configure plugin with:
+
+
+```
+Reference Name: "src1"
+Driver Name: "mariadb"
+Host: "localhost"
+Port: 3306
+Database: "prod"
+Import Query: "select id, name, email, phone from users;"
+Number of Splits to Generate: 1
+Username: "root"
+Password: "root"
+```
+
+For example, if the 'id' column is a primary key of type int and the other columns are
+non-nullable varchars, output records will have this schema:
+
+ +----------------+---------------------+
+ | Field Name | Type |
+ +----------------+---------------------+
+ | id | int |
+ | name | string |
+ | email | string |
+ | phone | string |
+ +----------------+---------------------+
diff --git a/mariadb-plugin/docs/Mariadb-postaction.md b/mariadb-plugin/docs/Mariadb-postaction.md
new file mode 100644
index 000000000..73c6310d5
--- /dev/null
+++ b/mariadb-plugin/docs/Mariadb-postaction.md
@@ -0,0 +1,83 @@
+# MariaDB Query Post-run Action
+
+
+Description
+-----------
+Runs a MariaDB query at the end of the pipeline run.
+Can be configured to run only on success, only on failure, or always at the end of the run.
+
+
+Use Case
+--------
+The action is used whenever you need to run a query at the end of a pipeline run.
+For example, you may have a pipeline that imports data from a database table to
+hdfs files. At the end of the run, you may want to run a query that deletes the data
+that was read from the table.
+
+
+Properties
+----------
+**Run Condition:** When to run the action. Must be 'completion', 'success', or 'failure'. Defaults to 'success'.
+If set to 'completion', the action will be executed regardless of whether the pipeline run succeeded or failed.
+If set to 'success', the action will only be executed if the pipeline run succeeded.
+If set to 'failure', the action will only be executed if the pipeline run failed.
+
+**Driver Name:** Name of the JDBC driver to use.
+
+**Query:** Query to run.
+
+**Host:** Host that MariaDB is running on.
+
+**Port:** Port that MariaDB is running on.
+
+**Database:** MariaDB database name.
+
+**Username:** User identity for connecting to the specified database.
+
+**Password:** Password to use to connect to the specified database.
+
+**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
+will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.
+
+**Auto Reconnect:** Should the driver try to re-establish stale and/or dead connections.
+
+**Enable Auto-Commit:** Whether to enable auto-commit for queries run by this source. Defaults to 'false'.
+Normally this setting does not matter. It only matters if you are using a jdbc driver -- like the Hive
+driver -- that will error when the commit operation is run, or a driver that will error when auto-commit is
+set to false. For drivers like those, you will need to set this to 'true'.
+
+**Use SSL:** Turns on SSL encryption. The connection will fail if SSL is not available.
+
+**Keystore URL:** URL to the client certificate KeyStore (if not specified, use defaults). Must be accessible at the
+same location on host where CDAP Master is running and all hosts on which at least one HDFS, MapReduce, or YARN daemon
+role is running.
+
+**Keystore Password:** Password for the client certificates KeyStore.
+
+**Truststore URL:** URL to the trusted root certificate KeyStore (if not specified, use defaults). Must be accessible at
+the same location on host where CDAP Master is running and all hosts on which at least one HDFS, MapReduce, or YARN
+daemon role is running.
+
+**Truststore Password:** Password for the trusted root certificates KeyStore
+
+**Use Compression:** Use zlib compression when communicating with the server. Select this option for WAN
+connections.
+
+**Use ANSI Quotes:** Treats " as an identifier quote character and not as a string quote character.
+
+**SQL_MODE:** Override the default SQL_MODE session variable used by the server.
+
+
+Example
+-------
+Suppose you want to delete all records from MariaDB table "userEvents" of database "prod" running on localhost, port 3306,
+without authentication using driver "mariadb" if the pipeline completes successfully, then configure the plugin with:
+
+```
+Run Condition: "success"
+Driver Name: "maridb"
+Query: "delete * from userEvents"
+Host: "localhost"
+Port: 3306
+Database: "prod"
+```
diff --git a/mariadb-plugin/icons/Mariadb-action.png b/mariadb-plugin/icons/Mariadb-action.png
new file mode 100644
index 000000000..5cb7e19c2
Binary files /dev/null and b/mariadb-plugin/icons/Mariadb-action.png differ
diff --git a/mariadb-plugin/icons/Mariadb-batchsink.png b/mariadb-plugin/icons/Mariadb-batchsink.png
new file mode 100644
index 000000000..5cb7e19c2
Binary files /dev/null and b/mariadb-plugin/icons/Mariadb-batchsink.png differ
diff --git a/mariadb-plugin/icons/Mariadb-batchsource.png b/mariadb-plugin/icons/Mariadb-batchsource.png
new file mode 100644
index 000000000..5cb7e19c2
Binary files /dev/null and b/mariadb-plugin/icons/Mariadb-batchsource.png differ
diff --git a/mariadb-plugin/icons/Mariadb-postaction.png b/mariadb-plugin/icons/Mariadb-postaction.png
new file mode 100644
index 000000000..5cb7e19c2
Binary files /dev/null and b/mariadb-plugin/icons/Mariadb-postaction.png differ
diff --git a/mariadb-plugin/pom.xml b/mariadb-plugin/pom.xml
new file mode 100644
index 000000000..f7dfa7c2e
--- /dev/null
+++ b/mariadb-plugin/pom.xml
@@ -0,0 +1,118 @@
+
+
+
+
+ database-plugins
+ io.cdap.plugin
+ 1.3.0-SNAPSHOT
+
+
+ Maria DB plugin
+ mariadb-plugin
+ 1.3.0-SNAPSHOT
+ 4.0.0
+
+
+
+ io.cdap.cdap
+ cdap-etl-api
+
+
+ io.cdap.plugin
+ database-commons
+ ${project.version}
+
+
+ io.cdap.plugin
+ hydrator-common
+
+
+ com.google.guava
+ guava
+
+
+
+
+ io.cdap.plugin
+ database-commons
+ ${project.version}
+ test-jar
+ test
+
+
+ io.cdap.cdap
+ hydrator-test
+
+
+ io.cdap.cdap
+ cdap-data-pipeline
+
+
+ junit
+ junit
+
+
+ io.cdap.cdap
+ cdap-api
+ provided
+
+
+ org.jetbrains
+ annotations
+ RELEASE
+ compile
+
+
+
+
+
+
+ org.apache.felix
+ maven-bundle-plugin
+ 3.3.0
+ true
+
+
+ <_exportcontents>
+ io.cdap.plugin.*;
+ org.apache.commons.lang;
+ org.apache.commons.logging.*;
+ org.codehaus.jackson.*
+
+ *;inline=false;scope=compile
+ true
+ lib
+
+
+
+
+ package
+
+ bundle
+
+
+
+
+
+ io.cdap
+ cdap-maven-plugin
+
+
+
+
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbAction.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbAction.java
new file mode 100644
index 000000000..f71eedb6c
--- /dev/null
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbAction.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.etl.api.action.Action;
+import io.cdap.plugin.db.batch.action.AbstractDBAction;
+import io.cdap.plugin.db.batch.config.DBSpecificQueryConfig;
+
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * MariaDB action that runs MariaDB command
+ */
+@Plugin(type = Action.PLUGIN_TYPE)
+@Name(MariadbConstants.PLUGIN_NAME)
+@Description("Action that runs a MariaDB command")
+public class MariadbAction extends AbstractDBAction {
+
+ private final MariadbActionConfig mariadbActionConfig;
+
+ public MariadbAction(MariadbActionConfig mariadbActionConfig) {
+ super(mariadbActionConfig, false);
+ this.mariadbActionConfig = mariadbActionConfig;
+ }
+
+ /**
+ * MariaDB Action Config
+ */
+ public static class MariadbActionConfig extends DBSpecificQueryConfig {
+
+ @Name(MariadbConstants.AUTO_RECONNECT)
+ @Description("Should the driver try to re-establish stale and/or dead connections")
+ @Nullable
+ public Boolean autoReconnect;
+
+ @Name(MariadbConstants.USE_COMPRESSION)
+ @Description("Select this option for WAN connections")
+ @Nullable
+ public Boolean useCompression;
+
+ @Name(MariadbConstants.USE_SSL)
+ @Description("Turns on SSL encryption. Connection will fail if SSL is not available")
+ @Nullable
+ public String useSSL;
+
+ @Name(MariadbConstants.USE_ANSI_QUOTES)
+ @Description("Treats \" as an identifier quote character and not as a string quote character")
+ @Nullable
+ public Boolean useAnsiQuotes;
+
+ @Name(MariadbConstants.KEY_STORE)
+ @Description("URL to the client certificate KeyStore (if not specified, use defaults)")
+ @Nullable
+ public String keyStore;
+
+ @Name(MariadbConstants.KEY_STORE_PASSWORD)
+ @Description("Password for the client certificates KeyStore")
+ @Nullable
+ public String keyStorePassword;
+
+ @Name(MariadbConstants.TRUST_STORE)
+ @Description("URL to the trusted root certificate KeyStore (if not specified, use defaults)")
+ @Nullable
+ public String trustStore;
+
+ @Name(MariadbConstants.TRUST_STORE_PASSWORD)
+ @Description("Password for the trusted root certificates KeyStore")
+ @Nullable
+ public String trustStorePassword;
+
+ @Override
+ public String getConnectionString() {
+ return MariadbUtil.getConnectionString(host, port, database);
+ }
+
+ @Override
+ public Map getDBSpecificArguments() {
+ return MariadbUtil.composeDbSpecificArgumentsMap(autoReconnect, useCompression, useSSL,
+ keyStore,
+ keyStorePassword,
+ trustStore,
+ trustStorePassword);
+ }
+
+ @Override
+ public List getInitQueries() {
+ return MariadbUtil.composeDbInitQueries(useAnsiQuotes);
+ }
+ }
+}
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbConstants.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbConstants.java
new file mode 100644
index 000000000..8eb30b5cd
--- /dev/null
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbConstants.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+/**
+ * MariaDB constants
+ */
+public final class MariadbConstants {
+ private MariadbConstants() {
+ throw new AssertionError("Should not instantiate static utility class.");
+ }
+
+ public static final String PLUGIN_NAME = "Mariadb";
+ public static final String AUTO_RECONNECT = "autoReconnect";
+ public static final String USE_COMPRESSION = "useCompression";
+ public static final String SQL_MODE = "sqlMode";
+ public static final String USE_SSL = "useSSL";
+ public static final String USE_ANSI_QUOTES = "useAnsiQuotes";
+ public static final String NO_SSL_OPTION = "No";
+ public static final String REQUIRE_SSL_OPTION = "Yes";
+ public static final String KEY_STORE = "keyStore";
+ public static final String KEY_STORE_PASSWORD = "keyStorePassword";
+ public static final String TRUST_STORE = "trustStore";
+ public static final String TRUST_STORE_PASSWORD = "trustStorePassword";
+ public static final String MARIADB_CONNECTION_STRING_FORMAT = "jdbc:mariadb://%s:%s/%s";
+
+ /**
+ * Query to set SQL_MODE system variable.
+ */
+ public static final String SET_SQL_MODE_QUERY_FORMAT = "SET SESSION sql_mode = '%s';";
+
+ /**
+ * Query to append 'ANSI_QUOTES' sql mode to the current value of SQL_MODE system variable.
+ */
+ public static final String ANSI_QUOTES_QUERY = "SET SESSION sql_mode = (CONCAT(@@sql_mode , ',', 'ANSI_QUOTES'));";
+}
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbPostAction.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbPostAction.java
new file mode 100644
index 000000000..8d438a40c
--- /dev/null
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbPostAction.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.etl.api.batch.PostAction;
+import io.cdap.plugin.db.batch.action.AbstractQueryAction;
+import io.cdap.plugin.db.batch.config.DBSpecificQueryActionConfig;
+
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * Represents MariaDB post action
+ */
+@Plugin(type = PostAction.PLUGIN_TYPE)
+@Name(MariadbConstants.PLUGIN_NAME)
+@Description("Runs a MariaDB query after a pipeline run.")
+public class MariadbPostAction extends AbstractQueryAction {
+
+ private final MariadbQueryActionConfig mariadbQueryActionConfig;
+
+ public MariadbPostAction(MariadbQueryActionConfig mariadbQueryActionConfig) {
+ super(mariadbQueryActionConfig, false);
+ this.mariadbQueryActionConfig = mariadbQueryActionConfig;
+ }
+
+ /**
+ * MariaDB post action mariadbQueryActionConfig
+ */
+ public static class MariadbQueryActionConfig extends DBSpecificQueryActionConfig {
+
+ @Name(MariadbConstants.AUTO_RECONNECT)
+ @Description("Should the driver try to re-establish stale and/or dead connections")
+ @Nullable
+ public Boolean autoReconnect;
+
+ @Name(MariadbConstants.USE_COMPRESSION)
+ @Description("Select this option for WAN connections")
+ @Nullable
+ public Boolean useCompression;
+
+ @Name(MariadbConstants.USE_SSL)
+ @Description("Turns on SSL encryption. Connection will fail if SSL is not available")
+ @Nullable
+ public String useSSL;
+
+ @Name(MariadbConstants.USE_ANSI_QUOTES)
+ @Description("Treats \" as an identifier quote character and not as a string quote character")
+ @Nullable
+ public Boolean useAnsiQuotes;
+
+ @Name(MariadbConstants.KEY_STORE)
+ @Description("URL to the client certificate KeyStore (if not specified, use defaults)")
+ @Nullable
+ public String keyStore;
+
+ @Name(MariadbConstants.KEY_STORE_PASSWORD)
+ @Description("Password for the client certificates KeyStore")
+ @Nullable
+ public String keyStorePassword;
+
+ @Name(MariadbConstants.TRUST_STORE)
+ @Description("URL to the trusted root certificate KeyStore (if not specified, use defaults)")
+ @Nullable
+ public String trustStore;
+
+ @Name(MariadbConstants.TRUST_STORE_PASSWORD)
+ @Description("Password for the trusted root certificates KeyStore")
+ @Nullable
+ public String trustStorePassword;
+
+ @Override
+ public String getConnectionString() {
+ return MariadbUtil.getConnectionString(host, port, database);
+ }
+
+ @Override
+ public Map getDBSpecificArguments() {
+ return MariadbUtil.composeDbSpecificArgumentsMap(autoReconnect, useCompression, useSSL,
+ keyStore,
+ keyStorePassword,
+ trustStore,
+ trustStorePassword);
+ }
+
+ @Override
+ public List getInitQueries() {
+ return MariadbUtil.composeDbInitQueries(useAnsiQuotes);
+ }
+ }
+}
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java
new file mode 100644
index 000000000..773210b88
--- /dev/null
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSink.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.etl.api.batch.BatchSink;
+import io.cdap.plugin.db.batch.config.DBSpecificSinkConfig;
+import io.cdap.plugin.db.batch.sink.AbstractDBSink;
+
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * MariaDB Sink
+ */
+@Plugin(type = BatchSink.PLUGIN_TYPE)
+@Name(MariadbConstants.PLUGIN_NAME)
+@Description("Writes records to a MariaDB table. Each record will be written in a row in the table")
+public class MariadbSink extends AbstractDBSink {
+
+ private final MariadbSinkConfig mariadbSinkConfig;
+
+ public MariadbSink(MariadbSinkConfig mariadbSinkConfig) {
+ super(mariadbSinkConfig);
+ this.mariadbSinkConfig = mariadbSinkConfig;
+ }
+
+ /**
+ * MariaDB Sink Config
+ */
+ public static class MariadbSinkConfig extends DBSpecificSinkConfig {
+
+ @Name(MariadbConstants.AUTO_RECONNECT)
+ @Description("Should the driver try to re-establish stale and/or dead connections")
+ @Nullable
+ public Boolean autoReconnect;
+
+ @Name(MariadbConstants.USE_COMPRESSION)
+ @Description("Select this option for WAN connections")
+ @Nullable
+ public Boolean useCompression;
+
+ @Name(MariadbConstants.USE_SSL)
+ @Description("Turns on SSL encryption. Connection will fail if SSL is not available")
+ @Nullable
+ public String useSSL;
+
+ @Name(MariadbConstants.KEY_STORE)
+ @Description("URL to the client certificate KeyStore (if not specified, use defaults)")
+ @Nullable
+ public String keyStore;
+
+ @Name(MariadbConstants.KEY_STORE_PASSWORD)
+ @Description("Password for the client certificates KeyStore")
+ @Nullable
+ public String keyStorePassword;
+
+ @Name(MariadbConstants.TRUST_STORE)
+ @Description("URL to the trusted root certificate KeyStore (if not specified, use defaults)")
+ @Nullable
+ public String trustStore;
+
+ @Name(MariadbConstants.TRUST_STORE_PASSWORD)
+ @Description("Password for the trusted root certificates KeyStore")
+ @Nullable
+ public String trustStorePassword;
+
+ @Override
+ public String getConnectionString() {
+ return MariadbUtil.getConnectionString(host, port, database);
+ }
+
+ @Override
+ public Map getDBSpecificArguments() {
+ return MariadbUtil.composeDbSpecificArgumentsMap(autoReconnect, useCompression, useSSL,
+ keyStore,
+ keyStorePassword,
+ trustStore,
+ trustStorePassword);
+ }
+ }
+}
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java
new file mode 100644
index 000000000..e21d618eb
--- /dev/null
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbSource.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig;
+import io.cdap.plugin.db.batch.source.AbstractDBSource;
+
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * MariaDB source
+ */
+@Plugin(type = BatchSource.PLUGIN_TYPE)
+@Name(MariadbConstants.PLUGIN_NAME)
+@Description("Reads from a database table(s) using a configurable SQL query." +
+ " Outputs one record for each row returned by the query.")
+public class MariadbSource extends AbstractDBSource {
+
+ private final MariadbSourceConfig mariadbSourceConfig;
+
+ public MariadbSource(MariadbSourceConfig mariadbSourceConfig) {
+ super(mariadbSourceConfig);
+ this.mariadbSourceConfig = mariadbSourceConfig;
+ }
+
+ @Override
+ protected String createConnectionString() {
+ return String.format(MariadbConstants.MARIADB_CONNECTION_STRING_FORMAT,
+ mariadbSourceConfig.host, mariadbSourceConfig.port, mariadbSourceConfig.database);
+ }
+
+ /**
+ * MaraiDB source mariadbSourceConfig
+ */
+ public static class MariadbSourceConfig extends DBSpecificSourceConfig {
+
+ @Name(MariadbConstants.AUTO_RECONNECT)
+ @Description("Should the driver try to re-establish stale and/or dead connections")
+ @Nullable
+ public Boolean autoReconnect;
+
+ @Name(MariadbConstants.USE_COMPRESSION)
+ @Description("Select this option for WAN connections")
+ @Nullable
+ public Boolean useCompression;
+
+ @Name(MariadbConstants.USE_SSL)
+ @Description("Turns on SSL encryption. Connection will fail if SSL is not available")
+ @Nullable
+ public String useSSL;
+
+ @Name(MariadbConstants.USE_ANSI_QUOTES)
+ @Description("Treats \" as an identifier quote character and not as a string quote character")
+ @Nullable
+ public Boolean useAnsiQuotes;
+
+ @Name(MariadbConstants.KEY_STORE)
+ @Description("URL to the client certificate KeyStore (if not specified, use defaults)")
+ @Nullable
+ public String keyStore;
+
+ @Name(MariadbConstants.KEY_STORE_PASSWORD)
+ @Description("Password for the client certificates KeyStore")
+ @Nullable
+ public String keyStorePassword;
+
+ @Name(MariadbConstants.TRUST_STORE)
+ @Description("URL to the trusted root certificate KeyStore (if not specified, use defaults)")
+ @Nullable
+ public String trustStore;
+
+ @Name(MariadbConstants.TRUST_STORE_PASSWORD)
+ @Description("Password for the trusted root certificates KeyStore")
+ @Nullable
+ public String trustStorePassword;
+
+ @Override
+ public String getConnectionString() {
+ return MariadbUtil.getConnectionString(host, port, database);
+ }
+
+ @Override
+ public Map getDBSpecificArguments() {
+ return MariadbUtil.composeDbSpecificArgumentsMap(autoReconnect, useCompression, useSSL,
+ keyStore,
+ keyStorePassword,
+ trustStore,
+ trustStorePassword);
+ }
+
+ @Override
+ public List getInitQueries() {
+ return MariadbUtil.composeDbInitQueries(useAnsiQuotes);
+ }
+ }
+}
diff --git a/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbUtil.java b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbUtil.java
new file mode 100644
index 000000000..f373dbcdb
--- /dev/null
+++ b/mariadb-plugin/src/main/java/io/cdap/plugin/mariadb/MariadbUtil.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * MariaDB util methods
+ */
+public final class MariadbUtil {
+ private MariadbUtil() {
+ throw new AssertionError("Should not instantiate static utility class.");
+ }
+
+ /**
+ * Composes immutable map of the MariaDB specific arguments.
+ *
+ * @param autoReconnect should the driver try to re-establish stale and/or dead connections.
+ * @param useCompression specifies if compression must be enabled.
+ * @param useSSL specifies if SSL encryption must be turned on.
+ * @param keyStore URL of the client certificate KeyStore.
+ * @param keyStorePassword password for the client certificates KeyStore.
+ * @param trustStore URL of the trusted root certificate KeyStore.
+ * @param trustStorePassword password for the trusted root certificates KeyStore.
+ * @return immutable map of the MariaDB specific arguments
+ */
+ public static Map composeDbSpecificArgumentsMap(Boolean autoReconnect,
+ Boolean useCompression,
+ String useSSL,
+ String keyStore,
+ String keyStorePassword,
+ String trustStore,
+ String trustStorePassword) {
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+
+ if (autoReconnect != null) {
+ builder.put(MariadbConstants.AUTO_RECONNECT, String.valueOf(autoReconnect));
+ }
+ if (useCompression != null) {
+ builder.put(MariadbConstants.USE_COMPRESSION, String.valueOf(useCompression));
+ }
+ if (MariadbConstants.REQUIRE_SSL_OPTION.equals(useSSL)) {
+ builder.put(MariadbConstants.USE_SSL, "true");
+ } else if (MariadbConstants.NO_SSL_OPTION.equals(useSSL)) {
+ builder.put(MariadbConstants.USE_SSL, "false");
+ }
+ if (keyStore != null) {
+ builder.put(MariadbConstants.KEY_STORE, String.valueOf(keyStore));
+ }
+ if (keyStorePassword != null) {
+ builder.put(MariadbConstants.KEY_STORE_PASSWORD, String.valueOf(keyStorePassword));
+ }
+ if (trustStore != null) {
+ builder.put(MariadbConstants.TRUST_STORE, String.valueOf(trustStore));
+ }
+ if (trustStorePassword != null) {
+ builder.put(MariadbConstants.TRUST_STORE_PASSWORD, String.valueOf(trustStorePassword));
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Composes immutable list of initial queries to the DB
+ *
+ * @param useAnsiQuotes ANSI_QUOTES mode
+ * @return immutable list of initial commands
+ */
+ public static List composeDbInitQueries(@Nullable Boolean useAnsiQuotes) {
+ ImmutableList.Builder builder = ImmutableList.builder();
+ if (useAnsiQuotes != null && useAnsiQuotes) {
+ builder.add(MariadbConstants.ANSI_QUOTES_QUERY);
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Creates MariaDB specific JDBC connection string
+ *
+ * @param host server host
+ * @param port server port
+ * @param database database name
+ * @return MariaDB specific JDBC connection string
+ */
+ public static String getConnectionString(String host, Integer port, String database) {
+ return String.format(MariadbConstants.MARIADB_CONNECTION_STRING_FORMAT, host, port, database);
+ }
+}
diff --git a/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbActionTestRun.java b/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbActionTestRun.java
new file mode 100644
index 000000000..6dd6532c3
--- /dev/null
+++ b/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbActionTestRun.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+import com.google.common.collect.ImmutableMap;
+import io.cdap.cdap.etl.api.action.Action;
+import io.cdap.cdap.etl.mock.batch.MockSink;
+import io.cdap.cdap.etl.mock.batch.MockSource;
+import io.cdap.cdap.etl.proto.v2.ETLBatchConfig;
+import io.cdap.cdap.etl.proto.v2.ETLPlugin;
+import io.cdap.cdap.etl.proto.v2.ETLStage;
+import io.cdap.cdap.proto.artifact.AppRequest;
+import io.cdap.cdap.proto.id.ApplicationId;
+import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.cdap.test.ApplicationManager;
+import io.cdap.plugin.db.batch.action.QueryConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+public class MariadbActionTestRun extends MariadbPluginTestBase {
+
+ @Test
+ public void testDBAction() throws Exception {
+
+ ETLStage source = new ETLStage("source", MockSource.getPlugin("actionInput"));
+ ETLStage sink = new ETLStage("sink", MockSink.getPlugin("actionOutput"));
+ ETLStage action = new ETLStage("action", new ETLPlugin(
+ MariadbConstants.PLUGIN_NAME,
+ Action.PLUGIN_TYPE,
+ ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(MariadbConstants.AUTO_RECONNECT, "true")
+ .put(MariadbConstants.USE_COMPRESSION, "true")
+ .put(MariadbConstants.ANSI_QUOTES_QUERY, "true")
+ .put(QueryConfig.QUERY, "delete from dbActionTest where day = '${logicalStartTime(yyyy-MM-dd,0m,UTC)}'")
+ .build(),
+ null));
+
+ ETLBatchConfig config = ETLBatchConfig.builder()
+ .addStage(source)
+ .addStage(sink)
+ .addStage(action)
+ .addConnection(sink.getName(), action.getName())
+ .addConnection(source.getName(), sink.getName())
+ .build();
+
+ AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, config);
+ ApplicationId appId = NamespaceId.DEFAULT.app("actionTest");
+ ApplicationManager appManager = deployApplication(appId, appRequest);
+ runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0"));
+
+ try (Connection connection = createConnection();
+ Statement statement = connection.createStatement();
+ ResultSet results = statement.executeQuery("select * from dbActionTest")) {
+ Assert.assertFalse(results.next());
+ }
+ }
+}
diff --git a/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbPluginTestBase.java b/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbPluginTestBase.java
new file mode 100644
index 000000000..6cb2ddf5e
--- /dev/null
+++ b/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbPluginTestBase.java
@@ -0,0 +1,265 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import io.cdap.cdap.api.artifact.ArtifactSummary;
+import io.cdap.cdap.api.plugin.PluginClass;
+import io.cdap.cdap.datapipeline.DataPipelineApp;
+import io.cdap.cdap.proto.id.ArtifactId;
+import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.cdap.test.TestConfiguration;
+import io.cdap.plugin.db.ConnectionConfig;
+import io.cdap.plugin.db.DBRecord;
+import io.cdap.plugin.db.batch.DatabasePluginTestBase;
+import io.cdap.plugin.db.batch.sink.ETLDBOutputFormat;
+import io.cdap.plugin.db.batch.source.DataDrivenETLDBInputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Map;
+import java.util.TimeZone;
+import javax.sql.rowset.serial.SerialBlob;
+
+public abstract class MariadbPluginTestBase extends DatabasePluginTestBase {
+ protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline", "3.2.0");
+ protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "3.2.0");
+ protected static final long CURRENT_TS = System.currentTimeMillis();
+ protected static final String DRIVER_CLASS = "org.mariadb.jdbc.Driver";
+ protected static final String JDBC_DRIVER_NAME = "mariadb";
+
+ protected static String connectionUrl;
+ protected static final int YEAR;
+ protected static final int PRECISION = 10;
+ protected static final int SCALE = 6;
+ protected static final ZoneId UTC_ZONE = ZoneId.ofOffset("UTC", ZoneOffset.UTC);
+ protected static boolean tearDown = true;
+ private static int startCount;
+
+ @ClassRule
+ public static final TestConfiguration CONFIG = new TestConfiguration("explore.enabled", false);
+
+ static {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(new Date(CURRENT_TS));
+ YEAR = calendar.get(Calendar.YEAR);
+ }
+
+ protected static final Map BASE_PROPS = ImmutableMap.builder()
+ .put(ConnectionConfig.HOST, System.getProperty("mariadb.host", "localhost"))
+ .put(ConnectionConfig.PORT, System.getProperty("mariadb.port", "3306"))
+ .put(ConnectionConfig.DATABASE, System.getProperty("mariadb.database", "mydb"))
+ .put(ConnectionConfig.USER, System.getProperty("mariadb.username", "root"))
+ .put(ConnectionConfig.PASSWORD, System.getProperty("mariadb.password", "123Qwe123"))
+ .put(ConnectionConfig.JDBC_PLUGIN_NAME, JDBC_DRIVER_NAME)
+ .build();
+
+ @BeforeClass
+ public static void setupTest() throws Exception {
+ if (startCount++ > 0) {
+ return;
+ }
+
+ setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class);
+
+ addPluginArtifact(NamespaceId.DEFAULT.artifact(JDBC_DRIVER_NAME, "1.0.0"),
+ DATAPIPELINE_ARTIFACT_ID,
+ MariadbSource.class, MariadbSink.class, DBRecord.class, ETLDBOutputFormat.class,
+ DataDrivenETLDBInputFormat.class, DBRecord.class, MariadbPostAction.class, MariadbAction.class);
+
+ Class> driverClass = Class.forName(DRIVER_CLASS);
+
+ // add mariadb 3rd party plugin
+ PluginClass mariadbDrive = new PluginClass(ConnectionConfig.JDBC_PLUGIN_TYPE,
+ JDBC_DRIVER_NAME,
+ "mariadb driver class",
+ driverClass.getName(),
+ null,
+ Collections.emptyMap());
+ addPluginArtifact(NamespaceId.DEFAULT.artifact("mariadb-jdbc-connector", "1.0.0"),
+ DATAPIPELINE_ARTIFACT_ID,
+ Sets.newHashSet(mariadbDrive), driverClass);
+
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+
+ connectionUrl = "jdbc:mariadb://" + BASE_PROPS.get(ConnectionConfig.HOST) + ":" +
+ BASE_PROPS.get(ConnectionConfig.PORT) + "/" + BASE_PROPS.get(ConnectionConfig.DATABASE);
+ Connection conn = createConnection();
+ createTestTables(conn);
+ prepareTestData(conn);
+ }
+
+ protected static void createTestTables(Connection conn) throws SQLException {
+ try (Statement stmt = conn.createStatement()) {
+ // create a table that the action will truncate at the end of the run
+ stmt.execute("CREATE TABLE dbActionTest (x int, day varchar(10))");
+ // create a table that the action will truncate at the end of the run
+ stmt.execute("CREATE TABLE postActionTest (x int, day varchar(10))");
+
+ stmt.execute("CREATE TABLE my_table" +
+ "(" +
+ "ID INT NOT NULL, " +
+ "NAME VARCHAR(40) NOT NULL, " +
+ "SCORE DOUBLE, " +
+ "GRADUATED BOOLEAN, " +
+ "NOT_IMPORTED VARCHAR(30), " +
+ "TINY TINYINT, " +
+ "SMALL SMALLINT, " +
+ "MEDIUMINT_COL MEDIUMINT, " +
+ "BIG BIGINT, " +
+ "FLOAT_COL FLOAT, " +
+ "REAL_COL REAL, " +
+ "NUMERIC_COL NUMERIC(" + PRECISION + "," + SCALE + "), " +
+ "DECIMAL_COL DECIMAL(" + PRECISION + "," + SCALE + "), " +
+ "BIT_COL BIT, " +
+ "DATE_COL DATE, " +
+ "TIME_COL TIME, " +
+ "TIMESTAMP_COL TIMESTAMP(3), " +
+ "DATETIME_COL DATETIME(3), " +
+ "YEAR_COL YEAR, " +
+ "TEXT_COL TEXT," +
+ "TINYTEXT_COL TINYTEXT," +
+ "MEDIUMTEXT_COL MEDIUMTEXT," +
+ "LONGTEXT_COL LONGTEXT," +
+ "CHAR_COL CHAR(100)," +
+ "BINARY_COL BINARY(5)," +
+ "VARBINARY_COL VARBINARY(20)," +
+ "TINYBLOB_COL TINYBLOB, " +
+ "BLOB_COL BLOB(100), " +
+ "MEDIUMBLOB_COL MEDIUMBLOB, " +
+ "LONGBLOB_COL LONGBLOB, " +
+ "ENUM_COL ENUM('First', 'Second', 'Third')," +
+ "SET_COL SET('a', 'b', 'c', 'd')" +
+ ")");
+ stmt.execute("CREATE TABLE MY_DEST_TABLE AS " +
+ "SELECT * FROM my_table");
+ stmt.execute("CREATE TABLE your_table AS " +
+ "SELECT * FROM my_table");
+ }
+ }
+
+ protected static void prepareTestData(Connection conn) throws SQLException {
+ try (
+ Statement stmt = conn.createStatement();
+ PreparedStatement pStmt1 =
+ conn.prepareStatement("INSERT INTO my_table " +
+ "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
+ " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
+ " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
+ PreparedStatement pStmt2 =
+ conn.prepareStatement("INSERT INTO your_table " +
+ "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
+ " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?," +
+ " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
+
+ stmt.execute("insert into dbActionTest values (1, '1970-01-01')");
+ stmt.execute("insert into postActionTest values (1, '1970-01-01')");
+
+ populateData(pStmt1, pStmt2);
+ }
+ }
+
+ private static void populateData(PreparedStatement ...stmts) throws SQLException {
+ // insert the same data into both tables: my_table and your_table
+ for (PreparedStatement pStmt : stmts) {
+ for (int i = 1; i <= 5; i++) {
+ String name = "user" + i;
+ pStmt.setInt(1, i);
+ pStmt.setString(2, name);
+ pStmt.setDouble(3, 123.45 + i);
+ pStmt.setBoolean(4, (i % 2 == 0));
+ pStmt.setString(5, "random" + i);
+ pStmt.setShort(6, (short) i);
+ pStmt.setShort(7, (short) i);
+ pStmt.setInt(8, (short) i);
+ pStmt.setLong(9, (long) i);
+ pStmt.setFloat(10, (float) 123.45 + i);
+ pStmt.setFloat(11, (float) 123.45 + i);
+ pStmt.setBigDecimal(12, new BigDecimal(123.45).add(new BigDecimal(i)));
+ if ((i % 2 == 0)) {
+ pStmt.setNull(13, Types.DECIMAL);
+ } else {
+ pStmt.setBigDecimal(13, new BigDecimal(123.45).add(new BigDecimal(i)));
+ }
+ pStmt.setBoolean(14, (i % 2 == 1));
+ pStmt.setDate(15, new Date(CURRENT_TS));
+ pStmt.setTime(16, new Time(CURRENT_TS));
+ pStmt.setTimestamp(17, new Timestamp(CURRENT_TS));
+ pStmt.setTimestamp(18, new Timestamp(CURRENT_TS));
+ pStmt.setShort(19, (short) YEAR);
+ pStmt.setString(20, name);
+ pStmt.setString(21, name);
+ pStmt.setString(22, name);
+ pStmt.setString(23, name);
+ pStmt.setString(24, "char" + i);
+ pStmt.setBytes(25, name.getBytes(Charsets.UTF_8));
+ pStmt.setBytes(26, name.getBytes(Charsets.UTF_8));
+ pStmt.setBlob(27, new SerialBlob(name.getBytes(Charsets.UTF_8)));
+ pStmt.setBlob(28, new SerialBlob(name.getBytes(Charsets.UTF_8)));
+ pStmt.setBlob(29, new SerialBlob(name.getBytes(Charsets.UTF_8)));
+ pStmt.setBlob(30, new SerialBlob(name.getBytes(Charsets.UTF_8)));
+ pStmt.setString(31, "Second");
+ pStmt.setString(32, "a,b");
+ pStmt.executeUpdate();
+ }
+ }
+ }
+
+ public static Connection createConnection() {
+ try {
+ Class.forName(DRIVER_CLASS);
+ return DriverManager.getConnection(connectionUrl, BASE_PROPS.get(ConnectionConfig.USER),
+ BASE_PROPS.get(ConnectionConfig.PASSWORD));
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @AfterClass
+ public static void tearDownDB() throws SQLException {
+ if (!tearDown) {
+ return;
+ }
+
+ try (Connection conn = createConnection();
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("DROP TABLE my_table");
+ stmt.execute("DROP TABLE your_table");
+ stmt.execute("DROP TABLE postActionTest");
+ stmt.execute("DROP TABLE dbActionTest");
+ stmt.execute("DROP TABLE MY_DEST_TABLE");
+ }
+ }
+}
diff --git a/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbPluginTestSuite.java b/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbPluginTestSuite.java
new file mode 100644
index 000000000..96aa18095
--- /dev/null
+++ b/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbPluginTestSuite.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+import io.cdap.cdap.common.test.TestSuite;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+
+/**
+ * This is a test suite that runs all the tests for Database plugins.
+ */
+@RunWith(TestSuite.class)
+@Suite.SuiteClasses({
+ MariadbSinkTestRun.class,
+ MariadbSourceTestRun.class,
+ MariadbActionTestRun.class,
+ MariadbPostActionTestRun.class
+})
+public class MariadbPluginTestSuite extends MariadbPluginTestBase {
+
+ @BeforeClass
+ public static void setup() {
+ tearDown = false;
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ tearDown = true;
+ }
+}
diff --git a/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbPostActionTestRun.java b/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbPostActionTestRun.java
new file mode 100644
index 000000000..53dd52baa
--- /dev/null
+++ b/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbPostActionTestRun.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+import com.google.common.collect.ImmutableMap;
+import io.cdap.cdap.etl.api.batch.PostAction;
+import io.cdap.cdap.etl.mock.batch.MockSink;
+import io.cdap.cdap.etl.mock.batch.MockSource;
+import io.cdap.cdap.etl.proto.v2.ETLBatchConfig;
+import io.cdap.cdap.etl.proto.v2.ETLPlugin;
+import io.cdap.cdap.etl.proto.v2.ETLStage;
+import io.cdap.cdap.proto.artifact.AppRequest;
+import io.cdap.cdap.proto.id.ApplicationId;
+import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.cdap.test.ApplicationManager;
+import io.cdap.plugin.common.batch.action.Condition;
+import io.cdap.plugin.db.ConnectionConfig;
+import io.cdap.plugin.db.batch.action.QueryActionConfig;
+import io.cdap.plugin.db.batch.action.QueryConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+public class MariadbPostActionTestRun extends MariadbPluginTestBase {
+
+ @Test
+ public void testAction() throws Exception {
+
+ ETLStage source = new ETLStage("source", MockSource.getPlugin("actionInput"));
+ ETLStage sink = new ETLStage("sink", MockSink.getPlugin("actionOutput"));
+ ETLStage action = new ETLStage("action", new ETLPlugin(
+ MariadbConstants.PLUGIN_NAME,
+ PostAction.PLUGIN_TYPE,
+ ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(MariadbConstants.AUTO_RECONNECT, "true")
+ .put(MariadbConstants.USE_COMPRESSION, "true")
+ .put(MariadbConstants.ANSI_QUOTES_QUERY, "true")
+ .put(QueryConfig.QUERY, "delete from postActionTest where day = '${logicalStartTime(yyyy-MM-dd,0m,UTC)}'")
+ .put(ConnectionConfig.ENABLE_AUTO_COMMIT, "false")
+ .put(QueryActionConfig.RUN_CONDITION, Condition.SUCCESS.name())
+ .build(),
+ null));
+
+ ETLBatchConfig config = ETLBatchConfig.builder()
+ .addStage(source)
+ .addStage(sink)
+ .addPostAction(action)
+ .addConnection(source.getName(), sink.getName())
+ .build();
+
+ AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, config);
+ ApplicationId appId = NamespaceId.DEFAULT.app("postActionTest");
+ ApplicationManager appManager = deployApplication(appId, appRequest);
+ runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0"));
+
+ try (Connection connection = createConnection();
+ Statement statement = connection.createStatement();
+ ResultSet results = statement.executeQuery("select * from postActionTest")) {
+ Assert.assertFalse(results.next());
+ }
+ }
+}
diff --git a/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbSinkTestRun.java b/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbSinkTestRun.java
new file mode 100644
index 000000000..edce93024
--- /dev/null
+++ b/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbSinkTestRun.java
@@ -0,0 +1,252 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.dataset.table.Table;
+import io.cdap.cdap.etl.api.batch.BatchSink;
+import io.cdap.cdap.etl.mock.batch.MockSource;
+import io.cdap.cdap.etl.proto.v2.ETLPlugin;
+import io.cdap.cdap.test.ApplicationManager;
+import io.cdap.cdap.test.DataSetManager;
+import io.cdap.plugin.common.Constants;
+import io.cdap.plugin.db.CustomAssertions;
+import io.cdap.plugin.db.batch.sink.AbstractDBSink;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test for ETL using databases.
+ */
+public class MariadbSinkTestRun extends MariadbPluginTestBase {
+
+ private static final Schema SCHEMA = Schema.recordOf(
+ "dbRecord",
+ Schema.Field.of("ID", Schema.of(Schema.Type.INT)),
+ Schema.Field.of("NAME", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("SCORE", Schema.of(Schema.Type.DOUBLE)),
+ Schema.Field.of("GRADUATED", Schema.of(Schema.Type.BOOLEAN)),
+ Schema.Field.of("TINY", Schema.of(Schema.Type.INT)),
+ Schema.Field.of("SMALL", Schema.of(Schema.Type.INT)),
+ Schema.Field.of("BIG", Schema.of(Schema.Type.LONG)),
+ Schema.Field.of("MEDIUMINT_COL", Schema.of(Schema.Type.INT)),
+ Schema.Field.of("FLOAT_COL", Schema.of(Schema.Type.FLOAT)),
+ Schema.Field.of("REAL_COL", Schema.of(Schema.Type.DOUBLE)),
+ Schema.Field.of("NUMERIC_COL", Schema.decimalOf(PRECISION, SCALE)),
+ Schema.Field.of("DECIMAL_COL", Schema.decimalOf(PRECISION, SCALE)),
+ Schema.Field.of("BIT_COL", Schema.of(Schema.Type.BOOLEAN)),
+ Schema.Field.of("DATE_COL", Schema.of(Schema.LogicalType.DATE)),
+ Schema.Field.of("TIME_COL", Schema.of(Schema.LogicalType.TIME_MICROS)),
+ Schema.Field.of("TIMESTAMP_COL", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)),
+ Schema.Field.of("DATETIME_COL", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)),
+ Schema.Field.of("YEAR_COL", Schema.of(Schema.LogicalType.DATE)),
+ Schema.Field.of("TEXT_COL", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("TINYTEXT_COL", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("MEDIUMTEXT_COL", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("LONGTEXT_COL", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("CHAR_COL", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("BINARY_COL", Schema.of(Schema.Type.BYTES)),
+ Schema.Field.of("VARBINARY_COL", Schema.of(Schema.Type.BYTES)),
+ Schema.Field.of("TINYBLOB_COL", Schema.of(Schema.Type.BYTES)),
+ Schema.Field.of("BLOB_COL", Schema.of(Schema.Type.BYTES)),
+ Schema.Field.of("MEDIUMBLOB_COL", Schema.of(Schema.Type.BYTES)),
+ Schema.Field.of("LONGBLOB_COL", Schema.of(Schema.Type.BYTES)),
+ Schema.Field.of("ENUM_COL", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("SET_COL", Schema.of(Schema.Type.STRING))
+ );
+
+ @Before
+ public void setup() throws Exception {
+ try (Connection connection = createConnection();
+ Statement stmt = connection.createStatement()) {
+ stmt.execute("TRUNCATE TABLE MY_DEST_TABLE");
+ }
+ }
+
+ @Test
+ public void testDBSinkWithInvalidFieldType() throws Exception {
+ testDBInvalidFieldType("ID", Schema.Type.STRING, getSinkConfig(), DATAPIPELINE_ARTIFACT);
+ }
+
+ @Test
+ public void testDBSinkWithInvalidFieldLogicalType() throws Exception {
+ testDBInvalidFieldLogicalType("TIMESTAMP_COL", Schema.Type.LONG, getSinkConfig(), DATAPIPELINE_ARTIFACT);
+ }
+
+ @Test
+ public void testDBSinkWithDBSchemaAndInvalidData() throws Exception {
+ // MariaDB JDBC connector allows to write integer values to STRING column. Use ENUM column instead.
+ String enumColumnName = "ENUM_COL";
+ startPipelineAndWriteInvalidData(enumColumnName, getSinkConfig(), DATAPIPELINE_ARTIFACT);
+ try (Connection conn = createConnection();
+ Statement stmt = conn.createStatement();
+ ResultSet resultSet = stmt.executeQuery("SELECT * FROM MY_DEST_TABLE")) {
+ testInvalidDataWrite(resultSet, enumColumnName);
+ }
+ }
+
+ @Test
+ public void testDBSinkWithExplicitInputSchema() throws Exception {
+ testDBSink("testDBSinkWithExplicitInputSchema", "input-dbsinktest-explicit", SCHEMA);
+ }
+
+ @Test
+ public void testDBSinkWithInferredInputSchema() throws Exception {
+ testDBSink("testDBSinkWithInferredInputSchema", "input-dbsinktest-inferred", null);
+ }
+
+ private void testDBSink(String appName, String inputDatasetName, Schema schema) throws Exception {
+ ETLPlugin sourceConfig = (schema != null)
+ ? MockSource.getPlugin(inputDatasetName, schema)
+ : MockSource.getPlugin(inputDatasetName);
+
+ ETLPlugin sinkConfig = getSinkConfig();
+
+ ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, DATAPIPELINE_ARTIFACT, appName);
+
+ // Prepare test input data
+ List inputRecords = createInputData();
+ DataSetManager inputManager = getDataset(inputDatasetName);
+ MockSource.writeInput(inputManager, inputRecords);
+ runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS)));
+
+ try (Connection conn = createConnection();
+ Statement stmt = conn.createStatement();
+ ResultSet actual = stmt.executeQuery("SELECT * FROM MY_DEST_TABLE ORDER BY ID")) {
+
+ for (StructuredRecord expected : inputRecords) {
+ Assert.assertTrue(actual.next());
+
+ // Verify data
+ CustomAssertions.assertObjectEquals(expected.get("ID"), actual.getInt("ID"));
+ CustomAssertions.assertObjectEquals(expected.get("NAME"), actual.getString("NAME"));
+ CustomAssertions.assertObjectEquals(expected.get("TEXT_COL"), actual.getString("TEXT_COL"));
+ CustomAssertions.assertObjectEquals(expected.get("TINYTEXT_COL"), actual.getString("TINYTEXT_COL"));
+ CustomAssertions.assertObjectEquals(expected.get("MEDIUMTEXT_COL"), actual.getString("MEDIUMTEXT_COL"));
+ CustomAssertions.assertObjectEquals(expected.get("LONGTEXT_COL"), actual.getString("LONGTEXT_COL"));
+ CustomAssertions.assertObjectEquals(expected.get("CHAR_COL"), actual.getString("CHAR_COL").trim());
+ CustomAssertions.assertObjectEquals(expected.get("GRADUATED"), actual.getBoolean("GRADUATED"));
+ Assert.assertNull(actual.getString("NOT_IMPORTED"));
+ CustomAssertions.assertObjectEquals(expected.get("ENUM_COL"), actual.getString("ENUM_COL"));
+ CustomAssertions.assertObjectEquals(expected.get("SET_COL"), actual.getString("SET_COL"));
+ CustomAssertions.assertObjectEquals(expected.get("TINY"), actual.getInt("TINY"));
+ CustomAssertions.assertObjectEquals(expected.get("SMALL"), actual.getInt("SMALL"));
+ CustomAssertions.assertObjectEquals(expected.get("BIG"), actual.getLong("BIG"));
+ CustomAssertions.assertObjectEquals(expected.get("MEDIUMINT_COL"), actual.getInt("MEDIUMINT_COL"));
+ CustomAssertions.assertNumericEquals(expected.get("SCORE"), actual.getDouble("SCORE"));
+ CustomAssertions.assertNumericEquals(expected.get("FLOAT_COL"), actual.getFloat("FLOAT_COL"));
+ CustomAssertions.assertNumericEquals(expected.get("REAL_COL"), actual.getDouble("REAL_COL"));
+ CustomAssertions.assertObjectEquals(expected.getDecimal("NUMERIC_COL"), actual.getBigDecimal("NUMERIC_COL"));
+ CustomAssertions.assertObjectEquals(expected.getDecimal("DECIMAL_COL"), actual.getBigDecimal("DECIMAL_COL"));
+ CustomAssertions.assertObjectEquals(expected.get("BIT_COL"), actual.getBoolean("BIT_COL"));
+
+ // Verify binary columns
+ Assert.assertArrayEquals(expected.get("BINARY_COL"), actual.getBytes("BINARY_COL"));
+ Assert.assertArrayEquals(expected.get("VARBINARY_COL"), actual.getBytes("VARBINARY_COL"));
+ Assert.assertArrayEquals(expected.get("BLOB_COL"), actual.getBytes("BLOB_COL"));
+ Assert.assertArrayEquals(expected.get("MEDIUMBLOB_COL"), actual.getBytes("MEDIUMBLOB_COL"));
+ Assert.assertArrayEquals(expected.get("TINYBLOB_COL"), actual.getBytes("TINYBLOB_COL"));
+ Assert.assertArrayEquals(expected.get("LONGBLOB_COL"), actual.getBytes("LONGBLOB_COL"));
+
+ // Verify time columns
+ Assert.assertEquals(expected.getDate("DATE_COL"), actual.getDate("DATE_COL").toLocalDate());
+
+ // compare seconds, since mariadb 'time' type does not store milliseconds but 'LocalTime' does
+ Assert.assertEquals(expected.getTime("TIME_COL").toSecondOfDay(),
+ actual.getTime("TIME_COL").toLocalTime().toSecondOfDay());
+ Assert.assertEquals(expected.getDate("YEAR_COL").getYear(), actual.getInt("YEAR_COL"));
+ Assert.assertEquals(expected.getTimestamp("DATETIME_COL"),
+ actual.getTimestamp("DATETIME_COL").toInstant().atZone(UTC_ZONE));
+ Assert.assertEquals(expected.getTimestamp("TIMESTAMP_COL"),
+ actual.getTimestamp("TIMESTAMP_COL").toInstant().atZone(UTC_ZONE));
+ }
+ }
+ }
+
+ private List createInputData() throws Exception {
+ List inputRecords = new ArrayList<>();
+ LocalDateTime localDateTime = new Timestamp(CURRENT_TS).toLocalDateTime();
+ for (int i = 1; i <= 2; i++) {
+ String name = "user" + i;
+ StructuredRecord.Builder builder = StructuredRecord.builder(SCHEMA)
+ .set("ID", i)
+ .set("NAME", name)
+ .set("SCORE", 3.451)
+ .set("GRADUATED", (i % 2 == 0))
+ .set("TINY", i)
+ .set("SMALL", i)
+ .set("BIG", 3456987L)
+ .set("MEDIUMINT_COL", 8388607)
+ .set("FLOAT_COL", 3.456f)
+ .set("REAL_COL", 3.457)
+ .setDecimal("NUMERIC_COL", new BigDecimal(3.458d, new MathContext(PRECISION)).setScale(SCALE))
+ .setDecimal("DECIMAL_COL", new BigDecimal(3.459d, new MathContext(PRECISION)).setScale(SCALE))
+ .set("BIT_COL", (i % 2 == 1))
+ .setDate("DATE_COL", localDateTime.toLocalDate())
+ .setTime("TIME_COL", localDateTime.toLocalTime())
+ .setTimestamp("TIMESTAMP_COL", localDateTime.atZone(UTC_ZONE))
+ .setTimestamp("DATETIME_COL", localDateTime.atZone(UTC_ZONE))
+ .setDate("YEAR_COL", localDateTime.toLocalDate())
+ .set("TEXT_COL", name)
+ .set("TINYTEXT_COL", name)
+ .set("MEDIUMTEXT_COL", name)
+ .set("LONGTEXT_COL", name)
+ .set("CHAR_COL", "char" + i)
+ .set("BINARY_COL", name.getBytes(Charsets.UTF_8))
+ .set("VARBINARY_COL", name.getBytes(Charsets.UTF_8))
+ .set("TINYBLOB_COL", name.getBytes(Charsets.UTF_8))
+ .set("BLOB_COL", name.getBytes(Charsets.UTF_8))
+ .set("MEDIUMBLOB_COL", name.getBytes(Charsets.UTF_8))
+ .set("LONGBLOB_COL", name.getBytes(Charsets.UTF_8))
+ .set("ENUM_COL", "Second")
+ .set("SET_COL", "a,b,c,d");
+
+ inputRecords.add(builder.build());
+ }
+
+ return inputRecords;
+ }
+
+ private ETLPlugin getSinkConfig() {
+ return new ETLPlugin(
+ MariadbConstants.PLUGIN_NAME,
+ BatchSink.PLUGIN_TYPE,
+ ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(MariadbConstants.AUTO_RECONNECT, "true")
+ .put(MariadbConstants.USE_COMPRESSION, "true")
+ .put(MariadbConstants.ANSI_QUOTES_QUERY, "true")
+ .put(AbstractDBSink.DBSinkConfig.TABLE_NAME, "MY_DEST_TABLE")
+ .put(Constants.Reference.REFERENCE_NAME, "DBTest")
+ .build(),
+ null);
+ }
+}
diff --git a/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbSourceTestRun.java b/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbSourceTestRun.java
new file mode 100644
index 000000000..09ee5681a
--- /dev/null
+++ b/mariadb-plugin/src/test/java/io/cdap/plugin/mariadb/MariadbSourceTestRun.java
@@ -0,0 +1,376 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.mariadb;
+
+import com.google.common.collect.ImmutableMap;
+import io.cdap.cdap.api.common.Bytes;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.dataset.table.Table;
+import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.mock.batch.MockSink;
+import io.cdap.cdap.etl.proto.v2.ETLBatchConfig;
+import io.cdap.cdap.etl.proto.v2.ETLPlugin;
+import io.cdap.cdap.etl.proto.v2.ETLStage;
+import io.cdap.cdap.proto.artifact.AppRequest;
+import io.cdap.cdap.proto.id.ApplicationId;
+import io.cdap.cdap.proto.id.NamespaceId;
+import io.cdap.cdap.test.ApplicationManager;
+import io.cdap.cdap.test.DataSetManager;
+import io.cdap.plugin.common.Constants;
+import io.cdap.plugin.db.ConnectionConfig;
+import io.cdap.plugin.db.batch.source.AbstractDBSource;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.text.SimpleDateFormat;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZonedDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MariadbSourceTestRun extends MariadbPluginTestBase {
+
+ @Test
+ @SuppressWarnings("ConstantConditions")
+ public void testDBMacroSupport() throws Exception {
+ String importQuery = "SELECT * FROM my_table WHERE DATE_COL <= '${logicalStartTime(yyyy-MM-dd,1d)}' " +
+ "AND $CONDITIONS";
+ String boundingQuery = "SELECT MIN(ID),MAX(ID) from my_table";
+ String splitBy = "ID";
+
+ ImmutableMap sourceProps = ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(MariadbConstants.AUTO_RECONNECT, "true")
+ .put(MariadbConstants.USE_COMPRESSION, "true")
+ .put(MariadbConstants.ANSI_QUOTES_QUERY, "true")
+ .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery)
+ .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery)
+ .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy)
+ .put(Constants.Reference.REFERENCE_NAME, "DBTestSource").build();
+
+ ETLPlugin sourceConfig = new ETLPlugin(
+ MariadbConstants.PLUGIN_NAME,
+ BatchSource.PLUGIN_TYPE,
+ sourceProps
+ );
+
+ ETLPlugin sinkConfig = MockSink.getPlugin("macroOutputTable");
+
+ ApplicationManager appManager = deployETL(sourceConfig, sinkConfig,
+ DATAPIPELINE_ARTIFACT, "testDBMacro");
+ runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS)));
+
+ DataSetManager outputManager = getDataset("macroOutputTable");
+ Assert.assertTrue(MockSink.readOutput(outputManager).isEmpty());
+ }
+
+ @Test
+ @SuppressWarnings("ConstantConditions")
+ public void testDBSource() throws Exception {
+ String importQuery = "SELECT ID, NAME, SCORE, GRADUATED, TINY, MEDIUMINT_COL, SMALL, BIG, FLOAT_COL, " +
+ "REAL_COL, NUMERIC_COL, CHAR_COL, DECIMAL_COL, BIT_COL, BINARY_COL, DATE_COL, TIME_COL, DATETIME_COL, " +
+ "TIMESTAMP_COL, VARBINARY_COL, BLOB_COL, MEDIUMBLOB_COL, TINYBLOB_COL, YEAR_COL, LONGBLOB_COL, TINYTEXT_COL, " +
+ "MEDIUMTEXT_COL, TEXT_COL, LONGTEXT_COL, ENUM_COL, SET_COL FROM " +
+ "my_table WHERE ID < 3 AND $CONDITIONS";
+ String boundingQuery = "SELECT MIN(ID),MAX(ID) from my_table";
+ String splitBy = "ID";
+ ETLPlugin sourceConfig = new ETLPlugin(
+ MariadbConstants.PLUGIN_NAME,
+ BatchSource.PLUGIN_TYPE,
+ ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(MariadbConstants.AUTO_RECONNECT, "true")
+ .put(MariadbConstants.USE_COMPRESSION, "true")
+ .put(MariadbConstants.ANSI_QUOTES_QUERY, "true")
+ .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery)
+ .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery)
+ .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy)
+ .put(Constants.Reference.REFERENCE_NAME, "DBSourceTest")
+ .build(),
+ null
+ );
+
+ String outputDatasetName = "output-dbsourcetest";
+ ETLPlugin sinkConfig = MockSink.getPlugin(outputDatasetName);
+
+ ApplicationManager appManager = deployETL(sourceConfig, sinkConfig,
+ DATAPIPELINE_ARTIFACT, "testDBSource");
+ runETLOnce(appManager);
+
+ DataSetManager outputManager = getDataset(outputDatasetName);
+ List outputRecords = MockSink.readOutput(outputManager);
+
+ Assert.assertEquals(2, outputRecords.size());
+ String userid = outputRecords.get(0).get("NAME");
+ StructuredRecord row1 = "user1".equals(userid) ? outputRecords.get(0) : outputRecords.get(1);
+ StructuredRecord row2 = "user1".equals(userid) ? outputRecords.get(1) : outputRecords.get(0);
+
+ // Verify data
+ Assert.assertEquals("user1", row1.get("NAME"));
+ Assert.assertEquals("user2", row2.get("NAME"));
+ Assert.assertEquals("user1", row1.get("TINYTEXT_COL"));
+ Assert.assertEquals("user2", row2.get("TINYTEXT_COL"));
+ Assert.assertEquals("user1", row1.get("MEDIUMTEXT_COL"));
+ Assert.assertEquals("user2", row2.get("MEDIUMTEXT_COL"));
+ Assert.assertEquals("user1", row1.get("TEXT_COL"));
+ Assert.assertEquals("user2", row2.get("TEXT_COL"));
+ Assert.assertEquals("user1", row1.get("LONGTEXT_COL"));
+ Assert.assertEquals("user2", row2.get("LONGTEXT_COL"));
+ Assert.assertEquals("char1", ((String) row1.get("CHAR_COL")).trim());
+ Assert.assertEquals("char2", ((String) row2.get("CHAR_COL")).trim());
+ Assert.assertEquals(124.45, row1.get("SCORE"), 0.000001);
+ Assert.assertEquals(125.45, row2.get("SCORE"), 0.000001);
+ Assert.assertEquals(false, row1.get("GRADUATED"));
+ Assert.assertEquals(true, row2.get("GRADUATED"));
+ Assert.assertNull(row1.get("NOT_IMPORTED"));
+ Assert.assertNull(row2.get("NOT_IMPORTED"));
+ Assert.assertEquals("Second", row1.get("ENUM_COL"));
+ Assert.assertEquals("a,b", row1.get("SET_COL"));
+
+ Assert.assertEquals(1, (int) row1.get("TINY"));
+ Assert.assertEquals(2, (int) row2.get("TINY"));
+ Assert.assertEquals(1, (int) row1.get("SMALL"));
+ Assert.assertEquals(2, (int) row2.get("SMALL"));
+ Assert.assertEquals(1, (long) row1.get("BIG"));
+ Assert.assertEquals(2, (long) row2.get("BIG"));
+ Assert.assertEquals(1, (int) row1.get("MEDIUMINT_COL"));
+ Assert.assertEquals(2, (int) row2.get("MEDIUMINT_COL"));
+
+ Assert.assertEquals(124.45, (float) row1.get("FLOAT_COL"), 0.00001);
+ Assert.assertEquals(125.45, (float) row2.get("FLOAT_COL"), 0.00001);
+ Assert.assertEquals(124.45, (double) row1.get("REAL_COL"), 0.00001);
+ Assert.assertEquals(125.45, (double) row2.get("REAL_COL"), 0.00001);
+ Assert.assertEquals(new BigDecimal(124.45, new MathContext(PRECISION)).setScale(SCALE),
+ row1.getDecimal("NUMERIC_COL"));
+ Assert.assertEquals(new BigDecimal(125.45, new MathContext(PRECISION)).setScale(SCALE),
+ row2.getDecimal("NUMERIC_COL"));
+ Assert.assertEquals(new BigDecimal(124.45, new MathContext(PRECISION)).setScale(SCALE),
+ row1.getDecimal("DECIMAL_COL"));
+ Assert.assertNull(row2.getDecimal("DECIMAL_COL"));
+ Assert.assertTrue((boolean) row1.get("BIT_COL"));
+ Assert.assertFalse((boolean) row2.get("BIT_COL"));
+ // Verify time columns
+ java.util.Date date = new java.util.Date(CURRENT_TS);
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+ LocalDate expectedDate = Date.valueOf(sdf.format(date)).toLocalDate();
+ sdf = new SimpleDateFormat("H:mm:ss");
+ LocalTime expectedTime = Time.valueOf(sdf.format(date)).toLocalTime();
+ ZonedDateTime expectedTs = date.toInstant().atZone(UTC_ZONE);
+ Assert.assertEquals(expectedDate, row1.getDate("DATE_COL"));
+ Assert.assertEquals(expectedTime, row1.getTime("TIME_COL"));
+ Assert.assertEquals(expectedDate.getYear(), (int) row1.getDate("YEAR_COL").getYear());
+ Assert.assertEquals(expectedTs, row1.getTimestamp("DATETIME_COL", UTC_ZONE));
+ Assert.assertEquals(expectedTs, row1.getTimestamp("TIMESTAMP_COL", UTC_ZONE));
+
+ // verify binary columns
+ Assert.assertEquals("user1", Bytes.toString(((ByteBuffer) row1.get("BINARY_COL")).array(), 0, 5));
+ Assert.assertEquals("user2", Bytes.toString(((ByteBuffer) row2.get("BINARY_COL")).array(), 0, 5));
+ Assert.assertEquals("user1", Bytes.toString(((ByteBuffer) row1.get("VARBINARY_COL")).array(), 0, 5));
+ Assert.assertEquals("user2", Bytes.toString(((ByteBuffer) row2.get("VARBINARY_COL")).array(), 0, 5));
+ Assert.assertEquals("user1", Bytes.toString(((ByteBuffer) row1.get("BLOB_COL")).array(), 0, 5));
+ Assert.assertEquals("user2", Bytes.toString(((ByteBuffer) row2.get("BLOB_COL")).array(), 0, 5));
+ Assert.assertEquals("user1", Bytes.toString(((ByteBuffer) row1.get("MEDIUMBLOB_COL")).array(), 0, 5));
+ Assert.assertEquals("user2", Bytes.toString(((ByteBuffer) row2.get("MEDIUMBLOB_COL")).array(), 0, 5));
+ Assert.assertEquals("user1", Bytes.toString(((ByteBuffer) row1.get("TINYBLOB_COL")).array(), 0, 5));
+ Assert.assertEquals("user2", Bytes.toString(((ByteBuffer) row2.get("TINYBLOB_COL")).array(), 0, 5));
+ Assert.assertEquals("user1", Bytes.toString(((ByteBuffer) row1.get("LONGBLOB_COL")).array(), 0, 5));
+ Assert.assertEquals("user2", Bytes.toString(((ByteBuffer) row2.get("LONGBLOB_COL")).array(), 0, 5));
+ }
+
+ @Test
+ public void testDbSourceMultipleTables() throws Exception {
+ String importQuery = "SELECT my_table.ID, your_table.NAME FROM my_table, your_table " +
+ "WHERE my_table.ID < 3 and my_table.ID = your_table.ID and $CONDITIONS ";
+ String boundingQuery = "SELECT LEAST(MIN(my_table.ID), MIN(your_table.ID)), " +
+ "GREATEST(MAX(my_table.ID), MAX(your_table.ID))";
+ String splitBy = "my_table.ID";
+ ETLPlugin sourceConfig = new ETLPlugin(
+ MariadbConstants.PLUGIN_NAME,
+ BatchSource.PLUGIN_TYPE,
+ ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(MariadbConstants.AUTO_RECONNECT, "true")
+ .put(MariadbConstants.USE_COMPRESSION, "true")
+ .put(MariadbConstants.ANSI_QUOTES_QUERY, "true")
+ .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery)
+ .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery)
+ .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy)
+ .put(Constants.Reference.REFERENCE_NAME, "DBMultipleTest")
+ .build(),
+ null
+ );
+
+ String outputDatasetName = "output-multitabletest";
+ ETLPlugin sinkConfig = MockSink.getPlugin(outputDatasetName);
+
+ ApplicationManager appManager = deployETL(sourceConfig, sinkConfig,
+ DATAPIPELINE_ARTIFACT, "testDBSourceWithMultipleTables");
+ runETLOnce(appManager);
+
+ // records should be written
+ DataSetManager outputManager = getDataset(outputDatasetName);
+ List outputRecords = MockSink.readOutput(outputManager);
+ Assert.assertEquals(2, outputRecords.size());
+ String userid = outputRecords.get(0).get("NAME");
+ StructuredRecord row1 = "user1".equals(userid) ? outputRecords.get(0) : outputRecords.get(1);
+ StructuredRecord row2 = "user1".equals(userid) ? outputRecords.get(1) : outputRecords.get(0);
+ // Verify data
+ Assert.assertEquals("user1", row1.get("NAME"));
+ Assert.assertEquals("user2", row2.get("NAME"));
+ Assert.assertEquals(1, row1.get("ID").intValue());
+ Assert.assertEquals(2, row2.get("ID").intValue());
+ }
+
+ @Test
+ public void testUserNamePasswordCombinations() throws Exception {
+ String importQuery = "SELECT * FROM my_table WHERE $CONDITIONS";
+ String boundingQuery = "SELECT MIN(ID),MAX(ID) from my_table";
+ String splitBy = "ID";
+
+ ETLPlugin sinkConfig = MockSink.getPlugin("outputTable");
+
+ Map baseSourceProps = ImmutableMap.builder()
+ .put(MariadbConstants.AUTO_RECONNECT, "true")
+ .put(MariadbConstants.USE_COMPRESSION, "true")
+ .put(MariadbConstants.ANSI_QUOTES_QUERY, "true")
+ .put(ConnectionConfig.HOST, BASE_PROPS.get(ConnectionConfig.HOST))
+ .put(ConnectionConfig.PORT, BASE_PROPS.get(ConnectionConfig.PORT))
+ .put(ConnectionConfig.DATABASE, BASE_PROPS.get(ConnectionConfig.DATABASE))
+ .put(ConnectionConfig.JDBC_PLUGIN_NAME, JDBC_DRIVER_NAME)
+ .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery)
+ .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery)
+ .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy)
+ .put(Constants.Reference.REFERENCE_NAME, "UserPassDBTest")
+ .build();
+
+ ApplicationId appId = NamespaceId.DEFAULT.app("dbTest");
+
+ // null user name, null password. Should succeed.
+ // as source
+ ETLPlugin dbConfig = new ETLPlugin(MariadbConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, baseSourceProps, null);
+ ETLStage table = new ETLStage("uniqueTableSink", sinkConfig);
+ ETLStage database = new ETLStage("databaseSource", dbConfig);
+ ETLBatchConfig etlConfig = ETLBatchConfig.builder()
+ .addStage(database)
+ .addStage(table)
+ .addConnection(database.getName(), table.getName())
+ .build();
+ AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlConfig);
+ deployApplication(appId, appRequest);
+
+ // null user name, non-null password. Should fail.
+ // as source
+ Map noUser = new HashMap<>(baseSourceProps);
+ noUser.put(ConnectionConfig.PASSWORD, "password");
+ database = new ETLStage("databaseSource",
+ new ETLPlugin(MariadbConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, noUser, null));
+ etlConfig = ETLBatchConfig.builder()
+ .addStage(database)
+ .addStage(table)
+ .addConnection(database.getName(), table.getName())
+ .build();
+ assertDeploymentFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
+ "Deploying DB Source with null username but non-null password should have failed.");
+
+ // non-null username, non-null, but empty password. Should succeed.
+ // as source
+ Map emptyPassword = new HashMap<>(baseSourceProps);
+ emptyPassword.put(ConnectionConfig.USER, "root");
+ emptyPassword.put(ConnectionConfig.PASSWORD, "");
+ database = new ETLStage("databaseSource",
+ new ETLPlugin(MariadbConstants.PLUGIN_NAME, BatchSource.PLUGIN_TYPE, emptyPassword, null));
+ etlConfig = ETLBatchConfig.builder()
+ .addStage(database)
+ .addStage(table)
+ .addConnection(database.getName(), table.getName())
+ .build();
+ appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, etlConfig);
+ deployApplication(appId, appRequest);
+ }
+
+ @Test
+ public void testNonExistentDBTable() throws Exception {
+ // source
+ String importQuery = "SELECT ID, NAME FROM dummy WHERE ID < 3 AND $CONDITIONS";
+ String boundingQuery = "SELECT MIN(ID),MAX(ID) FROM dummy";
+ String splitBy = "ID";
+ ETLPlugin sinkConfig = MockSink.getPlugin("table");
+ ETLPlugin sourceBadNameConfig = new ETLPlugin(
+ MariadbConstants.PLUGIN_NAME,
+ BatchSource.PLUGIN_TYPE,
+ ImmutableMap.builder()
+ .putAll(BASE_PROPS)
+ .put(MariadbConstants.AUTO_RECONNECT, "true")
+ .put(MariadbConstants.USE_COMPRESSION, "true")
+ .put(MariadbConstants.ANSI_QUOTES_QUERY, "true")
+ .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery)
+ .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery)
+ .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy)
+ .put(Constants.Reference.REFERENCE_NAME, "DBNonExistentTest")
+ .build(),
+ null);
+ ETLStage sink = new ETLStage("sink", sinkConfig);
+ ETLStage sourceBadName = new ETLStage("sourceBadName", sourceBadNameConfig);
+
+ ETLBatchConfig etlConfig = ETLBatchConfig.builder()
+ .addStage(sourceBadName)
+ .addStage(sink)
+ .addConnection(sourceBadName.getName(), sink.getName())
+ .build();
+ ApplicationId appId = NamespaceId.DEFAULT.app("dbSourceNonExistingTest");
+ assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
+ "ETL Application with DB Source should have failed because of a " +
+ "non-existent source table.", 1);
+
+ // Bad connection
+ ETLPlugin sourceBadConnConfig = new ETLPlugin(
+ MariadbConstants.PLUGIN_NAME,
+ BatchSource.PLUGIN_TYPE,
+ ImmutableMap.builder()
+ .put(MariadbConstants.AUTO_RECONNECT, "true")
+ .put(MariadbConstants.USE_COMPRESSION, "true")
+ .put(MariadbConstants.ANSI_QUOTES_QUERY, "true")
+ .put(ConnectionConfig.JDBC_PLUGIN_NAME, JDBC_DRIVER_NAME)
+ .put(ConnectionConfig.HOST, BASE_PROPS.get(ConnectionConfig.HOST))
+ .put(ConnectionConfig.PORT, BASE_PROPS.get(ConnectionConfig.PORT))
+ .put(ConnectionConfig.DATABASE, "dumDB")
+ .put(ConnectionConfig.USER, BASE_PROPS.get(ConnectionConfig.USER))
+ .put(ConnectionConfig.PASSWORD, BASE_PROPS.get(ConnectionConfig.PASSWORD))
+ .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery)
+ .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, boundingQuery)
+ .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, splitBy)
+ .put(Constants.Reference.REFERENCE_NAME, "MariaDBTest")
+ .build(),
+ null);
+ ETLStage sourceBadConn = new ETLStage("sourceBadConn", sourceBadConnConfig);
+ etlConfig = ETLBatchConfig.builder()
+ .addStage(sourceBadConn)
+ .addStage(sink)
+ .addConnection(sourceBadConn.getName(), sink.getName())
+ .build();
+ assertRuntimeFailure(appId, etlConfig, DATAPIPELINE_ARTIFACT,
+ "ETL Application with DB Source should have failed because of a " +
+ "non-existent source database.", 2);
+ }
+}
diff --git a/mariadb-plugin/widgets/Mariadb-action.json b/mariadb-plugin/widgets/Mariadb-action.json
new file mode 100644
index 000000000..bb78abb27
--- /dev/null
+++ b/mariadb-plugin/widgets/Mariadb-action.json
@@ -0,0 +1,161 @@
+{
+ "metadata": {
+ "spec-version": "1.5"
+ },
+ "display-name": "MariaDB Execute",
+ "configuration-groups": [
+ {
+ "label": "Basic",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Host",
+ "name": "host",
+ "widget-attributes": {
+ "default": "localhost"
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Port",
+ "name": "port",
+ "widget-attributes": {
+ "default": "3306"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Database",
+ "name": "database"
+ },
+ {
+ "widget-type": "textarea",
+ "label": "Database Query",
+ "name": "query"
+ }
+ ]
+ },
+ {
+ "label": "Credentials",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Username",
+ "name": "user"
+ },
+ {
+ "widget-type": "password",
+ "label": "Password",
+ "name": "password"
+ }
+ ]
+ },
+ {
+ "label": "SSL",
+ "properties": [
+ {
+ "label": "Use SSL",
+ "name": "useSSL",
+ "widget-type": "select",
+ "widget-attributes": {
+ "default": "If available",
+ "values": ["If available", "Yes", "No"]
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Keystore URL",
+ "name": "keyStore"
+ },
+ {
+ "widget-type": "password",
+ "label": "Keystore Password",
+ "name": "keyStorePassword"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Truststore URL",
+ "name": "trustStore"
+ },
+ {
+ "widget-type": "password",
+ "label": "Truststore Password",
+ "name": "trustStorePassword"
+ }
+ ]
+ },
+ {
+ "label": "Advanced",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Driver Name",
+ "name": "jdbcPluginName",
+ "widget-attributes": {
+ "default": "mariadb"
+ }
+ },
+ {
+ "widget-type": "toggle",
+ "label": "Use Compression",
+ "name": "useCompression",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "Yes"
+ },
+ "off": {
+ "value": "false",
+ "label": "No"
+ },
+ "default": "false"
+ }
+ },
+ {
+ "widget-type": "toggle",
+ "label": "Use ANSI Quotes",
+ "name": "useAnsiQuotes",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "Yes"
+ },
+ "off": {
+ "value": "false",
+ "label": "No"
+ },
+ "default": "false"
+ }
+ },
+ {
+ "widget-type": "keyvalue",
+ "label": "Connection Arguments",
+ "name": "connectionArguments",
+ "widget-attributes": {
+ "showDelimiter": "false",
+ "key-placeholder": "Key",
+ "value-placeholder": "Value",
+ "kv-delimiter": "=",
+ "delimiter": ";"
+ }
+ },
+ {
+ "widget-type": "toggle",
+ "label": "Auto Reconnect",
+ "name": "autoReconnect",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "Yes"
+ },
+ "off": {
+ "value": "false",
+ "label": "No"
+ },
+ "default": "false"
+ }
+ }
+ ]
+ }
+ ]
+}
diff --git a/mariadb-plugin/widgets/Mariadb-batchsink.json b/mariadb-plugin/widgets/Mariadb-batchsink.json
new file mode 100644
index 000000000..5ef662436
--- /dev/null
+++ b/mariadb-plugin/widgets/Mariadb-batchsink.json
@@ -0,0 +1,161 @@
+{
+ "metadata": {
+ "spec-version": "1.5"
+ },
+ "display-name": "MariaDB",
+ "configuration-groups": [
+ {
+ "label": "Basic",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Reference Name",
+ "name": "referenceName",
+ "widget-attributes": {
+ "placeholder": "Name used to identify this sink for lineage"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Host",
+ "name": "host",
+ "widget-attributes": {
+ "default": "localhost"
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Port",
+ "name": "port",
+ "widget-attributes": {
+ "default": "3306"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Database",
+ "name": "database"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Table Name",
+ "name": "tableName"
+ }
+ ]
+ },
+ {
+ "label": "Credentials",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Username",
+ "name": "user"
+ },
+ {
+ "widget-type": "password",
+ "label": "Password",
+ "name": "password"
+ }
+ ]
+ },
+ {
+ "label": "SSL",
+ "properties": [
+ {
+ "label": "Use SSL",
+ "name": "useSSL",
+ "widget-type": "select",
+ "widget-attributes": {
+ "default": "If available",
+ "values": ["If available", "Yes", "No"]
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Keystore URL",
+ "name": "keyStore"
+ },
+ {
+ "widget-type": "password",
+ "label": "Keystore Password",
+ "name": "keyStorePassword"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Truststore URL",
+ "name": "trustStore"
+ },
+ {
+ "widget-type": "password",
+ "label": "Truststore Password",
+ "name": "trustStorePassword"
+ }
+ ]
+ },
+ {
+ "label": "Advanced",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Driver Name",
+ "name": "jdbcPluginName",
+ "widget-attributes": {
+ "default": "mariadb"
+ }
+ },
+ {
+ "widget-type": "toggle",
+ "label": "Use Compression",
+ "name": "useCompression",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "Yes"
+ },
+ "off": {
+ "value": "false",
+ "label": "No"
+ },
+ "default": "false"
+ }
+ },
+ {
+ "widget-type": "keyvalue",
+ "label": "Connection Arguments",
+ "name": "connectionArguments",
+ "widget-attributes": {
+ "showDelimiter": "false",
+ "key-placeholder": "Key",
+ "value-placeholder": "Value",
+ "kv-delimiter": "=",
+ "delimiter": ";"
+ }
+ },
+ {
+ "widget-type": "toggle",
+ "label": "Auto Reconnect",
+ "name": "autoReconnect",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "Yes"
+ },
+ "off": {
+ "value": "false",
+ "label": "No"
+ },
+ "default": "false"
+ }
+ }
+ ]
+ }
+ ],
+ "outputs": [],
+ "jump-config": {
+ "datasets": [
+ {
+ "ref-property-name": "referenceName"
+ }
+ ]
+ }
+}
diff --git a/mariadb-plugin/widgets/Mariadb-batchsource.json b/mariadb-plugin/widgets/Mariadb-batchsource.json
new file mode 100644
index 000000000..7952a732e
--- /dev/null
+++ b/mariadb-plugin/widgets/Mariadb-batchsource.json
@@ -0,0 +1,238 @@
+{
+ "metadata": {
+ "spec-version": "1.5"
+ },
+ "display-name": "MariaDB",
+ "configuration-groups": [
+ {
+ "label": "Basic",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Reference Name",
+ "name": "referenceName",
+ "widget-attributes": {
+ "placeholder": "Name used to identify this source for lineage"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Host",
+ "name": "host",
+ "widget-attributes": {
+ "default": "localhost"
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Port",
+ "name": "port",
+ "widget-attributes": {
+ "default": "3306"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Database",
+ "name": "database"
+ },
+ {
+ "widget-type": "textarea",
+ "label": "Import Query",
+ "name": "importQuery",
+ "widget-attributes": {
+ "rows": "4"
+ },
+ "plugin-function": {
+ "widget": "outputSchema",
+ "output-property": "schema",
+ "omit-properties": [
+ {
+ "name": "schema"
+ }
+ ],
+ "add-properties": [
+ {
+ "name": "query",
+ "plugin-property-for-value": "importQuery",
+ "widget-type": "textarea",
+ "label": "Query",
+ "widget-attributes": {
+ "rows": "4"
+ }
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "textarea",
+ "label": "Bounding Query",
+ "name": "boundingQuery",
+ "widget-attributes": {
+ "rows": "4"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Split-By Field Name",
+ "name": "splitBy"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Number of Splits to Generate",
+ "name": "numSplits",
+ "widget-attributes": {
+ "default": "1"
+ }
+ }
+ ]
+ },
+ {
+ "label": "Credentials",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Username",
+ "name": "user"
+ },
+ {
+ "widget-type": "password",
+ "label": "Password",
+ "name": "password"
+ }
+ ]
+ },
+ {
+ "label": "SSL",
+ "properties": [
+ {
+ "label": "Use SSL",
+ "name": "useSSL",
+ "widget-type": "select",
+ "widget-attributes": {
+ "default": "If available",
+ "values": ["If available", "Yes", "No"]
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Keystore URL",
+ "name": "keyStore"
+ },
+ {
+ "widget-type": "password",
+ "label": "Keystore Password",
+ "name": "keyStorePassword"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Truststore URL",
+ "name": "trustStore"
+ },
+ {
+ "widget-type": "password",
+ "label": "Truststore Password",
+ "name": "trustStorePassword"
+ }
+ ]
+ },
+ {
+ "label": "Advanced",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Driver Name",
+ "name": "jdbcPluginName",
+ "widget-attributes": {
+ "default": "mariadb"
+ }
+ },
+ {
+ "widget-type": "toggle",
+ "label": "Use Compression",
+ "name": "useCompression",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "Yes"
+ },
+ "off": {
+ "value": "false",
+ "label": "No"
+ },
+ "default": "false"
+ }
+ },
+ {
+ "widget-type": "toggle",
+ "label": "Use ANSI Quotes",
+ "name": "useAnsiQuotes",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "Yes"
+ },
+ "off": {
+ "value": "false",
+ "label": "No"
+ },
+ "default": "false"
+ }
+ },
+ {
+ "widget-type": "keyvalue",
+ "label": "Connection Arguments",
+ "name": "connectionArguments",
+ "widget-attributes": {
+ "showDelimiter": "false",
+ "key-placeholder": "Key",
+ "value-placeholder": "Value",
+ "kv-delimiter": "=",
+ "delimiter": ";"
+ }
+ },
+ {
+ "widget-type": "toggle",
+ "label": "Auto Reconnect",
+ "name": "autoReconnect",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "Yes"
+ },
+ "off": {
+ "value": "false",
+ "label": "No"
+ },
+ "default": "false"
+ }
+ }
+ ]
+ }
+ ],
+ "outputs": [
+ {
+ "name": "schema",
+ "widget-type": "schema",
+ "widget-attributes": {
+ "schema-types": [
+ "boolean",
+ "int",
+ "long",
+ "float",
+ "double",
+ "bytes",
+ "string"
+ ],
+ "schema-default-type": "string"
+ }
+ }
+ ],
+ "jump-config": {
+ "datasets": [
+ {
+ "ref-property-name": "referenceName"
+ }
+ ]
+ }
+}
diff --git a/mariadb-plugin/widgets/Mariadb-postaction.json b/mariadb-plugin/widgets/Mariadb-postaction.json
new file mode 100644
index 000000000..8ecbaf011
--- /dev/null
+++ b/mariadb-plugin/widgets/Mariadb-postaction.json
@@ -0,0 +1,176 @@
+{
+ "metadata": {
+ "spec-version": "1.0"
+ },
+ "configuration-groups": [
+ {
+ "label": "Basic",
+ "properties": [
+ {
+ "widget-type": "select",
+ "label": "Run Condition",
+ "name": "runCondition",
+ "widget-attributes": {
+ "values": [
+ "completion",
+ "success",
+ "failure"
+ ],
+ "default": "success"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Host",
+ "name": "host",
+ "widget-attributes": {
+ "default": "localhost"
+ }
+ },
+ {
+ "widget-type": "number",
+ "label": "Port",
+ "name": "port",
+ "widget-attributes": {
+ "default": "3306"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Database",
+ "name": "database"
+ },
+ {
+ "widget-type": "textarea",
+ "label": "Query",
+ "name": "query",
+ "widget-attributes": {
+ "rows": "4"
+ }
+ }
+ ]
+ },
+ {
+ "label": "Credentials",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Username",
+ "name": "user"
+ },
+ {
+ "widget-type": "password",
+ "label": "Password",
+ "name": "password"
+ }
+ ]
+ },
+ {
+ "label": "SSL",
+ "properties": [
+ {
+ "label": "Use SSL",
+ "name": "useSSL",
+ "widget-type": "select",
+ "widget-attributes": {
+ "default": "If available",
+ "values": ["If available", "Yes", "No"]
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Keystore URL",
+ "name": "keyStore"
+ },
+ {
+ "widget-type": "password",
+ "label": "Keystore Password",
+ "name": "keyStorePassword"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Truststore URL",
+ "name": "trustStore"
+ },
+ {
+ "widget-type": "password",
+ "label": "Truststore Password",
+ "name": "trustStorePassword"
+ }
+ ]
+ },
+ {
+ "label": "Advanced",
+ "properties": [
+ {
+ "widget-type": "textbox",
+ "label": "Driver Name",
+ "name": "jdbcPluginName",
+ "widget-attributes": {
+ "default": "mariadb"
+ }
+ },
+ {
+ "widget-type": "toggle",
+ "label": "Use Compression",
+ "name": "useCompression",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "Yes"
+ },
+ "off": {
+ "value": "false",
+ "label": "No"
+ },
+ "default": "false"
+ }
+ },
+ {
+ "widget-type": "toggle",
+ "label": "Use ANSI Quotes",
+ "name": "useAnsiQuotes",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "Yes"
+ },
+ "off": {
+ "value": "false",
+ "label": "No"
+ },
+ "default": "false"
+ }
+ },
+ {
+ "widget-type": "keyvalue",
+ "label": "Connection Arguments",
+ "name": "connectionArguments",
+ "widget-attributes": {
+ "showDelimiter": "false",
+ "key-placeholder": "Key",
+ "value-placeholder": "Value",
+ "kv-delimiter": "=",
+ "delimiter": ";"
+ }
+ },
+ {
+ "widget-type": "toggle",
+ "label": "Auto Reconnect",
+ "name": "autoReconnect",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "Yes"
+ },
+ "off": {
+ "value": "false",
+ "label": "No"
+ },
+ "default": "false"
+ }
+ }
+ ]
+ }
+ ]
+}
diff --git a/pom.xml b/pom.xml
index 6d07b86e5..06f3f1eea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,7 @@
database-commons
generic-database-plugin
mysql-plugin
+ mariadb-plugin
postgresql-plugin
mssql-plugin
oracle-plugin