diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/RangeInputSplit.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/RangeInputSplit.java new file mode 100644 index 0000000000000..d9357c2215e74 --- /dev/null +++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/RangeInputSplit.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Public; +import org.apache.flink.core.io.InputSplit; + +/** + * A range input split provides information about a particular range of keys, each with a min and a max value. + */ +@Public +public class RangeInputSplit implements InputSplit{ + + private static final long serialVersionUID = 4310893817447171721L; + + private long min; + private long max; + + /** The number of the split. */ + private final int splitNumber; + + public RangeInputSplit(int splitNumber, long min, long max){ + this.min = min; + this.max = max; + this.splitNumber = splitNumber; + } + + @Override + public int getSplitNumber() { + return splitNumber; + } + + public long getMin() { + return min; + } + + public long getMax() { + return max; + } + + +} diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java index b7643508f17d5..b094af490ed4a 100644 --- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java +++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java @@ -26,13 +26,12 @@ import java.sql.SQLException; import java.sql.Statement; -import org.apache.flink.api.common.io.NonParallelInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.io.RangeInputSplit; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.GenericInputSplit; @@ -44,28 +43,38 @@ * InputFormat to read data from a database and generate tuples. * The InputFormat has to be configured using the supplied InputFormatBuilder. * + * Remark: split (if set) works only if the split column is numeric + * * @param * @see Tuple * @see DriverManager */ -public class JDBCInputFormat extends RichInputFormat implements NonParallelInput { - private static final long serialVersionUID = 1L; +public class JDBCInputFormat extends RichInputFormat { + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class); private String username; private String password; private String drivername; private String dbURL; - private String query; + private String queryTemplate; private int resultSetType; private int resultSetConcurrency; private transient Connection dbConn; private transient Statement statement; private transient ResultSet resultSet; + + private int[] columnTypes; - private int[] columnTypes = null; + private String splitColumnName; + private long max; + private long min; + private long fetchSize; + + private static final String BETWEEN = "(%s BETWEEN %s AND %s)"; + public static final String CONDITIONS = "$CONDITIONS"; public JDBCInputFormat() { } @@ -81,19 +90,34 @@ public void configure(Configuration parameters) { * @throws IOException */ @Override - public void open(InputSplit ignored) throws IOException { + public void open(InputSplit inputSplit) throws IOException { try { + //TODO is this performed once per Task Manager..? establishConnection(); statement = dbConn.createStatement(resultSetType, resultSetConcurrency); + String query = queryTemplate; + if(isSplitConfigured()){ + RangeInputSplit jdbcInputSplit = (RangeInputSplit) inputSplit; + long start = jdbcInputSplit.getMin(); + long end = jdbcInputSplit.getMax(); + if(isSplitConfigured()){ + query = queryTemplate.replace(CONDITIONS, String.format(BETWEEN, splitColumnName, start, end)); + } + } + LOG.debug(query); resultSet = statement.executeQuery(query); } catch (SQLException se) { - close(); + //close(); already closed by the caller throw new IllegalArgumentException("open() failed." + se.getMessage(), se); } catch (ClassNotFoundException cnfe) { throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe); } } + private boolean isSplitConfigured() { + return splitColumnName!=null; + } + private void establishConnection() throws SQLException, ClassNotFoundException { Class.forName(drivername); if (username == null) { @@ -139,13 +163,10 @@ public void close() throws IOException { @Override public boolean reachedEnd() throws IOException { try { - if (resultSet.isLast()) { - close(); - return true; - } - return false; + return !resultSet.next(); } catch (SQLException se) { - throw new IOException("Couldn't evaluate reachedEnd() - " + se.getMessage(), se); + //close();already closed by the caller + throw new IOException("ResultSet error during next() - " + se.getMessage(), se); } } @@ -159,17 +180,16 @@ public boolean reachedEnd() throws IOException { @Override public OUT nextRecord(OUT tuple) throws IOException { try { - resultSet.next(); if (columnTypes == null) { extractTypes(tuple); } addValue(tuple); return tuple; } catch (SQLException se) { - close(); + //close();already closed by the caller throw new IOException("Couldn't read data - " + se.getMessage(), se); } catch (NullPointerException npe) { - close(); + //close();already closed by the caller throw new IOException("Couldn't access resultSet", npe); } } @@ -178,8 +198,8 @@ private void extractTypes(OUT tuple) throws SQLException, IOException { ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); columnTypes = new int[resultSetMetaData.getColumnCount()]; if (tuple.getArity() != columnTypes.length) { - close(); - throw new IOException("Tuple size does not match columncount"); + //close();already closed by the caller + throw new IOException("Tuple size does not match column count"); } for (int pos = 0; pos < columnTypes.length; pos++) { columnTypes[pos] = resultSetMetaData.getColumnType(pos + 1); @@ -193,6 +213,8 @@ private void extractTypes(OUT tuple) throws SQLException, IOException { */ private void addValue(OUT reuse) throws IOException, SQLException { for (int pos = 0; pos < columnTypes.length; pos++) { + //TODO what if null?? use strings for now. Maybe use Row for JDBC?? + Object o = resultSet.getObject(pos + 1); try { switch (columnTypes[pos]) { case java.sql.Types.NULL: @@ -211,7 +233,8 @@ private void addValue(OUT reuse) throws IOException, SQLException { reuse.setField(resultSet.getString(pos + 1), pos); break; case java.sql.Types.VARCHAR: - reuse.setField(resultSet.getString(pos + 1), pos); + //TODO manage null fields + reuse.setField(o == null ? "" :resultSet.getString(pos + 1), pos); break; case java.sql.Types.LONGVARCHAR: reuse.setField(resultSet.getString(pos + 1), pos); @@ -232,7 +255,7 @@ private void addValue(OUT reuse) throws IOException, SQLException { reuse.setField(resultSet.getInt(pos + 1), pos); break; case java.sql.Types.FLOAT: - reuse.setField(resultSet.getDouble(pos + 1), pos); + reuse.setField(resultSet.getFloat(pos + 1), pos); break; case java.sql.Types.REAL: reuse.setField(resultSet.getFloat(pos + 1), pos); @@ -241,10 +264,29 @@ private void addValue(OUT reuse) throws IOException, SQLException { reuse.setField(resultSet.getDouble(pos + 1), pos); break; case java.sql.Types.DECIMAL: - reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos); - break; + //TODO manage null fields + try { + if (o == null) + reuse.setField("", pos); + else + reuse.setField(resultSet.getBigDecimal(pos + 1).toPlainString(), pos); + } catch (SQLException e) { + System.err.println("error reading at position " + pos + " setting blank field!"); + reuse.setField("", pos); + // throw e; + } case java.sql.Types.NUMERIC: - reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos); + //TODO manage null fields + try { + if (o == null) + reuse.setField("", pos); + else + reuse.setField(resultSet.getBigDecimal(pos + 1).toPlainString(), pos); + } catch (SQLException e) { + System.err.println("error reading at position " + pos + " setting blank field!"); + reuse.setField("", pos); + // throw e; + } break; case java.sql.Types.DATE: reuse.setField(resultSet.getDate(pos + 1).toString(), pos); @@ -253,7 +295,11 @@ private void addValue(OUT reuse) throws IOException, SQLException { reuse.setField(resultSet.getTime(pos + 1).getTime(), pos); break; case java.sql.Types.TIMESTAMP: - reuse.setField(resultSet.getTimestamp(pos + 1).toString(), pos); + //TODO manage null fields + if (o == null) + reuse.setField("", pos + 1); + else + reuse.setField(resultSet.getTimestamp(pos + 1).toString(), pos); break; case java.sql.Types.SQLXML: reuse.setField(resultSet.getSQLXML(pos + 1).toString(), pos); @@ -289,17 +335,38 @@ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOEx @Override public InputSplit[] createInputSplits(int minNumSplits) throws IOException { - GenericInputSplit[] split = { - new GenericInputSplit(0, 1) - }; - return split; + if(!isSplitConfigured()){ + GenericInputSplit[] split = { + new GenericInputSplit(0, 1) + }; + return split; + } + + double maxEelemCount = (max - min) +1; + if(min==0) + maxEelemCount = max + 1; + int size = new Double(Math.ceil( maxEelemCount / fetchSize)).intValue(); + if(minNumSplits > size){ + size = minNumSplits; + fetchSize = new Double(Math.ceil( maxEelemCount / minNumSplits)).intValue(); + } + RangeInputSplit[] ret = new RangeInputSplit[size]; + int count = 0; + for (long i = min; i < max; i += fetchSize, count++) { + long currentLimit = i + fetchSize - 1; + ret[count] = new RangeInputSplit(count, i, currentLimit); + if (currentLimit + 1 + fetchSize > max) { + ret[count+1] = new RangeInputSplit(count, currentLimit + 1, max); + break; + } + } + return ret; } @Override public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { return new DefaultInputSplitAssigner(inputSplits); } - /** * A builder used to set parameters to the output format's configuration in a fluent way. @@ -310,8 +377,9 @@ public static JDBCInputFormatBuilder buildJDBCInputFormat() { } public static class JDBCInputFormatBuilder { - private final JDBCInputFormat format; + private final JDBCInputFormat format; + @SuppressWarnings("rawtypes") public JDBCInputFormatBuilder() { this.format = new JDBCInputFormat(); this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY; @@ -339,7 +407,7 @@ public JDBCInputFormatBuilder setDBUrl(String dbURL) { } public JDBCInputFormatBuilder setQuery(String query) { - format.query = query; + format.queryTemplate = query; return this; } @@ -352,8 +420,15 @@ public JDBCInputFormatBuilder setResultSetConcurrency(int resultSetConcurrency) format.resultSetConcurrency = resultSetConcurrency; return this; } + public JDBCInputFormatBuilder setSplitConfig(String splitColumnName,long fetchSize, long min, long max) { + format.splitColumnName = splitColumnName; + format.fetchSize = fetchSize; + format.min = min; + format.max = max; + return this; + } - public JDBCInputFormat finish() { + public JDBCInputFormat finish() { if (format.username == null) { LOG.info("Username was not supplied separately."); } @@ -363,14 +438,29 @@ public JDBCInputFormat finish() { if (format.dbURL == null) { throw new IllegalArgumentException("No database URL supplied"); } - if (format.query == null) { + if (format.queryTemplate == null) { throw new IllegalArgumentException("No query supplied"); } if (format.drivername == null) { throw new IllegalArgumentException("No driver supplied"); } + adjustQueryTemplateIfNecessary(); return format; } + + /** Try to add $CONDITIONS token automatically (at least for straightforward cases) */ + private void adjustQueryTemplateIfNecessary() { + if(!format.isSplitConfigured()) + return; + if(!format.queryTemplate.toLowerCase().contains("where")){ + if(format.queryTemplate.contains(";")) + format.queryTemplate = format.queryTemplate.replace(";", ""); + format.queryTemplate += " WHERE "+CONDITIONS; + }else if(!format.queryTemplate.contains(CONDITIONS)){ + //if not simple query and no $CONDITIONS avoid dangerous heuristics + throw new IllegalArgumentException("Usage of splits requires to specify "+CONDITIONS+" in the query for their generation"); + } + } } } diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyUtil.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyUtil.java index 523b8b56ca868..cfd4a9eff1d50 100644 --- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyUtil.java +++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyUtil.java @@ -19,7 +19,6 @@ import java.io.OutputStream; -@SuppressWarnings("unused") /** * Utility class to disable derby logging */ diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java index 3fb0278c4ed31..098ebe7dabe36 100644 --- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java +++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java @@ -27,7 +27,7 @@ import org.junit.Assert; - +import org.apache.flink.api.java.io.jdbc.example.JDBCExample; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.junit.After; @@ -36,16 +36,10 @@ import org.junit.Test; public class JDBCInputFormatTest { - JDBCInputFormat jdbcInputFormat; - - static Connection conn; - - static final Object[][] dbData = { - {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11}, - {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22}, - {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33}, - {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44}, - {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}}; + + private static Connection conn; + + private JDBCInputFormat jdbcInputFormat; @BeforeClass public static void setUpClass() { @@ -58,41 +52,28 @@ public static void setUpClass() { private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException { System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL"); - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - conn = DriverManager.getConnection(dbURL); + Class.forName(JDBCExample.DRIVER_CLASS); + conn = DriverManager.getConnection(JDBCExample.DB_URL+";create=true"); createTable(); insertDataToSQLTable(); conn.close(); } private static void createTable() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - Statement stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); + stat.executeUpdate(JDBCExample.getCreateQuery()); stat.close(); } - private static void insertDataToSQLTable() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); - sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); - sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); - sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); - sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); + private static void insertDataToSQLTable() throws SQLException { Statement stat = conn.createStatement(); - stat.execute(sqlQueryBuilder.toString()); + stat.execute(JDBCExample.getInsertQuery()); stat.close(); } + + @AfterClass public static void tearDownClass() { cleanUpDerbyDatabases(); @@ -100,12 +81,10 @@ public static void tearDownClass() { private static void cleanUpDerbyDatabases() { try { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - - conn = DriverManager.getConnection(dbURL); + Class.forName(JDBCExample.DRIVER_CLASS); + conn = DriverManager.getConnection(JDBCExample.DB_URL); Statement stat = conn.createStatement(); - stat.executeUpdate("DROP TABLE books"); + stat.execute("DROP TABLE books"); stat.close(); conn.close(); } catch (Exception e) { @@ -115,7 +94,9 @@ private static void cleanUpDerbyDatabases() { } @After - public void tearDown() { + public void tearDown() throws IOException { + if(jdbcInputFormat!=null) + jdbcInputFormat.close(); jdbcInputFormat = null; } @@ -123,8 +104,8 @@ public void tearDown() { public void testInvalidDriver() throws IOException { jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() .setDrivername("org.apache.derby.jdbc.idontexist") - .setDBUrl("jdbc:derby:memory:ebookshop") - .setQuery("select * from books") + .setDBUrl(JDBCExample.DB_URL) + .setQuery(JDBCExample.SELECT_ALL_BOOKS) .finish(); jdbcInputFormat.open(null); } @@ -132,9 +113,9 @@ public void testInvalidDriver() throws IOException { @Test(expected = IllegalArgumentException.class) public void testInvalidURL() throws IOException { jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDrivername(JDBCExample.DRIVER_CLASS) .setDBUrl("jdbc:der:iamanerror:mory:ebookshop") - .setQuery("select * from books") + .setQuery(JDBCExample.SELECT_ALL_BOOKS) .finish(); jdbcInputFormat.open(null); } @@ -142,8 +123,8 @@ public void testInvalidURL() throws IOException { @Test(expected = IllegalArgumentException.class) public void testInvalidQuery() throws IOException { jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setDBUrl("jdbc:derby:memory:ebookshop") + .setDrivername(JDBCExample.DRIVER_CLASS) + .setDBUrl(JDBCExample.DB_URL) .setQuery("iamnotsql") .finish(); jdbcInputFormat.open(null); @@ -152,28 +133,28 @@ public void testInvalidQuery() throws IOException { @Test(expected = IllegalArgumentException.class) public void testIncompleteConfiguration() throws IOException { jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setQuery("select * from books") + .setDrivername(JDBCExample.DRIVER_CLASS) + .setQuery(JDBCExample.SELECT_ALL_BOOKS) .finish(); } @Test(expected = IOException.class) public void testIncompatibleTuple() throws IOException { jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setDBUrl("jdbc:derby:memory:ebookshop") - .setQuery("select * from books") + .setDrivername(JDBCExample.DRIVER_CLASS) + .setDBUrl(JDBCExample.DB_URL) + .setQuery(JDBCExample.SELECT_ALL_BOOKS) .finish(); jdbcInputFormat.open(null); jdbcInputFormat.nextRecord(new Tuple2()); } @Test - public void testJDBCInputFormat() throws IOException { + public void testJDBCInputFormatWithoutParallelism() throws IOException { jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setDBUrl("jdbc:derby:memory:ebookshop") - .setQuery("select * from books") + .setDrivername(JDBCExample.DRIVER_CLASS) + .setDBUrl(JDBCExample.DB_URL) + .setQuery(JDBCExample.SELECT_ALL_BOOKS) .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) .finish(); jdbcInputFormat.open(null); @@ -181,18 +162,20 @@ public void testJDBCInputFormat() throws IOException { int recordCount = 0; while (!jdbcInputFormat.reachedEnd()) { jdbcInputFormat.nextRecord(tuple); - Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass()); - Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass()); - Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass()); - Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass()); - Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass()); + if(tuple.getField(0)!=null) Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass()); + if(tuple.getField(1)!=null) Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass()); + if(tuple.getField(2)!=null) Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass()); + if(tuple.getField(3)!=null) Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass()); + if(tuple.getField(4)!=null) Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass()); for (int x = 0; x < 5; x++) { - Assert.assertEquals(dbData[recordCount][x], tuple.getField(x)); + //TODO how to handle null for double??? + if(JDBCExample.testData[recordCount][x]!=null) + Assert.assertEquals(JDBCExample.testData[recordCount][x], tuple.getField(x)); } recordCount++; } - Assert.assertEquals(5, recordCount); + Assert.assertEquals(10, recordCount); } } diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java index 840a31411ddf4..06214b05841e7 100644 --- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java +++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java @@ -29,28 +29,62 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder; import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.TupleTypeInfo; public class JDBCExample { - + + public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver"; + public static final String DB_URL = "jdbc:derby:memory:ebookshop"; + public static final String INPUT_TABLE = "books"; + public static final String SELECT_ALL_BOOKS = "select * from books"; + + public static final boolean exploitParallelism = true; + + public static final Object[][] testData = { + {1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11}, + {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22}, + {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33}, + {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44}, + {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}, + {1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 66.66, 66}, + {1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 77.77, 77}, + {1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 88.88, 88}, + {1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99}, + {1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010}}; + + public static void main(String[] args) throws Exception { prepareTestDb(); ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); + JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(SELECT_ALL_BOOKS); + + if(exploitParallelism){ + final String splitColumnName = "id"; + final int fetchSize = 1; + final Long min = new Long(testData[0][0]+""); + final Long max = new Long(testData[testData.length-fetchSize][0]+""); + //rewrite query and add $CONDITIONS token to generate splits (sqoop-like) + inputBuilder = inputBuilder + // WARNING: ONLY when query does not contains the WHERE clause we can keep the next line commented + //.setQuery(SELECT_ALL_BOOKS + " WHERE " +JDBCInputFormat.CONDITIONS) + .setSplitConfig(splitColumnName, fetchSize, min, max); + } DataSet source - = environment.createInput(JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setDBUrl("jdbc:derby:memory:ebookshop") - .setQuery("select * from books") - .finish(), + = environment.createInput( + inputBuilder.finish(), new TupleTypeInfo(Tuple5.class, INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO, DOUBLE_TYPE_INFO, INT_TYPE_INFO) ); source.output(JDBCOutputFormat.buildJDBCOutputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setDBUrl("jdbc:derby:memory:ebookshop") + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)") .finish()); environment.execute(); @@ -58,45 +92,52 @@ public static void main(String[] args) throws Exception { private static void prepareTestDb() throws Exception { System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL"); - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - Connection conn = DriverManager.getConnection(dbURL); - - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); + Class.forName(DRIVER_CLASS); + Connection conn = DriverManager.getConnection(DB_URL+";create=true"); + //create output table Statement stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); + stat.executeUpdate(getCreateQuery().replace("books", "newbooks")); stat.close(); - sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - + //create input table stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); + stat.executeUpdate(getCreateQuery()); stat.close(); - sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); - sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); - sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); - sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); - sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); - + //prepare input data stat = conn.createStatement(); - stat.execute(sqlQueryBuilder.toString()); + stat.execute(getInsertQuery()); stat.close(); conn.close(); } + + + public static String getCreateQuery() { + StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); + sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); + sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); + sqlQueryBuilder.append("qty INT DEFAULT NULL,"); + sqlQueryBuilder.append("PRIMARY KEY (id))"); + return sqlQueryBuilder.toString(); + } + + public static String getInsertQuery() { + StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); + for (int i = 0; i < testData.length; i++) { + sqlQueryBuilder.append("(") + .append(testData[i][0]).append(",'") + .append(testData[i][1]).append("','") + .append(testData[i][2]).append("',") + .append(testData[i][3]).append(",") + .append(testData[i][4]).append(")"); + if(i