diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/exceptions/KernelEngineException.java b/kernel/kernel-api/src/main/java/io/delta/kernel/exceptions/KernelEngineException.java new file mode 100644 index 00000000000..41caee8d4f9 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/exceptions/KernelEngineException.java @@ -0,0 +1,34 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.exceptions; + +/** + * TODO + */ +public class KernelEngineException extends RuntimeException { + + public KernelEngineException(String attemptedOperation, Throwable cause) { + super( + String.format( + "Encountered an error from the underlying engine implementation while trying " + + "to %s: %s", + attemptedOperation, + cause.getMessage() + ), + cause + ); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java index 1c98f650283..d6f4020f287 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java @@ -37,6 +37,7 @@ import io.delta.kernel.internal.skipping.DataSkippingPredicate; import io.delta.kernel.internal.skipping.DataSkippingUtils; import io.delta.kernel.internal.util.*; +import static io.delta.kernel.internal.client.WrappedExpressionHandler.wrapExpressionHandler; import static io.delta.kernel.internal.skipping.StatsSchemaHelper.getStatsSchema; import static io.delta.kernel.internal.util.PartitionUtils.rewritePartitionPredicateOnCheckpointFileSchema; import static io.delta.kernel.internal.util.PartitionUtils.rewritePartitionPredicateOnScanFileSchema; @@ -231,14 +232,21 @@ public boolean hasNext() { public FilteredColumnarBatch next() { FilteredColumnarBatch next = scanFileIter.next(); if (predicateEvaluator == null) { - predicateEvaluator = - engine.getExpressionHandler().getPredicateEvaluator( - next.getData().getSchema(), - predicateOnScanFileBatch); + predicateEvaluator = wrapExpressionHandler( + engine, + String.format( + "Evaluate partition predicate %s on schema %s", + predicateOnScanFileBatch, + next.getData().getSchema()) + ).getPredicateEvaluator( + next.getData().getSchema(), + predicateOnScanFileBatch + ); } ColumnVector newSelectionVector = predicateEvaluator.eval( - next.getData(), - next.getSelectionVector()); + next.getData(), + next.getSelectionVector()); + return new FilteredColumnarBatch( next.getData(), Optional.of(newSelectionVector)); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/client/ClientExceptionWrapping.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/client/ClientExceptionWrapping.java new file mode 100644 index 00000000000..53ef2b242ab --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/client/ClientExceptionWrapping.java @@ -0,0 +1,32 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.client; + +import java.util.concurrent.Callable; + +import io.delta.kernel.exceptions.KernelEngineException; + +public interface ClientExceptionWrapping { + + // TODO make operation string construction lazy + default T wrapWithEngineException(Callable s, String operation) { + try { + return s.call(); + } catch (Exception e) { + throw new KernelEngineException(operation, e); + } + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/client/WrappedExpressionHandler.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/client/WrappedExpressionHandler.java new file mode 100644 index 00000000000..13b798d2c43 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/client/WrappedExpressionHandler.java @@ -0,0 +1,88 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.client; + +import java.util.Optional; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.engine.ExpressionHandler; +import io.delta.kernel.expressions.Expression; +import io.delta.kernel.expressions.ExpressionEvaluator; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.expressions.PredicateEvaluator; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.StructType; + +public class WrappedExpressionHandler implements ExpressionHandler, ClientExceptionWrapping { + + public static ExpressionHandler wrapExpressionHandler(Engine engine, String operation) { + return new WrappedExpressionHandler(engine.getExpressionHandler(), operation); + } + + private final ExpressionHandler baseHandler; + private final String operationMessage; + + // TODO make operation string construction lazy + private WrappedExpressionHandler(ExpressionHandler baseHandler, String operationMessage) { + this.baseHandler = baseHandler; + this.operationMessage = operationMessage; + } + + @Override + public ExpressionEvaluator getEvaluator(StructType inputSchema, Expression expression, DataType outputType) { + ExpressionEvaluator baseEvaluator = wrapWithEngineException( + () -> baseHandler.getEvaluator(inputSchema, expression, outputType), + operationMessage); + return new ExpressionEvaluator() { + @Override + public void close() throws Exception { + baseEvaluator.close(); + } + + @Override + public ColumnVector eval(ColumnarBatch input) { + return wrapWithEngineException( + () -> baseEvaluator.eval(input), + operationMessage); + } + }; + } + + @Override + public PredicateEvaluator getPredicateEvaluator(StructType inputSchema, Predicate predicate) { + PredicateEvaluator baseEvaluator = wrapWithEngineException( + () -> baseHandler.getPredicateEvaluator(inputSchema, predicate), + operationMessage); + return new PredicateEvaluator() { + @Override + public ColumnVector eval(ColumnarBatch inputData, Optional existingSelectionVector) { + return wrapWithEngineException( + () -> baseEvaluator.eval(inputData, existingSelectionVector), + operationMessage); + } + }; + } + + @Override + public ColumnVector createSelectionVector(boolean[] values, int from, int to) { + return wrapWithEngineException( + () -> baseHandler.createSelectionVector(values, from, to), + operationMessage + ); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java index 23847402adb..1bf27d2e7e7 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java @@ -33,6 +33,7 @@ import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.replay.LogReplayUtils.UniqueFileActionTuple; import io.delta.kernel.internal.util.Utils; +import static io.delta.kernel.internal.client.WrappedExpressionHandler.wrapExpressionHandler; import static io.delta.kernel.internal.replay.LogReplay.ADD_FILE_DV_ORDINAL; import static io.delta.kernel.internal.replay.LogReplay.ADD_FILE_ORDINAL; import static io.delta.kernel.internal.replay.LogReplay.ADD_FILE_PATH_ORDINAL; @@ -217,11 +218,14 @@ private void prepareNext() { // Step 4: TODO: remove this step. This is a temporary requirement until the path // in `add` is converted to absolute path. if (tableRootVectorGenerator == null) { - tableRootVectorGenerator = engine.getExpressionHandler() - .getEvaluator( - scanAddFiles.getSchema(), - Literal.ofString(tableRoot.toUri().toString()), - StringType.STRING); + tableRootVectorGenerator = wrapExpressionHandler( + engine, + "Evaluate the table root literal" + ).getEvaluator( + scanAddFiles.getSchema(), + Literal.ofString(tableRoot.toUri().toString()), + StringType.STRING + ); } ColumnVector tableRootVector = tableRootVectorGenerator.eval(scanAddFiles); scanAddFiles = scanAddFiles.withNewColumn(