Skip to content

Commit

Permalink
[FLINK-4123] Cassandra sink checks for exceptions in ack phase
Browse files Browse the repository at this point in the history
add serialVersionUID

switch to AtomicReference

wait-notify

disable logging

add test case for leaving ackPhaseLoopOnException

prevent infinite loop in test

This closes #2183.
  • Loading branch information
zentol authored and tillrohrmann committed Jul 12, 2016
1 parent 508965e commit 5c2da21
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 35 deletions.
Expand Up @@ -31,9 +31,11 @@
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
import org.apache.flink.types.IntValue;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* Sink that emits its input elements into a Cassandra database. This sink stores incoming records within a
Expand All @@ -43,20 +45,16 @@
* @param <IN> Type of the elements emitted by this sink
*/
public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> {
private static final long serialVersionUID = 1L;

protected transient Cluster cluster;
protected transient Session session;

private final String insertQuery;
private transient PreparedStatement preparedStatement;

private transient Throwable exception = null;
private transient FutureCallback<ResultSet> callback;

private ClusterBuilder builder;

private int updatesSent = 0;
private AtomicInteger updatesConfirmed = new AtomicInteger(0);

private transient Object[] fields;

protected CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception {
Expand All @@ -71,18 +69,6 @@ public void open() throws Exception {
if (!getRuntimeContext().isCheckpointingEnabled()) {
throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled.");
}
this.callback = new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet resultSet) {
updatesConfirmed.incrementAndGet();
}

@Override
public void onFailure(Throwable throwable) {
exception = throwable;
LOG.error("Error while sending value.", throwable);
}
};
cluster = builder.getCluster();
session = cluster.connect();
preparedStatement = session.prepare(insertQuery);
Expand Down Expand Up @@ -110,12 +96,38 @@ public void close() throws Exception {
}

@Override
protected void sendValues(Iterable<IN> values, long timestamp) throws Exception {
//verify that no query failed until now
if (exception != null) {
throw new Exception(exception);
}
protected boolean sendValues(Iterable<IN> values, long timestamp) throws Exception {
final IntValue updatesCount = new IntValue(0);
final AtomicInteger updatesConfirmed = new AtomicInteger(0);

final AtomicReference<Throwable> exception = new AtomicReference<>();

FutureCallback<ResultSet> callback = new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet resultSet) {
updatesConfirmed.incrementAndGet();
if (updatesCount.getValue() > 0) { // only set if all updates have been sent
if (updatesCount.getValue() == updatesConfirmed.get()) {
synchronized (updatesConfirmed) {
updatesConfirmed.notifyAll();
}
}
}
}

@Override
public void onFailure(Throwable throwable) {
if (exception.compareAndSet(null, throwable)) {
LOG.error("Error while sending value.", throwable);
synchronized (updatesConfirmed) {
updatesConfirmed.notifyAll();
}
}
}
};

//set values for prepared statement
int updatesSent = 0;
for (IN value : values) {
for (int x = 0; x < value.getArity(); x++) {
fields[x] = value.getField(x);
Expand All @@ -130,13 +142,18 @@ protected void sendValues(Iterable<IN> values, long timestamp) throws Exception
Futures.addCallback(result, callback);
}
}
try {
updatesCount.setValue(updatesSent);

synchronized (updatesConfirmed) {
while (updatesSent != updatesConfirmed.get()) {
Thread.sleep(100);
if (exception.get() != null) { // verify that no query failed until now
LOG.warn("Sending a value failed.", exception.get());
break;
}
updatesConfirmed.wait();
}
} catch (InterruptedException e) {
}
updatesSent = 0;
updatesConfirmed.set(0);
boolean success = updatesSent == updatesConfirmed.get();
return success;
}
}
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.IterableIterator;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.powermock.api.mockito.PowerMockito.doAnswer;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;

@RunWith(PowerMockRunner.class)
@PrepareForTest({ResultPartitionWriter.class, CassandraTupleWriteAheadSink.class})
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class CassandraConnectorUnitTest {
@Test
public void testAckLoopExitOnException() throws Exception {
final AtomicReference<Runnable> callback = new AtomicReference<>();

final ClusterBuilder clusterBuilder = new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
try {
BoundStatement boundStatement = mock(BoundStatement.class);
when(boundStatement.setDefaultTimestamp(any(long.class))).thenReturn(boundStatement);

PreparedStatement preparedStatement = mock(PreparedStatement.class);
when(preparedStatement.bind(Matchers.anyVararg())).thenReturn(boundStatement);

ResultSetFuture future = mock(ResultSetFuture.class);
when(future.get()).thenThrow(new RuntimeException("Expected exception."));

doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
callback.set((((Runnable) invocationOnMock.getArguments()[0])));
return null;
}
}).when(future).addListener(any(Runnable.class), any(Executor.class));

Session session = mock(Session.class);
when(session.prepare(anyString())).thenReturn(preparedStatement);
when(session.executeAsync(any(BoundStatement.class))).thenReturn(future);

Cluster cluster = mock(Cluster.class);
when(cluster.connect()).thenReturn(session);
return cluster;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};

final IterableIterator<Tuple0> iter = new IterableIterator<Tuple0>() {
private boolean exhausted = false;

@Override
public boolean hasNext() {
return !exhausted;
}

@Override
public Tuple0 next() {
exhausted = true;
return new Tuple0();
}

@Override
public void remove() {
}

@Override
public Iterator<Tuple0> iterator() {
return this;
}
};

final AtomicReference<Boolean> exceptionCaught = new AtomicReference<>();

Thread t = new Thread() {
public void run() {
try {
CheckpointCommitter cc = mock(CheckpointCommitter.class);
final CassandraTupleWriteAheadSink<Tuple0> sink = new CassandraTupleWriteAheadSink<>(
"abc",
TupleTypeInfo.of(Tuple0.class).createSerializer(new ExecutionConfig()),
clusterBuilder,
cc
);

OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new OneInputStreamOperatorTestHarness(sink);
harness.getEnvironment().getTaskConfiguration().setBoolean("checkpointing", true);

harness.setup();
sink.open();
boolean result = sink.sendValues(iter, 0L);
sink.close();
exceptionCaught.set(result == false);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
t.start();

int count = 0;
while (t.getState() != Thread.State.WAITING && count < 100) { // 10 second timeout 10 * 10 * 100ms
Thread.sleep(100);
count++;
}

callback.get().run();

t.join();

Assert.assertTrue(exceptionCaught.get());
}
}
Expand Up @@ -16,7 +16,7 @@
# limitations under the License.
################################################################################

log4j.rootLogger=INFO, testlogger
log4j.rootLogger=OFF, testlogger

log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target= System.err
Expand Down
Expand Up @@ -49,6 +49,8 @@
* @param <IN> Type of the elements emitted by this sink
*/
public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;

protected static final Logger LOG = LoggerFactory.getLogger(GenericWriteAheadSink.class);
private final CheckpointCommitter committer;
private transient AbstractStateBackend.CheckpointStateOutputView out;
Expand Down Expand Up @@ -140,10 +142,14 @@ public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
if (!committer.isCheckpointCommitted(pastCheckpointId)) {
Tuple2<Long, StateHandle<DataInputView>> handle = state.pendingHandles.get(pastCheckpointId);
DataInputView in = handle.f1.getState(getUserCodeClassloader());
sendValues(new ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(in, serializer), serializer), handle.f0);
committer.commitCheckpoint(pastCheckpointId);
boolean success = sendValues(new ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(in, serializer), serializer), handle.f0);
if (success) { //if the sending has failed we will retry on the next notify
committer.commitCheckpoint(pastCheckpointId);
checkpointsToRemove.add(pastCheckpointId);
}
} else {
checkpointsToRemove.add(pastCheckpointId);
}
checkpointsToRemove.add(pastCheckpointId);
}
}
for (Long toRemove : checkpointsToRemove) {
Expand All @@ -159,10 +165,10 @@ public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
* Write the given element into the backend.
*
* @param value value to be written
* @return true, if the sending was successful, false otherwise
* @throws Exception
*/

protected abstract void sendValues(Iterable<IN> value, long timestamp) throws Exception;
protected abstract boolean sendValues(Iterable<IN> value, long timestamp) throws Exception;

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
Expand Down
Expand Up @@ -117,10 +117,11 @@ public ListSink() throws Exception {
}

@Override
protected void sendValues(Iterable<Tuple1<Integer>> values, long timestamp) throws Exception {
protected boolean sendValues(Iterable<Tuple1<Integer>> values, long timestamp) throws Exception {
for (Tuple1<Integer> value : values) {
this.values.add(value.f0);
}
return true;
}
}

Expand Down
Expand Up @@ -147,6 +147,10 @@ public Object getCheckpointLock() {
return mockTask.getCheckpointLock();
}

public Environment getEnvironment() {
return this.mockTask.getEnvironment();
}

public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, TypeInformation<K> keyType) {
ClosureCleaner.clean(keySelector, false);
config.setStatePartitioner(0, keySelector);
Expand Down

0 comments on commit 5c2da21

Please sign in to comment.