From 9ebd06b2f24243f3c8feebac9e33e949f26a81d6 Mon Sep 17 00:00:00 2001 From: FrankLi Date: Mon, 27 Feb 2017 14:53:58 +0800 Subject: [PATCH 1/7] bean sdks io by cassandra --- sdks/java/io/cassandra/README.txt | 1 + sdks/java/io/cassandra/pom.xml | 137 +++++ .../beam/sdk/io/cassandra/CassandraIO.java | 515 ++++++++++++++++++ .../beam/sdk/io/cassandra/package-info.java | 24 + .../sdk/io/cassandra/CassandraIOTest.java | 267 +++++++++ 5 files changed, 944 insertions(+) create mode 100644 sdks/java/io/cassandra/README.txt create mode 100644 sdks/java/io/cassandra/pom.xml create mode 100644 sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java create mode 100644 sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java create mode 100644 sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java diff --git a/sdks/java/io/cassandra/README.txt b/sdks/java/io/cassandra/README.txt new file mode 100644 index 000000000000..7f9cbabb0207 --- /dev/null +++ b/sdks/java/io/cassandra/README.txt @@ -0,0 +1 @@ +mvn dependency:sources -DdownloadSources=true eclipse:eclipse \ No newline at end of file diff --git a/sdks/java/io/cassandra/pom.xml b/sdks/java/io/cassandra/pom.xml new file mode 100644 index 000000000000..4b2ee09e498f --- /dev/null +++ b/sdks/java/io/cassandra/pom.xml @@ -0,0 +1,137 @@ + + + + + 4.0.0 + + + org.apache.beam + beam-sdks-java-io-parent + 0.6.0-SNAPSHOT + ../pom.xml + + + beam-sdks-java-io-cassandra + Apache Beam :: SDKs :: Java :: IO :: Cassandra + IO to read and write on Cassandra datasource. + + + 3.1.4 + 4.0.30.Final + 16.0.1 + + + + org.apache.beam + beam-sdks-java-core + + + + org.slf4j + slf4j-api + + + + com.google.guava + guava + ${guava.version} + + + com.google.code.findbugs + jsr305 + + + + + + joda-time + joda-time + + + org.apache.commons + commons-pool2 + 2.3 + + + + + com.google.auto.value + auto-value + provided + + + + + + org.apache.beam + beam-runners-direct-java + test + + + junit + junit + test + + + org.hamcrest + hamcrest-all + test + + + org.slf4j + slf4j-jdk14 + test + + + + com.datastax.cassandra + cassandra-driver-core + ${cassandra-driver.version} + + + com.datastax.cassandra + cassandra-driver-extras + ${cassandra-driver.version} + + + com.datastax.cassandra + cassandra-driver-mapping + ${cassandra-driver.version} + + + io.netty + netty-common + ${netty.version} + + + + io.netty + netty-buffer + ${netty.version} + + + + io.netty + netty-transport + ${netty.version} + + + + io.netty + netty-handler + ${netty.version} + + + + + \ No newline at end of file diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java new file mode 100644 index 000000000000..3746fbd154bf --- /dev/null +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java @@ -0,0 +1,515 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cassandra; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.google.auto.value.AutoValue; +import java.io.Closeable; +import java.io.Serializable; +import java.util.Random; + +import javax.annotation.Nullable; +import javax.sql.DataSource; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + +/** + * IO to read and write data on Cassandra. + * + *

Reading from Cassandra datasource

+ * + *

CassandraIO source returns a bounded collection of {@code T} as a + * {@code PCollection}. T is the type returned by the provided + * {@link RowMapper}. + *

{@code
+ * pipeline.apply(CassandraIO.>read()
+ *   .withClusterConfiguration(CassandraIO.ClusterConfiguration.create(
+ *   .withQuery("select id,name from Person")
+ *   .withRowMapper(new CassandraIO.RowMapper>() {
+ *     public KV mapRow(ResultSet resultSet) throws Exception {
+ *       return KV.of(resultSet.getInt(1), resultSet.getString(2));
+ *     }
+ *   })
+ * }
+ * + * + *

Query parameters can be configured using a user-provided {@link StatementPreparator}. + * For example:

+ * + *
{@code
+ * pipeline.apply(CassandraIO.>read()
+ *   .withClusterConfiguration(CassandraIO.ClusterConfiguration.create(cluster)
+ *   .withQuery("select id,name from Person where name = ?")
+ *   .withStatementPreparator(new CassandraIO.StatementPreparator() {
+ *     public void setParameters(BoundStatementLast boundStatementLast) throws Exception {
+ *       boundStatementLast.bind("Darwin");
+ *     }
+ *   })
+ *   .withRowMapper(new CassandraIO.RowMapper>() {
+ *     public KV mapRow(ResultSet resultSet) throws Exception {
+ *       return KV.of(resultSet.getInt(1), resultSet.getString(2));
+ *     }
+ *   })
+ * }
+ * + *

Writing to Cassandra cluster

+ * + *

Cassandra sink supports writing records into a database. + * It writes a {@link PCollection} to the + * database by converting each T into a {@link PreparedStatement} + * via a user-provided {@link + * PreparedStatementSetter}. + * + *

Like the source, to configure the sink, you have to provide + * a {@link ClusterConfiguration}. + *

{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(CassandraIO.>write()
+ *      .withDataSourceConfiguration(CassandraIO.ClusterConfiguration
+ *      .create(cluster)
+ *      .withStatement("insert into Person values(?, ?)")
+ *      .withPreparedStatementSetter(new CassandraIO.
+ *      PreparedStatementSetter>() {
+ *        public void setParameters(KV element,
+ *        BoundStatement query) {
+ *          boundStatement.bind(1, kv.getKey());
+ *        }
+ *      })
+ * }
+ * + *

NB: in case of transient failures, Beam runners may execute parts of + * CassandraIO.Write multiple times for fault tolerance. Because of that, you + * should avoid using {@code INSERT} statements, since that risks duplicating + * records in the database, or failing due to primary key conflicts. Consider + * using MERGE ("upsert") + * statements supported by your database instead. + */ + +public class CassandraIO { + + private static final Logger LOG = LoggerFactory + .getLogger(CassandraIO.class); + + /** + * Read data from a Cassandra Cluster. + * + * @param + * Type of the data to be read. + */ + public static Read read() { + return new AutoValue_CassandraIO_Read.Builder().build(); + } + + /** + * Write data to a Cassandra Cluster. + * + * @param + * Type of the data to be written. + */ + public static Write write() { + return new AutoValue_CassandraIO_Write.Builder().build(); + } + + private CassandraIO() { + } + + /** + * An interface used by {@link CassandraIO.Read} for converting each row of + * the {@link row} into an element of the resulting + * {@link PCollection}. + */ + public interface RowMapper extends Serializable { + T mapRow(Row row) throws Exception; + } + + /** + * A POJO describing a {@link DataSource}, either providing directly a + * {@link DataSource} or all properties allowing to create a + * {@link DataSource}. + */ + @AutoValue + public abstract static class ClusterConfiguration implements Serializable { + @Nullable + abstract Cluster getCluster(); + + @Nullable + abstract String getKeyspace(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setKeyspace(String keyspace); + + abstract Builder setCluster(Cluster cluster); + + abstract ClusterConfiguration build(); + } + + public static ClusterConfiguration create(Cluster cluster) { + checkArgument(cluster != null, + "ClusterConfiguration.create(cluster) called with " + + "null data cluster"); + checkArgument(cluster instanceof Closeable, + "ClusterConfiguration.create(cluster) called with a cluster not Closeable"); + return new AutoValue_CassandraIO_ClusterConfiguration.Builder() + .setCluster(cluster).build(); + } + public static ClusterConfiguration create(Cluster cluster, String keyspace) { + checkArgument(cluster != null, + "ClusterConfiguration.create(cluster,keyspace) called with " + + "null data cluster"); + checkArgument(cluster != null, + "ClusterConfiguration.create(cluster,keyspace) called with " + + "null data cluster"); + checkArgument(cluster instanceof Closeable, + "ClusterConfiguration.create(cluster,keyspace) " + + "called with a cluster not Closeable"); + return new AutoValue_CassandraIO_ClusterConfiguration.Builder() + .setCluster(cluster).setKeyspace(keyspace).build(); + } + + private void populateDisplayData(DisplayData.Builder builder) { + if (getCluster() != null) { + builder.addIfNotNull(DisplayData.item("cluster", + getCluster().getClass().getName())); + } + } + + Session getSession() throws Exception { + if (getKeyspace() != null) { + return getCluster().connect(getKeyspace()); + } else { + return getCluster().connect(); + } + } + } + + /** + * An interface used by the CassandraIO Write to set the parameters of the + * {@link PreparedStatement} used to setParameters into the database. + */ + public interface StatementPreparator extends Serializable { + void setParameters(BoundStatement boundStatement) + throws Exception; + } + + /** A {@link PTransform} to read data from a Cassandra cluster. */ + @AutoValue + public abstract static class Read + extends PTransform> { + @Nullable + abstract ClusterConfiguration getClusterConfiguration(); + + @Nullable + abstract String getQuery(); + + @Nullable + abstract StatementPreparator getStatementPreparator(); + + @Nullable + abstract RowMapper getRowMapper(); + + @Nullable + abstract Coder getCoder(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setClusterConfiguration( + ClusterConfiguration clusterConfiguration); + + abstract Builder setQuery(String query); + + abstract Builder setStatementPreparator( + StatementPreparator statementPreparator); + + abstract Builder setRowMapper(RowMapper rowMapper); + + abstract Builder setCoder(Coder coder); + + abstract Read build(); + } + + public Read withClusterConfiguration(ClusterConfiguration clusterConfiguration) { + checkArgument(clusterConfiguration != null, + "CassandraIO.read().withClusterConfiguration" + + "(clusterConfiguration) called with null cluster"); + return toBuilder().setClusterConfiguration(clusterConfiguration).build(); + } + + public Read withQuery(String query) { + checkArgument(query != null, + "CassandraIO.read().withQuery(query) called with null query"); + return toBuilder().setQuery(query).build(); + } + + public Read withStatementPrepator( + StatementPreparator statementPreparator) { + checkArgument(statementPreparator != null, + "CassandraIO.read().withStatementPreparator(statementPreparator) called " + + "with null statementPreparator"); + return toBuilder().setStatementPreparator(statementPreparator) + .build(); + } + + public Read withRowMapper(RowMapper rowMapper) { + checkArgument(rowMapper != null, + "CassandraIO.read().withRowMapper(rowMapper) " + + "called with null rowMapper"); + return toBuilder().setRowMapper(rowMapper).build(); + } + + public Read withCoder(Coder coder) { + checkArgument(coder != null, + "CassandraIO.read().withCoder(coder) " + + "called with null coder"); + return toBuilder().setCoder(coder).build(); + } + + @Override + public PCollection expand(PBegin input) { + return input.apply(Create.of(getQuery())) + .apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder()) + // generate a random key followed by a GroupByKey and then + // ungroup + // to prevent fusion + // see + // https://cloud.google.com/dataflow/service + //dataflow-service-desc#preventing-fusion + // for details + .apply(ParDo.of(new DoFn>() { + private Random random; + + @Setup + public void setup() { + random = new Random(); + } + + @ProcessElement + public void processElement(ProcessContext context) { + context.output( + KV.of(random.nextInt(), context.element())); + } + })).apply(GroupByKey. create()) + .apply(Values.> create()) + .apply(Flatten. iterables()); + } + + @Override + public void validate(PBegin input) { + checkState(getQuery() != null, + "CassandraIO.read() requires a query to be set via withQuery(query)"); + checkState(getRowMapper() != null, + "CassandraIO.read() requires a rowMapper to be " + + "set via withRowMapper(rowMapper)"); + checkState(getCoder() != null, + "CassandraIO.read() requires a coder to be set via withCoder(coder)"); + checkState(getClusterConfiguration() != null, + "CassandraIO.read() requires a Cluster configuration to be set via " + + "withClusterConfiguration(cluster)"); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("query", getQuery())); + builder.add(DisplayData.item("rowMapper", + getRowMapper().getClass().getName())); + builder.add( + DisplayData.item("coder", getCoder().getClass().getName())); + getClusterConfiguration().populateDisplayData(builder); + } + + /** A {@link DoFn} executing the SQL query to read from the cluster. */ + static class ReadFn extends DoFn { + private CassandraIO.Read spec; + private Session session; + + private ReadFn(Read spec) { + this.spec = spec; + } + + @Setup + public void setup() throws Exception { + session = spec.getClusterConfiguration().getSession(); + } + + @ProcessElement + public void processElement(ProcessContext context) + throws Exception { + String query = context.element(); + PreparedStatement statement = session.prepare(query); + BoundStatement boundStatementLast = new BoundStatement(statement); + this.spec.getStatementPreparator().setParameters(boundStatementLast); + ResultSet resultSet = session.execute(boundStatementLast); + for (Row row : resultSet) { + context.output(spec.getRowMapper().mapRow(row)); + } + } + + @Teardown + public void teardown() throws Exception { + if (session != null) { + session.close(); + } + } + } + } + + /** + * An interface used by the CassandraIO Write to set the parameters of the + * {@link BoundStatement} used to setParameters into the cluster. + */ + public interface BoundStatementSetter extends Serializable { + void setParameters(T element, BoundStatement boundStatement) + throws Exception; + } + + /** A {@link PTransform} to write to a Cassandra cluster. */ + @AutoValue + public abstract static class Write + extends PTransform, PDone> { + @Nullable + abstract ClusterConfiguration getClusterConfiguration(); + + @Nullable + abstract String getStatement(); + + @Nullable + abstract BoundStatementSetter getBoundStatementSetter(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setClusterConfiguration( + ClusterConfiguration clusterConfiguration); + + abstract Builder setStatement(String statement); + + abstract Builder setBoundStatementSetter( + BoundStatementSetter setter); + + abstract Write build(); + } + + public Write withClusterConfiguration(ClusterConfiguration clusterConfiguration) { + return toBuilder().setClusterConfiguration(clusterConfiguration).build(); + } + + public Write withStatement(String statement) { + return toBuilder().setStatement(statement).build(); + } + + public Write withBoundStatementSetter( + BoundStatementSetter setter) { + return toBuilder().setBoundStatementSetter(setter).build(); + } + + @Override + public PDone expand(PCollection input) { + input.apply(ParDo.of(new WriteFn(this))); + return PDone.in(input.getPipeline()); + } + + @Override + public void validate(PCollection input) { + checkArgument(getClusterConfiguration() != null, + "CassandraIO.write() requires a configuration to be " + + "set via " + + ".withClusterConfiguration(configuration)"); + checkArgument(getStatement() != null, "CassandraIO.write() requires" + + " a statement to be set via .withStatement(statement)"); + checkArgument(getBoundStatementSetter() != null, + "CassandraIO.write() requires a BoundStatementSetter" + + " to be set via.withBoundStatementSetter(" + + "BoundStatementSetter)"); + } + + private static class WriteFn extends DoFn { + + private final Write spec; + + private Session session; + private BoundStatement boundStatement; + + public WriteFn(Write spec) { + this.spec = spec; + } + + @Setup + public void setup() throws Exception { + session = spec.getClusterConfiguration().getSession(); + PreparedStatement st = session.prepare(spec.getStatement()); + boundStatement = new BoundStatement(st); + } + + @StartBundle + public void startBundle(Context context) { + + } + + @ProcessElement + public void processElement(ProcessContext context) + throws Exception { + T record = context.element(); + spec.getBoundStatementSetter().setParameters(record, + boundStatement); + finishBundle(context); + } + + @FinishBundle + public void finishBundle(Context context) throws Exception { + session.execute(boundStatement); + } + + @Teardown + public void teardown() throws Exception { + if (session != null) { + session.close(); + } + } + } + } +} diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java new file mode 100644 index 000000000000..47259b43245b --- /dev/null +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Defines transforms for reading and writing from HBase. + * + * @see org.apache.beam.sdk.io.cassandra.CassandraIO + */ +package org.apache.beam.sdk.io.cassandra; diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java new file mode 100644 index 000000000000..316542c912e4 --- /dev/null +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.cassandra; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.RegularStatement; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.policies.ConstantReconnectionPolicy; +import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; +import com.datastax.driver.core.policies.LatencyAwarePolicy; +import com.datastax.driver.core.policies.RoundRobinPolicy; +import com.datastax.driver.core.querybuilder.QueryBuilder; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + + +/** + * Test CassandraIO. + */ +@RunWith(JUnit4.class) +public class CassandraIOTest { + + @Rule + public final transient TestPipeline p = TestPipeline.create(); + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private static Cluster cluster; + private CassandraIO.ClusterConfiguration clusterConfiguration; + + private static final String TESTKEYSPACE = "testbeam"; + + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void beforeClass() throws Exception { + + int maxIdle = 5; + int maxTotal = 100; + int minIdle = 3; + int maxWaitMillis = 60 * 100; + + int localCoreConnectionsPerHost = 8; + int localMaxConnectionsPerHost = 8; + int localMaxRequestsPerConnection = 100; + + int poolTimeoutMillis = 60 * 1000; + int connectTimeoutMillis = 60 * 1000; + int readTimeoutMillis = 60 * 1000; + + GenericObjectPoolConfig conf = new GenericObjectPoolConfig(); + conf.setMaxIdle(maxIdle); + conf.setMaxTotal(maxTotal); + conf.setMinIdle(minIdle); + conf.setMaxWaitMillis(maxWaitMillis); + conf.setTestOnBorrow(false); + conf.setTestWhileIdle(false); + conf.setTestOnReturn(false); + int cassandraPort = 9042; + String cassandraHosts = "Cassandra-ip1"; + String cassandraUser = "admin"; + String cassandraPassword = ""; + String[] nodes = cassandraHosts.split(","); + PoolingOptions poolingOptions = new PoolingOptions(); + poolingOptions.setCoreConnectionsPerHost + (HostDistance.LOCAL, localCoreConnectionsPerHost); + poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, + localMaxConnectionsPerHost); + poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, + localMaxRequestsPerConnection); + poolingOptions.setPoolTimeoutMillis(poolTimeoutMillis); + SocketOptions socketOptions = new SocketOptions(); + socketOptions.setKeepAlive(true); + socketOptions.setReceiveBufferSize(100 * 1024 * 1024); + socketOptions.setTcpNoDelay(true); + socketOptions.setReadTimeoutMillis(readTimeoutMillis); + socketOptions.setConnectTimeoutMillis + (connectTimeoutMillis); + cluster = Cluster.builder().addContactPoints(nodes) + .withPort(cassandraPort) + .withLoadBalancingPolicy(LatencyAwarePolicy + .builder(new RoundRobinPolicy()).build()) + .withQueryOptions(new QueryOptions() + .setConsistencyLevel(ConsistencyLevel.ONE)) + .withReconnectionPolicy(new ConstantReconnectionPolicy(1000)) + .withProtocolVersion(ProtocolVersion.V3) + .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) + .withReconnectionPolicy(new ConstantReconnectionPolicy(100L)) + .withPoolingOptions(poolingOptions) + .withSocketOptions(socketOptions) + .withCredentials(cassandraUser, cassandraPassword) + .build(); + + } + + @AfterClass + public static void afterClass() throws Exception { + if (cluster != null) { + cluster.close(); + } + } + + @Before + public void setup() throws Exception { + clusterConfiguration = CassandraIO.ClusterConfiguration.create(cluster); + Session session = cluster.connect(); + session.execute("CREATE KEYSPACE IF NOT EXISTS " + TESTKEYSPACE + + " WITH replication = {'class':'SimpleStrategy'," + + "'replication_factor': 1};"); + session.execute("CREATE TABLE IF NOT EXISTS " + TESTKEYSPACE + + ".person(name text, id INT, c INT, PRIMARY KEY(id));"); + session.execute("CREATE TABLE IF NOT EXISTS " + TESTKEYSPACE + + ".atable(a1 text, a2 text,a3 text, PRIMARY KEY(a1));"); + RegularStatement insert = QueryBuilder.insertInto(TESTKEYSPACE, "person") + .values(new String[] {"id", "name"}, new Object[] {1, "test"}); + session.execute(insert); + } + + @Test + public void testClusterConfiguration() + throws Exception { + + try (Session conn = clusterConfiguration.getSession()) { + assertFalse(conn.isClosed()); + } + } + + @Test + public void testReadBuildsCorrectly() { + String query = "select * from test.person"; + CassandraIO.Read read = CassandraIO.read() + .withClusterConfiguration(clusterConfiguration) + .withQuery(query); + assertEquals(query, read.getQuery()); + // assertNotNull("configuration", read.getClusterConfiguration()); + } + + @Test + @Category(NeedsRunner.class) + public void testRead() throws Exception { + String query = "select * from test.person"; + PCollection>> output = pipeline + .apply(CassandraIO.>> read() + .withClusterConfiguration(clusterConfiguration) + .withQuery(query) + .withRowMapper( +new CassandraIO.RowMapper>>() { +private static final long serialVersionUID = 4304414480924473864L; + + @Override + public List> mapRow( +com.datastax.driver.core.Row row) + throws Exception { +List> rsult = new ArrayList>(); +KV kv = KV.of("name", row.getString("name")); + rsult.add(kv); + return rsult; + } + }) + .withCoder(ListCoder.of(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))); + + PAssert.thatSingleton(output.apply("Count All", + Count.>> globally())).isEqualTo(1000L); + pipeline.run(); + } + @Test + public void testWriteBuildsCorrectly() { + String statement = "INSERT INTO test.atable (a1,a2,a3) VALUES (?,?,?) "; + CassandraIO.Write write = CassandraIO.write() + .withClusterConfiguration(clusterConfiguration) + .withStatement(statement); + assertNotNull(statement, write.getStatement()); + assertNotNull("configuration", write.getClusterConfiguration()); + } + + @Test + @Category(NeedsRunner.class) + public void testWrite() { + String statement = "INSERT INTO test.atable (a1,a2,a3) VALUES (?,?,?) "; + + List data = new ArrayList(); + for (int i = 0; i < 1000; i++) { + List kv = new ArrayList(); + kv.add("a" + i); + kv.add("b" + i); + kv.add("c" + i); + + data.add(kv); + } + pipeline.apply(Create.of(data)) + .apply(CassandraIO. write() + .withClusterConfiguration(clusterConfiguration) + .withStatement(statement) + .withBoundStatementSetter( + new CassandraIO.BoundStatementSetter() { + @Override + public void setParameters(List element, + BoundStatement boundStatement) + throws Exception { + boundStatement. + bind(element.toArray()); + } + })); + pipeline.run(); + try (Session session = cluster.connect()) { + com.datastax.driver.core.ResultSet resultSet = session + .execute("select count(*) count from test.atable"); + List resultAll = resultSet.all(); + Assert.assertNotNull(resultAll); + Assert.assertEquals(1, resultAll.size()); + int count = resultAll.get(0).getInt("count"); + Assert.assertEquals(1000, count); + } + } + +} From 10219e35ee24083586e8c87a7c26a90ab1266917 Mon Sep 17 00:00:00 2001 From: FrankLi Date: Mon, 27 Feb 2017 15:54:55 +0800 Subject: [PATCH 2/7] modify clusterconfig class --- .../beam/sdk/io/cassandra/CassandraIO.java | 187 +++++++++++++++--- .../sdk/io/cassandra/CassandraIOTest.java | 99 ++-------- 2 files changed, 183 insertions(+), 103 deletions(-) diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java index 3746fbd154bf..e66950a3871d 100644 --- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java @@ -21,12 +21,22 @@ import static com.google.common.base.Preconditions.checkState; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.policies.ConstantReconnectionPolicy; +import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; +import com.datastax.driver.core.policies.LatencyAwarePolicy; +import com.datastax.driver.core.policies.RoundRobinPolicy; import com.google.auto.value.AutoValue; -import java.io.Closeable; + import java.io.Serializable; import java.util.Random; @@ -46,11 +56,13 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * IO to read and write data on Cassandra. * @@ -169,7 +181,10 @@ public interface RowMapper extends Serializable { @AutoValue public abstract static class ClusterConfiguration implements Serializable { @Nullable - abstract Cluster getCluster(); + abstract String getCassandraHosts(); + abstract Integer getCassandraPort(); + @Nullable abstract String getCassandraUsername(); + @Nullable abstract String getCassandraPasswd(); @Nullable abstract String getKeyspace(); @@ -180,46 +195,168 @@ public abstract static class ClusterConfiguration implements Serializable { abstract static class Builder { abstract Builder setKeyspace(String keyspace); - abstract Builder setCluster(Cluster cluster); + abstract Builder setCassandraHosts(String cassandraHosts); + + abstract Builder setCassandraPort(Integer cassandraPort); + + abstract Builder setCassandraUsername(String cassandraUsername); + + abstract Builder setCassandraPasswd(String cassandraPasswd); abstract ClusterConfiguration build(); } - public static ClusterConfiguration create(Cluster cluster) { - checkArgument(cluster != null, + public static ClusterConfiguration create(String cassandraHosts, Integer cassandraPort) { + checkArgument(cassandraHosts != null, "ClusterConfiguration.create(cluster) called with " + "null data cluster"); - checkArgument(cluster instanceof Closeable, - "ClusterConfiguration.create(cluster) called with a cluster not Closeable"); + checkArgument(cassandraHosts instanceof Serializable, + "ClusterConfiguration.create(cassandraHosts) called with " + + "a cassandraHosts not Serializable"); + if (cassandraPort == null){ + cassandraPort = 9042; + } return new AutoValue_CassandraIO_ClusterConfiguration.Builder() - .setCluster(cluster).build(); + .setCassandraHosts(cassandraHosts) + .setCassandraPort(cassandraPort).build(); } - public static ClusterConfiguration create(Cluster cluster, String keyspace) { - checkArgument(cluster != null, - "ClusterConfiguration.create(cluster,keyspace) called with " - + "null data cluster"); - checkArgument(cluster != null, - "ClusterConfiguration.create(cluster,keyspace) called with " - + "null data cluster"); - checkArgument(cluster instanceof Closeable, - "ClusterConfiguration.create(cluster,keyspace) " - + "called with a cluster not Closeable"); + public static ClusterConfiguration create(String cassandraHosts, int cassandraPort, + String cassandraUsername, String cassandraPasswd) { + checkArgument(cassandraHosts != null, + "ClusterConfiguration.create(cassandraHosts,cassandraPort," + + "cassandraUsername,cassandraPasswd) called with " + + "null data cluster configuration"); + checkArgument(cassandraUsername != null, + "ClusterConfiguration.create(cassandraHosts,cassandraPort," + + "cassandraUsername,cassandraPasswd) called with " + + "null data cluster configuration"); + checkArgument(cassandraPasswd != null, + "ClusterConfiguration.create(cassandraHosts,cassandraPort," + + "cassandraUsername,cassandraPasswd) called with " + + "null data cluster configuration"); + return new AutoValue_CassandraIO_ClusterConfiguration.Builder() - .setCluster(cluster).setKeyspace(keyspace).build(); + .setCassandraHosts(cassandraHosts) + .setCassandraPort(cassandraPort) + .setCassandraUsername(cassandraUsername) + .setCassandraPasswd(cassandraPasswd).build(); + } + + public static ClusterConfiguration create(String cassandraHosts, + int cassandraPort, String cassandraUsername, String cassandraPasswd, + String keyspace) { + checkArgument(cassandraHosts != null, + "ClusterConfiguration.create(cassandraHosts,cassandraPort," + + "cassandraUsername,cassandraPasswd,keyspace) called with " + + "null data cluster configuration"); + checkArgument(cassandraUsername != null, + "ClusterConfiguration.create(cassandraHosts,cassandraPort," + + "cassandraUsername,cassandraPasswd,keyspace) called with " + + "null data cluster configuration"); + checkArgument(cassandraPasswd != null, + "ClusterConfiguration.create(cassandraHosts,cassandraPort," + + "cassandraUsername,cassandraPasswd,keyspace) called with " + + "null data cluster configuration"); + checkArgument(keyspace != null, + "ClusterConfiguration.create(cassandraHosts,cassandraPort," + + "cassandraUsername,cassandraPasswd,keyspace) called with " + + "null data cluster configuration"); + return new AutoValue_CassandraIO_ClusterConfiguration.Builder() + .setCassandraHosts(cassandraHosts) + .setCassandraPort(cassandraPort) + .setCassandraUsername(cassandraUsername) + .setCassandraPasswd(cassandraPasswd) + .setKeyspace(keyspace).build(); } private void populateDisplayData(DisplayData.Builder builder) { - if (getCluster() != null) { - builder.addIfNotNull(DisplayData.item("cluster", - getCluster().getClass().getName())); + if (getCassandraHosts() != null) { + builder.addIfNotNull(DisplayData.item("cassandraHosts", + getCassandraHosts().getClass().getName())); } } Session getSession() throws Exception { - if (getKeyspace() != null) { - return getCluster().connect(getKeyspace()); + int maxIdle = 5; + int maxTotal = 100; + int minIdle = 3; + int maxWaitMillis = 60 * 100; + + int localCoreConnectionsPerHost = 8; + int localMaxConnectionsPerHost = 8; + int localMaxRequestsPerConnection = 100; + + int poolTimeoutMillis = 60 * 1000; + int connectTimeoutMillis = 60 * 1000; + int readTimeoutMillis = 60 * 1000; + + GenericObjectPoolConfig conf = new GenericObjectPoolConfig(); + conf.setMaxIdle(maxIdle); + conf.setMaxTotal(maxTotal); + conf.setMinIdle(minIdle); + conf.setMaxWaitMillis(maxWaitMillis); + conf.setTestOnBorrow(false); + conf.setTestWhileIdle(false); + conf.setTestOnReturn(false); + PoolingOptions poolingOptions = new PoolingOptions(); + poolingOptions.setCoreConnectionsPerHost + (HostDistance.LOCAL, localCoreConnectionsPerHost); + poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, + localMaxConnectionsPerHost); + poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, + localMaxRequestsPerConnection); + poolingOptions.setPoolTimeoutMillis(poolTimeoutMillis); + SocketOptions socketOptions = new SocketOptions(); + socketOptions.setKeepAlive(true); + socketOptions.setReceiveBufferSize(100 * 1024 * 1024); + socketOptions.setTcpNoDelay(true); + socketOptions.setReadTimeoutMillis(readTimeoutMillis); + socketOptions.setConnectTimeoutMillis + (connectTimeoutMillis); + String[] nodes = getCassandraHosts().split(","); + if (getCassandraUsername() == null + || getCassandraUsername().length() == 0) { + Cluster cluster = Cluster.builder() + .addContactPoints(nodes) + .withPort(getCassandraPort()) + .withLoadBalancingPolicy(LatencyAwarePolicy + .builder(new RoundRobinPolicy()).build()) + .withQueryOptions(new QueryOptions() + .setConsistencyLevel(ConsistencyLevel.ONE)) + .withReconnectionPolicy(new ConstantReconnectionPolicy(1000)) + .withProtocolVersion(ProtocolVersion.V3) + .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) + .withReconnectionPolicy(new ConstantReconnectionPolicy(100L)) + .withPoolingOptions(poolingOptions) + .withSocketOptions(socketOptions) + .build(); + if (getKeyspace() != null) { + return cluster.connect(getKeyspace()); + } else { + return cluster.connect(); + } } else { - return getCluster().connect(); + Cluster cluster = Cluster.builder() + .addContactPoints(nodes) + .withPort(getCassandraPort()) + .withCredentials(getCassandraUsername(), + getCassandraPasswd()) + .withLoadBalancingPolicy(LatencyAwarePolicy + .builder(new RoundRobinPolicy()).build()) + .withQueryOptions(new QueryOptions() + .setConsistencyLevel(ConsistencyLevel.ONE)) + .withReconnectionPolicy(new ConstantReconnectionPolicy(1000)) + .withProtocolVersion(ProtocolVersion.V3) + .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) + .withReconnectionPolicy(new ConstantReconnectionPolicy(100L)) + .withPoolingOptions(poolingOptions) + .withSocketOptions(socketOptions) + .build(); + if (getKeyspace() != null) { + return cluster.connect(getKeyspace()); + } else { + return cluster.connect(); + } } } } diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java index 316542c912e4..5524138c0943 100644 --- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java @@ -21,26 +21,13 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.HostDistance; -import com.datastax.driver.core.PoolingOptions; -import com.datastax.driver.core.ProtocolVersion; -import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.RegularStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; -import com.datastax.driver.core.SocketOptions; -import com.datastax.driver.core.policies.ConstantReconnectionPolicy; -import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; -import com.datastax.driver.core.policies.LatencyAwarePolicy; -import com.datastax.driver.core.policies.RoundRobinPolicy; import com.datastax.driver.core.querybuilder.QueryBuilder; - import java.util.ArrayList; import java.util.List; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -51,7 +38,6 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -64,6 +50,8 @@ import org.junit.runners.JUnit4; + + /** * Test CassandraIO. */ @@ -75,86 +63,39 @@ public class CassandraIOTest { @Rule public ExpectedException thrown = ExpectedException.none(); - private static Cluster cluster; private CassandraIO.ClusterConfiguration clusterConfiguration; private static final String TESTKEYSPACE = "testbeam"; + @SuppressWarnings("unused") + private static Integer cassandraPort = 9042; + @SuppressWarnings("unused") + private static String cassandraHosts = "Cassandra-ip1"; + @SuppressWarnings("unused") + private static String cassandraUsername = "admin"; + @SuppressWarnings("unused") + private static String cassandraPasswd = ""; + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @BeforeClass public static void beforeClass() throws Exception { - - int maxIdle = 5; - int maxTotal = 100; - int minIdle = 3; - int maxWaitMillis = 60 * 100; - - int localCoreConnectionsPerHost = 8; - int localMaxConnectionsPerHost = 8; - int localMaxRequestsPerConnection = 100; - - int poolTimeoutMillis = 60 * 1000; - int connectTimeoutMillis = 60 * 1000; - int readTimeoutMillis = 60 * 1000; - - GenericObjectPoolConfig conf = new GenericObjectPoolConfig(); - conf.setMaxIdle(maxIdle); - conf.setMaxTotal(maxTotal); - conf.setMinIdle(minIdle); - conf.setMaxWaitMillis(maxWaitMillis); - conf.setTestOnBorrow(false); - conf.setTestWhileIdle(false); - conf.setTestOnReturn(false); - int cassandraPort = 9042; - String cassandraHosts = "Cassandra-ip1"; - String cassandraUser = "admin"; - String cassandraPassword = ""; - String[] nodes = cassandraHosts.split(","); - PoolingOptions poolingOptions = new PoolingOptions(); - poolingOptions.setCoreConnectionsPerHost - (HostDistance.LOCAL, localCoreConnectionsPerHost); - poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, - localMaxConnectionsPerHost); - poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, - localMaxRequestsPerConnection); - poolingOptions.setPoolTimeoutMillis(poolTimeoutMillis); - SocketOptions socketOptions = new SocketOptions(); - socketOptions.setKeepAlive(true); - socketOptions.setReceiveBufferSize(100 * 1024 * 1024); - socketOptions.setTcpNoDelay(true); - socketOptions.setReadTimeoutMillis(readTimeoutMillis); - socketOptions.setConnectTimeoutMillis - (connectTimeoutMillis); - cluster = Cluster.builder().addContactPoints(nodes) - .withPort(cassandraPort) - .withLoadBalancingPolicy(LatencyAwarePolicy - .builder(new RoundRobinPolicy()).build()) - .withQueryOptions(new QueryOptions() - .setConsistencyLevel(ConsistencyLevel.ONE)) - .withReconnectionPolicy(new ConstantReconnectionPolicy(1000)) - .withProtocolVersion(ProtocolVersion.V3) - .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) - .withReconnectionPolicy(new ConstantReconnectionPolicy(100L)) - .withPoolingOptions(poolingOptions) - .withSocketOptions(socketOptions) - .withCredentials(cassandraUser, cassandraPassword) - .build(); - + cassandraPort = 9042; + cassandraHosts = "Cassandra-ip1"; + cassandraUsername = "admin"; + cassandraPasswd = ""; } @AfterClass public static void afterClass() throws Exception { - if (cluster != null) { - cluster.close(); - } } @Before public void setup() throws Exception { - clusterConfiguration = CassandraIO.ClusterConfiguration.create(cluster); - Session session = cluster.connect(); + clusterConfiguration = CassandraIO.ClusterConfiguration + .create(cassandraHosts, cassandraPort); + Session session = clusterConfiguration.getSession(); session.execute("CREATE KEYSPACE IF NOT EXISTS " + TESTKEYSPACE + " WITH replication = {'class':'SimpleStrategy'," + "'replication_factor': 1};"); @@ -253,7 +194,7 @@ public void setParameters(List element, } })); pipeline.run(); - try (Session session = cluster.connect()) { + try (Session session = clusterConfiguration.getSession()) { com.datastax.driver.core.ResultSet resultSet = session .execute("select count(*) count from test.atable"); List resultAll = resultSet.all(); @@ -261,6 +202,8 @@ public void setParameters(List element, Assert.assertEquals(1, resultAll.size()); int count = resultAll.get(0).getInt("count"); Assert.assertEquals(1000, count); + } catch (Exception e) { + e.printStackTrace(); } } From 1f37cc76f60b1b69eb56481e86a9ce906ad7837d Mon Sep 17 00:00:00 2001 From: FrankLi Date: Mon, 27 Feb 2017 16:25:08 +0800 Subject: [PATCH 3/7] read test success first commit --- .../apache/beam/sdk/io/cassandra/CassandraIO.java | 4 +++- .../beam/sdk/io/cassandra/CassandraIOTest.java | 13 ++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java index e66950a3871d..4d15ec95bd6f 100644 --- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java @@ -518,7 +518,9 @@ public void processElement(ProcessContext context) String query = context.element(); PreparedStatement statement = session.prepare(query); BoundStatement boundStatementLast = new BoundStatement(statement); - this.spec.getStatementPreparator().setParameters(boundStatementLast); + if (this.spec.getStatementPreparator() != null){ + this.spec.getStatementPreparator().setParameters(boundStatementLast); + } ResultSet resultSet = session.execute(boundStatementLast); for (Row row : resultSet) { context.output(spec.getRowMapper().mapRow(row)); diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java index 5524138c0943..33ba172700f5 100644 --- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java @@ -25,6 +25,8 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.QueryBuilder; + +import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -45,7 +47,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -56,12 +57,10 @@ * Test CassandraIO. */ @RunWith(JUnit4.class) -public class CassandraIOTest { +public class CassandraIOTest implements Serializable{ @Rule public final transient TestPipeline p = TestPipeline.create(); - @Rule - public ExpectedException thrown = ExpectedException.none(); private CassandraIO.ClusterConfiguration clusterConfiguration; @@ -119,7 +118,7 @@ public void testClusterConfiguration() @Test public void testReadBuildsCorrectly() { - String query = "select * from test.person"; + String query = "select * from " + TESTKEYSPACE + ".person"; CassandraIO.Read read = CassandraIO.read() .withClusterConfiguration(clusterConfiguration) .withQuery(query); @@ -130,7 +129,7 @@ public void testReadBuildsCorrectly() { @Test @Category(NeedsRunner.class) public void testRead() throws Exception { - String query = "select * from test.person"; + String query = "select * from " + TESTKEYSPACE + ".person"; PCollection>> output = pipeline .apply(CassandraIO.>> read() .withClusterConfiguration(clusterConfiguration) @@ -152,7 +151,7 @@ public List> mapRow( .withCoder(ListCoder.of(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))); PAssert.thatSingleton(output.apply("Count All", - Count.>> globally())).isEqualTo(1000L); + Count.>> globally())).isEqualTo(1L); pipeline.run(); } @Test From 3827106864924b16cf8ec9a40b67d4287382c426 Mon Sep 17 00:00:00 2001 From: FrankLi Date: Mon, 27 Feb 2017 17:54:52 +0800 Subject: [PATCH 4/7] write test ok --- .../sdk/io/cassandra/CassandraIOTest.java | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java index 33ba172700f5..e23584515e18 100644 --- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java @@ -27,9 +27,11 @@ import com.datastax.driver.core.querybuilder.QueryBuilder; import java.io.Serializable; +import java.math.BigInteger; import java.util.ArrayList; import java.util.List; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -156,7 +158,7 @@ public List> mapRow( } @Test public void testWriteBuildsCorrectly() { - String statement = "INSERT INTO test.atable (a1,a2,a3) VALUES (?,?,?) "; + String statement = "INSERT INTO " + TESTKEYSPACE + ".atable (a1,a2,a3) VALUES (?,?,?) "; CassandraIO.Write write = CassandraIO.write() .withClusterConfiguration(clusterConfiguration) .withStatement(statement); @@ -167,25 +169,27 @@ public void testWriteBuildsCorrectly() { @Test @Category(NeedsRunner.class) public void testWrite() { - String statement = "INSERT INTO test.atable (a1,a2,a3) VALUES (?,?,?) "; + String statement = "INSERT INTO " + TESTKEYSPACE + ".atable (a1,a2,a3) VALUES (?,?,?) "; List data = new ArrayList(); - for (int i = 0; i < 1000; i++) { - List kv = new ArrayList(); + int countAll = 20000; + for (int i = 0; i < countAll; i++) { + List kv = new ArrayList(); + kv.add("a" + i); kv.add("b" + i); kv.add("c" + i); - data.add(kv); } - pipeline.apply(Create.of(data)) + Coder coder = ListCoder.of(StringUtf8Coder.of()); + pipeline.apply(Create.of(data).withCoder(coder)) .apply(CassandraIO. write() .withClusterConfiguration(clusterConfiguration) .withStatement(statement) .withBoundStatementSetter( - new CassandraIO.BoundStatementSetter() { + new CassandraIO.BoundStatementSetter< List>() { @Override - public void setParameters(List element, + public void setParameters( List element, BoundStatement boundStatement) throws Exception { boundStatement. @@ -195,14 +199,15 @@ public void setParameters(List element, pipeline.run(); try (Session session = clusterConfiguration.getSession()) { com.datastax.driver.core.ResultSet resultSet = session - .execute("select count(*) count from test.atable"); + .execute("select count(*) from " + TESTKEYSPACE + ".atable"); List resultAll = resultSet.all(); Assert.assertNotNull(resultAll); Assert.assertEquals(1, resultAll.size()); - int count = resultAll.get(0).getInt("count"); - Assert.assertEquals(1000, count); + long count = resultAll.get(0).get(0,Long.class); + Assert.assertEquals(countAll, count); } catch (Exception e) { e.printStackTrace(); + Assert.assertNull(e); } } From 4f8201a4b69d0b5fb213fdabdcafbf3cdcd6c430 Mon Sep 17 00:00:00 2001 From: FrankLi Date: Mon, 27 Feb 2017 18:06:01 +0800 Subject: [PATCH 5/7] eclipse test ok ,maven compile FAILURE --- .../beam/sdk/io/cassandra/CassandraIOTest.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java index e23584515e18..2b48520bb354 100644 --- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java @@ -171,10 +171,10 @@ public void testWriteBuildsCorrectly() { public void testWrite() { String statement = "INSERT INTO " + TESTKEYSPACE + ".atable (a1,a2,a3) VALUES (?,?,?) "; - List data = new ArrayList(); + List> data = new ArrayList>(); int countAll = 20000; for (int i = 0; i < countAll; i++) { - List kv = new ArrayList(); + List kv = new ArrayList(); kv.add("a" + i); kv.add("b" + i); @@ -183,17 +183,16 @@ public void testWrite() { } Coder coder = ListCoder.of(StringUtf8Coder.of()); pipeline.apply(Create.of(data).withCoder(coder)) - .apply(CassandraIO. write() + .apply(CassandraIO.> write() .withClusterConfiguration(clusterConfiguration) .withStatement(statement) .withBoundStatementSetter( - new CassandraIO.BoundStatementSetter< List>() { + new CassandraIO.BoundStatementSetter< List>() { @Override - public void setParameters( List element, + public void setParameters( List element, BoundStatement boundStatement) throws Exception { - boundStatement. - bind(element.toArray()); + boundStatement.bind(element.toArray()); } })); pipeline.run(); From a80461e6553d68b6b16eb4a5c6d7dc7808fabefd Mon Sep 17 00:00:00 2001 From: FrankLi Date: Mon, 27 Feb 2017 18:32:41 +0800 Subject: [PATCH 6/7] modify test case succesed --- .../sdk/io/cassandra/CassandraIOTest.java | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java index 2b48520bb354..84650c668c13 100644 --- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java @@ -27,10 +27,10 @@ import com.datastax.driver.core.querybuilder.QueryBuilder; import java.io.Serializable; -import java.math.BigInteger; import java.util.ArrayList; import java.util.List; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; @@ -171,28 +171,31 @@ public void testWriteBuildsCorrectly() { public void testWrite() { String statement = "INSERT INTO " + TESTKEYSPACE + ".atable (a1,a2,a3) VALUES (?,?,?) "; - List> data = new ArrayList>(); + List>> data = new ArrayList>>(); int countAll = 20000; for (int i = 0; i < countAll; i++) { - List kv = new ArrayList(); - - kv.add("a" + i); - kv.add("b" + i); - kv.add("c" + i); + List> kv = new ArrayList>(); + kv.add(KV.of(i, "a" + i)); + kv.add(KV.of(i, "a" + i)); + kv.add(KV.of(i, "a" + i)); data.add(kv); } - Coder coder = ListCoder.of(StringUtf8Coder.of()); + Coder coder = ListCoder.of(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())); pipeline.apply(Create.of(data).withCoder(coder)) - .apply(CassandraIO.> write() + .apply(CassandraIO.>> write() .withClusterConfiguration(clusterConfiguration) .withStatement(statement) .withBoundStatementSetter( - new CassandraIO.BoundStatementSetter< List>() { + new CassandraIO.BoundStatementSetter< List>>() { @Override - public void setParameters( List element, + public void setParameters(List> element, BoundStatement boundStatement) throws Exception { - boundStatement.bind(element.toArray()); + List d = new ArrayList(); + for (KV kv : element){ + d.add(kv.getValue()); + } + boundStatement.bind(d.toArray()); } })); pipeline.run(); @@ -202,7 +205,7 @@ public void setParameters( List element, List resultAll = resultSet.all(); Assert.assertNotNull(resultAll); Assert.assertEquals(1, resultAll.size()); - long count = resultAll.get(0).get(0,Long.class); + long count = resultAll.get(0).get(0, Long.class); Assert.assertEquals(countAll, count); } catch (Exception e) { e.printStackTrace(); From d090294ae9c2add260ccfcdaf1acebc1eecf9763 Mon Sep 17 00:00:00 2001 From: FrankLi Date: Mon, 27 Feb 2017 21:47:49 +0800 Subject: [PATCH 7/7] modify note and test case --- .../beam/sdk/io/cassandra/CassandraIO.java | 77 ++++++++++--------- 1 file changed, 42 insertions(+), 35 deletions(-) diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java index 4d15ec95bd6f..2800c3ec7a94 100644 --- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java @@ -38,12 +38,17 @@ import com.google.auto.value.AutoValue; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import javax.annotation.Nullable; import javax.sql.DataSource; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; @@ -66,20 +71,25 @@ /** * IO to read and write data on Cassandra. * - *

Reading from Cassandra datasource

+ *

Reading from Cassandra Cluster

* *

CassandraIO source returns a bounded collection of {@code T} as a * {@code PCollection}. T is the type returned by the provided * {@link RowMapper}. *

{@code
  * pipeline.apply(CassandraIO.>read()
- *   .withClusterConfiguration(CassandraIO.ClusterConfiguration.create(
- *   .withQuery("select id,name from Person")
- *   .withRowMapper(new CassandraIO.RowMapper>() {
- *     public KV mapRow(ResultSet resultSet) throws Exception {
- *       return KV.of(resultSet.getInt(1), resultSet.getString(2));
+ *   .withClusterConfiguration(CassandraIO.ClusterConfiguration
+ *   			.create(cassandraHosts, cassandraPort))
+ *   .withQuery("select id,name from testbeam.person")
+ *   .withRowMapper(new CassandraIO.RowMapper>() {		
+ *     public List> mapRow(Row row) throws Exception {
+ *       List> rsult = new ArrayList>();
+ *       KV kv = KV.of("name", row.getString("name"));
+ *       rsult.add(kv);
+ *       return rsult;
  *     }
  *   })
+ *   .withCoder(ListCoder.of(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))));
  * }
* * @@ -89,7 +99,7 @@ *
{@code
  * pipeline.apply(CassandraIO.>read()
  *   .withClusterConfiguration(CassandraIO.ClusterConfiguration.create(cluster)
- *   .withQuery("select id,name from Person where name = ?")
+ *   .withQuery("select id,name from testbeam.person where name = ?")
  *   .withStatementPreparator(new CassandraIO.StatementPreparator() {
  *     public void setParameters(BoundStatementLast boundStatementLast) throws Exception {
  *       boundStatementLast.bind("Darwin");
@@ -106,9 +116,9 @@
  *
  * 

Cassandra sink supports writing records into a database. * It writes a {@link PCollection} to the - * database by converting each T into a {@link PreparedStatement} + * database by converting each T into a {@link BoundStatement} * via a user-provided {@link - * PreparedStatementSetter}. + * BoundStatementSetter}. * *

Like the source, to configure the sink, you have to provide * a {@link ClusterConfiguration}. @@ -116,24 +126,28 @@ * pipeline * .apply(...) * .apply(CassandraIO.>write() - * .withDataSourceConfiguration(CassandraIO.ClusterConfiguration - * .create(cluster) - * .withStatement("insert into Person values(?, ?)") - * .withPreparedStatementSetter(new CassandraIO. - * PreparedStatementSetter>() { - * public void setParameters(KV element, - * BoundStatement query) { - * boundStatement.bind(1, kv.getKey()); - * } - * }) + * .withClusterConfiguration(CassandraIO.ClusterConfiguration + * .create(cassandraHosts, cassandraPort)) + * .withStatement("insert into testbeam.person values(?, ?)") + * ..withBoundStatementSetter( + * new CassandraIO.BoundStatementSetter< List>>() { + * @Override + * public void setParameters(List> element, + * BoundStatement boundStatement) + * throws Exception { + * List d = new ArrayList(); + * for (KV kv : element){ + * d.add(kv.getValue()); + * } + * boundStatement.bind(d.toArray()); + * } + * })); * }

* *

NB: in case of transient failures, Beam runners may execute parts of * CassandraIO.Write multiple times for fault tolerance. Because of that, you * should avoid using {@code INSERT} statements, since that risks duplicating - * records in the database, or failing due to primary key conflicts. Consider - * using MERGE ("upsert") - * statements supported by your database instead. + * records in the cluster databases, or failing due to primary key conflicts. */ public class CassandraIO { @@ -174,9 +188,9 @@ public interface RowMapper extends Serializable { } /** - * A POJO describing a {@link DataSource}, either providing directly a - * {@link DataSource} or all properties allowing to create a - * {@link DataSource}. + * A POJO describing a {@link Cluster}, either providing directly a + * {@link Cluster} or all properties allowing to create a + * {@link Cluster}. */ @AutoValue public abstract static class ClusterConfiguration implements Serializable { @@ -277,7 +291,7 @@ private void populateDisplayData(DisplayData.Builder builder) { } Session getSession() throws Exception { - int maxIdle = 5; + int maxIdle = 5; int maxTotal = 100; int minIdle = 3; int maxWaitMillis = 60 * 100; @@ -363,7 +377,7 @@ Session getSession() throws Exception { /** * An interface used by the CassandraIO Write to set the parameters of the - * {@link PreparedStatement} used to setParameters into the database. + * {@link BoundStatement} used to setParameters into the cluster database. */ public interface StatementPreparator extends Serializable { void setParameters(BoundStatement boundStatement) @@ -448,13 +462,6 @@ public Read withCoder(Coder coder) { public PCollection expand(PBegin input) { return input.apply(Create.of(getQuery())) .apply(ParDo.of(new ReadFn<>(this))).setCoder(getCoder()) - // generate a random key followed by a GroupByKey and then - // ungroup - // to prevent fusion - // see - // https://cloud.google.com/dataflow/service - //dataflow-service-desc#preventing-fusion - // for details .apply(ParDo.of(new DoFn>() { private Random random; @@ -484,7 +491,7 @@ public void validate(PBegin input) { "CassandraIO.read() requires a coder to be set via withCoder(coder)"); checkState(getClusterConfiguration() != null, "CassandraIO.read() requires a Cluster configuration to be set via " - + "withClusterConfiguration(cluster)"); + + "withClusterConfiguration(clusterConfiguration)"); } @Override