Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.evaluator.FunctionEvaluatorFactory;
import org.apache.pinot.common.utils.ThrottledLogger;
import org.apache.pinot.segment.local.utils.SchemaUtils;
Expand All @@ -44,10 +42,13 @@


/**
* The {@code ExpressionTransformer} class will evaluate the function expressions.
* {@link RecordTransformer} that evaluates ingestion transform functions from table config and field specs.
* <p>Per-row evaluation logic is shared with the enricher pipeline via
* {@link IngestionFunctionEvaluation#applyFunctionEvaluations} and {@link IngestionFunctionEvaluation.Policy};
* see {@link org.apache.pinot.segment.local.recordtransformer.enricher.function.CustomFunctionEnricher} for
* enricher-specific configuration (JSON {@code fieldToFunctionMap}).
* <p>NOTE: should put this before the {@link DataTypeTransformer}. After this, transformed column can be treated as
* regular column for other record transformers.
* TODO: Merge this and CustomFunctionEnricher
*/
public class ExpressionTransformer implements RecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionTransformer.class);
Expand Down Expand Up @@ -177,43 +178,9 @@ public Set<String> getInputColumns() {

@Override
public void transform(GenericRow record) {
for (Map.Entry<String, FunctionEvaluator> entry : _expressionEvaluators.entrySet()) {
String column = entry.getKey();
FunctionEvaluator transformFunctionEvaluator = entry.getValue();
Object existingValue = record.getValue(column);
boolean shouldApplyTransform = _overwriteExistingValues || existingValue == null || record.isNullValue(column);
if (shouldApplyTransform) {
try {
// Apply transformation when overwriting is enabled or when the current value is null.
// NOTE: column value might already exist for OFFLINE data. For backward compatibility, we may override
// nested fields like arrays, collections, or maps since they were not included in the record
// transformation before.
Object transformedValue = transformFunctionEvaluator.evaluate(record);
applyTransformedValue(record, column, transformedValue);
} catch (Exception e) {
if (!_continueOnError) {
throw new RuntimeException("Caught exception while evaluation transform function for column: " + column, e);
}
_throttledLogger.warn("Caught exception while evaluation transform function for column: " + column, e);
record.markIncomplete();
}
} else if (existingValue.getClass().isArray() || existingValue instanceof Collection
|| existingValue instanceof Map) {
try {
Object transformedValue = transformFunctionEvaluator.evaluate(record);
if (transformedValue == null && _implicitMapTransformColumns.contains(column)) {
continue;
}
// For backward compatibility, The only exception here is that we will override nested field like array,
// collection or map since they were not included in the record transformation before.
if (!isTypeCompatible(existingValue, transformedValue)) {
applyTransformedValue(record, column, transformedValue);
}
} catch (Exception e) {
LOGGER.debug("Caught exception while evaluation transform function for column: {}", column, e);
}
}
}
IngestionFunctionEvaluation.applyFunctionEvaluations(record, _expressionEvaluators,
IngestionFunctionEvaluation.Policy.expressionTransformation(_continueOnError, _overwriteExistingValues,
_implicitMapTransformColumns, _throttledLogger));
}

private static boolean isImplicitMapTransform(FieldSpec fieldSpec) {
Expand All @@ -224,30 +191,4 @@ private static boolean isImplicitMapTransform(FieldSpec fieldSpec) {
return fieldName.endsWith(SchemaUtils.MAP_KEY_COLUMN_SUFFIX)
|| fieldName.endsWith(SchemaUtils.MAP_VALUE_COLUMN_SUFFIX);
}

private void applyTransformedValue(GenericRow record, String column, @Nullable Object transformedValue) {
if (transformedValue != null) {
record.removeNullValueField(column);
record.putValue(column, transformedValue);
} else {
record.removeValue(column);
record.addNullValueField(column);
}
}

private boolean isTypeCompatible(Object existingValue, @Nullable Object transformedValue) {
if (transformedValue == null || existingValue == null) {
return transformedValue == existingValue;
}
if (transformedValue.getClass() == existingValue.getClass()) {
return true;
}
if (transformedValue instanceof Collection && existingValue instanceof Collection) {
return true;
}
if (transformedValue instanceof Map && existingValue instanceof Map) {
return true;
}
return transformedValue.getClass().isArray() && existingValue.getClass().isArray();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.recordtransformer;

import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.ThrottledLogger;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.function.FunctionEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Shared per-row application of {@link FunctionEvaluator}s during ingestion. A single entry point,
* {@link #applyFunctionEvaluations}, iterates evaluators; {@link Policy} selects semantics for each pipeline:
* <ul>
* <li>{@link Policy#enricher()} &mdash; used by
* {@link org.apache.pinot.segment.local.recordtransformer.enricher.function.CustomFunctionEnricher}: always
* {@code putValue(column, evaluate(record))} in map order (exceptions propagate).</li>
* <li>{@link Policy#expressionTransformation} &mdash; used by {@link ExpressionTransformer}: null / overwrite /
* implicit MAP rules and {@code continueOnError} as in table/schema-driven transforms.</li>
* </ul>
*/
public final class IngestionFunctionEvaluation {
private static final Logger LOGGER = LoggerFactory.getLogger(IngestionFunctionEvaluation.class);

private IngestionFunctionEvaluation() {
}

/**
* How each {@code (column, evaluator)} pair is applied to the row. Only one policy is active per call.
*/
public static final class Policy {
private final boolean _enricherOverwrite;
private final boolean _continueOnError;
private final boolean _overwriteExistingValues;
private final Set<String> _implicitMapTransformColumns;
private final ThrottledLogger _throttledLogger;

private Policy(boolean enricherOverwrite, boolean continueOnError, boolean overwriteExistingValues,
Set<String> implicitMapTransformColumns, ThrottledLogger throttledLogger) {
_enricherOverwrite = enricherOverwrite;
_continueOnError = continueOnError;
_overwriteExistingValues = overwriteExistingValues;
_implicitMapTransformColumns = implicitMapTransformColumns;
_throttledLogger = throttledLogger;
}

/**
* JSON {@code fieldToFunctionMap} enricher: every field is overwritten with {@code evaluate(record)}.
*/
public static Policy enricher() {
return new Policy(true, false, false, Set.of(), null);
}

/**
* Table/schema-driven {@link ExpressionTransformer} transforms (including post-upsert overrides).
*/
public static Policy expressionTransformation(boolean continueOnError, boolean overwriteExistingValues,
Set<String> implicitMapTransformColumns, ThrottledLogger throttledLogger) {
return new Policy(false, continueOnError, overwriteExistingValues, implicitMapTransformColumns,
Objects.requireNonNull(throttledLogger, "throttledLogger"));
}
}

/**
* Single loop over {@code evaluators}: behavior is determined by {@code policy} (enricher vs. expression transform).
*/
public static void applyFunctionEvaluations(GenericRow record, Map<String, FunctionEvaluator> evaluators,
Policy policy) {
for (Map.Entry<String, FunctionEvaluator> entry : evaluators.entrySet()) {
String column = entry.getKey();
FunctionEvaluator transformFunctionEvaluator = entry.getValue();

if (policy._enricherOverwrite) {
record.putValue(column, transformFunctionEvaluator.evaluate(record));
continue;
}

Object existingValue = record.getValue(column);
boolean shouldApplyTransform =
policy._overwriteExistingValues || existingValue == null || record.isNullValue(column);
if (shouldApplyTransform) {
try {
Object transformedValue = transformFunctionEvaluator.evaluate(record);
applyTransformedValue(record, column, transformedValue);
} catch (Exception e) {
if (!policy._continueOnError) {
throw new RuntimeException("Caught exception while evaluation transform function for column: " + column, e);
}
policy._throttledLogger.warn("Caught exception while evaluation transform function for column: " + column, e);
record.markIncomplete();
}
} else if (existingValue.getClass().isArray() || existingValue instanceof Collection
|| existingValue instanceof Map) {
try {
Object transformedValue = transformFunctionEvaluator.evaluate(record);
if (transformedValue == null && policy._implicitMapTransformColumns.contains(column)) {
continue;
}
if (!isTypeCompatible(existingValue, transformedValue)) {
applyTransformedValue(record, column, transformedValue);
}
} catch (Exception e) {
LOGGER.debug("Caught exception while evaluation transform function for column: {}", column, e);
}
}
}
}

private static void applyTransformedValue(GenericRow record, String column, @Nullable Object transformedValue) {
if (transformedValue != null) {
record.removeNullValueField(column);
record.putValue(column, transformedValue);
} else {
record.removeValue(column);
record.addNullValueField(column);
}
}

private static boolean isTypeCompatible(Object existingValue, @Nullable Object transformedValue) {
if (transformedValue == null || existingValue == null) {
return transformedValue == existingValue;
}
if (transformedValue.getClass() == existingValue.getClass()) {
return true;
}
if (transformedValue instanceof Collection && existingValue instanceof Collection) {
return true;
}
if (transformedValue instanceof Map && existingValue instanceof Map) {
return true;
}
return transformedValue.getClass().isArray() && existingValue.getClass().isArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.evaluator.FunctionEvaluatorFactory;
import org.apache.pinot.segment.local.recordtransformer.IngestionFunctionEvaluation;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.function.FunctionEvaluator;
import org.apache.pinot.spi.recordtransformer.enricher.RecordEnricher;
import org.apache.pinot.spi.utils.JsonUtils;


/**
* Enriches the record with custom functions.
* TODO: Merge this and ExpressionTransformer
* {@link RecordEnricher} that applies a JSON {@code fieldToFunctionMap} of transform expressions. Evaluation is
* delegated to {@link IngestionFunctionEvaluation#applyFunctionEvaluations} with
* {@link IngestionFunctionEvaluation.Policy#enricher()}; for table/schema-driven transforms and
* dependency ordering, see {@link org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer}.
*/
public class CustomFunctionEnricher implements RecordEnricher {
private final LinkedHashMap<String, FunctionEvaluator> _fieldToFunctionEvaluator;
Expand All @@ -61,8 +64,7 @@ public List<String> getInputColumns() {

@Override
public void enrich(GenericRow record) {
_fieldToFunctionEvaluator.forEach((field, evaluator) -> {
record.putValue(field, evaluator.evaluate(record));
});
IngestionFunctionEvaluation.applyFunctionEvaluations(record, _fieldToFunctionEvaluator,
IngestionFunctionEvaluation.Policy.enricher());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.recordtransformer;

import java.util.Collections;
import java.util.LinkedHashMap;
import org.apache.pinot.common.utils.ThrottledLogger;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.function.FunctionEvaluator;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;


/**
* Parity tests for {@link IngestionFunctionEvaluation} (shared by {@link ExpressionTransformer} and
* {@link org.apache.pinot.segment.local.recordtransformer.enricher.function.CustomFunctionEnricher}).
*/
public class IngestionFunctionEvaluationTest {

@Test
public void testApplyEnricherEvaluationsAlwaysOverwrites() {
FunctionEvaluator evaluator = mock(FunctionEvaluator.class);
when(evaluator.evaluate(any(GenericRow.class))).thenReturn("enriched");
LinkedHashMap<String, FunctionEvaluator> map = new LinkedHashMap<>();
map.put("out", evaluator);
GenericRow record = new GenericRow();
record.putValue("out", "stale");
IngestionFunctionEvaluation.applyFunctionEvaluations(record, map, IngestionFunctionEvaluation.Policy.enricher());
assertEquals(record.getValue("out"), "enriched");
verify(evaluator).evaluate(record);
}

@Test
public void testApplyExpressionTransformationsFillsWhenColumnNull() {
FunctionEvaluator evaluator = mock(FunctionEvaluator.class);
when(evaluator.evaluate(any(GenericRow.class))).thenReturn(99L);
LinkedHashMap<String, FunctionEvaluator> map = new LinkedHashMap<>();
map.put("c", evaluator);
GenericRow record = new GenericRow();
assertNull(record.getValue("c"));
ThrottledLogger throttledLogger = mock(ThrottledLogger.class);
IngestionFunctionEvaluation.applyFunctionEvaluations(record, map,
IngestionFunctionEvaluation.Policy.expressionTransformation(false, false, Collections.emptySet(),
throttledLogger));
assertEquals(record.getValue("c"), 99L);
}

@Test
public void testApplyExpressionTransformationsSkipsWhenPrimitivePresent() {
FunctionEvaluator evaluator = mock(FunctionEvaluator.class);
LinkedHashMap<String, FunctionEvaluator> map = new LinkedHashMap<>();
map.put("c", evaluator);
GenericRow record = new GenericRow();
record.putValue("c", 1);
ThrottledLogger throttledLogger = mock(ThrottledLogger.class);
IngestionFunctionEvaluation.applyFunctionEvaluations(record, map,
IngestionFunctionEvaluation.Policy.expressionTransformation(false, false, Collections.emptySet(),
throttledLogger));
assertEquals(record.getValue("c"), 1);
verifyNoInteractions(evaluator);
}

@Test
public void testApplyExpressionTransformationsContinueOnErrorMarksIncomplete() {
FunctionEvaluator evaluator = mock(FunctionEvaluator.class);
when(evaluator.evaluate(any(GenericRow.class)))
.thenThrow(new RuntimeException("eval failed"));
LinkedHashMap<String, FunctionEvaluator> map = new LinkedHashMap<>();
map.put("c", evaluator);
GenericRow record = new GenericRow();
ThrottledLogger throttledLogger = mock(ThrottledLogger.class);
IngestionFunctionEvaluation.applyFunctionEvaluations(record, map,
IngestionFunctionEvaluation.Policy.expressionTransformation(true, false, Collections.emptySet(),
throttledLogger));
assertTrue(record.isIncomplete());
}
}
Loading