Skip to content

Commit

Permalink
Revert "[FLINK-4177] Harden CassandraConnectorITCase"
Browse files Browse the repository at this point in the history
This reverts commit 62523ac.
  • Loading branch information
StephanEwen committed Nov 16, 2016
1 parent 95d640b commit 02c10d3
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 351 deletions.
11 changes: 2 additions & 9 deletions flink-streaming-connectors/flink-connector-cassandra/pom.xml
Expand Up @@ -37,8 +37,8 @@ under the License.

<!-- Allow users to pass custom connector versions -->
<properties>
<cassandra.version>2.2.7</cassandra.version>
<driver.version>3.0.3</driver.version>
<cassandra.version>2.2.5</cassandra.version>
<driver.version>3.0.0</driver.version>
</properties>

<build>
Expand Down Expand Up @@ -159,13 +159,6 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- we need this dependency for the EmbeddedCassandraService-->
<dependency>
<groupId>org.caffinitas.ohc</groupId>
<artifactId>ohc-core</artifactId>
<version>0.4.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
Expand Down
Expand Up @@ -29,8 +29,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}.
Expand All @@ -42,13 +40,11 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> {
protected transient Cluster cluster;
protected transient Session session;

protected transient final AtomicReference<Throwable> exception = new AtomicReference<>();
protected transient Throwable exception = null;
protected transient FutureCallback<V> callback;

private final ClusterBuilder builder;

protected final AtomicInteger updatesPending = new AtomicInteger();

protected CassandraSinkBase(ClusterBuilder builder) {
this.builder = builder;
ClosureCleaner.clean(builder, true);
Expand All @@ -59,24 +55,11 @@ public void open(Configuration configuration) {
this.callback = new FutureCallback<V>() {
@Override
public void onSuccess(V ignored) {
int pending = updatesPending.decrementAndGet();
if (pending == 0) {
synchronized (updatesPending) {
updatesPending.notifyAll();
}
}
}

@Override
public void onFailure(Throwable t) {
int pending = updatesPending.decrementAndGet();
if (pending == 0) {
synchronized (updatesPending) {
updatesPending.notifyAll();
}
}
exception.set(t);

exception = t;
LOG.error("Error while sending value.", t);
}
};
Expand All @@ -86,27 +69,17 @@ public void onFailure(Throwable t) {

@Override
public void invoke(IN value) throws Exception {
Throwable e = exception.get();
if (e != null) {
throw new IOException("Error while sending value.", e);
if (exception != null) {
throw new IOException("invoke() failed", exception);
}
ListenableFuture<V> result = send(value);
updatesPending.incrementAndGet();
Futures.addCallback(result, callback);
}

public abstract ListenableFuture<V> send(IN value);

@Override
public void close() {
while (updatesPending.get() > 0) {
synchronized (updatesPending) {
try {
updatesPending.wait();
} catch (InterruptedException e) {
}
}
}
try {
if (session != null) {
session.close();
Expand All @@ -121,9 +94,5 @@ public void close() {
} catch (Exception e) {
LOG.error("Error while closing cluster.", e);
}
Throwable e = exception.get();
if (e != null) {
LOG.error("Error while sending value.", e);
}
}
}

0 comments on commit 02c10d3

Please sign in to comment.