Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4238] Only allow/require query for Tuple Stream in CassandraSink #2273

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -271,9 +271,6 @@ public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committe
public abstract CassandraSink<IN> build() throws Exception;

protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must not be null or empty.");
}
if (builder == null) {
throw new IllegalArgumentException("Cassandra host information must be supplied using either setHost() or setClusterBuilder().");
}
Expand All @@ -285,6 +282,14 @@ public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeI
super(input, typeInfo, serializer);
}

@Override
protected void sanityCheck() {
super.sanityCheck();
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must not be null or empty.");
}
}

@Override
public CassandraSink<IN> build() throws Exception {
sanityCheck();
Expand All @@ -303,6 +308,14 @@ public CassandraPojoSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeIn
super(input, typeInfo, serializer);
}

@Override
protected void sanityCheck() {
super.sanityCheck();
if (query != null) {
throw new IllegalArgumentException("Specifying a query is not allowed when using a Pojo-Stream as input.");
}
}

@Override
public CassandraSink<IN> build() throws Exception {
sanityCheck();
Expand Down