Skip to content

Commit

Permalink
fix: do not throw error if VALUE_DELIMITER is set on non-DELIMITED to…
Browse files Browse the repository at this point in the history
…pic (#4366)
  • Loading branch information
agavra committed Jan 23, 2020
1 parent d15026f commit 2b59b8b
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ private FormatInfo(
}

this.delimiter = Objects.requireNonNull(delimiter, "delimiter");

if (format != Format.DELIMITED && delimiter.isPresent()) {
throw new KsqlException("Delimeter only supported with DELIMITED format");
}

}

public Format getFormat() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,24 +123,4 @@ public void shouldGetAvroSchemaName() {
assertThat(FormatInfo.of(AVRO, Optional.empty(), Optional.empty()).getFullSchemaName(),
is(Optional.empty()));
}

@Test
public void shouldThrowWhenAttemptingToUseValueDelimeterWithAvroFormat() {
// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Delimeter only supported with DELIMITED format");

// When:
FormatInfo.of(Format.AVRO, Optional.of("something"), Optional.of(Delimiter.of('x')));
}

@Test
public void shouldThrowWhenAttemptingToUseValueDelimeterWithJsonFormat() {
// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Delimeter only supported with DELIMITED format");

// When:
FormatInfo.of(Format.JSON, Optional.empty(), Optional.of(Delimiter.of('x')));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ private Optional<Delimiter> getValueDelimiter(final Sink sink) {
if (sink.getProperties().getValueDelimiter().isPresent()) {
return sink.getProperties().getValueDelimiter();
}

return analysis
.getFromDataSources()
.get(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@
{"topic": "S2", "key": "100", "value": "100,100,100", "timestamp": 0}
]
},
{
"name": "select delimited value_format into another format",
"format": ["JSON", "AVRO"],
"statements": [
"CREATE STREAM TEST (ID bigint, NAME varchar, VALUE integer) WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter=',');",
"CREATE STREAM S2 WITH(value_format='{FORMAT}') as SELECT id, name, value FROM test;"
],
"inputs": [
{"topic": "test_topic", "key": "0", "value": "0,zero,0", "timestamp": 0}
],
"outputs": [
{"topic": "S2", "key": "0", "value": {"ID": 0, "NAME": "zero", "VALUE": 0}, "timestamp": 0}
]
},
{
"name": "validate value_delimiter to be single character",
"statements": [
Expand All @@ -35,7 +49,7 @@
"outputs": []
},
{
"name": "validate delimeter is not empty",
"name": "validate delimiter is not empty",
"statements": [
"CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter='');"
],
Expand All @@ -53,7 +67,7 @@
"outputs": []
},
{
"name": "validate delimeter is not a space",
"name": "validate delimiter is not a space",
"statements": [
"CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter=' ');"
],
Expand All @@ -65,7 +79,7 @@
"outputs": []
},
{
"name": "validate delimeter is not a tab character",
"name": "validate delimiter is not a tab character",
"statements": [
"CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='DELIMITED', value_delimiter='\t');"
],
Expand Down Expand Up @@ -151,24 +165,6 @@
{"topic": "S2", "key": "100", "value": "100\t100\t500", "timestamp": 0},
{"topic": "S2", "key": "100", "value": "100\t100\t100", "timestamp": 0}
]
},
{
"name": "validate cannot specify delimeter with json format",
"statements": [
"CREATE STREAM TEST WITH (kafka_topic='test_topic', value_format='JSON', value_delimiter='|');"
],
"topics": [
{
"name": "test_topic",
"format": "JSON"
}
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Delimeter only supported with DELIMITED format"
},
"inputs": [],
"outputs": []
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,16 @@ public void shouldSerializeNegativeDecimalWithPaddedZeros() {
}

@Test
public void shouldSerializeRowCorrectlyWithTabDelimeter() {
shouldSerializeRowCorrectlyWithNonDefaultDelimeter('\t');
public void shouldSerializeRowCorrectlyWithTabDelimiter() {
shouldSerializeRowCorrectlyWithNonDefaultDelimiter('\t');
}

@Test
public void shouldSerializeRowCorrectlyWithBarDelimeter() {
shouldSerializeRowCorrectlyWithNonDefaultDelimeter('|');
public void shouldSerializeRowCorrectlyWithBarDelimiter() {
shouldSerializeRowCorrectlyWithNonDefaultDelimiter('|');
}

private void shouldSerializeRowCorrectlyWithNonDefaultDelimeter(final char delimiter) {
private void shouldSerializeRowCorrectlyWithNonDefaultDelimiter(final char delimiter) {
// Given:
final Struct data = new Struct(SCHEMA)
.put("ORDERTIME", 1511897796092L)
Expand Down

0 comments on commit 2b59b8b

Please sign in to comment.