Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix JSONPath cache inefficient issue #7409

Merged
merged 2 commits into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.function;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.spi.cache.Cache;


public class JsonPathCache implements Cache {
private static final long DEFAULT_CACHE_MAXIMUM_SIZE = 10000;

private final com.google.common.cache.Cache<String, JsonPath> _jsonPathCache =
CacheBuilder.newBuilder().maximumSize(DEFAULT_CACHE_MAXIMUM_SIZE).build();

@Override
public JsonPath get(String key) {
return _jsonPathCache.getIfPresent(key);
}

@Override
public void put(String key, JsonPath value) {
_jsonPathCache.put(key, value);
}

@VisibleForTesting
public long size() {
return _jsonPathCache.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,16 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.ParseContext;
import com.jayway.jsonpath.Predicate;
import com.jayway.jsonpath.internal.ParseContextImpl;
import com.jayway.jsonpath.spi.cache.CacheProvider;
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
import com.jayway.jsonpath.spi.json.JsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import com.jayway.jsonpath.spi.mapper.MappingProvider;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.function.JsonPathCache;
import org.apache.pinot.spi.annotations.ScalarFunction;
import org.apache.pinot.spi.utils.JsonUtils;

Expand All @@ -50,32 +47,17 @@
* </code>
*/
public class JsonFunctions {
private static final ParseContext PARSE_CONTEXT;
private static final Predicate[] NO_PREDICATES = new Predicate[0];
static {
Configuration.setDefaults(new Configuration.Defaults() {
private final JsonProvider _jsonProvider = new ArrayAwareJacksonJsonProvider();
private final MappingProvider _mappingProvider = new JacksonMappingProvider();

@Override
public JsonProvider jsonProvider() {
return _jsonProvider;
}

@Override
public MappingProvider mappingProvider() {
return _mappingProvider;
}

@Override
public Set<Option> options() {
return EnumSet.noneOf(Option.class);
}
});
PARSE_CONTEXT = new ParseContextImpl(Configuration.defaultConfiguration());
private JsonFunctions() {
}

private JsonFunctions() {
private static final Predicate[] NO_PREDICATES = new Predicate[0];
private static final ParseContext PARSE_CONTEXT = JsonPath.using(
new Configuration.ConfigurationBuilder().jsonProvider(new ArrayAwareJacksonJsonProvider())
.mappingProvider(new JacksonMappingProvider()).build());

static {
// Set the JsonPath cache before the cache is accessed
CacheProvider.setCache(new JsonPathCache());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,17 @@
*/
package org.apache.pinot.core.operator.transform.function;

import com.google.common.collect.ImmutableSet;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ParseContext;
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
import com.jayway.jsonpath.spi.json.JsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import com.jayway.jsonpath.spi.mapper.MappingProvider;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.spi.datasource.DataSource;


Expand All @@ -48,20 +45,23 @@
*
*/
public class JsonExtractKeyTransformFunction extends BaseTransformFunction {

public static final String FUNCTION_NAME = "jsonExtractKey";
private static final Configuration JSON_PATH_KEY_CONFIG =
Configuration.builder().options(Option.AS_PATH_LIST).build();

private static final ParseContext JSON_PARSER_CONTEXT = JsonPath.using(
new Configuration.ConfigurationBuilder().jsonProvider(new JacksonJsonProvider())
.mappingProvider(new JacksonMappingProvider()).options(Option.AS_PATH_LIST, Option.SUPPRESS_EXCEPTIONS)
.build());

private TransformFunction _jsonFieldTransformFunction;
private String _jsonPath;
private JsonPath _jsonPath;

@Override
public String getName() {
return FUNCTION_NAME;
}

@Override
public void init(@Nonnull List<TransformFunction> arguments, @Nonnull Map<String, DataSource> dataSourceMap) {
public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
// Check that there are exactly 2 arguments
if (arguments.size() != 2) {
throw new IllegalArgumentException(
Expand All @@ -75,7 +75,7 @@ public void init(@Nonnull List<TransformFunction> arguments, @Nonnull Map<String
+ "function");
}
_jsonFieldTransformFunction = firstArgument;
_jsonPath = ((LiteralTransformFunction) arguments.get(1)).getLiteral();
_jsonPath = JsonPath.compile(((LiteralTransformFunction) arguments.get(1)).getLiteral());
}

@Override
Expand All @@ -84,39 +84,17 @@ public TransformResultMetadata getResultMetadata() {
}

@Override
public String[][] transformToStringValuesMV(@Nonnull ProjectionBlock projectionBlock) {
final String[] stringValuesMV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
final String[][] results = new String[projectionBlock.getNumDocs()][];
for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
final List<String> stringVals = JsonPath.using(JSON_PATH_KEY_CONFIG).parse(stringValuesMV[i]).read(_jsonPath);
results[i] = new String[stringVals.size()];
for (int j = 0; j < stringVals.size(); j++) {
results[i][j] = stringVals.get(j);
}
public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) {
if (_stringValuesMV == null) {
_stringValuesMV = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
}
return results;
}

static {
Configuration.setDefaults(new Configuration.Defaults() {

private final JsonProvider _jsonProvider = new JacksonJsonProvider();
private final MappingProvider _mappingProvider = new JacksonMappingProvider();

@Override
public JsonProvider jsonProvider() {
return _jsonProvider;
}

@Override
public MappingProvider mappingProvider() {
return _mappingProvider;
}

@Override
public Set<Option> options() {
return ImmutableSet.of(Option.SUPPRESS_EXCEPTIONS);
}
});
String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
int numDocs = projectionBlock.getNumDocs();
for (int i = 0; i < numDocs; i++) {
List<String> values = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
_stringValuesMV[i] = values.toArray(new String[0]);
}
return _stringValuesMV;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,14 @@
*/
package org.apache.pinot.core.operator.transform.function;

import com.google.common.collect.ImmutableSet;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ParseContext;
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
import com.jayway.jsonpath.spi.json.JsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import com.jayway.jsonpath.spi.mapper.MappingProvider;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
Expand All @@ -54,11 +50,14 @@
*/
public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
public static final String FUNCTION_NAME = "jsonExtractScalar";
private static final ParseContext JSON_PARSER_CONTEXT =
JsonPath.using(Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));

private static final ParseContext JSON_PARSER_CONTEXT = JsonPath.using(
new Configuration.ConfigurationBuilder().jsonProvider(new JacksonJsonProvider())
.mappingProvider(new JacksonMappingProvider()).options(Option.SUPPRESS_EXCEPTIONS).build());

private TransformFunction _jsonFieldTransformFunction;
private String _jsonPath;
private String _jsonPathString;
private JsonPath _jsonPath;
private Object _defaultValue = null;
private TransformResultMetadata _resultMetadata;

Expand All @@ -83,7 +82,8 @@ public void init(List<TransformFunction> arguments, Map<String, DataSource> data
+ "function");
}
_jsonFieldTransformFunction = firstArgument;
_jsonPath = ((LiteralTransformFunction) arguments.get(1)).getLiteral();
_jsonPathString = ((LiteralTransformFunction) arguments.get(1)).getLiteral();
_jsonPath = JsonPath.compile(_jsonPathString);
String resultsType = ((LiteralTransformFunction) arguments.get(2)).getLiteral().toUpperCase();
boolean isSingleValue = !resultsType.endsWith("_ARRAY");
try {
Expand Down Expand Up @@ -126,7 +126,7 @@ public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
continue;
}
throw new RuntimeException(
String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPathString, jsonStrings[i]));
}
if (result instanceof Number) {
_intValuesSV[i] = ((Number) result).intValue();
Expand Down Expand Up @@ -157,7 +157,7 @@ public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
continue;
}
throw new RuntimeException(
String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPathString, jsonStrings[i]));
}
if (result instanceof Number) {
_longValuesSV[i] = ((Number) result).longValue();
Expand Down Expand Up @@ -189,7 +189,7 @@ public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
continue;
}
throw new RuntimeException(
String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPathString, jsonStrings[i]));
}
if (result instanceof Number) {
_floatValuesSV[i] = ((Number) result).floatValue();
Expand Down Expand Up @@ -220,7 +220,7 @@ public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
continue;
}
throw new RuntimeException(
String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPathString, jsonStrings[i]));
}
if (result instanceof Number) {
_doubleValuesSV[i] = ((Number) result).doubleValue();
Expand Down Expand Up @@ -251,7 +251,7 @@ public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
continue;
}
throw new RuntimeException(
String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPathString, jsonStrings[i]));
}
if (result instanceof String) {
_stringValuesSV[i] = (String) result;
Expand Down Expand Up @@ -401,27 +401,4 @@ public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) {
}
return _stringValuesMV;
}

static {
Configuration.setDefaults(new Configuration.Defaults() {

private final JsonProvider _jsonProvider = new JacksonJsonProvider();
private final MappingProvider _mappingProvider = new JacksonMappingProvider();

@Override
public JsonProvider jsonProvider() {
return _jsonProvider;
}

@Override
public MappingProvider mappingProvider() {
return _mappingProvider;
}

@Override
public Set<Option> options() {
return ImmutableSet.of(Option.SUPPRESS_EXCEPTIONS);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.jayway.jsonpath.spi.cache.Cache;
import com.jayway.jsonpath.spi.cache.CacheProvider;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -35,6 +37,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.function.JsonPathCache;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
Expand Down Expand Up @@ -471,6 +474,13 @@ void testFailedSqlQuery()
Assert.assertEquals(pinotResponse.get("totalDocs").asInt(), 0);
}

@Test
public void testJsonPathCache() {
Cache cache = CacheProvider.getCache();
Assert.assertTrue(cache instanceof JsonPathCache);
Assert.assertTrue(((JsonPathCache) cache).size() > 0);
}

@AfterClass
public void tearDown()
throws Exception {
Expand Down