getClass(FIELD_TYPE_KEY + i, null, cl);
- if (clazz == null) {
- throw new IllegalArgumentException("Invalid configuration for JDBCOutputFormat: "
- + "No type class for parameter " + i);
- }
- this.fieldClasses[i] = clazz;
- }
- }
- catch (ClassNotFoundException e) {
- throw new RuntimeException("Could not load data type classes.", e);
- }
- }
-
- /**
- * Connects to the target database and initializes the prepared statement.
- *
- * @param taskNumber The number of the parallel instance.
- * @throws IOException Thrown, if the output could not be opened due to an
- * I/O problem.
- */
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- try {
- establishConnection();
- upload = dbConn.prepareStatement(query);
- } catch (SQLException sqe) {
- throw new IllegalArgumentException("open() failed:\t!", sqe);
- } catch (ClassNotFoundException cnfe) {
- throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe);
- }
- }
-
- private void establishConnection() throws SQLException, ClassNotFoundException {
- Class.forName(driverName);
- if (username == null) {
- dbConn = DriverManager.getConnection(dbURL);
- } else {
- dbConn = DriverManager.getConnection(dbURL, username, password);
- }
- }
-
- /**
- * Adds a record to the prepared statement.
- *
- * When this method is called, the output format is guaranteed to be opened.
- *
- * @param record The records to add to the output.
- * @throws IOException Thrown, if the records could not be added due to an
- * I/O problem.
- */
-
- @Override
- public void writeRecord(Record record) throws IOException {
- try {
- for (int x = 0; x < record.getNumFields(); x++) {
- Value temp = record.getField(x, fieldClasses[x]);
- addValue(x + 1, temp);
- }
- upload.addBatch();
- batchCount++;
- if(batchCount >= batchInterval) {
- upload.executeBatch();
- batchCount = 0;
- }
- } catch (SQLException sqe) {
- throw new IllegalArgumentException("writeRecord() failed:\t", sqe);
- } catch (IllegalArgumentException iae) {
- throw new IllegalArgumentException("writeRecord() failed:\t", iae);
- }
- }
-
- private enum pactType {
- BooleanValue,
- ByteValue,
- CharValue,
- DoubleValue,
- FloatValue,
- IntValue,
- LongValue,
- ShortValue,
- StringValue
- }
-
- private void addValue(int index, Value value) throws SQLException {
- pactType type;
- try {
- type = pactType.valueOf(value.getClass().getSimpleName());
- } catch (IllegalArgumentException iae) {
- throw new IllegalArgumentException("PactType not supported:\t", iae);
- }
- switch (type) {
- case BooleanValue:
- upload.setBoolean(index, ((BooleanValue) value).getValue());
- break;
- case ByteValue:
- upload.setByte(index, ((ByteValue) value).getValue());
- break;
- case CharValue:
- upload.setString(index, String.valueOf(((CharValue) value).getValue()));
- break;
- case DoubleValue:
- upload.setDouble(index, ((DoubleValue) value).getValue());
- break;
- case FloatValue:
- upload.setFloat(index, ((FloatValue) value).getValue());
- break;
- case IntValue:
- upload.setInt(index, ((IntValue) value).getValue());
- break;
- case LongValue:
- upload.setLong(index, ((LongValue) value).getValue());
- break;
- case ShortValue:
- upload.setShort(index, ((ShortValue) value).getValue());
- break;
- case StringValue:
- upload.setString(index, ((StringValue) value).getValue());
- break;
- }
- }
-
- /**
- * Executes prepared statement and closes all resources of this instance.
- *
- * @throws IOException Thrown, if the input could not be closed properly.
- */
- @Override
- public void close() throws IOException {
- try {
- upload.executeBatch();
- batchCount = 0;
- upload.close();
- dbConn.close();
- } catch (SQLException sqe) {
- throw new IllegalArgumentException("close() failed:\t", sqe);
- }
- }
-
- /**
- * Creates a configuration builder that can be used to set the
- * output format's parameters to the config in a fluent fashion.
- *
- * @return A config builder for setting parameters.
- */
- public static ConfigBuilder configureOutputFormat(GenericDataSink target) {
- return new ConfigBuilder(target.getParameters());
- }
-
- /**
- * Abstract builder used to set parameters to the output format's
- * configuration in a fluent way.
- */
- protected static abstract class AbstractConfigBuilder
- extends FileOutputFormat.AbstractConfigBuilder {
-
- /**
- * Creates a new builder for the given configuration.
- *
- * @param config The configuration into which the parameters will be written.
- */
- protected AbstractConfigBuilder(Configuration config) {
- super(config);
- }
-
- /**
- * Sets the query field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setQuery(String value) {
- this.config.setString(QUERY_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the url field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setUrl(String value) {
- this.config.setString(URL_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the username field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setUsername(String value) {
- this.config.setString(USERNAME_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the password field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setPassword(String value) {
- this.config.setString(PASSWORD_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the driver field.
- * @param value value to be set.
- * @return The builder itself.
- */
- public T setDriver(String value) {
- this.config.setString(DRIVER_KEY, value);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
-
- /**
- * Sets the type of a column.
- * Types are applied in the order they were set.
- * @param type PactType to apply.
- * @return The builder itself.
- */
- public T setClass(Class extends Value> type) {
- final int numYet = this.config.getInteger(FIELD_COUNT_KEY, 0);
- this.config.setClass(FIELD_TYPE_KEY + numYet, type);
- this.config.setInteger(FIELD_COUNT_KEY, numYet + 1);
- @SuppressWarnings("unchecked")
- T ret = (T) this;
- return ret;
- }
- }
-
- /**
- * A builder used to set parameters to the output format's configuration in a fluent way.
- */
- public static final class ConfigBuilder extends AbstractConfigBuilder {
- /**
- * Creates a new builder for the given configuration.
- *
- * @param targetConfig The configuration into which the parameters will be written.
- */
- protected ConfigBuilder(Configuration targetConfig) {
- super(targetConfig);
- }
- }
-}
diff --git a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java b/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
deleted file mode 100644
index 213fd6a714738..0000000000000
--- a/flink-staging/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc.example;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.record.io.jdbc.JDBCOutputFormat;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.StringValue;
-
-/**
- * Stand-alone example for the JDBC connector.
- *
- * NOTE: To run this example, you need the apache derby code in your classpath.
- * See the Maven file (pom.xml) for a reference to the derby dependency. You can
- * simply Change the scope of the Maven dependency from test to compile.
- */
-public class JDBCExample implements Program, ProgramDescription {
-
- @Override
- public Plan getPlan(String[] args) {
- /*
- * In this example we use the constructor where the url contains all the settings that are needed.
- * You could also use the default constructor and deliver a Configuration with all the needed settings.
- * You also could set the settings to the source-instance.
- */
- GenericDataSource source = new GenericDataSource(
- new JDBCInputFormat(
- "org.apache.derby.jdbc.EmbeddedDriver",
- "jdbc:derby:memory:ebookshop",
- "select * from books"),
- "Data Source");
-
- GenericDataSink sink = new GenericDataSink(new JDBCOutputFormat(), "Data Output");
- JDBCOutputFormat.configureOutputFormat(sink)
- .setDriver("org.apache.derby.jdbc.EmbeddedDriver")
- .setUrl("jdbc:derby:memory:ebookshop")
- .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
- .setClass(IntValue.class)
- .setClass(StringValue.class)
- .setClass(StringValue.class)
- .setClass(FloatValue.class)
- .setClass(IntValue.class);
-
- sink.addInput(source);
- return new Plan(sink, "JDBC Example Job");
- }
-
- @Override
- public String getDescription() {
- return "Parameter:";
- }
-
- /*
- * To run this example, you need the apache derby code in your classpath!
- */
- public static void main(String[] args) throws Exception {
-
- prepareTestDb();
- JDBCExample tut = new JDBCExample();
- JobExecutionResult res = LocalExecutor.execute(tut, args);
- System.out.println("runtime: " + res.getNetRuntime() + " ms");
-
- System.exit(0);
- }
-
- private static void prepareTestDb() throws Exception {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- Connection conn = DriverManager.getConnection(dbURL);
-
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
-
- sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
-
- sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
- stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
-
- conn.close();
- }
-}
diff --git a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
deleted file mode 100644
index 172f58573493d..0000000000000
--- a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.OutputStream;
-
-public class DevNullLogStream {
-
- public static final OutputStream DEV_NULL = new OutputStream() {
- public void write(int b) {}
- };
-
-}
diff --git a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
deleted file mode 100644
index 8e0a2c5636993..0000000000000
--- a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.junit.Assert;
-
-import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCInputFormatTest {
- JDBCInputFormat jdbcInputFormat;
- Configuration config;
- static Connection conn;
- static final Value[][] dbData = {
- {new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
- {new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
- {new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)},
- {new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
- {new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
-
- @BeforeClass
- public static void setUpClass() {
- try {
- prepareDerbyDatabase();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void prepareDerbyDatabase() throws ClassNotFoundException {
- System.setProperty("derby.stream.error.field","org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- createConnection(dbURL);
- }
-
- private static void cleanUpDerbyDatabases() {
- try {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- Statement stat = conn.createStatement();
- stat.executeUpdate("DROP TABLE books");
- stat.close();
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- /*
- Loads JDBC derby driver ; creates(if necessary) and populates database.
- */
- private static void createConnection(String dbURL) {
- try {
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- createTable();
- insertDataToSQLTables();
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void createTable() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
-
- sqlQueryBuilder = new StringBuilder("CREATE TABLE bookscontent (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("content BLOB(10K) DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
- }
-
- private static void insertDataToSQLTables() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
- Statement stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
-
- sqlQueryBuilder = new StringBuilder("INSERT INTO bookscontent (id, title, content) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', CAST(X'7f454c4602' AS BLOB)),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', CAST(X'7f454c4602' AS BLOB)),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', CAST(X'7f454c4602' AS BLOB)),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', CAST(X'7f454c4602' AS BLOB)),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', CAST(X'7f454c4602' AS BLOB))");
-
- stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
- }
-
-
- @After
- public void tearDown() {
- jdbcInputFormat = null;
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidConnection() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:idontexist", "select * from books;");
- jdbcInputFormat.configure(null);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidQuery() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "abc");
- jdbcInputFormat.configure(null);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidDBType() {
- jdbcInputFormat = new JDBCInputFormat("idontexist.Driver", "jdbc:derby:memory:ebookshop", "select * from books;");
- jdbcInputFormat.configure(null);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testUnsupportedSQLType() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from bookscontent");
- jdbcInputFormat.configure(null);
- jdbcInputFormat.nextRecord(new Record());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testNotConfiguredFormatNext() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
- jdbcInputFormat.nextRecord(new Record());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testNotConfiguredFormatEnd() {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
- jdbcInputFormat.reachedEnd();
- }
-
- @Test
- public void testJDBCInputFormat() throws IOException {
- jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
- jdbcInputFormat.configure(null);
- Record record = new Record();
- int recordCount = 0;
- while (!jdbcInputFormat.reachedEnd()) {
- jdbcInputFormat.nextRecord(record);
- Assert.assertEquals(5, record.getNumFields());
- Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass());
- Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass());
- Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass());
- Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
- Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass());
-
- int[] pos = {0, 1, 2, 3, 4};
- Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()};
- Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values));
-
- recordCount++;
- }
- Assert.assertEquals(5, recordCount);
-
- cleanUpDerbyDatabases();
- }
-
-}
diff --git a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java b/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
deleted file mode 100644
index c824ea1a61703..0000000000000
--- a/flink-staging/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormatTest.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io.jdbc;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.junit.Assert;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class JDBCOutputFormatTest {
- private JDBCInputFormat jdbcInputFormat;
- private JDBCOutputFormat jdbcOutputFormat;
-
- private static Connection conn;
-
- static final Value[][] dbData = {
- {new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
- {new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
- {new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)},
- {new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
- {new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
-
- @BeforeClass
- public static void setUpClass() {
- try {
- System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
- prepareDerbyInputDatabase();
- prepareDerbyOutputDatabase();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void cleanUpDerbyDatabases() {
- try {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- Statement stat = conn.createStatement();
- stat.executeUpdate("DROP TABLE books");
- stat.executeUpdate("DROP TABLE newbooks");
- stat.close();
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void prepareDerbyInputDatabase() throws ClassNotFoundException {
- try {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- createTableBooks();
- insertDataToSQLTables();
- conn.close();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- Assert.fail();
- } catch (SQLException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void prepareDerbyOutputDatabase() throws ClassNotFoundException {
- try {
- String dbURL = "jdbc:derby:memory:ebookshop;create=true";
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
- conn = DriverManager.getConnection(dbURL);
- createTableNewBooks();
- conn.close();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- Assert.fail();
- } catch (SQLException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- private static void createTableBooks() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
- }
-
- private static void createTableNewBooks() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
- sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
- sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
- sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
- sqlQueryBuilder.append("qty INT DEFAULT NULL,");
- sqlQueryBuilder.append("PRIMARY KEY (id))");
-
- Statement stat = conn.createStatement();
- stat.executeUpdate(sqlQueryBuilder.toString());
- stat.close();
- }
-
- private static void insertDataToSQLTables() throws SQLException {
- StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
- sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
- sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
- sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
- sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
- sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
- Statement stat = conn.createStatement();
- stat.execute(sqlQueryBuilder.toString());
- stat.close();
- }
-
-
- @After
- public void tearDown() {
- jdbcOutputFormat = null;
- cleanUpDerbyDatabases();
- }
-
- @Test
- public void testJDBCOutputFormat() throws IOException {
- String sourceTable = "books";
- String targetTable = "newbooks";
- String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
- String dbUrl = "jdbc:derby:memory:ebookshop";
-
- Configuration cfg = new Configuration();
- cfg.setString("driver", driverPath);
- cfg.setString("url", dbUrl);
- cfg.setString("query", "insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)");
- cfg.setInteger("fields", 5);
- cfg.setClass("type0", IntValue.class);
- cfg.setClass("type1", StringValue.class);
- cfg.setClass("type2", StringValue.class);
- cfg.setClass("type3", FloatValue.class);
- cfg.setClass("type4", IntValue.class);
-
- jdbcOutputFormat = new JDBCOutputFormat();
- jdbcOutputFormat.configure(cfg);
- jdbcOutputFormat.open(0,1);
-
- jdbcInputFormat = new JDBCInputFormat(
- driverPath,
- dbUrl,
- "select * from " + sourceTable);
- jdbcInputFormat.configure(null);
-
- Record record = new Record();
- while (!jdbcInputFormat.reachedEnd()) {
- jdbcInputFormat.nextRecord(record);
- jdbcOutputFormat.writeRecord(record);
- }
-
- jdbcOutputFormat.close();
- jdbcInputFormat.close();
-
- jdbcInputFormat = new JDBCInputFormat(
- driverPath,
- dbUrl,
- "select * from " + targetTable);
- jdbcInputFormat.configure(null);
-
- int recordCount = 0;
- while (!jdbcInputFormat.reachedEnd()) {
- jdbcInputFormat.nextRecord(record);
- Assert.assertEquals(5, record.getNumFields());
- Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass());
- Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass());
- Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass());
- Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
- Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass());
-
- int[] pos = {0, 1, 2, 3, 4};
- Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()};
- Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values));
-
- recordCount++;
- }
- Assert.assertEquals(5, recordCount);
-
- jdbcInputFormat.close();
- }
-}