From 7470bb4ad95019b2ea2ce383ba0b7983607b4113 Mon Sep 17 00:00:00 2001 From: zentol Date: Mon, 5 Sep 2016 11:03:00 +0200 Subject: [PATCH 1/9] [FLINK-4177] Harden CassandraConnectorTest --- .../flink-connector-cassandra/pom.xml | 11 +- .../cassandra/CassandraSinkBase.java | 39 +- .../cassandra/CassandraConnectorITCase.java | 355 ++++++++++-------- .../cassandra/EmbeddedCassandraService.java | 43 +++ .../src/test/resources/cassandra.yaml | 41 +- 5 files changed, 313 insertions(+), 176 deletions(-) create mode 100644 flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/EmbeddedCassandraService.java diff --git a/flink-streaming-connectors/flink-connector-cassandra/pom.xml b/flink-streaming-connectors/flink-connector-cassandra/pom.xml index 3a1731c7e8186..07cdc09c0925e 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/pom.xml +++ b/flink-streaming-connectors/flink-connector-cassandra/pom.xml @@ -37,8 +37,8 @@ under the License. - 2.2.5 - 3.0.0 + 2.2.7 + 3.0.3 @@ -159,6 +159,13 @@ under the License. ${project.version} test + + + org.caffinitas.ohc + ohc-core + 0.4.5 + test + org.apache.cassandra cassandra-all diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java index 49b1efacbe576..9c4c430293ffb 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java @@ -29,6 +29,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}. @@ -40,11 +42,13 @@ public abstract class CassandraSinkBase extends RichSinkFunction { protected transient Cluster cluster; protected transient Session session; - protected transient Throwable exception = null; + protected transient final AtomicReference exception = new AtomicReference<>(); protected transient FutureCallback callback; private final ClusterBuilder builder; + protected final AtomicInteger updatesPending = new AtomicInteger(); + protected CassandraSinkBase(ClusterBuilder builder) { this.builder = builder; ClosureCleaner.clean(builder, true); @@ -55,11 +59,24 @@ public void open(Configuration configuration) { this.callback = new FutureCallback() { @Override public void onSuccess(V ignored) { + int pending = updatesPending.decrementAndGet(); + if (pending == 0) { + synchronized (updatesPending) { + updatesPending.notifyAll(); + } + } } @Override public void onFailure(Throwable t) { - exception = t; + int pending = updatesPending.decrementAndGet(); + if (pending == 0) { + synchronized (updatesPending) { + updatesPending.notifyAll(); + } + } + exception.set(t); + LOG.error("Error while sending value.", t); } }; @@ -69,10 +86,12 @@ public void onFailure(Throwable t) { @Override public void invoke(IN value) throws Exception { - if (exception != null) { - throw new IOException("invoke() failed", exception); + Throwable e = exception.get(); + if (e != null) { + throw new IOException("Error while sending value.", e); } ListenableFuture result = send(value); + updatesPending.incrementAndGet(); Futures.addCallback(result, callback); } @@ -80,6 +99,14 @@ public void invoke(IN value) throws Exception { @Override public void close() { + while (updatesPending.get() > 0) { + synchronized (updatesPending) { + try { + updatesPending.wait(); + } catch (InterruptedException e) { + } + } + } try { if (session != null) { session.close(); @@ -94,5 +121,9 @@ public void close() { } catch (Exception e) { LOG.error("Error while closing cluster.", e); } + Throwable e = exception.get(); + if (e != null) { + LOG.error("Error while sending value.", e); + } } } diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index d58b0af0ce1b3..26c5bf8858b24 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -20,38 +20,35 @@ import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; - -import org.apache.cassandra.service.CassandraDaemon; - +import com.datastax.driver.core.SocketOptions; +import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.batch.connectors.cassandra.CassandraInputFormat; import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.runtime.testutils.CommonTestUtils.PipeForwarder; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.test.util.TestEnvironment; -import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -70,31 +67,38 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.io.StringWriter; import java.util.ArrayList; +import java.util.List; +import java.util.Random; import java.util.Scanner; import java.util.UUID; import static org.junit.Assert.*; +import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; +import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath; +import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive; @SuppressWarnings("serial") -@PowerMockIgnore("javax.management.*") @RunWith(PowerMockRunner.class) @PrepareForTest(ResultPartitionWriter.class) -public class CassandraConnectorITCase extends WriteAheadSinkTestBase, CassandraTupleWriteAheadSink>> { +public class CassandraConnectorITCase extends WriteAheadSinkTestBase, CassandraConnectorITCase.TestCassandraTupleWriteAheadSink>> { private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class); private static File tmpDir; private static final boolean EMBEDDED = true; - private static EmbeddedCassandraService cassandra; + private static Process cassandra; private static ClusterBuilder builder = new ClusterBuilder() { @Override protected Cluster buildCluster(Cluster.Builder builder) { return builder .addContactPoint("127.0.0.1") + .withSocketOptions(new SocketOptions().setConnectTimeoutMillis(30000)) .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)) + .withPoolingOptions(new PoolingOptions().setConnectionsPerHost(HostDistance.LOCAL, 32, 32).setMaxRequestsPerConnection(HostDistance.LOCAL, 2048).setPoolTimeoutMillis(15000)) .withoutJMXReporting() .withoutMetrics().build(); } @@ -104,11 +108,9 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static Session session; private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; - private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE flink;"; - private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink.test (id text PRIMARY KEY, counter int, batch_id int);"; - private static final String CLEAR_TABLE_QUERY = "TRUNCATE flink.test;"; - private static final String INSERT_DATA_QUERY = "INSERT INTO flink.test (id, counter, batch_id) VALUES (?, ?, ?)"; - private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.test;"; + private static final String CREATE_TABLE_QUERY = "CREATE TABLE IF NOT EXISTS flink.$TABLE (id text PRIMARY KEY, counter int, batch_id int);"; + private static final String INSERT_DATA_QUERY = "INSERT INTO flink.$TABLE (id, counter, batch_id) VALUES (?, ?, ?)"; + private static final String SELECT_DATA_QUERY = "SELECT * FROM flink.$TABLE;"; private static final ArrayList> collection = new ArrayList<>(20); @@ -118,26 +120,13 @@ protected Cluster buildCluster(Cluster.Builder builder) { } } - private static class EmbeddedCassandraService { - CassandraDaemon cassandraDaemon; - - public void start() throws IOException { - this.cassandraDaemon = new CassandraDaemon(); - this.cassandraDaemon.init(null); - this.cassandraDaemon.start(); - } - - public void stop() { - this.cassandraDaemon.stop(); - } - } + private static StringWriter sw; + + private static Random random = new Random(); + private int lastInt; private static LocalFlinkMiniCluster flinkCluster; - // ------------------------------------------------------------------------ - // Cluster Setup (Cassandra & Flink) - // ------------------------------------------------------------------------ - @BeforeClass public static void startCassandra() throws IOException { @@ -166,49 +155,86 @@ public static void startCassandra() throws IOException { } } - - // Tell cassandra where the configuration files are. - // Use the test configuration file. - System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString()); - if (EMBEDDED) { - cassandra = new EmbeddedCassandraService(); - cassandra.start(); + String javaCommand = getJavaCommandPath(); + + // create a logging file for the process + File tempLogFile = File.createTempFile("testlogconfig", "properties"); + CommonTestUtils.printLog4jDebugConfig(tempLogFile); + + // start the task manager process + String[] command = new String[]{ + javaCommand, + "-Dlog.level=DEBUG", + "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), + "-Dcassandra.config=" + tmp.toURI(), + // these options were taken directly from the jvm.options file in the cassandra repo + "-XX:+UseThreadPriorities", + "-Xss256k", + "-XX:+AlwaysPreTouch", + "-XX:+UseTLAB", + "-XX:+ResizeTLAB", + "-XX:+UseNUMA", + "-XX:+PerfDisableSharedMem", + "-XX:+UseParNewGC", + "-XX:+UseConcMarkSweepGC", + "-XX:+CMSParallelRemarkEnabled", + "-XX:SurvivorRatio=8", + "-XX:MaxTenuringThreshold=1", + "-XX:CMSInitiatingOccupancyFraction=75", + "-XX:+UseCMSInitiatingOccupancyOnly", + "-XX:CMSWaitDuration=10000", + "-XX:+CMSParallelInitialMarkEnabled", + "-XX:+CMSEdenChunksRecordAlways", + "-XX:+CMSClassUnloadingEnabled", + + "-classpath", getCurrentClasspath(), + EmbeddedCassandraService.class.getName() + }; + + ProcessBuilder bld = new ProcessBuilder(command); + cassandra = bld.start(); + sw = new StringWriter(); + new PipeForwarder(cassandra.getErrorStream(), sw); } - try { - Thread.sleep(1000 * 10); - } catch (InterruptedException e) { //give cassandra a few seconds to start up + int attempt = 0; + while (true) { + try { + attempt++; + cluster = builder.getCluster(); + session = cluster.connect(); + break; + } catch (Exception e) { + if (attempt > 30) { + throw e; + } + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + } + } } - - cluster = builder.getCluster(); - session = cluster.connect(); + LOG.debug("Connection established after " + attempt + " attempts."); session.execute(CREATE_KEYSPACE_QUERY); - session.execute(CREATE_TABLE_QUERY); - } + session.execute(CREATE_TABLE_QUERY.replace("$TABLE", "flink_initial")); - @BeforeClass - public static void startFlink() throws Exception { - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); - - flinkCluster = new LocalFlinkMiniCluster(config); - flinkCluster.start(); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } } - @AfterClass - public static void stopFlink() { - if (flinkCluster != null) { - flinkCluster.stop(); - flinkCluster = null; - } + @Before + public void createTable() { + lastInt = random.nextInt(Integer.MAX_VALUE); + session.execute(CREATE_TABLE_QUERY.replace("$TABLE", "flink_" + lastInt)); } @AfterClass public static void closeCassandra() { if (session != null) { - session.executeAsync(DROP_KEYSPACE_QUERY); session.close(); } @@ -216,29 +242,24 @@ public static void closeCassandra() { cluster.close(); } - if (cassandra != null) { - cassandra.stop(); - } - - if (tmpDir != null) { - //noinspection ResultOfMethodCallIgnored - tmpDir.delete(); + if (isProcessAlive(cassandra)) { + try { + cassandra.getOutputStream().write(1); + cassandra.getOutputStream().flush(); + } catch (IOException e) { + } + cassandra.destroy(); } - } - - // ------------------------------------------------------------------------ - // Test preparation & cleanup - // ------------------------------------------------------------------------ - @Before - public void initializeExecutionEnvironment() { - TestStreamEnvironment.setAsContext(flinkCluster, 4); - new TestEnvironment(flinkCluster, 4, false).setAsContext(); - } - @After - public void deleteSchema() throws Exception { - session.executeAsync(CLEAR_TABLE_QUERY); + try { + FileUtils.deleteDirectory(tmpDir); + } catch (IOException e) { + LOG.error("Failed to delete tmp directory.", e); + } + if (sw != null) { + //System.out.println(sw.toString()); + } } // ------------------------------------------------------------------------ @@ -246,9 +267,9 @@ public void deleteSchema() throws Exception { // ------------------------------------------------------------------------ @Override - protected CassandraTupleWriteAheadSink> createSink() throws Exception { - return new CassandraTupleWriteAheadSink<>( - INSERT_DATA_QUERY, + protected TestCassandraTupleWriteAheadSink> createSink() throws Exception { + return new TestCassandraTupleWriteAheadSink<>( + "flink_" + lastInt, TypeExtractor.getForObject(new Tuple3<>("", 0, 0)).createSerializer(new ExecutionConfig()), builder, new CassandraCommitter(builder)); @@ -267,9 +288,9 @@ protected Tuple3 generateValue(int counter, int checkp @Override protected void verifyResultsIdealCircumstances( OneInputStreamOperatorTestHarness, Tuple3> harness, - CassandraTupleWriteAheadSink> sink) { + TestCassandraTupleWriteAheadSink> sink) { - ResultSet result = session.execute(SELECT_DATA_QUERY); + ResultSet result = session.execute(SELECT_DATA_QUERY.replace("$TABLE", sink.tableName)); ArrayList list = new ArrayList<>(); for (int x = 1; x <= 60; x++) { list.add(x); @@ -284,9 +305,9 @@ protected void verifyResultsIdealCircumstances( @Override protected void verifyResultsDataPersistenceUponMissedNotify( OneInputStreamOperatorTestHarness, Tuple3> harness, - CassandraTupleWriteAheadSink> sink) { + TestCassandraTupleWriteAheadSink> sink) { - ResultSet result = session.execute(SELECT_DATA_QUERY); + ResultSet result = session.execute(SELECT_DATA_QUERY.replace("$TABLE", sink.tableName)); ArrayList list = new ArrayList<>(); for (int x = 1; x <= 60; x++) { list.add(x); @@ -301,9 +322,8 @@ protected void verifyResultsDataPersistenceUponMissedNotify( @Override protected void verifyResultsDataDiscardingUponRestore( OneInputStreamOperatorTestHarness, Tuple3> harness, - CassandraTupleWriteAheadSink> sink) { - - ResultSet result = session.execute(SELECT_DATA_QUERY); + TestCassandraTupleWriteAheadSink> sink) { + ResultSet result = session.execute(SELECT_DATA_QUERY.replace("$TABLE", sink.tableName)); ArrayList list = new ArrayList<>(); for (int x = 1; x <= 20; x++) { list.add(x); @@ -320,18 +340,19 @@ protected void verifyResultsDataDiscardingUponRestore( @Test public void testCassandraCommitter() throws Exception { - CassandraCommitter cc1 = new CassandraCommitter(builder); - cc1.setJobId("job"); + String jobID = new JobID().toString(); + CassandraCommitter cc1 = new CassandraCommitter(builder, "flink_auxiliary_cc"); + cc1.setJobId(jobID); cc1.setOperatorId("operator"); cc1.setOperatorSubtaskId(0); - CassandraCommitter cc2 = new CassandraCommitter(builder); - cc2.setJobId("job"); + CassandraCommitter cc2 = new CassandraCommitter(builder, "flink_auxiliary_cc"); + cc2.setJobId(jobID); cc2.setOperatorId("operator"); cc2.setOperatorSubtaskId(1); - CassandraCommitter cc3 = new CassandraCommitter(builder); - cc3.setJobId("job"); + CassandraCommitter cc3 = new CassandraCommitter(builder, "flink_auxiliary_cc"); + cc3.setJobId(jobID); cc3.setOperatorId("operator1"); cc3.setOperatorSubtaskId(0); @@ -358,8 +379,8 @@ public void testCassandraCommitter() throws Exception { cc2.close(); cc3.close(); - cc1 = new CassandraCommitter(builder); - cc1.setJobId("job"); + cc1 = new CassandraCommitter(builder, "flink_auxiliary_cc"); + cc1.setJobId(jobID); cc1.setOperatorId("operator"); cc1.setOperatorSubtaskId(0); @@ -378,70 +399,94 @@ public void testCassandraCommitter() throws Exception { @Test public void testCassandraTupleAtLeastOnceSink() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); + CassandraTupleSink> sink = new CassandraTupleSink<>(INSERT_DATA_QUERY.replace("$TABLE", "flink_" + lastInt), builder); - DataStream> source = env.fromCollection(collection); - source.addSink(new CassandraTupleSink>(INSERT_DATA_QUERY, builder)); + sink.open(new Configuration()); + + for (Tuple3 value : collection) { + sink.send(value); + } - env.execute(); + sink.close(); - ResultSet rs = session.execute(SELECT_DATA_QUERY); - Assert.assertEquals(20, rs.all().size()); + synchronized (sink.updatesPending) { + if (sink.updatesPending.get() != 0) { + sink.updatesPending.wait(); + } + } + + ResultSet rs = session.execute(SELECT_DATA_QUERY.replace("$TABLE", "flink_" + lastInt)); + try { + Assert.assertEquals(20, rs.all().size()); + } catch (Throwable e) { + LOG.error("test failed.", e); + } } @Test public void testCassandraPojoAtLeastOnceSink() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - DataStreamSource source = env - .addSource(new SourceFunction() { - - private boolean running = true; - private volatile int cnt = 0; - - @Override - public void run(SourceContext ctx) throws Exception { - while (running) { - ctx.collect(new Pojo(UUID.randomUUID().toString(), cnt, 0)); - cnt++; - if (cnt == 20) { - cancel(); - } - } - } + session.execute(CREATE_TABLE_QUERY.replace("$TABLE", "test")); - @Override - public void cancel() { - running = false; - } - }); + CassandraPojoSink sink = new CassandraPojoSink<>(Pojo.class, builder); + + sink.open(new Configuration()); - source.addSink(new CassandraPojoSink<>(Pojo.class, builder)); + for (int x = 0; x < 20; x++) { + sink.send(new Pojo(UUID.randomUUID().toString(), x, 0)); + } + + sink.close(); - env.execute(); + synchronized (sink.updatesPending) { + while (sink.updatesPending.get() != 0) { + sink.updatesPending.wait(); + } + } - ResultSet rs = session.execute(SELECT_DATA_QUERY); - Assert.assertEquals(20, rs.all().size()); + ResultSet rs = session.execute(SELECT_DATA_QUERY.replace("$TABLE", "test")); + try { + assertEquals(20, rs.all().size()); + } catch (Throwable e) { + LOG.error("test failed.", e); + } } @Test public void testCassandraBatchFormats() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - DataSet> dataSet = env.fromCollection(collection); - dataSet.output(new CassandraOutputFormat>(INSERT_DATA_QUERY, builder)); - - env.execute("Write data"); - - DataSet> inputDS = env.createInput( - new CassandraInputFormat>(SELECT_DATA_QUERY, builder), - TypeInformation.of(new TypeHint>(){})); + OutputFormat> sink = new CassandraOutputFormat<>(INSERT_DATA_QUERY.replace("$TABLE", "flink_" + lastInt), builder); + sink.configure(new Configuration()); + sink.open(0, 1); + + for (Tuple3 value : collection) { + sink.writeRecord(value); + } + + sink.close(); + + InputFormat, InputSplit> source = new CassandraInputFormat<>(SELECT_DATA_QUERY.replace("$TABLE", "flink_" + lastInt), builder); + source.configure(new Configuration()); + source.open(null); + + List> result = new ArrayList<>(); + + while (!source.reachedEnd()) { + result.add(source.nextRecord(new Tuple3())); + } + + source.close(); + try { + assertEquals(20, result.size()); + } catch (Throwable e) { + LOG.error("test failed.", e); + } + } + protected static class TestCassandraTupleWriteAheadSink extends CassandraTupleWriteAheadSink { + private String tableName; - long count = inputDS.count(); - Assert.assertEquals(count, 20L); + private TestCassandraTupleWriteAheadSink(String tableName, TypeSerializer serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception { + super(INSERT_DATA_QUERY.replace("$TABLE", tableName), serializer, builder, committer); + this.tableName = tableName; + } } } diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/EmbeddedCassandraService.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/EmbeddedCassandraService.java new file mode 100644 index 0000000000000..bc2522ca9865f --- /dev/null +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/EmbeddedCassandraService.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.cassandra.service.CassandraDaemon; + +import java.io.IOException; + +public class EmbeddedCassandraService { + public static void main(String[] args) { + final CassandraDaemon cassandraDaemon = new CassandraDaemon(); + + cassandraDaemon.activate(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + cassandraDaemon.deactivate(); + } + }); + + try { + System.in.read(); + } catch (IOException e) { + } + cassandraDaemon.deactivate(); + } +} diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml index 0594ea3fc49f3..77ee0ac6281cd 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/cassandra.yaml @@ -15,29 +15,40 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ + +auto_snapshot: false + cluster_name: 'Test Cluster' -commitlog_sync: 'periodic' -commitlog_sync_period_in_ms: 10000 -commitlog_segment_size_in_mb: 16 -partitioner: 'org.apache.cassandra.dht.RandomPartitioner' -endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch' commitlog_directory: $PATH/commit' +commitlog_sync: periodic +commitlog_sync_period_in_ms: 5000 + data_file_directories: - $PATH/data' -saved_caches_directory: $PATH/cache' +disk_access_mode: mmap + +endpoint_snitch: 'org.apache.cassandra.locator.SimpleSnitch' + listen_address: '127.0.0.1' -seed_provider: - - class_name: 'org.apache.cassandra.locator.SimpleSeedProvider' - parameters: - - seeds: '127.0.0.1' + +memtable_allocation_type: offheap_objects + native_transport_port: 9042 -concurrent_reads: 8 -concurrent_writes: 8 +partitioner: org.apache.cassandra.dht.Murmur3Partitioner -auto_bootstrap: false -auto_snapshot: false +read_request_timeout_in_ms: 15000 +request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler +request_scheduler_id: keyspace +rpc_port: 9170 +saved_caches_directory: $PATH/cache' +seed_provider: + - class_name: 'org.apache.cassandra.locator.SimpleSeedProvider' + parameters: + - seeds: '127.0.0.1' start_rpc: false start_native_transport: true -native_transport_max_threads: 8 +storage_port: 7010 + +write_request_timeout_in_ms: 15000 From 1dc08b7b37eb014edf3a1581ed5c14f86adf3286 Mon Sep 17 00:00:00 2001 From: zentol Date: Fri, 9 Sep 2016 15:55:50 +0200 Subject: [PATCH 2/9] small fix --- .../cassandra/CassandraConnectorITCase.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 26c5bf8858b24..b5553100cb152 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -251,11 +251,12 @@ public static void closeCassandra() { cassandra.destroy(); } - - try { - FileUtils.deleteDirectory(tmpDir); - } catch (IOException e) { - LOG.error("Failed to delete tmp directory.", e); + if (tmpDir != null) { + try { + FileUtils.deleteDirectory(tmpDir); + } catch (IOException e) { + LOG.error("Failed to delete tmp directory.", e); + } } if (sw != null) { //System.out.println(sw.toString()); From 16dbeee16c14a6521c3e919cc836737c9f85fca3 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 2 Nov 2016 12:34:15 +0100 Subject: [PATCH 3/9] misc smaller changes --- .../connectors/cassandra/CassandraConnectorITCase.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index b5553100cb152..631f7856001f7 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -56,7 +56,6 @@ import org.junit.Test; import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -125,8 +124,6 @@ protected Cluster buildCluster(Cluster.Builder builder) { private static Random random = new Random(); private int lastInt; - private static LocalFlinkMiniCluster flinkCluster; - @BeforeClass public static void startCassandra() throws IOException { From 22b1f8a4b91807b0211d212c8eb99d6c03ccc13d Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 2 Nov 2016 12:34:04 +0100 Subject: [PATCH 4/9] Integrate TestJvmProcess --- .../cassandra/CassandraConnectorITCase.java | 101 ++------------- .../cassandra/CassandraService.java | 116 ++++++++++++++++++ .../cassandra/EmbeddedCassandraService.java | 43 ------- 3 files changed, 124 insertions(+), 136 deletions(-) create mode 100644 flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java delete mode 100644 flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/EmbeddedCassandraService.java diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 631f7856001f7..e22442f7abcdf 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -27,7 +27,6 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.SocketOptions; -import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.io.InputFormat; @@ -42,9 +41,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; -import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.runtime.testutils.CommonTestUtils.PipeForwarder; import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -62,21 +58,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; -import java.io.StringWriter; import java.util.ArrayList; import java.util.List; import java.util.Random; -import java.util.Scanner; import java.util.UUID; import static org.junit.Assert.*; -import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; -import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath; -import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive; @SuppressWarnings("serial") @RunWith(PowerMockRunner.class) @@ -84,11 +72,10 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase, CassandraConnectorITCase.TestCassandraTupleWriteAheadSink>> { private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class); - private static File tmpDir; private static final boolean EMBEDDED = true; - private static Process cassandra; + private static CassandraService cassandra; private static ClusterBuilder builder = new ClusterBuilder() { @Override @@ -118,8 +105,6 @@ protected Cluster buildCluster(Cluster.Builder builder) { collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0)); } } - - private static StringWriter sw; private static Random random = new Random(); private int lastInt; @@ -130,69 +115,15 @@ public static void startCassandra() throws IOException { // check if we should run this test, current Cassandra version requires Java >= 1.8 org.apache.flink.core.testutils.CommonTestUtils.assumeJava8(); - // generate temporary files - tmpDir = CommonTestUtils.createTempDirectory(); - ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader(); - File file = new File(classLoader.getResource("cassandra.yaml").getFile()); - File tmp = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml"); - - assertTrue(tmp.createNewFile()); - - try ( - BufferedWriter b = new BufferedWriter(new FileWriter(tmp)); - - //copy cassandra.yaml; inject absolute paths into cassandra.yaml - Scanner scanner = new Scanner(file); - ) { - while (scanner.hasNextLine()) { - String line = scanner.nextLine(); - line = line.replace("$PATH", "'" + tmp.getParentFile()); - b.write(line + "\n"); - b.flush(); - } + try { + cassandra = new CassandraService(); + } catch (Exception e) { + LOG.error("Failed to instantiate cassandra service.", e); + fail("Failed to instantiate cassandra service."); } if (EMBEDDED) { - String javaCommand = getJavaCommandPath(); - - // create a logging file for the process - File tempLogFile = File.createTempFile("testlogconfig", "properties"); - CommonTestUtils.printLog4jDebugConfig(tempLogFile); - - // start the task manager process - String[] command = new String[]{ - javaCommand, - "-Dlog.level=DEBUG", - "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), - "-Dcassandra.config=" + tmp.toURI(), - // these options were taken directly from the jvm.options file in the cassandra repo - "-XX:+UseThreadPriorities", - "-Xss256k", - "-XX:+AlwaysPreTouch", - "-XX:+UseTLAB", - "-XX:+ResizeTLAB", - "-XX:+UseNUMA", - "-XX:+PerfDisableSharedMem", - "-XX:+UseParNewGC", - "-XX:+UseConcMarkSweepGC", - "-XX:+CMSParallelRemarkEnabled", - "-XX:SurvivorRatio=8", - "-XX:MaxTenuringThreshold=1", - "-XX:CMSInitiatingOccupancyFraction=75", - "-XX:+UseCMSInitiatingOccupancyOnly", - "-XX:CMSWaitDuration=10000", - "-XX:+CMSParallelInitialMarkEnabled", - "-XX:+CMSEdenChunksRecordAlways", - "-XX:+CMSClassUnloadingEnabled", - - "-classpath", getCurrentClasspath(), - EmbeddedCassandraService.class.getName() - }; - - ProcessBuilder bld = new ProcessBuilder(command); - cassandra = bld.start(); - sw = new StringWriter(); - new PipeForwarder(cassandra.getErrorStream(), sw); + cassandra.startProcess(); } int attempt = 0; @@ -239,25 +170,9 @@ public static void closeCassandra() { cluster.close(); } - if (isProcessAlive(cassandra)) { - try { - cassandra.getOutputStream().write(1); - cassandra.getOutputStream().flush(); - } catch (IOException e) { - } + if (EMBEDDED) { cassandra.destroy(); } - - if (tmpDir != null) { - try { - FileUtils.deleteDirectory(tmpDir); - } catch (IOException e) { - LOG.error("Failed to delete tmp directory.", e); - } - } - if (sw != null) { - //System.out.println(sw.toString()); - } } // ------------------------------------------------------------------------ diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java new file mode 100644 index 0000000000000..33a63badb3140 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.cassandra.service.CassandraDaemon; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.TestJvmProcess; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Scanner; + +import static org.junit.Assert.assertTrue; + +public class CassandraService extends TestJvmProcess { + private File tmpDir; + private File tmpCassandraYaml; + + public CassandraService() throws Exception { + // generate temporary files + tmpDir = CommonTestUtils.createTempDirectory(); + ClassLoader classLoader = CassandraConnectorITCase.class.getClassLoader(); + File file = new File(classLoader.getResource("cassandra.yaml").getFile()); + tmpCassandraYaml = new File(tmpDir.getAbsolutePath() + File.separator + "cassandra.yaml"); + + assertTrue(tmpCassandraYaml.createNewFile()); + BufferedWriter b = new BufferedWriter(new FileWriter(tmpCassandraYaml)); + + //copy cassandra.yaml; inject absolute paths into cassandra.yaml + Scanner scanner = new Scanner(file); + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + line = line.replace("$PATH", "'" + tmpCassandraYaml.getParentFile()); + b.write(line + "\n"); + b.flush(); + } + scanner.close(); + setJVMMemory(512); + } + + @Override + public String getName() { + return "CassandraService"; + } + + @Override + public String[] getJvmArgs() { + return new String[]{ + tmpCassandraYaml.toURI().toString(), + // these options were taken directly from the jvm.options file in the cassandra repo + "-XX:+UseThreadPriorities", + "-Xss256k", + "-XX:+AlwaysPreTouch", + "-XX:+UseTLAB", + "-XX:+ResizeTLAB", + "-XX:+UseNUMA", + "-XX:+PerfDisableSharedMem", + "-XX:+UseParNewGC", + "-XX:+UseConcMarkSweepGC", + "-XX:+CMSParallelRemarkEnabled", + "-XX:SurvivorRatio=8", + "-XX:MaxTenuringThreshold=1", + "-XX:CMSInitiatingOccupancyFraction=75", + "-XX:+UseCMSInitiatingOccupancyOnly", + "-XX:CMSWaitDuration=10000", + "-XX:+CMSParallelInitialMarkEnabled", + "-XX:+CMSEdenChunksRecordAlways", + "-XX:+CMSClassUnloadingEnabled",}; + } + + @Override + public String getEntryPointClassName() { + return CassandraServiceEntryPoint.class.getName(); + } + + public static class CassandraServiceEntryPoint { + public static void main(String[] args) { + final CassandraDaemon cassandraDaemon = new CassandraDaemon(); + + System.setProperty("cassandra.config", args[0]); + + cassandraDaemon.activate(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + cassandraDaemon.deactivate(); + } + }); + + try { + System.in.read(); + } catch (IOException e) { + } + cassandraDaemon.deactivate(); + } + + } +} diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/EmbeddedCassandraService.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/EmbeddedCassandraService.java deleted file mode 100644 index bc2522ca9865f..0000000000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/EmbeddedCassandraService.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.cassandra; - -import org.apache.cassandra.service.CassandraDaemon; - -import java.io.IOException; - -public class EmbeddedCassandraService { - public static void main(String[] args) { - final CassandraDaemon cassandraDaemon = new CassandraDaemon(); - - cassandraDaemon.activate(); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - cassandraDaemon.deactivate(); - } - }); - - try { - System.in.read(); - } catch (IOException e) { - } - cassandraDaemon.deactivate(); - } -} From 188cf45b1ffd96c7812d371d299dbc35f2fe2491 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 2 Nov 2016 12:35:35 +0100 Subject: [PATCH 5/9] Wait for time instead of # of attempts --- .../connectors/cassandra/CassandraConnectorITCase.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index e22442f7abcdf..6ee99b2b2e910 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -126,15 +126,15 @@ public static void startCassandra() throws IOException { cassandra.startProcess(); } - int attempt = 0; + long start = System.currentTimeMillis(); + long deadline = start + 1000 * 30; while (true) { try { - attempt++; cluster = builder.getCluster(); session = cluster.connect(); break; } catch (Exception e) { - if (attempt > 30) { + if (System.currentTimeMillis() > deadline) { throw e; } try { @@ -143,7 +143,7 @@ public static void startCassandra() throws IOException { } } } - LOG.debug("Connection established after " + attempt + " attempts."); + LOG.debug("Connection established after {}ms.", System.currentTimeMillis() - start); session.execute(CREATE_KEYSPACE_QUERY); session.execute(CREATE_TABLE_QUERY.replace("$TABLE", "flink_initial")); From d5cab239e6af600aff493897bf4bb66230366055 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 2 Nov 2016 14:56:10 +0100 Subject: [PATCH 6/9] remove 5 second sleep --- .../connectors/cassandra/CassandraConnectorITCase.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 6ee99b2b2e910..ede5313687404 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -147,11 +147,6 @@ public static void startCassandra() throws IOException { session.execute(CREATE_KEYSPACE_QUERY); session.execute(CREATE_TABLE_QUERY.replace("$TABLE", "flink_initial")); - - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - } } @Before From 7c68d3fd33a404e955645c1db628bb16957c68b2 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 2 Nov 2016 15:15:41 +0100 Subject: [PATCH 7/9] use CountDownLatch --- .../connectors/cassandra/CassandraService.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java index 33a63badb3140..eb89ef82999eb 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java @@ -26,6 +26,7 @@ import java.io.FileWriter; import java.io.IOException; import java.util.Scanner; +import java.util.concurrent.CountDownLatch; import static org.junit.Assert.assertTrue; @@ -91,7 +92,7 @@ public String getEntryPointClassName() { } public static class CassandraServiceEntryPoint { - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { final CassandraDaemon cassandraDaemon = new CassandraDaemon(); System.setProperty("cassandra.config", args[0]); @@ -105,11 +106,8 @@ public void run() { } }); - try { - System.in.read(); - } catch (IOException e) { - } - cassandraDaemon.deactivate(); + // Run forever + new CountDownLatch(1).await(); } } From 8de60bcb303f91d43723af5664436b9f1e58d0a5 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 2 Nov 2016 15:17:11 +0100 Subject: [PATCH 8/9] remove unused import --- .../flink/streaming/connectors/cassandra/CassandraService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java index eb89ef82999eb..126be5b8bb568 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraService.java @@ -24,7 +24,6 @@ import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; -import java.io.IOException; import java.util.Scanner; import java.util.concurrent.CountDownLatch; From 86786e818f36dcdd93e3e2dbc82417d18a5a7cb8 Mon Sep 17 00:00:00 2001 From: zentol Date: Mon, 14 Nov 2016 12:27:46 +0100 Subject: [PATCH 9/9] prevent NP in close() --- .../connectors/cassandra/CassandraConnectorITCase.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index ede5313687404..5c5fd485ffbce 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -166,7 +166,9 @@ public static void closeCassandra() { } if (EMBEDDED) { - cassandra.destroy(); + if (cassandra != null) { + cassandra.destroy(); + } } }