diff --git a/docs/developer-guide/ksqldb-reference/scalar-functions.md b/docs/developer-guide/ksqldb-reference/scalar-functions.md index e3086564a1fb..5053a471221e 100644 --- a/docs/developer-guide/ksqldb-reference/scalar-functions.md +++ b/docs/developer-guide/ksqldb-reference/scalar-functions.md @@ -1060,6 +1060,87 @@ A call to UUID() returns a value conforming to UUID version 4, sometimes called "random UUID", as described in RFC 4122. The value is a 128-bit number represented as a string of five hexadecimal numbers _aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee_. +## Bytes + +### `BIGINT_FROM_BYTES` + +Since: 0.23.1 + +```sql +BIGINT_FROM_BYTES(col1, [byteOrder]) +``` + +Converts a BYTES value to an BIGINT value according to the specified byte order. +BYTES must be 8 bytes long or a NULL value will be returned. + +Byte order values must be 'BIG_ENDIAN' or 'LITTLE_ENDIAN'. If omitted, 'BIG_ENDIAN' is used. +A NULL value is returned if an invalid byte order value is used. + +Example, where `b` is a bytes value represented as a base64 string `AAAAASoF8gA=`: +```sql +BIGINT_FROM_BYTES(b, 'BIG_ENDIAN') -> 5000000000 +``` + +### `DOUBLE_FROM_BYTES` + +Since: 0.23.1 + +```sql +DOUBLE_FROM_BYTES(col1, [byteOrder]) +``` + +Converts a BYTES value to an DOUBLE value according to the specified byte order. +BYTES must be 8 bytes long or a NULL value will be returned. + +Byte order values must be 'BIG_ENDIAN' or 'LITTLE_ENDIAN'. If omitted, 'BIG_ENDIAN' is used. +A NULL value is returned if an invalid byte order value is used. + +Example, where `b` is a bytes value represented as a base64 string `QICm/ZvJ9YI=`: +```sql +DOUBLE_FROM_BYTES(b, 'BIG_ENDIAN') -> 532.8738323 +``` + +### `FROM_BYTES` + +Since: 0.21.0 + +```sql +FROM_BYTES(col1, encoding) +``` + +Converts a BYTES value to STRING in the specified encoding. +The accepted encoders are 'hex', 'utf8', 'ascii', and 'base64'. + +### `INT_FROM_BYTES` + +Since: 0.23.1 + +```sql +INT_FROM_BYTES(col1, [byteOrder]) +``` + +Converts a BYTES value to an INT value according to the specified byte order. +BYTES must be 4 bytes long or a NULL value will be returned. + +Byte order values must be 'BIG_ENDIAN' or 'LITTLE_ENDIAN'. If omitted, 'BIG_ENDIAN' is used. +A NULL value is returned if an invalid byte order value is used. + +Examples, where `b_big` is a bytes value represented as a base64 string `AAAH5Q==`: +```sql +INT_FROM_BYTES(b, 'BIG_ENDIAN') -> 2021 +``` + +### `TO_BYTES` + +Since: 0.21.0 + +```sql +TO_BYTES(col1, encoding) +``` + +Converts a STRING value in the specified encoding to BYTES. +The accepted encoders are 'hex', 'utf8', 'ascii', and 'base64'. + ## Nulls ### `COALESCE` diff --git a/docs/reference/user-defined-functions.md b/docs/reference/user-defined-functions.md index 7b521b841ac7..69bfacc2de7f 100644 --- a/docs/reference/user-defined-functions.md +++ b/docs/reference/user-defined-functions.md @@ -27,6 +27,7 @@ values. | `DOUBLE` | `double`, `java.lang.Double` | | `DECIMAL` | `java.math.BigDecimal` | | `VARCHAR` | `java.lang.String` | +| `BYTES` | `java.nio.ByteBuffer` | | `TIME` | `java.sql.Time` | | `DATE` | `java.sql.Date` | |`TIMESTAMP`| `java.sql.Timestamp` | diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/BytesUtils.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/BytesUtils.java index 1a1b9d5c22c1..a16d359a85ad 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/BytesUtils.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/BytesUtils.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableMap; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -65,6 +66,20 @@ private BytesUtils() { Encoding.BASE64, v -> base64Decoding(v) ); + public static ByteOrder byteOrderType(final String byteOrderStr) { + if (byteOrderStr != null + && byteOrderStr.equalsIgnoreCase(ByteOrder.BIG_ENDIAN.toString())) { + return ByteOrder.BIG_ENDIAN; + } else if (byteOrderStr != null + && byteOrderStr.equalsIgnoreCase(ByteOrder.LITTLE_ENDIAN.toString())) { + return ByteOrder.LITTLE_ENDIAN; + } else { + throw new KsqlException(String.format( + "Byte order must be BIG_ENDIAN or LITTLE_ENDIAN. Unknown byte order '%s'.", + byteOrderStr)); + } + } + public static String encode(final byte[] value, final Encoding encoding) { final Function encoder = ENCODERS.get(encoding); if (encoder == null) { @@ -154,6 +169,14 @@ public static int indexOf(final byte[] array, final byte[] target, final int fro return -1; } + public static void checkBytesSize(final ByteBuffer buffer, final int size) { + final int bufferSize = getByteArray(buffer).length; + if (bufferSize != size) { + throw new KsqlException( + String.format("Number of bytes must be equal to %d, but found %d", size, bufferSize)); + } + } + @SuppressWarnings("ParameterName") private static boolean arrayEquals( final byte[] a, diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/conversions/BigIntFromBytes.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/conversions/BigIntFromBytes.java new file mode 100644 index 000000000000..3cfdbc64b511 --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/conversions/BigIntFromBytes.java @@ -0,0 +1,66 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.conversions; + +import io.confluent.ksql.function.FunctionCategory; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.util.BytesUtils; +import io.confluent.ksql.util.KsqlConstants; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +@UdfDescription( + name = "bigint_from_bytes", + category = FunctionCategory.CONVERSIONS, + description = "Converts a BYTES value to an BIGINT value according to the specified" + + " byte order. BYTES must be 8 bytes long or a NULL value will be returned.", + author = KsqlConstants.CONFLUENT_AUTHOR +) +public class BigIntFromBytes { + private static final int BYTES_LENGTH = 8; + + @Udf(description = "Converts a BYTES value to an BIGINT value using the 'BIG_ENDIAN' byte order." + + " BYTES must be 8 bytes long or a NULL value will be returned.") + public Long bigIntFromBytes( + @UdfParameter(description = "The BYTES value to convert.") + final ByteBuffer value + ) { + return bigIntFromBytes(value, ByteOrder.BIG_ENDIAN); + } + + @Udf(description = "Converts a BYTES value to an BIGINT value according to the specified" + + " byte order. BYTES must be 8 bytes long or a NULL value will be returned.") + public Long bigIntFromBytes( + @UdfParameter(description = "The BYTES value to convert.") + final ByteBuffer value, + @UdfParameter(description = "The byte order. Valid orders are 'BIG_ENDIAN' and" + + " 'LITTLE_ENDIAN'. If omitted, 'BIG_ENDIAN' is used.") + final String byteOrder + ) { + return bigIntFromBytes(value, BytesUtils.byteOrderType(byteOrder)); + } + + private Long bigIntFromBytes(final ByteBuffer value, final ByteOrder byteOrder) { + if (value == null) { + return null; + } + + BytesUtils.checkBytesSize(value, BYTES_LENGTH); + value.rewind(); + return value.order(byteOrder).getLong(); + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/conversions/DoubleFromBytes.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/conversions/DoubleFromBytes.java new file mode 100644 index 000000000000..22e3faa1bf8a --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/conversions/DoubleFromBytes.java @@ -0,0 +1,66 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.conversions; + +import io.confluent.ksql.function.FunctionCategory; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.util.BytesUtils; +import io.confluent.ksql.util.KsqlConstants; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +@UdfDescription( + name = "double_from_bytes", + category = FunctionCategory.CONVERSIONS, + description = "Converts a BYTES value to an DOUBLE value according to the specified" + + " byte order. BYTES must be 8 bytes long or a NULL value will be returned.", + author = KsqlConstants.CONFLUENT_AUTHOR +) +public class DoubleFromBytes { + private static final int BYTES_LENGTH = 8; + + @Udf(description = "Converts a BYTES value to an DOUBLE value using the 'BIG_ENDIAN' byte order." + + " BYTES must be 8 bytes long or a NULL value will be returned.") + public Double doubleFromBytes( + @UdfParameter(description = "The BYTES value to convert.") + final ByteBuffer value + ) { + return doubleFromBytes(value, ByteOrder.BIG_ENDIAN); + } + + @Udf(description = "Converts a BYTES value to an DOUBLE value according to the specified" + + " byte order. BYTES must be 8 bytes long or a NULL value will be returned.") + public Double doubleFromBytes( + @UdfParameter(description = "The BYTES value to convert.") + final ByteBuffer value, + @UdfParameter(description = "The byte order. Valid orders are 'BIG_ENDIAN' and" + + " 'LITTLE_ENDIAN'. If omitted, 'BIG_ENDIAN' is used.") + final String byteOrder + ) { + return doubleFromBytes(value, BytesUtils.byteOrderType(byteOrder)); + } + + private Double doubleFromBytes(final ByteBuffer value, final ByteOrder byteOrder) { + if (value == null) { + return null; + } + + BytesUtils.checkBytesSize(value, BYTES_LENGTH); + value.rewind(); + return value.order(byteOrder).getDouble(); + } +} diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/conversions/IntFromBytes.java b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/conversions/IntFromBytes.java new file mode 100644 index 000000000000..8566a56d191c --- /dev/null +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/function/udf/conversions/IntFromBytes.java @@ -0,0 +1,66 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.conversions; + +import io.confluent.ksql.function.FunctionCategory; +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.util.BytesUtils; +import io.confluent.ksql.util.KsqlConstants; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +@UdfDescription( + name = "int_from_bytes", + category = FunctionCategory.CONVERSIONS, + description = "Converts a BYTES value to an INT value according to the specified" + + " byte order. BYTES must be 4 bytes long or a NULL value will be returned.", + author = KsqlConstants.CONFLUENT_AUTHOR +) +public class IntFromBytes { + private static final int BYTES_LENGTH = 4; + + @Udf(description = "Converts a BYTES value to an INT value using the 'BIG_ENDIAN' byte order." + + " BYTES must be 4 bytes long or a NULL value will be returned.") + public Integer intFromBytes( + @UdfParameter(description = "The BYTES value to convert.") + final ByteBuffer value + ) { + return intFromBytes(value, ByteOrder.BIG_ENDIAN); + } + + @Udf(description = "Converts a BYTES value to an INT value according to the specified" + + " byte order. BYTES must be 4 bytes long or a NULL value will be returned.") + public Integer intFromBytes( + @UdfParameter(description = "The BYTES value to convert.") + final ByteBuffer value, + @UdfParameter(description = "The byte order. Valid orders are 'BIG_ENDIAN' and" + + " 'LITTLE_ENDIAN'. If omitted, 'BIG_ENDIAN' is used.") + final String byteOrder + ) { + return intFromBytes(value, BytesUtils.byteOrderType(byteOrder)); + } + + private Integer intFromBytes(final ByteBuffer value, final ByteOrder byteOrder) { + if (value == null) { + return null; + } + + BytesUtils.checkBytesSize(value, BYTES_LENGTH); + value.rewind(); + return value.order(byteOrder).getInt(); + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/conversions/BigIntFromBytesTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/conversions/BigIntFromBytesTest.java new file mode 100644 index 000000000000..8c89a6ff987f --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/conversions/BigIntFromBytesTest.java @@ -0,0 +1,102 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.conversions; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThrows; + +import io.confluent.ksql.util.KsqlException; +import org.junit.Test; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class BigIntFromBytesTest { + private BigIntFromBytes udf = new BigIntFromBytes(); + + @Test + public void shouldConvertBytesToBigInteger() { + assertThat(udf.bigIntFromBytes(toByteBuffer(5000000000L)), is(5000000000L)); + } + + @Test + public void shouldConvertBytesToNegativeBigInteger() { + assertThat(udf.bigIntFromBytes(toByteBuffer(-5000000000L)), is(-5000000000L)); + } + + @Test + public void shouldConvertBytesToMaxBigInteger() { + assertThat(udf.bigIntFromBytes(toByteBuffer(Long.MAX_VALUE)), is(Long.MAX_VALUE)); + } + + @Test + public void shouldConvertBytesToMinBigInteger() { + assertThat(udf.bigIntFromBytes(toByteBuffer(Long.MIN_VALUE)), is(Long.MIN_VALUE)); + } + + @Test + public void shouldReturnNullOnNullBytes() { + assertThat(udf.bigIntFromBytes(null), is(nullValue())); + } + + @Test + public void shouldThrowOnInvalidBytesSizes() { + final Exception e1 = assertThrows( + KsqlException.class, + () -> udf.bigIntFromBytes(ByteBuffer.wrap(new byte[]{1, 2, 3, 4}))); + + assertThat(e1.getMessage(), is("Number of bytes must be equal to 8, but found 4")); + + final Exception e2 = assertThrows( + KsqlException.class, + () -> udf.bigIntFromBytes(ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}))); + + assertThat(e2.getMessage(), is("Number of bytes must be equal to 8, but found 9")); + } + + @Test + public void shouldConvertLittleEndianBytesToInteger() { + final ByteBuffer buffer = toByteBuffer(5000000000L, ByteOrder.LITTLE_ENDIAN); + assertThat(udf.bigIntFromBytes(buffer, "LITTLE_ENDIAN"), is(5000000000L)); + } + + @Test + public void shouldConvertBigEndianBytesToInteger() { + final ByteBuffer buffer = toByteBuffer(5000000000L).order(ByteOrder.BIG_ENDIAN); + assertThat(udf.bigIntFromBytes(buffer, "BIG_ENDIAN"), is(5000000000L)); + } + + @Test + public void shouldThrowOnUnknownByteOrder() { + final Exception e = assertThrows( + KsqlException.class, + () -> udf.bigIntFromBytes(toByteBuffer(5000000000L), "weep!")); + + assertThat(e.getMessage(), + is("Byte order must be BIG_ENDIAN or LITTLE_ENDIAN. Unknown byte order 'weep!'.")); + } + + private ByteBuffer toByteBuffer(final long n) { + return toByteBuffer(n, ByteOrder.BIG_ENDIAN); + } + + private ByteBuffer toByteBuffer(final long n, final ByteOrder byteOrder) { + final ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.order(byteOrder); + buffer.putLong(n); + return buffer; + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/conversions/DoubleFromBytesTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/conversions/DoubleFromBytesTest.java new file mode 100644 index 000000000000..25e19877b891 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/conversions/DoubleFromBytesTest.java @@ -0,0 +1,102 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.conversions; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThrows; + +import io.confluent.ksql.util.KsqlException; +import org.junit.Test; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class DoubleFromBytesTest { + private DoubleFromBytes udf = new DoubleFromBytes(); + + @Test + public void shouldConvertBytesToDouble() { + assertThat(udf.doubleFromBytes(toByteBuffer(532.8738323)), is(532.8738323)); + } + + @Test + public void shouldConvertBytesToNegativeDouble() { + assertThat(udf.doubleFromBytes(toByteBuffer(-532.8734)), is(-532.8734)); + } + + @Test + public void shouldConvertBytesToMaxDouble() { + assertThat(udf.doubleFromBytes(toByteBuffer(Double.MAX_VALUE)), is(Double.MAX_VALUE)); + } + + @Test + public void shouldConvertBytesToMinDouble() { + assertThat(udf.doubleFromBytes(toByteBuffer(Double.MIN_VALUE)), is(Double.MIN_VALUE)); + } + + @Test + public void shouldReturnNullOnNullBytes() { + assertThat(udf.doubleFromBytes(null), is(nullValue())); + } + + @Test + public void shouldThrowOnInvalidBytesSizes() { + final Exception e1 = assertThrows( + KsqlException.class, + () -> udf.doubleFromBytes(ByteBuffer.wrap(new byte[]{1, 2, 3, 4}))); + + assertThat(e1.getMessage(), is("Number of bytes must be equal to 8, but found 4")); + + final Exception e2 = assertThrows( + KsqlException.class, + () -> udf.doubleFromBytes(ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}))); + + assertThat(e2.getMessage(), is("Number of bytes must be equal to 8, but found 9")); + } + + @Test + public void shouldConvertLittleEndianBytesToInteger() { + final ByteBuffer buffer = toByteBuffer(532.8738323, ByteOrder.LITTLE_ENDIAN); + assertThat(udf.doubleFromBytes(buffer, "LITTLE_ENDIAN"), is(532.8738323)); + } + + @Test + public void shouldConvertBigEndianBytesToInteger() { + final ByteBuffer buffer = toByteBuffer(532.8738323).order(ByteOrder.BIG_ENDIAN); + assertThat(udf.doubleFromBytes(buffer, "BIG_ENDIAN"), is(532.8738323)); + } + + @Test + public void shouldThrowOnUnknownByteOrder() { + final Exception e = assertThrows( + KsqlException.class, + () -> udf.doubleFromBytes(toByteBuffer(532.8738323), "weep!")); + + assertThat(e.getMessage(), + is("Byte order must be BIG_ENDIAN or LITTLE_ENDIAN. Unknown byte order 'weep!'.")); + } + + private ByteBuffer toByteBuffer(final double n) { + return toByteBuffer(n, ByteOrder.BIG_ENDIAN); + } + + private ByteBuffer toByteBuffer(final double n, final ByteOrder byteOrder) { + final ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.order(byteOrder); + buffer.putDouble(n); + return buffer; + } +} diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/conversions/IntFromBytesTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/conversions/IntFromBytesTest.java new file mode 100644 index 000000000000..bfaf83a1cc00 --- /dev/null +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/function/udf/conversions/IntFromBytesTest.java @@ -0,0 +1,102 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License; you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.conversions; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThrows; + +import io.confluent.ksql.util.KsqlException; +import org.junit.Test; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class IntFromBytesTest { + private IntFromBytes udf = new IntFromBytes(); + + @Test + public void shouldConvertBytesToInteger() { + assertThat(udf.intFromBytes(toByteBuffer(2021)), is(2021)); + } + + @Test + public void shouldConvertBytesToNegativeInteger() { + assertThat(udf.intFromBytes(toByteBuffer(-2021)), is(-2021)); + } + + @Test + public void shouldConvertBytesToMaxInteger() { + assertThat(udf.intFromBytes(toByteBuffer(Integer.MAX_VALUE)), is(Integer.MAX_VALUE)); + } + + @Test + public void shouldConvertBytesToMinInteger() { + assertThat(udf.intFromBytes(toByteBuffer(Integer.MIN_VALUE)), is(Integer.MIN_VALUE)); + } + + @Test + public void shouldReturnNullOnNullBytes() { + assertThat(udf.intFromBytes(null), is(nullValue())); + } + + @Test + public void shouldThrowOnInvalidBytesSizes() { + final Exception e1 = assertThrows( + KsqlException.class, + () -> udf.intFromBytes(ByteBuffer.wrap(new byte[]{1, 2}))); + + assertThat(e1.getMessage(), is("Number of bytes must be equal to 4, but found 2")); + + final Exception e2 = assertThrows( + KsqlException.class, + () -> udf.intFromBytes(ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5}))); + + assertThat(e2.getMessage(), is("Number of bytes must be equal to 4, but found 5")); + } + + @Test + public void shouldConvertLittleEndianBytesToInteger() { + final ByteBuffer buffer = toByteBuffer(2021, ByteOrder.LITTLE_ENDIAN); + assertThat(udf.intFromBytes(buffer, "LITTLE_ENDIAN"), is(2021)); + } + + @Test + public void shouldConvertBigEndianBytesToInteger() { + final ByteBuffer buffer = toByteBuffer(2021).order(ByteOrder.BIG_ENDIAN); + assertThat(udf.intFromBytes(buffer, "BIG_ENDIAN"), is(2021)); + } + + @Test + public void shouldThrowOnUnknownByteOrder() { + final Exception e = assertThrows( + KsqlException.class, + () -> udf.intFromBytes(toByteBuffer(5), "weep!")); + + assertThat(e.getMessage(), + is("Byte order must be BIG_ENDIAN or LITTLE_ENDIAN. Unknown byte order 'weep!'.")); + } + + private ByteBuffer toByteBuffer(final int n) { + return toByteBuffer(n, ByteOrder.BIG_ENDIAN); + } + + private ByteBuffer toByteBuffer(final int n, final ByteOrder byteOrder) { + final ByteBuffer buffer = ByteBuffer.allocate(4); + buffer.order(byteOrder); + buffer.putInt(n); + return buffer; + } +} diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_BIG_ENDIAN/7.2.0_1638229348360/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_BIG_ENDIAN/7.2.0_1638229348360/plan.json new file mode 100644 index 000000000000..a1f65e37a7c8 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_BIG_ENDIAN/7.2.0_1638229348360/plan.json @@ -0,0 +1,170 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (A BYTES, B BYTES, C BYTES) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`A` BYTES, `B` BYTES, `C` BYTES", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false, + "isSource" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TS AS SELECT\n INT_FROM_BYTES(TEST.A) A,\n BIGINT_FROM_BYTES(TEST.B) B,\n DOUBLE_FROM_BYTES(TEST.C) C\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TS", + "schema" : "`A` INTEGER, `B` BIGINT, `C` DOUBLE", + "topicName" : "TS", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false, + "isSource" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TS", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TS" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`A` BYTES, `B` BYTES, `C` BYTES", + "pseudoColumnVersion" : 1 + }, + "selectExpressions" : [ "INT_FROM_BYTES(A) AS A", "BIGINT_FROM_BYTES(B) AS B", "DOUBLE_FROM_BYTES(C) AS C" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "TS" + }, + "queryId" : "CSAS_TS_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.headers.columns.enabled" : "false", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.stream.enabled" : "true", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.push.v2.interpreter.enabled" : "true", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.query.cleanup.shutdown.timeout.ms" : "30000", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.query.push.v2.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.query.pull.range.scan.enabled" : "true", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.query.pull.consistency.token.enabled" : "false", + "ksql.lambdas.enabled" : "true", + "ksql.source.table.materialization.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.query.push.v2.new.node.continuity" : "false", + "ksql.query.push.v2.new.latest.delay.ms" : "5000", + "ksql.query.push.v2.enabled" : "false", + "ksql.query.push.v2.latest.reset.age.ms" : "30000", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.rowpartition.rowoffset.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "true", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.router.thread.pool.size" : "50", + "ksql.query.push.v2.registry.installed" : "false", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.runtime.feature.shared.enabled" : "false", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "50", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_BIG_ENDIAN/7.2.0_1638229348360/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_BIG_ENDIAN/7.2.0_1638229348360/spec.json new file mode 100644 index 000000000000..7cab6c5928dc --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_BIG_ENDIAN/7.2.0_1638229348360/spec.json @@ -0,0 +1,94 @@ +{ + "version" : "7.2.0", + "timestamp" : 1638229348360, + "path" : "query-validation-tests/bytes-and-strings.json", + "schemas" : { + "CSAS_TS_0.TS" : { + "schema" : "`A` INTEGER, `B` BIGINT, `C` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "CSAS_TS_0.KsqlTopic.Source" : { + "schema" : "`A` BYTES, `B` BYTES, `C` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "testCase" : { + "name" : "convert bytes to INT, BIGINT, and DOUBLE using BIG ENDIAN", + "inputs" : [ { + "topic" : "test_topic", + "key" : null, + "value" : "AAAH5Q==,AAAAASoF8gA=,QICm/ZvJ9YI=" + } ], + "outputs" : [ { + "topic" : "TS", + "key" : null, + "value" : "2021,5000000000,532.8738323" + } ], + "topics" : [ { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "TS", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (a BYTES, b BYTES, c BYTES) WITH (kafka_topic='test_topic', value_format='DELIMITED');", "CREATE STREAM TS AS select INT_FROM_BYTES(a) as a, BIGINT_FROM_BYTES(b) as b, DOUBLE_FROM_BYTES(c) AS c from test;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`A` BYTES, `B` BYTES, `C` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ], + "isSource" : false + }, { + "name" : "TS", + "type" : "STREAM", + "schema" : "`A` INTEGER, `B` BIGINT, `C` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ], + "isSource" : false + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "TS", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_BIG_ENDIAN/7.2.0_1638229348360/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_BIG_ENDIAN/7.2.0_1638229348360/topology new file mode 100644 index 000000000000..aabd9cade152 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_BIG_ENDIAN/7.2.0_1638229348360/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TS) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_LITTLE_ENDIAN/7.2.0_1638229348474/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_LITTLE_ENDIAN/7.2.0_1638229348474/plan.json new file mode 100644 index 000000000000..d4bf7afdc8fb --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_LITTLE_ENDIAN/7.2.0_1638229348474/plan.json @@ -0,0 +1,170 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (A BYTES, B BYTES, C BYTES) WITH (KAFKA_TOPIC='test_topic', KEY_FORMAT='KAFKA', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`A` BYTES, `B` BYTES, `C` BYTES", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false, + "isSource" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TS AS SELECT\n INT_FROM_BYTES(TEST.A, 'LITTLE_ENDIAN') A,\n BIGINT_FROM_BYTES(TEST.B, 'LITTLE_ENDIAN') B,\n DOUBLE_FROM_BYTES(TEST.C, 'LITTLE_ENDIAN') C\nFROM TEST TEST\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TS", + "schema" : "`A` INTEGER, `B` BIGINT, `C` DOUBLE", + "topicName" : "TS", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "orReplace" : false, + "isSource" : false + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "TS", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "TS" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`A` BYTES, `B` BYTES, `C` BYTES", + "pseudoColumnVersion" : 1 + }, + "selectExpressions" : [ "INT_FROM_BYTES(A, 'LITTLE_ENDIAN') AS A", "BIGINT_FROM_BYTES(B, 'LITTLE_ENDIAN') AS B", "DOUBLE_FROM_BYTES(C, 'LITTLE_ENDIAN') AS C" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "TS" + }, + "queryId" : "CSAS_TS_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.headers.columns.enabled" : "false", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.stream.enabled" : "true", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.push.v2.interpreter.enabled" : "true", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.query.cleanup.shutdown.timeout.ms" : "30000", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.query.push.v2.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.query.pull.range.scan.enabled" : "true", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.query.pull.consistency.token.enabled" : "false", + "ksql.lambdas.enabled" : "true", + "ksql.source.table.materialization.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.query.push.v2.new.node.continuity" : "false", + "ksql.query.push.v2.new.latest.delay.ms" : "5000", + "ksql.query.push.v2.enabled" : "false", + "ksql.query.push.v2.latest.reset.age.ms" : "30000", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.rowpartition.rowoffset.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "true", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.router.thread.pool.size" : "50", + "ksql.query.push.v2.registry.installed" : "false", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.runtime.feature.shared.enabled" : "false", + "ksql.nested.error.set.null" : "true", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "50", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_LITTLE_ENDIAN/7.2.0_1638229348474/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_LITTLE_ENDIAN/7.2.0_1638229348474/spec.json new file mode 100644 index 000000000000..460064dad30e --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_LITTLE_ENDIAN/7.2.0_1638229348474/spec.json @@ -0,0 +1,94 @@ +{ + "version" : "7.2.0", + "timestamp" : 1638229348474, + "path" : "query-validation-tests/bytes-and-strings.json", + "schemas" : { + "CSAS_TS_0.TS" : { + "schema" : "`A` INTEGER, `B` BIGINT, `C` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "CSAS_TS_0.KsqlTopic.Source" : { + "schema" : "`A` BYTES, `B` BYTES, `C` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "testCase" : { + "name" : "convert bytes to INT, BIGINT, and DOUBLE using LITTLE ENDIAN", + "inputs" : [ { + "topic" : "test_topic", + "key" : null, + "value" : "5QcAAA==,APIFKgEAAAA=,gvXJm/2mgEA=" + } ], + "outputs" : [ { + "topic" : "TS", + "key" : null, + "value" : "2021,5000000000,532.8738323" + } ], + "topics" : [ { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "TS", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM TEST (a BYTES, b BYTES, c BYTES) WITH (kafka_topic='test_topic', value_format='DELIMITED');", "CREATE STREAM TS AS select INT_FROM_BYTES(a, 'LITTLE_ENDIAN') as a, BIGINT_FROM_BYTES(b, 'LITTLE_ENDIAN') as b, DOUBLE_FROM_BYTES(c, 'LITTLE_ENDIAN') AS c from test;" ], + "post" : { + "sources" : [ { + "name" : "TEST", + "type" : "STREAM", + "schema" : "`A` BYTES, `B` BYTES, `C` BYTES", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ], + "isSource" : false + }, { + "name" : "TS", + "type" : "STREAM", + "schema" : "`A` INTEGER, `B` BIGINT, `C` DOUBLE", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : "DELIMITED", + "keyFeatures" : [ ], + "valueFeatures" : [ ], + "isSource" : false + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + }, { + "name" : "TS", + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_LITTLE_ENDIAN/7.2.0_1638229348474/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_LITTLE_ENDIAN/7.2.0_1638229348474/topology new file mode 100644 index 000000000000..aabd9cade152 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/bytes-and-strings_-_convert_bytes_to_INT,_BIGINT,_and_DOUBLE_using_LITTLE_ENDIAN/7.2.0_1638229348474/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: TS) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/bytes-and-strings.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/bytes-and-strings.json index a639e70e31f4..3250cc8083f7 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/bytes-and-strings.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/bytes-and-strings.json @@ -84,6 +84,42 @@ {"name": "TS", "type": "stream", "schema": "base64 STRING, b1 BYTES, s1 STRING"} ] } + }, + { + "name": "convert bytes to INT, BIGINT, and DOUBLE using BIG ENDIAN", + "statements": [ + "CREATE STREAM TEST (a BYTES, b BYTES, c BYTES) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE STREAM TS AS select INT_FROM_BYTES(a) as a, BIGINT_FROM_BYTES(b) as b, DOUBLE_FROM_BYTES(c) AS c from test;" + ], + "inputs": [ + {"topic": "test_topic", "key": null, "value": "AAAH5Q==,AAAAASoF8gA=,QICm/ZvJ9YI="} + ], + "outputs": [ + {"topic": "TS", "key": null, "value": "2021,5000000000,532.8738323"} + ], + "post": { + "sources": [ + {"name": "TS", "type": "stream", "schema": "a INT, b BIGINT, c DOUBLE"} + ] + } + }, + { + "name": "convert bytes to INT, BIGINT, and DOUBLE using LITTLE ENDIAN", + "statements": [ + "CREATE STREAM TEST (a BYTES, b BYTES, c BYTES) WITH (kafka_topic='test_topic', value_format='DELIMITED');", + "CREATE STREAM TS AS select INT_FROM_BYTES(a, 'LITTLE_ENDIAN') as a, BIGINT_FROM_BYTES(b, 'LITTLE_ENDIAN') as b, DOUBLE_FROM_BYTES(c, 'LITTLE_ENDIAN') AS c from test;" + ], + "inputs": [ + {"topic": "test_topic", "key": null, "value": "5QcAAA==,APIFKgEAAAA=,gvXJm/2mgEA="} + ], + "outputs": [ + {"topic": "TS", "key": null, "value": "2021,5000000000,532.8738323"} + ], + "post": { + "sources": [ + {"name": "TS", "type": "stream", "schema": "a INT, b BIGINT, c DOUBLE"} + ] + } } ] } \ No newline at end of file diff --git a/ksqldb-udf/src/main/java/io/confluent/ksql/function/FunctionCategory.java b/ksqldb-udf/src/main/java/io/confluent/ksql/function/FunctionCategory.java index 26c5f3fc17d4..67b79dddeadc 100644 --- a/ksqldb-udf/src/main/java/io/confluent/ksql/function/FunctionCategory.java +++ b/ksqldb-udf/src/main/java/io/confluent/ksql/function/FunctionCategory.java @@ -33,4 +33,5 @@ private FunctionCategory() { public static final String AGGREGATE = "AGGREGATE"; public static final String TABLE = "TABLE"; public static final String LAMBDA = "LAMBDA"; + public static final String CONVERSIONS = "CONVERSIONS"; }