Skip to content

Commit

Permalink
Feat: Print Topic sets auto.offset.reset as default
Browse files Browse the repository at this point in the history
  • Loading branch information
Guttz committed Dec 5, 2022
1 parent 4624e6e commit 38122a2
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,33 @@ private KafkaConsumer<Bytes, Bytes> createTopicConsumer(final ServiceContext ser
throw new KsqlException("Could not list existing kafka topics" + e);
}

Map<String, Object> populatedConsumerProperties = populateKsqlStreamConfigProps(ksqlConfig,
Map<String, Object> ksqlStreamConfigProps = overrideDefaultKsqlStreamConfigProps(
ksqlConfig);

Map<String, Object> finalConsumerProperties = populateKsqlStreamConfigProps(
ksqlStreamConfigProps,
consumerProperties);
return PrintTopicUtil.createTopicConsumer(serviceContext, populatedConsumerProperties,
return PrintTopicUtil.createTopicConsumer(serviceContext, finalConsumerProperties,
printTopic);
}

private Map<String, Object> populateKsqlStreamConfigProps(KsqlConfig ksqlConfig,
private Map<String, Object> populateKsqlStreamConfigProps(Map<String, Object> ksqlConfig,
Map<String, Object> properties) {
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.putAll(ksqlConfig.getKsqlStreamConfigProps());
Map<String, Object> consumerProperties = new HashMap<>(ksqlConfig);
consumerProperties.putAll(properties);

return consumerProperties;
}

private Map<String, Object> overrideDefaultKsqlStreamConfigProps(KsqlConfig ksqlConfig) {
Map<String, Object> overriddenProperties = new HashMap<>(ksqlConfig.getKsqlStreamConfigProps());
// We override the default value of auto.offset.reset to latest because that's the default
// behavior for the print topic command, unlike the default behavior for push and pull queries.
overriddenProperties.put("auto.offset.reset", "latest");

return overriddenProperties;
}

private boolean isClosed() {
return closed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ private void shouldFailToExecuteQuery(final String sql, final String message) {

private void shouldFailToExecutePrint(final String sql, final String message) {
// When:
PrintResponse response = executePrintQuery(sql);
PrintResponse response = executePrintTopic(sql);

// Then:
assertThat(response.getRows(), hasSize(0));
Expand All @@ -674,15 +674,13 @@ private void shouldFailToExecutePrint(final String sql, final String message) {
@Test
public void shouldExecutePrint() {
// Given:
String sql = "PRINT " + TEST_TOPIC + " LIMIT 1;";
String sql = "PRINT " + TEST_TOPIC + " FROM BEGINNING LIMIT 1;";

// Create a write stream to capture the incomplete response
ReceiveStream writeStream = new ReceiveStream(vertx);

// Make the request to stream a print
JsonObject printProperties = new JsonObject().put("auto.offset.reset", "earliest");
JsonObject printRequestBody = new JsonObject().put("sql", sql)
.put("properties", printProperties);
JsonObject printRequestBody = new JsonObject().put("sql", sql);
VertxCompletableFuture<HttpResponse<Void>> responseFuture = new VertxCompletableFuture<>();

// When:
Expand Down Expand Up @@ -711,15 +709,13 @@ public void shouldExecutePrint() {
@Test
public void shouldExecutePrintQueryNoLimit() {
// Given:
String sql = "PRINT " + TEST_TOPIC + ";";
String sql = "PRINT " + TEST_TOPIC + " FROM BEGINNING;";

// Create a write stream to capture the incomplete response
ReceiveStream writeStream = new ReceiveStream(vertx);

// Make the request to stream a print
JsonObject printProperties = new JsonObject().put("auto.offset.reset", "earliest");
JsonObject printRequestBody = new JsonObject().put("sql", sql)
.put("properties", printProperties);
JsonObject printRequestBody = new JsonObject().put("sql", sql);
VertxCompletableFuture<HttpResponse<Void>> responseFuture = new VertxCompletableFuture<>();

// When:
Expand Down Expand Up @@ -747,14 +743,42 @@ public void shouldExecutePrintQueryNoLimit() {
}

@Test
public void shouldExecutePrintQueryFromEarliestOffsetWithLimit() {
public void shouldExecutePrintQueryFromBeginningWithLimit() {
// Given:
String sql = "PRINT " + TEST_TOPIC + " FROM BEGINNING LIMIT 3;";

// When:
AtomicReference<PrintResponse> atomicReference = new AtomicReference<>();
assertThatEventually(() -> {
PrintResponse printResponse = executePrintQuery(sql);
PrintResponse printResponse = executePrintTopic(sql);
atomicReference.set(printResponse);
return printResponse.getRows().size();
}, is(3));

PrintResponse printResponse = atomicReference.get();

// Then:
assertThat(printResponse.getRows().get(0), containsString("rowtime:"));
assertThat(printResponse.getRows().get(0), containsString(
"key: {\"F1\":[\"a\"]}, value: {\"STR\":\"FOO\",\"LONG\":1,\"DEC\":1.11,\"BYTES_\":\"AQ==\",\"ARRAY\":[\"a\"],\"MAP\":{\"k1\":\"v1\"},\"STRUCT\":{\"F1\":2},\"COMPLEX\":{\"DECIMAL\":0.0,\"STRUCT\":{\"F1\":\"v0\",\"F2\":0},\"ARRAY_ARRAY\":[[\"foo\"]],\"ARRAY_STRUCT\":[{\"F1\":\"v0\"}],\"ARRAY_MAP\":[{\"k1\":0}],\"MAP_ARRAY\":{\"k\":[\"v0\"]},\"MAP_MAP\":{\"k\":{\"k\":0}},\"MAP_STRUCT\":{\"k\":{\"F1\":\"v0\"}}},\"TIMESTAMP\":1,\"DATE\":1,\"TIME\":0}, partition: 0"));
assertThat(printResponse.getRows().get(1), containsString("rowtime:"));
assertThat(printResponse.getRows().get(1), containsString(
"key: {\"F1\":[\"b\"]}, value: {\"STR\":\"BAR\",\"LONG\":2,\"DEC\":2.22,\"BYTES_\":\"Ag==\",\"ARRAY\":[],\"MAP\":{},\"STRUCT\":{\"F1\":3},\"COMPLEX\":{\"DECIMAL\":1.0,\"STRUCT\":{\"F1\":\"v1\",\"F2\":1},\"ARRAY_ARRAY\":[[\"foo\"]],\"ARRAY_STRUCT\":[{\"F1\":\"v1\"}],\"ARRAY_MAP\":[{\"k1\":1}],\"MAP_ARRAY\":{\"k\":[\"v1\"]},\"MAP_MAP\":{\"k\":{\"k\":1}},\"MAP_STRUCT\":{\"k\":{\"F1\":\"v1\"}}},\"TIMESTAMP\":2,\"DATE\":2,\"TIME\":1}, partition: 0"));
assertThat(printResponse.getRows().get(2), containsString("rowtime:"));
assertThat(printResponse.getRows().get(2), containsString(
"key: {\"F1\":[\"c\"]}, value: {\"STR\":\"BAZ\",\"LONG\":3,\"DEC\":30.33,\"BYTES_\":\"Aw==\",\"ARRAY\":[\"b\"],\"MAP\":{},\"STRUCT\":{\"F1\":null},\"COMPLEX\":{\"DECIMAL\":2.0,\"STRUCT\":{\"F1\":\"v2\",\"F2\":2},\"ARRAY_ARRAY\":[[\"foo\"]],\"ARRAY_STRUCT\":[{\"F1\":\"v2\"}],\"ARRAY_MAP\":[{\"k1\":2}],\"MAP_ARRAY\":{\"k\":[\"v2\"]},\"MAP_MAP\":{\"k\":{\"k\":2}},\"MAP_STRUCT\":{\"k\":{\"F1\":\"v2\"}}},\"TIMESTAMP\":3,\"DATE\":3,\"TIME\":2}, partition: 0"));
}

@Test
public void shouldExecutePrintQueryFromEarliestOffsetWithLimit() {
// Given:
String sql = "PRINT " + TEST_TOPIC + " LIMIT 3;";

// When:
AtomicReference<PrintResponse> atomicReference = new AtomicReference<>();
assertThatEventually(() -> {
JsonObject printProperties = new JsonObject().put("auto.offset.reset", "earliest");
PrintResponse printResponse = executePrintTopicWithProperties(sql, printProperties);
atomicReference.set(printResponse);
return printResponse.getRows().size();
}, is(3));
Expand Down Expand Up @@ -797,15 +821,13 @@ public void shouldFailToPrintWithInvalidSql() {
@Test
public void shouldFailToPrintInJsonFormat() {
// Given:
String sql = "PRINT " + TEST_TOPIC + " LIMIT 1;";
String sql = "PRINT " + TEST_TOPIC + " FROM BEGINNING LIMIT 1;";

// Create a write stream to capture the incomplete response
ReceiveStream writeStream = new ReceiveStream(vertx);

// Make the request to stream a print
JsonObject printProperties = new JsonObject().put("auto.offset.reset", "earliest");
JsonObject printRequestBody = new JsonObject().put("sql", sql)
.put("properties", printProperties);
JsonObject printRequestBody = new JsonObject().put("sql", sql);
VertxCompletableFuture<HttpResponse<Void>> responseFuture = new VertxCompletableFuture<>();

// When:
Expand Down Expand Up @@ -835,13 +857,18 @@ private QueryResponse executeQueryWithVariables(final String sql, final JsonObje
return new QueryResponse(response.bodyAsString());
}

private PrintResponse executePrintQuery(final String sql) {
private PrintResponse executePrintTopic(final String sql) {
JsonObject properties = new JsonObject();
return executePrintTopicWithProperties(sql, properties);
}

private PrintResponse executePrintTopicWithProperties(final String sql, final JsonObject properties) {
JsonObject requestBody = new JsonObject().put("sql", sql).put("properties", properties);
HttpResponse<Buffer> response = sendRequest("/query-stream", requestBody.toBuffer());
return new PrintResponse(response.bodyAsString());
}


private void shouldFailToInsert(final JsonObject row, final int errorCode, final String message) {
final HttpResponse<Buffer> response = makeInsertsRequest(TEST_STREAM, row);

Expand Down

0 comments on commit 38122a2

Please sign in to comment.