diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
index 3e154039242c..d52e3aac64fa 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
@@ -20,14 +20,12 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nullable;
import org.apache.pinot.common.evaluator.FunctionEvaluatorFactory;
import org.apache.pinot.common.utils.ThrottledLogger;
import org.apache.pinot.segment.local.utils.SchemaUtils;
@@ -44,10 +42,13 @@
/**
- * The {@code ExpressionTransformer} class will evaluate the function expressions.
+ * {@link RecordTransformer} that evaluates ingestion transform functions from table config and field specs.
+ *
Per-row evaluation logic is shared with the enricher pipeline via
+ * {@link IngestionFunctionEvaluation#applyFunctionEvaluations} and {@link IngestionFunctionEvaluation.Policy};
+ * see {@link org.apache.pinot.segment.local.recordtransformer.enricher.function.CustomFunctionEnricher} for
+ * enricher-specific configuration (JSON {@code fieldToFunctionMap}).
*
NOTE: should put this before the {@link DataTypeTransformer}. After this, transformed column can be treated as
* regular column for other record transformers.
- * TODO: Merge this and CustomFunctionEnricher
*/
public class ExpressionTransformer implements RecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(ExpressionTransformer.class);
@@ -177,43 +178,9 @@ public Set getInputColumns() {
@Override
public void transform(GenericRow record) {
- for (Map.Entry entry : _expressionEvaluators.entrySet()) {
- String column = entry.getKey();
- FunctionEvaluator transformFunctionEvaluator = entry.getValue();
- Object existingValue = record.getValue(column);
- boolean shouldApplyTransform = _overwriteExistingValues || existingValue == null || record.isNullValue(column);
- if (shouldApplyTransform) {
- try {
- // Apply transformation when overwriting is enabled or when the current value is null.
- // NOTE: column value might already exist for OFFLINE data. For backward compatibility, we may override
- // nested fields like arrays, collections, or maps since they were not included in the record
- // transformation before.
- Object transformedValue = transformFunctionEvaluator.evaluate(record);
- applyTransformedValue(record, column, transformedValue);
- } catch (Exception e) {
- if (!_continueOnError) {
- throw new RuntimeException("Caught exception while evaluation transform function for column: " + column, e);
- }
- _throttledLogger.warn("Caught exception while evaluation transform function for column: " + column, e);
- record.markIncomplete();
- }
- } else if (existingValue.getClass().isArray() || existingValue instanceof Collection
- || existingValue instanceof Map) {
- try {
- Object transformedValue = transformFunctionEvaluator.evaluate(record);
- if (transformedValue == null && _implicitMapTransformColumns.contains(column)) {
- continue;
- }
- // For backward compatibility, The only exception here is that we will override nested field like array,
- // collection or map since they were not included in the record transformation before.
- if (!isTypeCompatible(existingValue, transformedValue)) {
- applyTransformedValue(record, column, transformedValue);
- }
- } catch (Exception e) {
- LOGGER.debug("Caught exception while evaluation transform function for column: {}", column, e);
- }
- }
- }
+ IngestionFunctionEvaluation.applyFunctionEvaluations(record, _expressionEvaluators,
+ IngestionFunctionEvaluation.Policy.expressionTransformation(_continueOnError, _overwriteExistingValues,
+ _implicitMapTransformColumns, _throttledLogger));
}
private static boolean isImplicitMapTransform(FieldSpec fieldSpec) {
@@ -224,30 +191,4 @@ private static boolean isImplicitMapTransform(FieldSpec fieldSpec) {
return fieldName.endsWith(SchemaUtils.MAP_KEY_COLUMN_SUFFIX)
|| fieldName.endsWith(SchemaUtils.MAP_VALUE_COLUMN_SUFFIX);
}
-
- private void applyTransformedValue(GenericRow record, String column, @Nullable Object transformedValue) {
- if (transformedValue != null) {
- record.removeNullValueField(column);
- record.putValue(column, transformedValue);
- } else {
- record.removeValue(column);
- record.addNullValueField(column);
- }
- }
-
- private boolean isTypeCompatible(Object existingValue, @Nullable Object transformedValue) {
- if (transformedValue == null || existingValue == null) {
- return transformedValue == existingValue;
- }
- if (transformedValue.getClass() == existingValue.getClass()) {
- return true;
- }
- if (transformedValue instanceof Collection && existingValue instanceof Collection) {
- return true;
- }
- if (transformedValue instanceof Map && existingValue instanceof Map) {
- return true;
- }
- return transformedValue.getClass().isArray() && existingValue.getClass().isArray();
- }
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/IngestionFunctionEvaluation.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/IngestionFunctionEvaluation.java
new file mode 100644
index 000000000000..5423db8ed2a1
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/IngestionFunctionEvaluation.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordtransformer;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.ThrottledLogger;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.function.FunctionEvaluator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Shared per-row application of {@link FunctionEvaluator}s during ingestion. A single entry point,
+ * {@link #applyFunctionEvaluations}, iterates evaluators; {@link Policy} selects semantics for each pipeline:
+ *
+ * - {@link Policy#enricher()} — used by
+ * {@link org.apache.pinot.segment.local.recordtransformer.enricher.function.CustomFunctionEnricher}: always
+ * {@code putValue(column, evaluate(record))} in map order (exceptions propagate).
+ * - {@link Policy#expressionTransformation} — used by {@link ExpressionTransformer}: null / overwrite /
+ * implicit MAP rules and {@code continueOnError} as in table/schema-driven transforms.
+ *
+ */
+public final class IngestionFunctionEvaluation {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IngestionFunctionEvaluation.class);
+
+ private IngestionFunctionEvaluation() {
+ }
+
+ /**
+ * How each {@code (column, evaluator)} pair is applied to the row. Only one policy is active per call.
+ */
+ public static final class Policy {
+ private final boolean _enricherOverwrite;
+ private final boolean _continueOnError;
+ private final boolean _overwriteExistingValues;
+ private final Set _implicitMapTransformColumns;
+ private final ThrottledLogger _throttledLogger;
+
+ private Policy(boolean enricherOverwrite, boolean continueOnError, boolean overwriteExistingValues,
+ Set implicitMapTransformColumns, ThrottledLogger throttledLogger) {
+ _enricherOverwrite = enricherOverwrite;
+ _continueOnError = continueOnError;
+ _overwriteExistingValues = overwriteExistingValues;
+ _implicitMapTransformColumns = implicitMapTransformColumns;
+ _throttledLogger = throttledLogger;
+ }
+
+ /**
+ * JSON {@code fieldToFunctionMap} enricher: every field is overwritten with {@code evaluate(record)}.
+ */
+ public static Policy enricher() {
+ return new Policy(true, false, false, Set.of(), null);
+ }
+
+ /**
+ * Table/schema-driven {@link ExpressionTransformer} transforms (including post-upsert overrides).
+ */
+ public static Policy expressionTransformation(boolean continueOnError, boolean overwriteExistingValues,
+ Set implicitMapTransformColumns, ThrottledLogger throttledLogger) {
+ return new Policy(false, continueOnError, overwriteExistingValues, implicitMapTransformColumns,
+ Objects.requireNonNull(throttledLogger, "throttledLogger"));
+ }
+ }
+
+ /**
+ * Single loop over {@code evaluators}: behavior is determined by {@code policy} (enricher vs. expression transform).
+ */
+ public static void applyFunctionEvaluations(GenericRow record, Map evaluators,
+ Policy policy) {
+ for (Map.Entry entry : evaluators.entrySet()) {
+ String column = entry.getKey();
+ FunctionEvaluator transformFunctionEvaluator = entry.getValue();
+
+ if (policy._enricherOverwrite) {
+ record.putValue(column, transformFunctionEvaluator.evaluate(record));
+ continue;
+ }
+
+ Object existingValue = record.getValue(column);
+ boolean shouldApplyTransform =
+ policy._overwriteExistingValues || existingValue == null || record.isNullValue(column);
+ if (shouldApplyTransform) {
+ try {
+ Object transformedValue = transformFunctionEvaluator.evaluate(record);
+ applyTransformedValue(record, column, transformedValue);
+ } catch (Exception e) {
+ if (!policy._continueOnError) {
+ throw new RuntimeException("Caught exception while evaluation transform function for column: " + column, e);
+ }
+ policy._throttledLogger.warn("Caught exception while evaluation transform function for column: " + column, e);
+ record.markIncomplete();
+ }
+ } else if (existingValue.getClass().isArray() || existingValue instanceof Collection
+ || existingValue instanceof Map) {
+ try {
+ Object transformedValue = transformFunctionEvaluator.evaluate(record);
+ if (transformedValue == null && policy._implicitMapTransformColumns.contains(column)) {
+ continue;
+ }
+ if (!isTypeCompatible(existingValue, transformedValue)) {
+ applyTransformedValue(record, column, transformedValue);
+ }
+ } catch (Exception e) {
+ LOGGER.debug("Caught exception while evaluation transform function for column: {}", column, e);
+ }
+ }
+ }
+ }
+
+ private static void applyTransformedValue(GenericRow record, String column, @Nullable Object transformedValue) {
+ if (transformedValue != null) {
+ record.removeNullValueField(column);
+ record.putValue(column, transformedValue);
+ } else {
+ record.removeValue(column);
+ record.addNullValueField(column);
+ }
+ }
+
+ private static boolean isTypeCompatible(Object existingValue, @Nullable Object transformedValue) {
+ if (transformedValue == null || existingValue == null) {
+ return transformedValue == existingValue;
+ }
+ if (transformedValue.getClass() == existingValue.getClass()) {
+ return true;
+ }
+ if (transformedValue instanceof Collection && existingValue instanceof Collection) {
+ return true;
+ }
+ if (transformedValue instanceof Map && existingValue instanceof Map) {
+ return true;
+ }
+ return transformedValue.getClass().isArray() && existingValue.getClass().isArray();
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/function/CustomFunctionEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/function/CustomFunctionEnricher.java
index 047528505e8a..4027e253b839 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/function/CustomFunctionEnricher.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/enricher/function/CustomFunctionEnricher.java
@@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.evaluator.FunctionEvaluatorFactory;
+import org.apache.pinot.segment.local.recordtransformer.IngestionFunctionEvaluation;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.function.FunctionEvaluator;
import org.apache.pinot.spi.recordtransformer.enricher.RecordEnricher;
@@ -33,8 +34,10 @@
/**
- * Enriches the record with custom functions.
- * TODO: Merge this and ExpressionTransformer
+ * {@link RecordEnricher} that applies a JSON {@code fieldToFunctionMap} of transform expressions. Evaluation is
+ * delegated to {@link IngestionFunctionEvaluation#applyFunctionEvaluations} with
+ * {@link IngestionFunctionEvaluation.Policy#enricher()}; for table/schema-driven transforms and
+ * dependency ordering, see {@link org.apache.pinot.segment.local.recordtransformer.ExpressionTransformer}.
*/
public class CustomFunctionEnricher implements RecordEnricher {
private final LinkedHashMap _fieldToFunctionEvaluator;
@@ -61,8 +64,7 @@ public List getInputColumns() {
@Override
public void enrich(GenericRow record) {
- _fieldToFunctionEvaluator.forEach((field, evaluator) -> {
- record.putValue(field, evaluator.evaluate(record));
- });
+ IngestionFunctionEvaluation.applyFunctionEvaluations(record, _fieldToFunctionEvaluator,
+ IngestionFunctionEvaluation.Policy.enricher());
}
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/IngestionFunctionEvaluationTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/IngestionFunctionEvaluationTest.java
new file mode 100644
index 000000000000..7a63839d0457
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/IngestionFunctionEvaluationTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordtransformer;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import org.apache.pinot.common.utils.ThrottledLogger;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.function.FunctionEvaluator;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Parity tests for {@link IngestionFunctionEvaluation} (shared by {@link ExpressionTransformer} and
+ * {@link org.apache.pinot.segment.local.recordtransformer.enricher.function.CustomFunctionEnricher}).
+ */
+public class IngestionFunctionEvaluationTest {
+
+ @Test
+ public void testApplyEnricherEvaluationsAlwaysOverwrites() {
+ FunctionEvaluator evaluator = mock(FunctionEvaluator.class);
+ when(evaluator.evaluate(any(GenericRow.class))).thenReturn("enriched");
+ LinkedHashMap map = new LinkedHashMap<>();
+ map.put("out", evaluator);
+ GenericRow record = new GenericRow();
+ record.putValue("out", "stale");
+ IngestionFunctionEvaluation.applyFunctionEvaluations(record, map, IngestionFunctionEvaluation.Policy.enricher());
+ assertEquals(record.getValue("out"), "enriched");
+ verify(evaluator).evaluate(record);
+ }
+
+ @Test
+ public void testApplyExpressionTransformationsFillsWhenColumnNull() {
+ FunctionEvaluator evaluator = mock(FunctionEvaluator.class);
+ when(evaluator.evaluate(any(GenericRow.class))).thenReturn(99L);
+ LinkedHashMap map = new LinkedHashMap<>();
+ map.put("c", evaluator);
+ GenericRow record = new GenericRow();
+ assertNull(record.getValue("c"));
+ ThrottledLogger throttledLogger = mock(ThrottledLogger.class);
+ IngestionFunctionEvaluation.applyFunctionEvaluations(record, map,
+ IngestionFunctionEvaluation.Policy.expressionTransformation(false, false, Collections.emptySet(),
+ throttledLogger));
+ assertEquals(record.getValue("c"), 99L);
+ }
+
+ @Test
+ public void testApplyExpressionTransformationsSkipsWhenPrimitivePresent() {
+ FunctionEvaluator evaluator = mock(FunctionEvaluator.class);
+ LinkedHashMap map = new LinkedHashMap<>();
+ map.put("c", evaluator);
+ GenericRow record = new GenericRow();
+ record.putValue("c", 1);
+ ThrottledLogger throttledLogger = mock(ThrottledLogger.class);
+ IngestionFunctionEvaluation.applyFunctionEvaluations(record, map,
+ IngestionFunctionEvaluation.Policy.expressionTransformation(false, false, Collections.emptySet(),
+ throttledLogger));
+ assertEquals(record.getValue("c"), 1);
+ verifyNoInteractions(evaluator);
+ }
+
+ @Test
+ public void testApplyExpressionTransformationsContinueOnErrorMarksIncomplete() {
+ FunctionEvaluator evaluator = mock(FunctionEvaluator.class);
+ when(evaluator.evaluate(any(GenericRow.class)))
+ .thenThrow(new RuntimeException("eval failed"));
+ LinkedHashMap map = new LinkedHashMap<>();
+ map.put("c", evaluator);
+ GenericRow record = new GenericRow();
+ ThrottledLogger throttledLogger = mock(ThrottledLogger.class);
+ IngestionFunctionEvaluation.applyFunctionEvaluations(record, map,
+ IngestionFunctionEvaluation.Policy.expressionTransformation(true, false, Collections.emptySet(),
+ throttledLogger));
+ assertTrue(record.isIncomplete());
+ }
+}