Skip to content

Commit

Permalink
fix: issue 7948. Allow insert into table using Java API (#8114)
Browse files Browse the repository at this point in the history
  • Loading branch information
lihaosky committed Sep 17, 2021
1 parent a722184 commit a6c2cac
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static io.confluent.ksql.api.client.util.ClientTestUtil.awaitLatch;
import static io.confluent.ksql.api.client.util.ClientTestUtil.subscribeAndWait;
import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_REQUEST;
import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_STATEMENT;
import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -595,7 +596,7 @@ public void shouldInsertInto() throws Exception {
@Test
public void shouldHandleErrorResponseFromInsertInto() {
// Given
KsqlApiException exception = new KsqlApiException("Cannot insert into a table", ERROR_CODE_BAD_REQUEST);
KsqlApiException exception = new KsqlApiException("Invalid target name", ERROR_CODE_BAD_STATEMENT);
testEndpoints.setCreateInsertsSubscriberException(exception);

// When
Expand All @@ -607,7 +608,7 @@ public void shouldHandleErrorResponseFromInsertInto() {
// Then
assertThat(e.getCause(), instanceOf(KsqlClientException.class));
assertThat(e.getCause().getMessage(), containsString("Received 400 response from server"));
assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table"));
assertThat(e.getCause().getMessage(), containsString("Invalid target name"));
}

@Test
Expand Down Expand Up @@ -671,7 +672,7 @@ public void shouldStreamInserts() throws Exception {
@Test
public void shouldHandleErrorResponseFromStreamInserts() {
// Given
KsqlApiException exception = new KsqlApiException("Cannot insert into a table", ERROR_CODE_BAD_REQUEST);
KsqlApiException exception = new KsqlApiException("Invalid target name", ERROR_CODE_BAD_STATEMENT);
testEndpoints.setCreateInsertsSubscriberException(exception);

// When
Expand All @@ -683,7 +684,7 @@ public void shouldHandleErrorResponseFromStreamInserts() {
// Then
assertThat(e.getCause(), instanceOf(KsqlClientException.class));
assertThat(e.getCause().getMessage(), containsString("Received 400 response from server"));
assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table"));
assertThat(e.getCause().getMessage(), containsString("Invalid target name"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public class ClientMutationIntegrationTest {
private static final Format KEY_FORMAT = FormatFactory.JSON;
private static final Format VALUE_FORMAT = FormatFactory.JSON;

private static final String TEST_TABLE = "TEST_TABLE";
private static final String NON_EXIST_TABLE = "NON_EXIST_TABLE";
private static final String AGG_TABLE = "AGG_TABLE";
private static final PhysicalSchema AGG_SCHEMA = PhysicalSchema.from(
LogicalSchema.builder()
Expand Down Expand Up @@ -179,6 +181,9 @@ public static void setUpClass() throws Exception {
makeKsqlRequest("CREATE TABLE " + AGG_TABLE + " AS "
+ "SELECT K, LATEST_BY_OFFSET(LONG) AS LONG FROM " + TEST_STREAM + " GROUP BY K;"
);
makeKsqlRequest(
"CREATE TABLE " + TEST_TABLE + " (K STRING PRIMARY KEY, LONG BIGINT) WITH (KAFKA_TOPIC='"
+ TEST_TABLE + "', VALUE_FORMAT='json', PARTITIONS=1);");

TEST_HARNESS.verifyAvailableUniqueRows(
AGG_TABLE,
Expand Down Expand Up @@ -410,13 +415,42 @@ public void shouldHandleErrorResponseFromInsertInto() {
// When
final Exception e = assertThrows(
ExecutionException.class, // thrown from .get() when the future completes exceptionally
() -> client.insertInto(AGG_TABLE, insertRow).get()
() -> client.insertInto(NON_EXIST_TABLE, insertRow).get()
);

// Then
assertThat(e.getCause(), instanceOf(KsqlClientException.class));
assertThat(e.getCause().getMessage(), containsString("Received 400 response from server"));
assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table"));
assertThat(e.getCause().getMessage(), containsString("Cannot insert values into an unknown stream/table"));
}

@Test
public void shouldInsertIntoTable() throws Exception {
// Given
final Map<String, Object> properties = new HashMap<>();
properties.put("auto.offset.reset", "earliest");

final KsqlObject insertRow = new KsqlObject()
.put("K", "my_key")
.put("LONG", 11L);

// When
final String query = "SELECT * from " + TEST_TABLE + " WHERE K='my_key' EMIT CHANGES LIMIT 1;";
StreamedQueryResult queryResult = client.streamQuery(query, properties).get();

final Row row = assertThatEventually(() -> {
// Potentially try inserting multiple times, in case the query wasn't started by the first time
try {
client.insertInto(TEST_TABLE, insertRow).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
return queryResult.poll(Duration.ofMillis(10));
}, is(notNullValue()));

// Then: a newly inserted row arrives
assertThat(row.getString("K"), is("my_key"));
assertThat(row.getLong("LONG"), is(11L));
}

@Test
Expand Down Expand Up @@ -494,13 +528,14 @@ public void shouldHandleErrorResponseFromStreamInserts() {
// When
final Exception e = assertThrows(
ExecutionException.class, // thrown from .get() when the future completes exceptionally
() -> client.streamInserts(AGG_TABLE, new InsertsPublisher()).get()
() -> client.streamInserts(NON_EXIST_TABLE, new InsertsPublisher()).get()
);

// Then
assertThat(e.getCause(), instanceOf(KsqlClientException.class));
assertThat(e.getCause().getMessage(), containsString("Received 400 response from server"));
assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table"));
assertThat(e.getCause().getMessage(),
containsString("Cannot insert values into an unknown stream/table"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public boolean topicExists(final String topicName) {
/**
* Ensure topics with the given {@code topicNames} exist.
*
* <p>Topics will be creates, if they do not already exist, with a single partition and replica.
* <p>Topics will be created, if they do not already exist, with a single partition and replica.
*
* @param topicNames the names of the topics to create.
*/
Expand All @@ -144,7 +144,7 @@ public void ensureTopics(final String... topicNames) {
/**
* Ensure topics with the given {@code topicNames} exist.
*
* <p>Topics will be creates, if they do not already exist, with the specified
* <p>Topics will be created, if they do not already exist, with the specified
* {@code partitionCount}.
*
* @param topicNames the names of the topics to create.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.Identifiers;
Expand Down Expand Up @@ -73,11 +72,9 @@ public InsertsStreamSubscriber createInsertsSubscriber(final String caseInsensit
"Invalid target name: " + e.getMessage(), ERROR_CODE_BAD_STATEMENT);
}


final DataSource dataSource = getDataSource(ksqlEngine.getMetaStore(),
SourceName.of(target));
if (dataSource.getDataSourceType() == DataSourceType.KTABLE) {
throw new KsqlApiException("Cannot insert into a table", ERROR_CODE_BAD_STATEMENT);
}
return InsertsSubscriber.createInsertsSubscriber(serviceContext, properties, dataSource,
ksqlConfig, context, acksSubscriber, workerExecutor);
}
Expand All @@ -89,12 +86,13 @@ private DataSource getDataSource(
final DataSource dataSource = metaStore.getSource(sourceName);
if (dataSource == null) {
throw new KsqlApiException(
"Cannot insert values into an unknown stream: " + sourceName, ERROR_CODE_BAD_STATEMENT);
"Cannot insert values into an unknown stream/table: " + sourceName,
ERROR_CODE_BAD_STATEMENT);
}

if (dataSource.getKsqlTopic().getKeyFormat().isWindowed()) {
throw new KsqlApiException(
"Cannot insert values into windowed stream", ERROR_CODE_BAD_STATEMENT);
"Cannot insert values into windowed stream/table", ERROR_CODE_BAD_STATEMENT);
}

if (reservedInternalTopics.isReadOnly(dataSource.getKafkaTopicName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,8 @@ public void shouldTreatInsertTargetAsCaseSensitiveIfQuotedWithBackticks() {
.put("COMPLEX", COMPLEX_FIELD_VALUE);

// Then: request fails because stream name is invalid
shouldRejectInsertRequest(target, row, "Cannot insert values into an unknown stream: " + target);
shouldRejectInsertRequest(target, row,
"Cannot insert values into an unknown stream/table: " + target);
}

@Test
Expand All @@ -533,7 +534,8 @@ public void shouldTreatInsertTargetAsCaseSensitiveIfQuotedWithDoubleQuotes() {
.put("COMPLEX", COMPLEX_FIELD_VALUE);

// Then: request fails because stream name is invalid
shouldRejectInsertRequest(target, row, "Cannot insert values into an unknown stream: `" + TEST_STREAM.toLowerCase() + "`");
shouldRejectInsertRequest(target, row,
"Cannot insert values into an unknown stream/table: `" + TEST_STREAM.toLowerCase() + "`");
}

@Test
Expand Down

0 comments on commit a6c2cac

Please sign in to comment.