From b68411f1b98378a528141baf568a14f9bfb7587d Mon Sep 17 00:00:00 2001 From: Jing Fan Date: Thu, 20 Apr 2017 17:54:36 -0700 Subject: [PATCH 01/11] [FLINK-6225] [Cassandra Connector] add CassandraTableSink --- .../flink-connector-cassandra/pom.xml | 5 + .../cassandra/CassandraRowSink.java | 59 ++++++++++ .../connectors/cassandra/CassandraSink.java | 34 +++++- .../cassandra/CassandraTableSink.java | 101 ++++++++++++++++++ .../cassandra/CassandraConnectorITCase.java | 50 ++++++++- 5 files changed, 244 insertions(+), 5 deletions(-) create mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java create mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml index f006f3ab576e6..fdd304d77445c 100644 --- a/flink-connectors/flink-connector-cassandra/pom.xml +++ b/flink-connectors/flink-connector-cassandra/pom.xml @@ -187,5 +187,10 @@ under the License. + + org.apache.flink + flink-table_2.10 + 1.3-SNAPSHOT + diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java new file mode 100644 index 0000000000000..69c474063ec2d --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.types.Row; + +/** + * Flink Sink to save data into a Cassandra cluster. + * + * @param Type of the elements emitted by this sink, it must extend {@link Row} + */ +public class CassandraRowSink extends CassandraSinkBase { + private final String insertQuery; + private transient PreparedStatement ps; + + public CassandraRowSink(String insertQuery, ClusterBuilder builder) { + super(builder); + this.insertQuery = insertQuery; + } + + @Override + public void open(Configuration configuration) { + super.open(configuration); + this.ps = session.prepare(insertQuery); + } + + @Override + public ListenableFuture send(IN value) { + Object[] fields = extract(value); + return session.executeAsync(ps.bind(fields)); + } + + private Object[] extract(IN record) { + Object[] al = new Object[record.getArity()]; + for (int i = 0; i < record.getArity(); i++) { + al[i] = record.getField(i); + } + return al; + } +} diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java index af138c53e7ed4..442c41ce6802a 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo; @@ -32,6 +33,7 @@ import org.apache.flink.streaming.api.transformations.SinkTransformation; import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.types.Row; import com.datastax.driver.core.Cluster; @@ -205,12 +207,17 @@ public static CassandraSinkBuilder addSink(org.apache.flink.streaming.a * @param input type * @return CassandraSinkBuilder, to further configure the sink */ - public static CassandraSinkBuilder addSink(DataStream input) { + + public static CassandraSinkBuilder addSink(DataStream input) { TypeInformation typeInfo = input.getType(); if (typeInfo instanceof TupleTypeInfo) { DataStream tupleInput = (DataStream) input; return (CassandraSinkBuilder) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig())); } + if (typeInfo instanceof RowTypeInfo) { + DataStream rowInput = (DataStream) input; + return (CassandraSinkBuilder) new CassandraRowSinkBuilder<>(rowInput, rowInput.getType(), rowInput.getType().createSerializer(rowInput.getExecutionEnvironment().getConfig())); + } if (typeInfo instanceof PojoTypeInfo) { return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig())); } @@ -378,6 +385,31 @@ protected CassandraSink createWriteAheadSink() throws Exception { * Builder for a {@link CassandraPojoSink}. * @param */ + public static class CassandraRowSinkBuilder extends CassandraSinkBuilder { + public CassandraRowSinkBuilder(DataStream input, TypeInformation typeInfo, TypeSerializer serializer) { + super(input, typeInfo, serializer); + } + + @Override + protected void sanityCheck() { + super.sanityCheck(); + if (query == null || query.length() == 0) { + throw new IllegalArgumentException("Query must not be null or empty."); + } + } + + @Override + protected CassandraSink createSink() throws Exception { + return new CassandraSink<>(input.addSink(new CassandraRowSink(query, builder)).name("Cassandra Sink")); + + } + + @Override + protected CassandraSink createWriteAheadSink() throws Exception { + throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types."); + } + } + public static class CassandraPojoSinkBuilder extends CassandraSinkBuilder { public CassandraPojoSinkBuilder(DataStream input, TypeInformation typeInfo, TypeSerializer serializer) { super(input, typeInfo, serializer); diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java new file mode 100644 index 0000000000000..6b22478eb7020 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +/** + * A cassandra {@link StreamTableSink}. + * + */ +class CassandraTableSink implements StreamTableSink { + private final List hostAddrs; + private final String cql; + private final String[] fieldNames; + private final TypeInformation[] fieldTypes; + private final Properties properties; + + public CassandraTableSink(List hostAddrs, String cql, String[] fieldNames, TypeInformation[] fieldTypes, Properties properties) { + this.hostAddrs = Preconditions.checkNotNull(hostAddrs, "hostAddrs"); + this.cql = Preconditions.checkNotNull(cql, "cql"); + this.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); + this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); + this.properties = Preconditions.checkNotNull(properties, "properties"); + } + + @Override + public TypeInformation getOutputType() { + return new RowTypeInfo(fieldTypes); + } + + @Override + public String[] getFieldNames() { + return this.fieldNames; + } + + @Override + public TypeInformation[] getFieldTypes() { + return this.fieldTypes; + } + + @Override + public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); + fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); + Preconditions.checkArgument(fieldNames.length == fieldTypes.length, + "Number of provided field names and types does not match."); + return new CassandraTableSink(this.hostAddrs, this.cql, fieldNames, fieldTypes, this.properties); + } + + @Override + public void emitDataStream(DataStream dataStream) { + try { + CassandraSink.addSink(dataStream) + .setClusterBuilder(new CassandraClusterBuilder(this.hostAddrs)) + .setQuery(this.cql) + .build(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private class CassandraClusterBuilder extends ClusterBuilder { + private final Collection hostAddrs; + + CassandraClusterBuilder(Collection hostAddrs) { + this.hostAddrs = hostAddrs; + } + + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPointsWithPorts(hostAddrs).build(); + } + } +} diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index f52a42c6b8e19..42f4131dadc3b 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -35,6 +35,7 @@ import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; @@ -45,6 +46,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.google.common.collect.ImmutableList; import org.apache.cassandra.service.CassandraDaemon; import org.junit.AfterClass; import org.junit.Assert; @@ -53,21 +55,22 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.collection.JavaConverters; +import scala.collection.Seq; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.Random; import java.util.Scanner; import java.util.UUID; -import scala.collection.JavaConverters; -import scala.collection.Seq; - import static org.junit.Assert.assertTrue; /** @@ -83,11 +86,15 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase> collection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); + + private static final String[] FIELD_NAME = new String[] {"id, counter, batch_id"}; + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}; static { for (int i = 0; i < 20; i++) { collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0)); + rowCollection.add(org.apache.flink.types.Row.of(UUID.randomUUID().toString(), i, 0)); } } @@ -379,6 +392,20 @@ public void testCassandraTupleAtLeastOnceSink() throws Exception { Assert.assertEquals(20, rs.all().size()); } + @Test + public void testCassandraRowAtLeastOnceSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource source = env.fromCollection(rowCollection); + source.addSink(new CassandraRowSink(INSERT_DATA_QUERY, builder)); + + env.execute(); + + ResultSet rs = session.execute(SELECT_DATA_QUERY); + Assert.assertEquals(20, rs.all().size()); + } + @Test public void testCassandraPojoAtLeastOnceSink() throws Exception { session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test")); @@ -397,6 +424,21 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception { Assert.assertEquals(20, rs.all().size()); } + @Test + public void testCassandraTableSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource source = env.fromCollection(rowCollection); + CassandraTableSink cassandraTableSink = new CassandraTableSink(ImmutableList.of(new InetSocketAddress(HOST, PORT)), INSERT_DATA_QUERY, FIELD_NAME, FIELD_TYPES, new Properties()); + + cassandraTableSink.emitDataStream(source); + + env.execute(); + ResultSet rs = session.execute(SELECT_DATA_QUERY); + Assert.assertEquals(20, rs.all().size()); + } + @Test public void testCassandraBatchFormats() throws Exception { OutputFormat> sink = new CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder); From 0e868e025e8e3b2b5465012ed25b11fa4e2979b2 Mon Sep 17 00:00:00 2001 From: Jing Fan Date: Tue, 25 Apr 2017 14:03:08 -0700 Subject: [PATCH 02/11] [FLINK-6225] [Cassandra Connector] modify constructor & ut --- .../cassandra/CassandraTableSink.java | 38 +++++-------------- .../cassandra/CassandraConnectorITCase.java | 15 ++++++-- 2 files changed, 21 insertions(+), 32 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java index 6b22478eb7020..df0d280c44836 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.connectors.cassandra; -import com.datastax.driver.core.Cluster; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; @@ -26,9 +25,6 @@ import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.List; import java.util.Properties; /** @@ -36,17 +32,15 @@ * */ class CassandraTableSink implements StreamTableSink { - private final List hostAddrs; + private final ClusterBuilder builder; private final String cql; - private final String[] fieldNames; - private final TypeInformation[] fieldTypes; + private String[] fieldNames; + private TypeInformation[] fieldTypes; private final Properties properties; - public CassandraTableSink(List hostAddrs, String cql, String[] fieldNames, TypeInformation[] fieldTypes, Properties properties) { - this.hostAddrs = Preconditions.checkNotNull(hostAddrs, "hostAddrs"); + public CassandraTableSink(ClusterBuilder builder, String cql, Properties properties) { + this.builder = Preconditions.checkNotNull(builder, "builder"); this.cql = Preconditions.checkNotNull(cql, "cql"); - this.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); - this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); this.properties = Preconditions.checkNotNull(properties, "properties"); } @@ -67,35 +61,23 @@ public TypeInformation[] getFieldTypes() { @Override public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { - fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); - fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); + CassandraTableSink cassandraTableSink = new CassandraTableSink(this.builder, this.cql, this.properties); + cassandraTableSink.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); + cassandraTableSink.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); Preconditions.checkArgument(fieldNames.length == fieldTypes.length, "Number of provided field names and types does not match."); - return new CassandraTableSink(this.hostAddrs, this.cql, fieldNames, fieldTypes, this.properties); + return cassandraTableSink; } @Override public void emitDataStream(DataStream dataStream) { try { CassandraSink.addSink(dataStream) - .setClusterBuilder(new CassandraClusterBuilder(this.hostAddrs)) + .setClusterBuilder(this.builder) .setQuery(this.cql) .build(); } catch (Exception e) { throw new RuntimeException(e); } } - - private class CassandraClusterBuilder extends ClusterBuilder { - private final Collection hostAddrs; - - CassandraClusterBuilder(Collection hostAddrs) { - this.hostAddrs = hostAddrs; - } - - @Override - protected Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPointsWithPorts(hostAddrs).build(); - } - } } diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 42f4131dadc3b..e97b322958468 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -46,7 +46,6 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; -import com.google.common.collect.ImmutableList; import org.apache.cassandra.service.CassandraDaemon; import org.junit.AfterClass; import org.junit.Assert; @@ -117,7 +116,7 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static final ArrayList> collection = new ArrayList<>(20); private static final ArrayList rowCollection = new ArrayList<>(20); - private static final String[] FIELD_NAME = new String[] {"id, counter, batch_id"}; + private static final String[] FIELD_NAMES = new String[] {"id", "counter", "batch_id"}; private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}; @@ -398,7 +397,9 @@ public void testCassandraRowAtLeastOnceSink() throws Exception { env.setParallelism(1); DataStreamSource source = env.fromCollection(rowCollection); - source.addSink(new CassandraRowSink(INSERT_DATA_QUERY, builder)); + + new CassandraSink.CassandraRowSinkBuilder(source, source.getType(), + source.getType().createSerializer(source.getExecutionEnvironment().getConfig())).setQuery(INSERT_DATA_QUERY).setClusterBuilder(builder).build(); env.execute(); @@ -430,7 +431,13 @@ public void testCassandraTableSink() throws Exception { env.setParallelism(1); DataStreamSource source = env.fromCollection(rowCollection); - CassandraTableSink cassandraTableSink = new CassandraTableSink(ImmutableList.of(new InetSocketAddress(HOST, PORT)), INSERT_DATA_QUERY, FIELD_NAME, FIELD_TYPES, new Properties()); + CassandraTableSink cassandraTableSink = new CassandraTableSink(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT)).build(); + } + }, INSERT_DATA_QUERY, new Properties()); + cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); cassandraTableSink.emitDataStream(source); From 9564bf14faa941d780e6aaecde7489254e56a785 Mon Sep 17 00:00:00 2001 From: Jing Fan Date: Thu, 27 Apr 2017 14:33:38 -0700 Subject: [PATCH 03/11] [FLINK-6225] [Cassandra Connector] fix style --- .../flink-connector-cassandra/pom.xml | 3 +- .../connectors/cassandra/CassandraSink.java | 7 +-- .../cassandra/CassandraTableSink.java | 20 +++--- .../cassandra/CassandraConnectorITCase.java | 62 ++++++++++++------- 4 files changed, 55 insertions(+), 37 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml index fdd304d77445c..22a409edf64c2 100644 --- a/flink-connectors/flink-connector-cassandra/pom.xml +++ b/flink-connectors/flink-connector-cassandra/pom.xml @@ -190,7 +190,8 @@ under the License. org.apache.flink flink-table_2.10 - 1.3-SNAPSHOT + ${project.version} + provided diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java index 442c41ce6802a..e3b28ba036782 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java @@ -207,15 +207,14 @@ public static CassandraSinkBuilder addSink(org.apache.flink.streaming.a * @param input type * @return CassandraSinkBuilder, to further configure the sink */ - - public static CassandraSinkBuilder addSink(DataStream input) { + public static CassandraSinkBuilder addSink(DataStream input) { TypeInformation typeInfo = input.getType(); if (typeInfo instanceof TupleTypeInfo) { - DataStream tupleInput = (DataStream) input; + DataStream tupleInput = (DataStream) input; return (CassandraSinkBuilder) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig())); } if (typeInfo instanceof RowTypeInfo) { - DataStream rowInput = (DataStream) input; + DataStream rowInput = (DataStream) input; return (CassandraSinkBuilder) new CassandraRowSinkBuilder<>(rowInput, rowInput.getType(), rowInput.getType().createSerializer(rowInput.getExecutionEnvironment().getConfig())); } if (typeInfo instanceof PojoTypeInfo) { diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java index df0d280c44836..58d1ce4ebbe13 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java @@ -20,18 +20,16 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.table.sinks.StreamTableSink; -import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; import java.util.Properties; /** - * A cassandra {@link StreamTableSink}. - * + * A cassandra {@link AppendStreamTableSink}. */ -class CassandraTableSink implements StreamTableSink { +class CassandraTableSink implements AppendStreamTableSink { private final ClusterBuilder builder; private final String cql; private String[] fieldNames; @@ -39,9 +37,9 @@ class CassandraTableSink implements StreamTableSink { private final Properties properties; public CassandraTableSink(ClusterBuilder builder, String cql, Properties properties) { - this.builder = Preconditions.checkNotNull(builder, "builder"); - this.cql = Preconditions.checkNotNull(cql, "cql"); - this.properties = Preconditions.checkNotNull(properties, "properties"); + this.builder = Preconditions.checkNotNull(builder, "ClusterBuilder must not be null"); + this.cql = Preconditions.checkNotNull(cql, "Cql must not be null"); + this.properties = Preconditions.checkNotNull(properties, "Properties must not be null"); } @Override @@ -60,10 +58,10 @@ public TypeInformation[] getFieldTypes() { } @Override - public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + public CassandraTableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { CassandraTableSink cassandraTableSink = new CassandraTableSink(this.builder, this.cql, this.properties); - cassandraTableSink.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames"); - cassandraTableSink.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); + cassandraTableSink.fieldNames = Preconditions.checkNotNull(fieldNames, "FieldNames must not be null"); + cassandraTableSink.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes must not be null"); Preconditions.checkArgument(fieldNames.length == fieldTypes.length, "Number of provided field names and types does not match."); return cassandraTableSink; diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index e97b322958468..094aaed9e561a 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -39,14 +39,17 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; +import org.apache.flink.table.api.StreamTableEnvironment; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import org.apache.cassandra.service.CassandraDaemon; + import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -85,15 +88,15 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase> collection = new ArrayList<>(20); - private static final ArrayList rowCollection = new ArrayList<>(20); + private static final ArrayList rowCollection = new ArrayList<>(20); - private static final String[] FIELD_NAMES = new String[] {"id", "counter", "batch_id"}; - private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] {BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}; + private static final String[] FIELD_NAMES = {"id", "counter", "batch_id"}; + private static final TypeInformation[] FIELD_TYPES = { + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}; static { for (int i = 0; i < 20; i++) { collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0)); - rowCollection.add(org.apache.flink.types.Row.of(UUID.randomUUID().toString(), i, 0)); + rowCollection.add(Row.of(UUID.randomUUID().toString(), i, 0)); } } @@ -257,7 +260,7 @@ protected void verifyResultsIdealCircumstances(CassandraTupleWriteAheadSink actual = new ArrayList<>(); - ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY)); - for (Row s : result) { + ArrayList actual = new ArrayList<>();ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY)); + for (com.datastax.driver.core.Row s : result) { actual.add(s.getInt("counter")); } @@ -396,7 +398,7 @@ public void testCassandraRowAtLeastOnceSink() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); - DataStreamSource source = env.fromCollection(rowCollection); + DataStreamSource source = env.fromCollection(rowCollection); new CassandraSink.CassandraRowSinkBuilder(source, source.getType(), source.getType().createSerializer(source.getExecutionEnvironment().getConfig())).setQuery(INSERT_DATA_QUERY).setClusterBuilder(builder).build(); @@ -430,16 +432,34 @@ public void testCassandraTableSink() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); - DataStreamSource source = env.fromCollection(rowCollection); + DataStreamSource source = env.fromCollection(rowCollection); CassandraTableSink cassandraTableSink = new CassandraTableSink(new ClusterBuilder() { @Override protected Cluster buildCluster(Cluster.Builder builder) { return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT)).build(); } }, INSERT_DATA_QUERY, new Properties()); - cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); + CassandraTableSink newCassandrTableSink = cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); + + newCassandrTableSink.emitDataStream(source); + + env.execute(); + ResultSet rs = session.execute(SELECT_DATA_QUERY); + Assert.assertEquals(20, rs.all().size()); + } + + @Test + public void testCassandraTableSinkE2E() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + + DataStreamSource source = env.fromCollection(rowCollection); + + tEnv.registerDataStreamInternal("testFlinkTable", source); - cassandraTableSink.emitDataStream(source); + tEnv.sql("select * from testFlinkTable").writeToSink( + new CassandraTableSink(builder, INSERT_DATA_QUERY, new Properties())); env.execute(); ResultSet rs = session.execute(SELECT_DATA_QUERY); @@ -514,10 +534,10 @@ public void testCassandraScalaTupleAtLeastSink() throws Exception { sink.close(); ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); - List rows = rs.all(); + List rows = rs.all(); Assert.assertEquals(scalaTupleCollection.size(), rows.size()); - for (Row row : rows) { + for (com.datastax.driver.core.Row row : rows) { scalaTupleCollection.remove(new scala.Tuple3<>(row.getString("id"), row.getInt("counter"), row.getInt("batch_id"))); } Assert.assertEquals(0, scalaTupleCollection.size()); From c5f5ef45ea0ddf0614440f9c77816270349e6003 Mon Sep 17 00:00:00 2001 From: Jing Fan Date: Mon, 1 May 2017 11:14:37 -0700 Subject: [PATCH 04/11] [FLINK-6225] [Cassandra Connector] fix compile --- .../flink-connector-cassandra/pom.xml | 6 ++++++ .../cassandra/CassandraTableSink.java | 3 ++- .../cassandra/CassandraConnectorITCase.java | 18 ++++++++++-------- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml index 22a409edf64c2..3b5556a9f50b1 100644 --- a/flink-connectors/flink-connector-cassandra/pom.xml +++ b/flink-connectors/flink-connector-cassandra/pom.xml @@ -193,5 +193,11 @@ under the License. ${project.version} provided + + org.apache.flink + flink-streaming-scala_2.10 + ${project.version} + provided + diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java index 58d1ce4ebbe13..8dbc60ad4b396 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java @@ -29,7 +29,8 @@ /** * A cassandra {@link AppendStreamTableSink}. */ -class CassandraTableSink implements AppendStreamTableSink { +public class CassandraTableSink implements AppendStreamTableSink { + private final ClusterBuilder builder; private final String cql; private String[] fieldNames; diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 094aaed9e561a..2e85397a69232 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -313,7 +313,9 @@ protected void verifyResultsWhenReScaling( expected.add(i); } - ArrayList actual = new ArrayList<>();ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY)); + ArrayList actual = new ArrayList<>(); + ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY)); + for (com.datastax.driver.core.Row s : result) { actual.add(s.getInt("counter")); } @@ -401,11 +403,11 @@ public void testCassandraRowAtLeastOnceSink() throws Exception { DataStreamSource source = env.fromCollection(rowCollection); new CassandraSink.CassandraRowSinkBuilder(source, source.getType(), - source.getType().createSerializer(source.getExecutionEnvironment().getConfig())).setQuery(INSERT_DATA_QUERY).setClusterBuilder(builder).build(); + source.getType().createSerializer(source.getExecutionEnvironment().getConfig())).setQuery(injectTableName(INSERT_DATA_QUERY)).setClusterBuilder(builder).build(); env.execute(); - ResultSet rs = session.execute(SELECT_DATA_QUERY); + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); Assert.assertEquals(20, rs.all().size()); } @@ -438,13 +440,13 @@ public void testCassandraTableSink() throws Exception { protected Cluster buildCluster(Cluster.Builder builder) { return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT)).build(); } - }, INSERT_DATA_QUERY, new Properties()); + }, injectTableName(INSERT_DATA_QUERY), new Properties()); CassandraTableSink newCassandrTableSink = cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); newCassandrTableSink.emitDataStream(source); env.execute(); - ResultSet rs = session.execute(SELECT_DATA_QUERY); + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); Assert.assertEquals(20, rs.all().size()); } @@ -452,17 +454,17 @@ protected Cluster buildCluster(Cluster.Builder builder) { public void testCassandraTableSinkE2E() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); - StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env); DataStreamSource source = env.fromCollection(rowCollection); tEnv.registerDataStreamInternal("testFlinkTable", source); tEnv.sql("select * from testFlinkTable").writeToSink( - new CassandraTableSink(builder, INSERT_DATA_QUERY, new Properties())); + new CassandraTableSink(builder, injectTableName(INSERT_DATA_QUERY), new Properties())); env.execute(); - ResultSet rs = session.execute(SELECT_DATA_QUERY); + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); Assert.assertEquals(20, rs.all().size()); } From 350d17fd5f7e223d835c30adb40df291eaba2032 Mon Sep 17 00:00:00 2001 From: Jing Fan Date: Mon, 15 May 2017 14:57:23 -0700 Subject: [PATCH 05/11] refactor ut --- .../cassandra/CassandraConnectorITCase.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 2e85397a69232..f902618b565b0 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -397,15 +397,15 @@ public void testCassandraTupleAtLeastOnceSink() throws Exception { @Test public void testCassandraRowAtLeastOnceSink() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + CassandraRowSink sink = new CassandraRowSink<>(injectTableName(INSERT_DATA_QUERY), builder); - DataStreamSource source = env.fromCollection(rowCollection); + sink.open(new Configuration()); - new CassandraSink.CassandraRowSinkBuilder(source, source.getType(), - source.getType().createSerializer(source.getExecutionEnvironment().getConfig())).setQuery(injectTableName(INSERT_DATA_QUERY)).setClusterBuilder(builder).build(); + for (Row value : rowCollection) { + sink.send(value); + } - env.execute(); + sink.close(); ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); Assert.assertEquals(20, rs.all().size()); From aa66f76ebf499c2f9f6b6843e9d9374540709be4 Mon Sep 17 00:00:00 2001 From: Jing Fan Date: Thu, 5 Oct 2017 19:21:50 -0700 Subject: [PATCH 06/11] [FLINK-6225] [Cassandra Connector] address comments --- .../cassandra/CassandraRowSink.java | 7 +++--- .../connectors/cassandra/CassandraSink.java | 13 +++++----- .../cassandra/CassandraConnectorITCase.java | 24 +++++++------------ 3 files changed, 17 insertions(+), 27 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java index 69c474063ec2d..55c7f6fa8b32f 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java @@ -26,9 +26,8 @@ /** * Flink Sink to save data into a Cassandra cluster. * - * @param Type of the elements emitted by this sink, it must extend {@link Row} */ -public class CassandraRowSink extends CassandraSinkBase { +public class CassandraRowSink extends CassandraSinkBase { private final String insertQuery; private transient PreparedStatement ps; @@ -44,12 +43,12 @@ public void open(Configuration configuration) { } @Override - public ListenableFuture send(IN value) { + public ListenableFuture send(Row value) { Object[] fields = extract(value); return session.executeAsync(ps.bind(fields)); } - private Object[] extract(IN record) { + private Object[] extract(Row record) { Object[] al = new Object[record.getArity()]; for (int i = 0; i < record.getArity(); i++) { al[i] = record.getField(i); diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java index e3b28ba036782..ce0c340a9a144 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java @@ -215,7 +215,7 @@ public static CassandraSinkBuilder addSink(DataStream input) { } if (typeInfo instanceof RowTypeInfo) { DataStream rowInput = (DataStream) input; - return (CassandraSinkBuilder) new CassandraRowSinkBuilder<>(rowInput, rowInput.getType(), rowInput.getType().createSerializer(rowInput.getExecutionEnvironment().getConfig())); + return (CassandraSinkBuilder) new CassandraRowSinkBuilder(rowInput, rowInput.getType(), rowInput.getType().createSerializer(rowInput.getExecutionEnvironment().getConfig())); } if (typeInfo instanceof PojoTypeInfo) { return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig())); @@ -382,10 +382,9 @@ protected CassandraSink createWriteAheadSink() throws Exception { /** * Builder for a {@link CassandraPojoSink}. - * @param */ - public static class CassandraRowSinkBuilder extends CassandraSinkBuilder { - public CassandraRowSinkBuilder(DataStream input, TypeInformation typeInfo, TypeSerializer serializer) { + public static class CassandraRowSinkBuilder extends CassandraSinkBuilder { + public CassandraRowSinkBuilder(DataStream input, TypeInformation typeInfo, TypeSerializer serializer) { super(input, typeInfo, serializer); } @@ -398,13 +397,13 @@ protected void sanityCheck() { } @Override - protected CassandraSink createSink() throws Exception { - return new CassandraSink<>(input.addSink(new CassandraRowSink(query, builder)).name("Cassandra Sink")); + protected CassandraSink createSink() throws Exception { + return new CassandraSink<>(input.addSink(new CassandraRowSink(query, builder)).name("Cassandra Sink")); } @Override - protected CassandraSink createWriteAheadSink() throws Exception { + protected CassandraSink createWriteAheadSink() throws Exception { throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types."); } } diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index f902618b565b0..4a9f16f67145c 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -18,6 +18,12 @@ package org.apache.flink.streaming.connectors.cassandra; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import org.apache.cassandra.service.CassandraDaemon; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.io.InputFormat; @@ -40,16 +46,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; import org.apache.flink.table.api.StreamTableEnvironment; -import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; - -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.QueryOptions; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Session; -import org.apache.cassandra.service.CassandraDaemon; - import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -397,7 +394,7 @@ public void testCassandraTupleAtLeastOnceSink() throws Exception { @Test public void testCassandraRowAtLeastOnceSink() throws Exception { - CassandraRowSink sink = new CassandraRowSink<>(injectTableName(INSERT_DATA_QUERY), builder); + CassandraRowSink sink = new CassandraRowSink(injectTableName(INSERT_DATA_QUERY), builder); sink.open(new Configuration()); @@ -435,12 +432,7 @@ public void testCassandraTableSink() throws Exception { env.setParallelism(1); DataStreamSource source = env.fromCollection(rowCollection); - CassandraTableSink cassandraTableSink = new CassandraTableSink(new ClusterBuilder() { - @Override - protected Cluster buildCluster(Cluster.Builder builder) { - return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT)).build(); - } - }, injectTableName(INSERT_DATA_QUERY), new Properties()); + CassandraTableSink cassandraTableSink = new CassandraTableSink(builder, injectTableName(INSERT_DATA_QUERY), new Properties()); CassandraTableSink newCassandrTableSink = cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); newCassandrTableSink.emitDataStream(source); From e4499b14b1c49026d4c2b356a29a18512a415a0b Mon Sep 17 00:00:00 2001 From: Jing Fan Date: Thu, 5 Oct 2017 22:42:36 -0700 Subject: [PATCH 07/11] [FLINK-6225] [Cassandra Connector] fix checkstyle --- .../connectors/cassandra/CassandraRowSink.java | 5 +++-- .../connectors/cassandra/CassandraSink.java | 8 ++++++-- .../cassandra/CassandraConnectorITCase.java | 18 ++++++++++-------- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java index 55c7f6fa8b32f..7e962bf877c29 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java @@ -17,11 +17,12 @@ package org.apache.flink.streaming.connectors.cassandra; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.types.Row; + import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.Row; /** * Flink Sink to save data into a Cassandra cluster. diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java index ce0c340a9a144..756cb6578d462 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; @@ -381,7 +381,7 @@ protected CassandraSink createWriteAheadSink() throws Exception { } /** - * Builder for a {@link CassandraPojoSink}. + * Builder for a {@link CassandraRowSink}. */ public static class CassandraRowSinkBuilder extends CassandraSinkBuilder { public CassandraRowSinkBuilder(DataStream input, TypeInformation typeInfo, TypeSerializer serializer) { @@ -408,6 +408,10 @@ protected CassandraSink createWriteAheadSink() throws Exception { } } + /** + * Builder for a {@link CassandraPojoSink}. + * @param + */ public static class CassandraPojoSinkBuilder extends CassandraSinkBuilder { public CassandraPojoSinkBuilder(DataStream input, TypeInformation typeInfo, TypeSerializer serializer) { super(input, typeInfo, serializer); diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 4a9f16f67145c..cc61c8bdbae28 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -18,12 +18,6 @@ package org.apache.flink.streaming.connectors.cassandra; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.QueryOptions; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Session; -import org.apache.cassandra.service.CassandraDaemon; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.io.InputFormat; @@ -47,6 +41,13 @@ import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; import org.apache.flink.table.api.StreamTableEnvironment; import org.apache.flink.types.Row; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import org.apache.cassandra.service.CassandraDaemon; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -54,8 +55,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.collection.JavaConverters; -import scala.collection.Seq; import java.io.BufferedWriter; import java.io.File; @@ -70,6 +69,9 @@ import java.util.Scanner; import java.util.UUID; +import scala.collection.JavaConverters; +import scala.collection.Seq; + import static org.junit.Assert.assertTrue; /** From 625e03fdd156bb6c218adccb624952439c103120 Mon Sep 17 00:00:00 2001 From: Jing Fan Date: Mon, 9 Oct 2017 15:07:01 -0700 Subject: [PATCH 08/11] [FLINK-6225] [Cassandra Connector] add suppression --- tools/maven/suppressions.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml index b19435eff4cc4..e273e19e47e4b 100644 --- a/tools/maven/suppressions.xml +++ b/tools/maven/suppressions.xml @@ -30,7 +30,7 @@ under the License. Date: Mon, 16 Oct 2017 10:50:38 -0700 Subject: [PATCH 09/11] [FLINK-6225] [Cassandra Connector] address comments --- .../flink-connector-cassandra/pom.xml | 8 +------ .../cassandra/CassandraRowSink.java | 21 +++---------------- .../cassandra/CassandraConnectorITCase.java | 4 ++-- 3 files changed, 6 insertions(+), 27 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml index 3b5556a9f50b1..0ea5033335d7c 100644 --- a/flink-connectors/flink-connector-cassandra/pom.xml +++ b/flink-connectors/flink-connector-cassandra/pom.xml @@ -189,13 +189,7 @@ under the License. org.apache.flink - flink-table_2.10 - ${project.version} - provided - - - org.apache.flink - flink-streaming-scala_2.10 + flink-table_${scala.binary.version} ${project.version} provided diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java index 7e962bf877c29..d473800f6705f 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java @@ -28,28 +28,13 @@ * Flink Sink to save data into a Cassandra cluster. * */ -public class CassandraRowSink extends CassandraSinkBase { - private final String insertQuery; - private transient PreparedStatement ps; - +public class CassandraRowSink extends AbstractCassandraTupleSink { public CassandraRowSink(String insertQuery, ClusterBuilder builder) { - super(builder); - this.insertQuery = insertQuery; - } - - @Override - public void open(Configuration configuration) { - super.open(configuration); - this.ps = session.prepare(insertQuery); + super(insertQuery, builder); } @Override - public ListenableFuture send(Row value) { - Object[] fields = extract(value); - return session.executeAsync(ps.bind(fields)); - } - - private Object[] extract(Row record) { + protected Object[] extract(Row record) { Object[] al = new Object[record.getArity()]; for (int i = 0; i < record.getArity(); i++) { al[i] = record.getField(i); diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index cc61c8bdbae28..bcd60e8f2036a 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -530,10 +530,10 @@ public void testCassandraScalaTupleAtLeastSink() throws Exception { sink.close(); ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); - List rows = rs.all(); + List rows = rs.all(); Assert.assertEquals(scalaTupleCollection.size(), rows.size()); - for (com.datastax.driver.core.Row row : rows) { + for (com.datastax.driver.core.Row row : rows) { scalaTupleCollection.remove(new scala.Tuple3<>(row.getString("id"), row.getInt("counter"), row.getInt("batch_id"))); } Assert.assertEquals(0, scalaTupleCollection.size()); From a701b3dbcdcb4b5a5be4d8f9967ad7f1fafa82d9 Mon Sep 17 00:00:00 2001 From: Jing Fan Date: Wed, 18 Oct 2017 11:50:44 -0700 Subject: [PATCH 10/11] [FLINK-6225] [Cassandra Connector] fix checkstyle --- .../streaming/connectors/cassandra/CassandraRowSink.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java index d473800f6705f..07a4e19c98da8 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java @@ -17,13 +17,8 @@ package org.apache.flink.streaming.connectors.cassandra; -import org.apache.flink.configuration.Configuration; import org.apache.flink.types.Row; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; -import com.google.common.util.concurrent.ListenableFuture; - /** * Flink Sink to save data into a Cassandra cluster. * From 269139f8cd009af4a36d74fa7babfd4ebed36a52 Mon Sep 17 00:00:00 2001 From: Jing Fan Date: Mon, 30 Oct 2017 16:46:05 -0700 Subject: [PATCH 11/11] [FLINK-6225] [Cassandra Connector] add CassandraRowWriteAheadSink --- .../cassandra/CassandraRowWriteAheadSink.java | 160 ++++++++++++++++++ .../connectors/cassandra/CassandraSink.java | 7 +- .../cassandra/CassandraTableSink.java | 2 +- .../cassandra/CassandraConnectorITCase.java | 16 -- .../java/typeutils/runtime/RowSerializer.java | 7 + tools/maven/suppressions.xml | 2 +- 6 files changed, 173 insertions(+), 21 deletions(-) create mode 100644 flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java new file mode 100644 index 0000000000000..f8c0d7477e480 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.runtime.RowSerializer; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink; +import org.apache.flink.types.Row; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Sink that emits its input elements into a Cassandra database. This sink stores incoming records within a + * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to cassandra + * if a checkpoint is completed. + * + */ +public class CassandraRowWriteAheadSink extends GenericWriteAheadSink { + private static final long serialVersionUID = 1L; + + protected transient Cluster cluster; + protected transient Session session; + + private final String insertQuery; + private transient PreparedStatement preparedStatement; + + private ClusterBuilder builder; + + private transient Object[] fields; + + protected CassandraRowWriteAheadSink(String insertQuery, TypeSerializer serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception { + super(committer, serializer, UUID.randomUUID().toString().replace("-", "_")); + this.insertQuery = insertQuery; + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + public void open() throws Exception { + super.open(); + if (!getRuntimeContext().isCheckpointingEnabled()) { + throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled."); + } + cluster = builder.getCluster(); + session = cluster.connect(); + preparedStatement = session.prepare(insertQuery); + + fields = new Object[((RowSerializer) serializer).getArity()]; + } + + @Override + public void close() throws Exception { + super.close(); + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + try { + if (cluster != null) { + cluster.close(); + } + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + } + } + + @Override + protected boolean sendValues(Iterable values, long checkpointId, long timestamp) throws Exception { + final AtomicInteger updatesCount = new AtomicInteger(0); + final AtomicInteger updatesConfirmed = new AtomicInteger(0); + + final AtomicReference exception = new AtomicReference<>(); + + FutureCallback callback = new FutureCallback() { + @Override + public void onSuccess(ResultSet resultSet) { + updatesConfirmed.incrementAndGet(); + if (updatesCount.get() > 0) { // only set if all updates have been sent + if (updatesCount.get() == updatesConfirmed.get()) { + synchronized (updatesConfirmed) { + updatesConfirmed.notifyAll(); + } + } + } + } + + @Override + public void onFailure(Throwable throwable) { + if (exception.compareAndSet(null, throwable)) { + LOG.error("Error while sending value.", throwable); + synchronized (updatesConfirmed) { + updatesConfirmed.notifyAll(); + } + } + } + }; + + //set values for prepared statement + int updatesSent = 0; + for (Row value : values) { + for (int x = 0; x < value.getArity(); x++) { + fields[x] = value.getField(x); + } + //insert values and send to cassandra + BoundStatement s = preparedStatement.bind(fields); + s.setDefaultTimestamp(timestamp); + ResultSetFuture result = session.executeAsync(s); + updatesSent++; + if (result != null) { + //add callback to detect errors + Futures.addCallback(result, callback); + } + } + updatesCount.set(updatesSent); + + synchronized (updatesConfirmed) { + while (exception.get() == null && updatesSent != updatesConfirmed.get()) { + updatesConfirmed.wait(); + } + } + + if (exception.get() != null) { + LOG.warn("Sending a value failed.", exception.get()); + return false; + } else { + return true; + } + } +} diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java index 756cb6578d462..a70fce619f34a 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.PojoTypeInfo; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; @@ -213,7 +212,7 @@ public static CassandraSinkBuilder addSink(DataStream input) { DataStream tupleInput = (DataStream) input; return (CassandraSinkBuilder) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig())); } - if (typeInfo instanceof RowTypeInfo) { + if (Row.class.equals(typeInfo.getTypeClass())) { DataStream rowInput = (DataStream) input; return (CassandraSinkBuilder) new CassandraRowSinkBuilder(rowInput, rowInput.getType(), rowInput.getType().createSerializer(rowInput.getExecutionEnvironment().getConfig())); } @@ -404,7 +403,9 @@ protected CassandraSink createSink() throws Exception { @Override protected CassandraSink createWriteAheadSink() throws Exception { - throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types."); + return committer == null + ? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraRowWriteAheadSink(query, serializer, builder, new CassandraCommitter(builder)))) + : new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraRowWriteAheadSink(query, serializer, builder, committer))); } } diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java index 8dbc60ad4b396..de4e4bb236cdf 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java @@ -27,7 +27,7 @@ import java.util.Properties; /** - * A cassandra {@link AppendStreamTableSink}. + * A Cassandra {@link AppendStreamTableSink}. */ public class CassandraTableSink implements AppendStreamTableSink { diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index bcd60e8f2036a..784bd19798595 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -428,22 +428,6 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception { Assert.assertEquals(20, rs.all().size()); } - @Test - public void testCassandraTableSink() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - DataStreamSource source = env.fromCollection(rowCollection); - CassandraTableSink cassandraTableSink = new CassandraTableSink(builder, injectTableName(INSERT_DATA_QUERY), new Properties()); - CassandraTableSink newCassandrTableSink = cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); - - newCassandrTableSink.emitDataStream(source); - - env.execute(); - ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); - Assert.assertEquals(20, rs.all().size()); - } - @Test public void testCassandraTableSinkE2E() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java index bd08b04cf9681..7f9cc2145be66 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java @@ -50,11 +50,14 @@ public final class RowSerializer extends TypeSerializer { private final TypeSerializer[] fieldSerializers; + private final int arity; + private transient boolean[] nullMask; @SuppressWarnings("unchecked") public RowSerializer(TypeSerializer[] fieldSerializers) { this.fieldSerializers = (TypeSerializer[]) checkNotNull(fieldSerializers); + this.arity = fieldSerializers.length; this.nullMask = new boolean[fieldSerializers.length]; } @@ -135,6 +138,10 @@ public int getLength() { return -1; } + public int getArity() { + return arity; + } + @Override public void serialize(Row record, DataOutputView target) throws IOException { int len = fieldSerializers.length; diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml index e273e19e47e4b..0691ac7eedb69 100644 --- a/tools/maven/suppressions.xml +++ b/tools/maven/suppressions.xml @@ -30,7 +30,7 @@ under the License.