Skip to content

Commit

Permalink
[FLINK-4238] Only allow/require query for Tuple Stream in CassandraSink
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Jul 20, 2016
1 parent 19dae21 commit 89021cb
Showing 1 changed file with 10 additions and 2 deletions.
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.apache.hadoop.yarn.webapp.hamlet.HamletSpec;

/**
* This class wraps different Cassandra sink implementations to provide a common interface for all of them.
Expand Down Expand Up @@ -271,8 +272,15 @@ public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committe
public abstract CassandraSink<IN> build() throws Exception;

protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must not be null or empty.");
if (input.getType() instanceof TupleTypeInfo) {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must not be null or empty.");
}
} else {
if (query != null) {
throw new IllegalArgumentException("Specifying a query is not allowed when using a Pojo-Stream as input.");
}

}
if (builder == null) {
throw new IllegalArgumentException("Cassandra host information must be supplied using either setHost() or setClusterBuilder().");
Expand Down

0 comments on commit 89021cb

Please sign in to comment.