Skip to content

Commit

Permalink
feat: Add is_json_string UDF (#8600)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrrr committed Jan 21, 2022
1 parent f351ce8 commit c12a745
Show file tree
Hide file tree
Showing 10 changed files with 569 additions and 25 deletions.
23 changes: 23 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,29 @@ multiple elements, like those containing wildcards, aren't supported.

`CREATE STREAM LOGS (LOG STRUCT<CLOUD STRING, APP STRING, INSTANCE INT>, ...) WITH (VALUE_FORMAT='JSON', ...)`

### `IS_JSON_STRING`

Since: 0.25.0

```sql
is_json_string(json_string) -> Boolean
```

Given a string, returns `true` if it can be parsed as a valid JSON value, `false` otherwise.

Examples:

```sql
is_json_string("[1, 2, 3]") => true
is_json_string("{}") => true
is_json_string("1") => true
is_json_string("\"abc\"") => true
is_json_string("null") => true
is_json_string("") => false
is_json_string("abc") => false
is_json_string(NULL) => false
```

### `INITCAP`

Since: 0.6.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.json;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;

@UdfDescription(
name = "IS_JSON_STRING",
category = FunctionCategory.JSON,
description = "Given a string, returns true if it can be parsed as a valid JSON value, false"
+ " otherwise.",
author = KsqlConstants.CONFLUENT_AUTHOR)
public class IsJsonString {

@Udf
public boolean check(@UdfParameter(description = "The input JSON string") final String input) {
if (input == null) {
return false;
}

try {
return !UdfJsonMapper.parseJson(input).isMissingNode();
} catch (KsqlFunctionException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class JsonArrayContains {
private static final JsonFactory PARSER_FACTORY = new JsonFactoryBuilder()
.disable(CANONICALIZE_FIELD_NAMES)
.build()
.setCodec(UdfJsonMapper.INSTANCE.get());
.setCodec(UdfJsonMapper.INSTANCE);

private static final EnumMap<JsonToken, Predicate<Object>> TOKEN_COMPAT;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
package io.confluent.ksql.function.udf.json;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.json.JsonPathTokenizer;
import java.io.IOException;
import java.util.List;

@UdfDescription(
Expand All @@ -35,8 +32,6 @@
+ " JSONPath or NULL if the specified path does not exist.")
public class JsonExtractString {

private static final ObjectReader OBJECT_READER = UdfJsonMapper.INSTANCE.get().reader();

private String latestPath = null;
private List<String> latestTokens = null;

Expand All @@ -56,7 +51,7 @@ public String extract(
latestPath = path;
}

JsonNode currentNode = parseJsonDoc(input);
JsonNode currentNode = UdfJsonMapper.parseJson(input);
for (final String token : latestTokens) {
if (currentNode instanceof ArrayNode) {
try {
Expand All @@ -80,12 +75,4 @@ public String extract(
return currentNode.toString();
}
}

private static JsonNode parseJsonDoc(final String jsonString) {
try {
return OBJECT_READER.readTree(jsonString);
} catch (final IOException e) {
throw new KsqlFunctionException("Invalid JSON format:" + jsonString, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,57 @@

package io.confluent.ksql.function.udf.json;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import io.confluent.ksql.function.KsqlFunctionException;

/**
* Shared Object mapper used by JSON processing UDFs
*/
public enum UdfJsonMapper {
final class UdfJsonMapper {

INSTANCE;
private UdfJsonMapper() {}

private final ObjectMapper mapper = new ObjectMapper()
.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN)
.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));
/**
* It is thread-safe to share an instance of the configured ObjectMapper
* (see https://fasterxml.github.io/jackson-databind/javadoc/2.12/com/fasterxml/jackson/databind/ObjectReader.html for more details).
* The object is configured as part of static initialization, so it is published safely
* as well.
*/
public static final ObjectMapper INSTANCE;
/**
* Akin to the {@link UdfJsonMapper#INSTANCE}, the reader is fully thread-safe, so there is
* no need to construct more than one instance. See https://fasterxml.github.io/jackson-databind/javadoc/2.12/com/fasterxml/jackson/databind/ObjectReader.html
* for more details.
*/
private static final ObjectReader OBJECT_READER;

public ObjectMapper get() {
return mapper.copy();
static {
INSTANCE = new ObjectMapper()
.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS)
.enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN)
.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));
OBJECT_READER = INSTANCE.reader();
}

/**
* Parses string into a {@link JsonNode}; throws {@link KsqlFunctionException} on invalid JSON.
*
* @param jsonString the string to parse
* @return a JSON node
*/
public static JsonNode parseJson(final String jsonString) {
try {
return OBJECT_READER.readTree(jsonString);
} catch (final JacksonException e) {
throw new KsqlFunctionException("Invalid JSON format:" + jsonString, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.json;


import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.junit.Before;
import org.junit.Test;

public class IsJsonStringTest {
private IsJsonString udf;

@Before
public void setUp() {
udf = new IsJsonString();
}

@Test
public void shouldInterpretNumber() {
assertTrue(udf.check("1"));
}

@Test
public void shouldInterpretString() {
assertTrue(udf.check("\"abc\""));
}

@Test
public void shouldInterpretNullString() {
assertTrue(udf.check("null"));
}

@Test
public void shouldInterpretArray() {
assertTrue(udf.check("[1, 2, 3]"));
}

@Test
public void shouldInterpretObject() {
assertTrue(udf.check("{\"1\": 2}"));
}

@Test
public void shouldNotInterpretUnquotedString() {
assertFalse(udf.check("abc"));
}

@Test
public void shouldNotInterpretEmptyString() {
assertFalse(udf.check(""));
}

@Test
public void shouldNotInterpretNull() {
assertFalse(udf.check(null));
}

@Test
public void shouldNotInterpretStringWithSyntaxErrors() {
assertFalse(udf.check("{1:2]"));
}
}
Loading

0 comments on commit c12a745

Please sign in to comment.