From 8404faec4ac851f33a7663defe56ddc9e36d79a3 Mon Sep 17 00:00:00 2001 From: Jicaar Date: Thu, 5 Jul 2018 10:29:25 -0600 Subject: [PATCH 1/6] Adding functionality and tests for CassandraPojoInputFormat object. --- .../cassandra/CassandraPojoInputFormat.java | 136 ++++++++++++++++++ .../CustomCassandraAnnotatedPojo.java | 58 ++++++++ .../cassandra/example/BatchPojoExample.java | 95 ++++++++++++ 3 files changed, 289 insertions(+) create mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java create mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java create mode 100644 flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java new file mode 100644 index 0000000000000..bb1049239759e --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import com.datastax.driver.mapping.Result; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * @param type of inputClass + */ +public class CassandraPojoInputFormat extends RichInputFormat implements NonParallelInput { + private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormat.class); + + private final String query; + private final ClusterBuilder builder; + + private transient Cluster cluster; + private transient Session session; + private transient Result resultSet; + private Class inputClass; + + public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class inputClass) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); + Preconditions.checkArgument(builder != null, "Builder cannot be null"); + Preconditions.checkArgument(inputClass != null, "InputClass cannot be null"); + + this.query = query; + this.builder = builder; + this.inputClass = inputClass; + } + + @Override + public void configure(Configuration parameters) { + this.cluster = builder.getCluster(); + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return cachedStatistics; + } + + /** + * Opens a Session and executes the query. + * + * @param ignored + * @throws IOException + */ + @Override + public void open(InputSplit ignored) throws IOException { + this.session = cluster.connect(); + MappingManager manager = new MappingManager(session); + + Mapper mapper = manager.mapper(inputClass); + + this.resultSet = mapper.map(session.execute(query)); + } + + @Override + public boolean reachedEnd() throws IOException { + return resultSet.isExhausted(); + } + + @Override + public OUT nextRecord(OUT reuse) throws IOException { + return resultSet.one(); + } + + @Override + public InputSplit[] createInputSplits(int minNumSplits) throws IOException { + GenericInputSplit[] split = {new GenericInputSplit(0, 1)}; + return split; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + /** + * Closes all resources used. + */ + @Override + public void close() throws IOException { + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + + try { + if (cluster != null) { + cluster.close(); + } + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + } + } +} diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java new file mode 100644 index 0000000000000..a4f19738e83a6 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import com.datastax.driver.mapping.annotations.Column; +import com.datastax.driver.mapping.annotations.Table; + +/** + * Example of Cassandra Annotated POJO class for use with CassandraInputFormatter. + */ +@Table(name = "$TABLE", keyspace = "flink") +public class CustomCassandraAnnotatedPojo { + @Column(name = "id") + private String id; + @Column(name = "counter") + private Integer counter; + @Column(name = "batch_id") + private Integer batchId; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public Integer getCounter() { + return counter; + } + + public void setCounter(Integer counter) { + this.counter = counter; + } + + public Integer getBatchId() { + return batchId; + } + + public void setBatchId(Integer batchId) { + this.batchId = batchId; + } +} diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java new file mode 100644 index 0000000000000..856d4f2a8f20f --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra.example; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat; +import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; + +import java.util.ArrayList; + +/** + * This is an example showing the to use the CassandraPojoInput-/CassandraOutputFormats in the Batch API. + * + *

The example assumes that a table exists in a local cassandra database, according to the following queries: + * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': ‘1’}; + * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY KEY(number, strings)); + */ +public class BatchPojoExample { + private static final String INSERT_QUERY = "INSERT INTO test.batches (number, strings) VALUES (?,?);"; + private static final String SELECT_QUERY = "SELECT number, strings FROM test.batches;"; + + /* + * table script: "CREATE TABLE test.batches (number int, strings text, PRIMARY KEY(number, strings));" + */ + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + ArrayList> collection = new ArrayList<>(20); + for (int i = 0; i < 20; i++) { + collection.add(new Tuple2<>(i, "string " + i)); + } + + DataSet> dataSet = env.fromCollection(collection); + + dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPoints("127.0.0.1").build(); + } + })); + + env.execute("Write"); + + /*DataSet> inputDS = env + .createInput(new CassandraInputFormat>(SELECT_QUERY, new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPoints("127.0.0.1").build(); + } + }), TupleTypeInfo.of(new TypeHint>() { + })); + + inputDS.print();*/ + + DataSet inputDS = env + .createInput(new CassandraPojoInputFormat<>(SELECT_QUERY, new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPoints("127.0.0.1").build(); + } + }, CustomCassandraAnnotatedPojo.class)); + + /*DataSet testStep = inputDS.map(new MapFunction() { + @Override + public String map(CustomCassandraAnnotatedPojo value) throws Exception { + return "numberVal: " + value.getNumber() + ", stringsVal: " + value.getStrings(); + } + });*/ + + //testStep.print(); + } +} From 65d15fa64946ecba1a966bf6902f8e3d10595935 Mon Sep 17 00:00:00 2001 From: Jicaar Date: Tue, 4 Sep 2018 17:40:10 -0600 Subject: [PATCH 2/6] Added CassandraInputFormatBase to combine the code shared by the CassandraInputFormatClass and CassandraPojoInputFormat class. --- .../cassandra/CassandraInputFormat.java | 72 +------------ .../cassandra/CassandraInputFormatBase.java | 102 ++++++++++++++++++ .../cassandra/CassandraPojoInputFormat.java | 78 +------------- 3 files changed, 109 insertions(+), 143 deletions(-) create mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java index e0806fe2056fc..bbd31963f3544 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java @@ -17,25 +17,12 @@ package org.apache.flink.batch.connectors.cassandra; -import org.apache.flink.api.common.io.DefaultInputSplitAssigner; -import org.apache.flink.api.common.io.NonParallelInput; -import org.apache.flink.api.common.io.RichInputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; -import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; -import org.apache.flink.util.Preconditions; -import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.google.common.base.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; @@ -44,32 +31,12 @@ * * @param type of Tuple */ -public class CassandraInputFormat extends RichInputFormat implements NonParallelInput { - private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormat.class); +public class CassandraInputFormat extends CassandraInputFormatBase { - private final String query; - private final ClusterBuilder builder; - - private transient Cluster cluster; - private transient Session session; private transient ResultSet resultSet; public CassandraInputFormat(String query, ClusterBuilder builder) { - Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); - Preconditions.checkArgument(builder != null, "Builder cannot be null"); - - this.query = query; - this.builder = builder; - } - - @Override - public void configure(Configuration parameters) { - this.cluster = builder.getCluster(); - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { - return cachedStatistics; + super(query, builder); } /** @@ -86,7 +53,7 @@ public void open(InputSplit ignored) throws IOException { @Override public boolean reachedEnd() throws IOException { - return resultSet.isExhausted(); + return false; } @Override @@ -97,37 +64,4 @@ public OUT nextRecord(OUT reuse) throws IOException { } return reuse; } - - @Override - public InputSplit[] createInputSplits(int minNumSplits) throws IOException { - GenericInputSplit[] split = {new GenericInputSplit(0, 1)}; - return split; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { - return new DefaultInputSplitAssigner(inputSplits); - } - - /** - * Closes all resources used. - */ - @Override - public void close() throws IOException { - try { - if (session != null) { - session.close(); - } - } catch (Exception e) { - LOG.error("Error while closing session.", e); - } - - try { - if (cluster != null) { - cluster.close(); - } - } catch (Exception e) { - LOG.error("Error while closing cluster.", e); - } - } } diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java new file mode 100644 index 0000000000000..eb1cbd90cdfff --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.NonParallelInput; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. + * @param type of inputClass + */ +public abstract class CassandraInputFormatBase extends RichInputFormat implements NonParallelInput { + private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormatBase.class); + + protected final String query; + protected final ClusterBuilder builder; + + protected transient Cluster cluster; + protected transient Session session; + + public CassandraInputFormatBase(String query, ClusterBuilder builder){ + Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); + Preconditions.checkArgument(builder != null, "Builder cannot be null"); + + this.query = query; + this.builder = builder; + } + + @Override + public void configure(Configuration parameters) { + this.cluster = builder.getCluster(); + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return cachedStatistics; + } + + @Override + public InputSplit[] createInputSplits(int minNumSplits) throws IOException { + GenericInputSplit[] split = {new GenericInputSplit(0, 1)}; + return split; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + /** + * Closes all resources used. + */ + @Override + public void close() throws IOException { + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + + try { + if (cluster != null) { + cluster.close(); + } + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + } + } + +} diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java index bb1049239759e..518fd8473c042 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java @@ -17,26 +17,13 @@ package org.apache.flink.batch.connectors.cassandra; -import org.apache.flink.api.common.io.DefaultInputSplitAssigner; -import org.apache.flink.api.common.io.NonParallelInput; -import org.apache.flink.api.common.io.RichInputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; -import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; import org.apache.flink.util.Preconditions; -import org.apache.flink.shaded.guava18.com.google.common.base.Strings; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Session; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; import com.datastax.driver.mapping.Result; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; @@ -44,45 +31,21 @@ * InputFormat to read data from Apache Cassandra and generate a custom Cassandra annotated object. * @param type of inputClass */ -public class CassandraPojoInputFormat extends RichInputFormat implements NonParallelInput { - private static final Logger LOG = LoggerFactory.getLogger(CassandraInputFormat.class); - - private final String query; - private final ClusterBuilder builder; +public class CassandraPojoInputFormat extends CassandraInputFormatBase { - private transient Cluster cluster; - private transient Session session; private transient Result resultSet; private Class inputClass; public CassandraPojoInputFormat(String query, ClusterBuilder builder, Class inputClass) { - Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); - Preconditions.checkArgument(builder != null, "Builder cannot be null"); + super(query, builder); + Preconditions.checkArgument(inputClass != null, "InputClass cannot be null"); - this.query = query; - this.builder = builder; this.inputClass = inputClass; } @Override - public void configure(Configuration parameters) { - this.cluster = builder.getCluster(); - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { - return cachedStatistics; - } - - /** - * Opens a Session and executes the query. - * - * @param ignored - * @throws IOException - */ - @Override - public void open(InputSplit ignored) throws IOException { + public void open(InputSplit split) throws IOException { this.session = cluster.connect(); MappingManager manager = new MappingManager(session); @@ -100,37 +63,4 @@ public boolean reachedEnd() throws IOException { public OUT nextRecord(OUT reuse) throws IOException { return resultSet.one(); } - - @Override - public InputSplit[] createInputSplits(int minNumSplits) throws IOException { - GenericInputSplit[] split = {new GenericInputSplit(0, 1)}; - return split; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { - return new DefaultInputSplitAssigner(inputSplits); - } - - /** - * Closes all resources used. - */ - @Override - public void close() throws IOException { - try { - if (session != null) { - session.close(); - } - } catch (Exception e) { - LOG.error("Error while closing session.", e); - } - - try { - if (cluster != null) { - cluster.close(); - } - } catch (Exception e) { - LOG.error("Error while closing cluster.", e); - } - } } From 7791713602cd943d5dd2540230cb4a50e73b5fb7 Mon Sep 17 00:00:00 2001 From: Jicaar Date: Wed, 5 Sep 2018 11:44:02 -0600 Subject: [PATCH 3/6] Fixed bug with imports formatted improperly. --- .../connectors/cassandra/CassandraInputFormatBase.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java index eb1cbd90cdfff..d8212dff3142d 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormatBase.java @@ -17,8 +17,6 @@ package org.apache.flink.batch.connectors.cassandra; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Session; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.NonParallelInput; import org.apache.flink.api.common.io.RichInputFormat; @@ -27,9 +25,13 @@ import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.shaded.guava18.com.google.common.base.Strings; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.base.Strings; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 335230a00609e05343f14268a13734c47a0fa182 Mon Sep 17 00:00:00 2001 From: Jicaar Date: Wed, 5 Sep 2018 12:39:08 -0600 Subject: [PATCH 4/6] Cleaned up BatchPojoExample. Added test that was missing for CassandraPojoInputFormat to CassandraConnectorITCase. Added reason for parameter being ignored in CassandraInputFormat. --- .../cassandra/CassandraInputFormat.java | 2 +- .../cassandra/CassandraPojoInputFormat.java | 2 ++ .../cassandra/example/BatchPojoExample.java | 27 ++++----------- .../cassandra/CassandraConnectorITCase.java | 33 ++++++++++++++++--- 4 files changed, 39 insertions(+), 25 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java index bbd31963f3544..cc57538cee46a 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java @@ -42,7 +42,7 @@ public CassandraInputFormat(String query, ClusterBuilder builder) { /** * Opens a Session and executes the query. * - * @param ignored + * @param ignored because parameter is not parallelizable. * @throws IOException */ @Override diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java index 518fd8473c042..538b96df6813e 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraPojoInputFormat.java @@ -33,6 +33,8 @@ */ public class CassandraPojoInputFormat extends CassandraInputFormatBase { + private static final long serialVersionUID = 1992091320180905115L; + private transient Result resultSet; private Class inputClass; diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java index 856d4f2a8f20f..d82816ae836c5 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java @@ -62,19 +62,9 @@ protected Cluster buildCluster(Cluster.Builder builder) { } })); - env.execute("Write"); - - /*DataSet> inputDS = env - .createInput(new CassandraInputFormat>(SELECT_QUERY, new ClusterBuilder() { - @Override - protected Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPoints("127.0.0.1").build(); - } - }), TupleTypeInfo.of(new TypeHint>() { - })); - - inputDS.print();*/ - + /* + * This is for the purpose of showing an example of creating a DataSet using CassandraPojoInputFormat. + */ DataSet inputDS = env .createInput(new CassandraPojoInputFormat<>(SELECT_QUERY, new ClusterBuilder() { @Override @@ -83,13 +73,10 @@ protected Cluster buildCluster(Cluster.Builder builder) { } }, CustomCassandraAnnotatedPojo.class)); - /*DataSet testStep = inputDS.map(new MapFunction() { - @Override - public String map(CustomCassandraAnnotatedPojo value) throws Exception { - return "numberVal: " + value.getNumber() + ", stringsVal: " + value.getStrings(); - } - });*/ + inputDS.print(); + + env.execute("Write"); + - //testStep.print(); } } diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 4444fa6fd169c..6d931f8c4b8cd 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -29,10 +29,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo; -import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; -import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; -import org.apache.flink.batch.connectors.cassandra.CassandraRowOutputFormat; -import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat; +import org.apache.flink.batch.connectors.cassandra.*; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.testutils.CommonTestUtils; @@ -458,6 +455,34 @@ public void testCassandraTableSink() throws Exception { Assert.assertTrue("The input data was not completely written to Cassandra", input.isEmpty()); } + @Test + public void testCassandraBatchPojoFormat() throws Exception { + + OutputFormat> sink = new CassandraTupleOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder); + sink.configure(new Configuration()); + sink.open(0, 1); + + for (Tuple3 value : collection) { + sink.writeRecord(value); + } + + sink.close(); + + InputFormat source = new CassandraPojoInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder, CustomCassandraAnnotatedPojo.class); + source.configure(new Configuration()); + source.open(null); + + List result = new ArrayList<>(); + + while (!source.reachedEnd()) { + CustomCassandraAnnotatedPojo temp = source.nextRecord(new CustomCassandraAnnotatedPojo()); + result.add(temp); + } + + source.close(); + Assert.assertEquals(20, result.size()); + } + @Test public void testCassandraBatchTupleFormat() throws Exception { OutputFormat> sink = new CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder); From 5c15b77131e8faae68db96373d9c0bc843315760 Mon Sep 17 00:00:00 2001 From: Jicaar Date: Wed, 5 Sep 2018 13:05:44 -0600 Subject: [PATCH 5/6] Fixed formatting issues. --- .../connectors/cassandra/example/BatchPojoExample.java | 2 -- .../connectors/cassandra/CassandraConnectorITCase.java | 7 ++++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java index d82816ae836c5..7c3c101c4fc16 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java @@ -76,7 +76,5 @@ protected Cluster buildCluster(Cluster.Builder builder) { inputDS.print(); env.execute("Write"); - - } } diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 6d931f8c4b8cd..1f4dc38704ed1 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -29,7 +29,12 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo; -import org.apache.flink.batch.connectors.cassandra.*; +import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraRowOutputFormat; +import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat; +import org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.testutils.CommonTestUtils; From e2102deb41c398f4eb047f637f3290dd85fcb6c1 Mon Sep 17 00:00:00 2001 From: Jicaar Date: Sun, 9 Sep 2018 15:33:19 -0600 Subject: [PATCH 6/6] Fixed that resultSet.exhausted wasn't being used in the reachedEnd method in CassandraInputFormat. --- .../flink/batch/connectors/cassandra/CassandraInputFormat.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java index cc57538cee46a..60e30669267ef 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java @@ -53,7 +53,7 @@ public void open(InputSplit ignored) throws IOException { @Override public boolean reachedEnd() throws IOException { - return false; + return resultSet.isExhausted(); } @Override