Skip to content

Commit

Permalink
feat: Add int/bigint/double conversion functions from bytes (#8426)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Dec 3, 2021
1 parent daa3ed0 commit da77c8a
Show file tree
Hide file tree
Showing 17 changed files with 1,200 additions and 0 deletions.
81 changes: 81 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,87 @@ A call to UUID() returns a value conforming to UUID version 4, sometimes called
"random UUID", as described in RFC 4122. The value is a 128-bit number represented
as a string of five hexadecimal numbers _aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee_.

## Bytes

### `BIGINT_FROM_BYTES`

Since: 0.23.1

```sql
BIGINT_FROM_BYTES(col1, [byteOrder])
```

Converts a BYTES value to an BIGINT value according to the specified byte order.
BYTES must be 8 bytes long or a NULL value will be returned.

Byte order values must be 'BIG_ENDIAN' or 'LITTLE_ENDIAN'. If omitted, 'BIG_ENDIAN' is used.
A NULL value is returned if an invalid byte order value is used.

Example, where `b` is a bytes value represented as a base64 string `AAAAASoF8gA=`:
```sql
BIGINT_FROM_BYTES(b, 'BIG_ENDIAN') -> 5000000000
```

### `DOUBLE_FROM_BYTES`

Since: 0.23.1

```sql
DOUBLE_FROM_BYTES(col1, [byteOrder])
```

Converts a BYTES value to an DOUBLE value according to the specified byte order.
BYTES must be 8 bytes long or a NULL value will be returned.

Byte order values must be 'BIG_ENDIAN' or 'LITTLE_ENDIAN'. If omitted, 'BIG_ENDIAN' is used.
A NULL value is returned if an invalid byte order value is used.

Example, where `b` is a bytes value represented as a base64 string `QICm/ZvJ9YI=`:
```sql
DOUBLE_FROM_BYTES(b, 'BIG_ENDIAN') -> 532.8738323
```

### `FROM_BYTES`

Since: 0.21.0

```sql
FROM_BYTES(col1, encoding)
```

Converts a BYTES value to STRING in the specified encoding.
The accepted encoders are 'hex', 'utf8', 'ascii', and 'base64'.

### `INT_FROM_BYTES`

Since: 0.23.1

```sql
INT_FROM_BYTES(col1, [byteOrder])
```

Converts a BYTES value to an INT value according to the specified byte order.
BYTES must be 4 bytes long or a NULL value will be returned.

Byte order values must be 'BIG_ENDIAN' or 'LITTLE_ENDIAN'. If omitted, 'BIG_ENDIAN' is used.
A NULL value is returned if an invalid byte order value is used.

Examples, where `b_big` is a bytes value represented as a base64 string `AAAH5Q==`:
```sql
INT_FROM_BYTES(b, 'BIG_ENDIAN') -> 2021
```

### `TO_BYTES`

Since: 0.21.0

```sql
TO_BYTES(col1, encoding)
```

Converts a STRING value in the specified encoding to BYTES.
The accepted encoders are 'hex', 'utf8', 'ascii', and 'base64'.

## Nulls

### `COALESCE`
Expand Down
1 change: 1 addition & 0 deletions docs/reference/user-defined-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ values.
| `DOUBLE` | `double`, `java.lang.Double` |
| `DECIMAL` | `java.math.BigDecimal` |
| `VARCHAR` | `java.lang.String` |
| `BYTES` | `java.nio.ByteBuffer` |
| `TIME` | `java.sql.Time` |
| `DATE` | `java.sql.Date` |
|`TIMESTAMP`| `java.sql.Timestamp` |
Expand Down
23 changes: 23 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/BytesUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.common.collect.ImmutableMap;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -65,6 +66,20 @@ private BytesUtils() {
Encoding.BASE64, v -> base64Decoding(v)
);

public static ByteOrder byteOrderType(final String byteOrderStr) {
if (byteOrderStr != null
&& byteOrderStr.equalsIgnoreCase(ByteOrder.BIG_ENDIAN.toString())) {
return ByteOrder.BIG_ENDIAN;
} else if (byteOrderStr != null
&& byteOrderStr.equalsIgnoreCase(ByteOrder.LITTLE_ENDIAN.toString())) {
return ByteOrder.LITTLE_ENDIAN;
} else {
throw new KsqlException(String.format(
"Byte order must be BIG_ENDIAN or LITTLE_ENDIAN. Unknown byte order '%s'.",
byteOrderStr));
}
}

public static String encode(final byte[] value, final Encoding encoding) {
final Function<byte[], String> encoder = ENCODERS.get(encoding);
if (encoder == null) {
Expand Down Expand Up @@ -154,6 +169,14 @@ public static int indexOf(final byte[] array, final byte[] target, final int fro
return -1;
}

public static void checkBytesSize(final ByteBuffer buffer, final int size) {
final int bufferSize = getByteArray(buffer).length;
if (bufferSize != size) {
throw new KsqlException(
String.format("Number of bytes must be equal to %d, but found %d", size, bufferSize));
}
}

@SuppressWarnings("ParameterName")
private static boolean arrayEquals(
final byte[] a,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.conversions;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.BytesUtils;
import io.confluent.ksql.util.KsqlConstants;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

@UdfDescription(
name = "bigint_from_bytes",
category = FunctionCategory.CONVERSIONS,
description = "Converts a BYTES value to an BIGINT value according to the specified"
+ " byte order. BYTES must be 8 bytes long or a NULL value will be returned.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class BigIntFromBytes {
private static final int BYTES_LENGTH = 8;

@Udf(description = "Converts a BYTES value to an BIGINT value using the 'BIG_ENDIAN' byte order."
+ " BYTES must be 8 bytes long or a NULL value will be returned.")
public Long bigIntFromBytes(
@UdfParameter(description = "The BYTES value to convert.")
final ByteBuffer value
) {
return bigIntFromBytes(value, ByteOrder.BIG_ENDIAN);
}

@Udf(description = "Converts a BYTES value to an BIGINT value according to the specified"
+ " byte order. BYTES must be 8 bytes long or a NULL value will be returned.")
public Long bigIntFromBytes(
@UdfParameter(description = "The BYTES value to convert.")
final ByteBuffer value,
@UdfParameter(description = "The byte order. Valid orders are 'BIG_ENDIAN' and"
+ " 'LITTLE_ENDIAN'. If omitted, 'BIG_ENDIAN' is used.")
final String byteOrder
) {
return bigIntFromBytes(value, BytesUtils.byteOrderType(byteOrder));
}

private Long bigIntFromBytes(final ByteBuffer value, final ByteOrder byteOrder) {
if (value == null) {
return null;
}

BytesUtils.checkBytesSize(value, BYTES_LENGTH);
value.rewind();
return value.order(byteOrder).getLong();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.conversions;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.BytesUtils;
import io.confluent.ksql.util.KsqlConstants;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

@UdfDescription(
name = "double_from_bytes",
category = FunctionCategory.CONVERSIONS,
description = "Converts a BYTES value to an DOUBLE value according to the specified"
+ " byte order. BYTES must be 8 bytes long or a NULL value will be returned.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class DoubleFromBytes {
private static final int BYTES_LENGTH = 8;

@Udf(description = "Converts a BYTES value to an DOUBLE value using the 'BIG_ENDIAN' byte order."
+ " BYTES must be 8 bytes long or a NULL value will be returned.")
public Double doubleFromBytes(
@UdfParameter(description = "The BYTES value to convert.")
final ByteBuffer value
) {
return doubleFromBytes(value, ByteOrder.BIG_ENDIAN);
}

@Udf(description = "Converts a BYTES value to an DOUBLE value according to the specified"
+ " byte order. BYTES must be 8 bytes long or a NULL value will be returned.")
public Double doubleFromBytes(
@UdfParameter(description = "The BYTES value to convert.")
final ByteBuffer value,
@UdfParameter(description = "The byte order. Valid orders are 'BIG_ENDIAN' and"
+ " 'LITTLE_ENDIAN'. If omitted, 'BIG_ENDIAN' is used.")
final String byteOrder
) {
return doubleFromBytes(value, BytesUtils.byteOrderType(byteOrder));
}

private Double doubleFromBytes(final ByteBuffer value, final ByteOrder byteOrder) {
if (value == null) {
return null;
}

BytesUtils.checkBytesSize(value, BYTES_LENGTH);
value.rewind();
return value.order(byteOrder).getDouble();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.conversions;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.BytesUtils;
import io.confluent.ksql.util.KsqlConstants;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

@UdfDescription(
name = "int_from_bytes",
category = FunctionCategory.CONVERSIONS,
description = "Converts a BYTES value to an INT value according to the specified"
+ " byte order. BYTES must be 4 bytes long or a NULL value will be returned.",
author = KsqlConstants.CONFLUENT_AUTHOR
)
public class IntFromBytes {
private static final int BYTES_LENGTH = 4;

@Udf(description = "Converts a BYTES value to an INT value using the 'BIG_ENDIAN' byte order."
+ " BYTES must be 4 bytes long or a NULL value will be returned.")
public Integer intFromBytes(
@UdfParameter(description = "The BYTES value to convert.")
final ByteBuffer value
) {
return intFromBytes(value, ByteOrder.BIG_ENDIAN);
}

@Udf(description = "Converts a BYTES value to an INT value according to the specified"
+ " byte order. BYTES must be 4 bytes long or a NULL value will be returned.")
public Integer intFromBytes(
@UdfParameter(description = "The BYTES value to convert.")
final ByteBuffer value,
@UdfParameter(description = "The byte order. Valid orders are 'BIG_ENDIAN' and"
+ " 'LITTLE_ENDIAN'. If omitted, 'BIG_ENDIAN' is used.")
final String byteOrder
) {
return intFromBytes(value, BytesUtils.byteOrderType(byteOrder));
}

private Integer intFromBytes(final ByteBuffer value, final ByteOrder byteOrder) {
if (value == null) {
return null;
}

BytesUtils.checkBytesSize(value, BYTES_LENGTH);
value.rewind();
return value.order(byteOrder).getInt();
}
}

0 comments on commit da77c8a

Please sign in to comment.