Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Handle KernelEngineException when reading the _last_checkpoint file #3086

Merged
merged 5 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.exceptions;

import io.delta.kernel.engine.Engine;

/**
* Throws when the {@link Engine} encountered an error while executing an operation.
*/
public class KernelEngineException extends RuntimeException {
private static final String msgTemplate = "Encountered an error from the underlying engine " +
"implementation while trying to %s: %s";

public KernelEngineException(String attemptedOperation, Throwable cause) {
super(String.format(msgTemplate, attemptedOperation, cause.getMessage()), cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
*/
package io.delta.kernel.internal.checkpoints;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.*;
import java.util.*;
import java.util.stream.Collectors;

Expand All @@ -26,6 +25,7 @@
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.KernelEngineException;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;

Expand Down Expand Up @@ -239,22 +239,39 @@ private Optional<CheckpointMetaData> loadMetadataFromFile(Engine engine, int tri
"Retrying after 1sec. (current attempt = {})",
lastCheckpointFilePath,
tries);
Thread.sleep(1000);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Optional.empty();
}
return loadMetadataFromFile(engine, tries + 1);
}
} catch (FileNotFoundException ex) {
return Optional.empty(); // there is no point retrying
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (IOException ex) {
// TODO: this exception is thrown when `readJsonFiles` is called and not before
// the file is actually read. In current implementation this never happens as the file
// is actually read when the data is fetched from the returned `CloseableIterator`
// from `readJsonFiles`. Once we wrap all calls to engine and throw
// {@link KernelEngineException} instead of {@link IOException}, we can remove this
// catch block.
return Optional.empty();
} catch (Exception ex) {
String msg = String.format(
"Failed to load checkpoint metadata from file %s. " +
"Retrying after 1sec. (current attempt = %s)",
lastCheckpointFilePath, tries);
logger.warn(msg, ex);
// we can retry until max tries are exhausted
return loadMetadataFromFile(engine, tries + 1);
} catch (KernelEngineException ex) {
Throwable cause = ex.getCause();
if (cause instanceof FileNotFoundException) {
return Optional.empty(); // there is no point retrying
} else if (cause instanceof Exception) {
String msg = String.format(
"Failed to load checkpoint metadata from file %s. " +
"It must be in the process of being written. " +
"Retrying after 1sec. (current attempt of %s (max 3)",
lastCheckpointFilePath, tries);
logger.warn(msg, cause);
// we can retry until max tries are exhausted. It saves latency as the alternative
// is to list files and find the last checkpoint file. And the `_last_checkpoint`
// file is possibly being written to.
return loadMetadataFromFile(engine, tries + 1);
}
throw ex;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ private Optional<CloseableIterator<FileStatus>> listFromOrNone(
} catch (FileNotFoundException e) {
return Optional.empty();
} catch (IOException io) {
throw new RuntimeException("Failed to list the files in delta log", io);
throw new UncheckedIOException("Failed to list the files in delta log", io);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.function.Function;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.KernelEngineException;
import io.delta.kernel.exceptions.KernelException;

import io.delta.kernel.internal.util.Utils;

Expand All @@ -34,6 +37,41 @@
*/
@Evolving
public interface CloseableIterator<T> extends Iterator<T>, Closeable {

/**
* Returns true if the iteration has more elements. (In other words, returns true if next would
* return an element rather than throwing an exception.)
*
* @return true if the iteration has more elements
* @throws KernelEngineException For any underlying exception occurs in {@link Engine} while
* trying to execute the operation. The original exception is (if
* any) wrapped in this exception as cause. E.g.
* {@link IOException} thrown while trying to read from a Delta
* log file. It will be wrapped in this exception as cause.
* @throws KernelException When encountered an operation or state that is invalid or
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for KernelEngineException you state i'ts for exceptions that occur in the Engine.

Am I correct in assuming that KernelException is for exceptions that occur in the Kernel, not in the engine? Can you add that very short clarification here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. Clarified.

* unsupported.
*/
@Override
boolean hasNext();

/**
* Returns the next element in the iteration.
*
* @return the next element in the iteration
* @throws NoSuchElementException if the iteration has no more elements
* @throws KernelEngineException For any underlying exception occurs in {@link Engine} while
* trying to execute the operation. The original exception is (if
* any) wrapped in this exception as cause. E.g.
* {@link IOException} thrown while trying to read from a Delta
* log file. It will be wrapped in this exception as cause.
* @throws KernelException When encountered an operation or state that is invalid or
* unsupported in Kernel. For example, trying to read from a
* Delta table that has advanced features which are not yet
* supported by Kernel.
*/
@Override
T next();

default <U> CloseableIterator<U> map(Function<T, U> mapper) {
CloseableIterator<T> delegate = this;
return new CloseableIterator<U>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@
package io.delta.kernel.internal.checkpoints

import io.delta.kernel.data.{ColumnVector, ColumnarBatch}
import io.delta.kernel.exceptions.KernelEngineException
import io.delta.kernel.expressions.Predicate
import io.delta.kernel.internal.checkpoints.Checkpointer.findLastCompleteCheckpointBeforeHelper
import io.delta.kernel.internal.fs.Path
import io.delta.kernel.internal.util.FileNames.checkpointFileSingular
import io.delta.kernel.internal.util.Utils
import io.delta.kernel.test.{BaseMockJsonHandler, MockFileSystemClientUtils, MockEngineUtils, VectorTestUtils}
import io.delta.kernel.test.{BaseMockJsonHandler, MockFileSystemClientUtils, VectorTestUtils}
import io.delta.kernel.types.StructType
import io.delta.kernel.utils.{CloseableIterator, FileStatus}
import org.scalatest.funsuite.AnyFunSuite

import java.io.{FileNotFoundException, IOException}
import java.util.Optional
import scala.util.control.NonFatal

class CheckpointerSuite extends AnyFunSuite with MockFileSystemClientUtils {
import CheckpointerSuite._
Expand Down Expand Up @@ -264,20 +266,25 @@ class MockLastCheckpointMetadataFileReader(maxFailures: Int) extends BaseMockJso
val file = fileIter.next()
val path = new Path(file.getPath)

if (currentFailCount < maxFailures) {
currentFailCount += 1
throw new IOException("Retryable exception")
}

Utils.singletonCloseableIterator(
path.getParent match {
case VALID_LAST_CHECKPOINT_FILE_TABLE => SAMPLE_LAST_CHECKPOINT_FILE_CONTENT
case ZERO_SIZED_LAST_CHECKPOINT_FILE_TABLE => ZERO_ENTRIES_COLUMNAR_BATCH
case INVALID_LAST_CHECKPOINT_FILE_TABLE =>
throw new IOException("Invalid last checkpoint file")
case LAST_CHECKPOINT_FILE_NOT_FOUND_TABLE =>
throw new FileNotFoundException("File not found")
case _ => throw new IOException("Unknown table")
})
try {
if (currentFailCount < maxFailures) {
currentFailCount += 1
throw new IOException("Retryable exception")
}

path.getParent match {
case VALID_LAST_CHECKPOINT_FILE_TABLE => SAMPLE_LAST_CHECKPOINT_FILE_CONTENT
case ZERO_SIZED_LAST_CHECKPOINT_FILE_TABLE => ZERO_ENTRIES_COLUMNAR_BATCH
case INVALID_LAST_CHECKPOINT_FILE_TABLE =>
throw new IOException("Invalid last checkpoint file")
case LAST_CHECKPOINT_FILE_NOT_FOUND_TABLE =>
throw new FileNotFoundException("File not found")
case _ => throw new IOException("Unknown table")
}
} catch {
case NonFatal(e) => throw new KernelEngineException("Failed to read last checkpoint", e);
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.*;
import static java.lang.String.format;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.*;
Expand All @@ -29,6 +30,7 @@

import io.delta.kernel.data.*;
import io.delta.kernel.engine.JsonHandler;
import io.delta.kernel.exceptions.KernelEngineException;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.CloseableIterator;
Expand Down Expand Up @@ -89,7 +91,7 @@ public StructType deserializeStructType(String structTypeJson) {
return DataTypeParser.parseSchema(defaultObjectReader.readTree(structTypeJson));
} catch (JsonProcessingException ex) {
throw new RuntimeException(
String.format("Could not parse JSON: %s", structTypeJson), ex);
format("Could not parse JSON: %s", structTypeJson), ex);
}
}

Expand Down Expand Up @@ -127,7 +129,8 @@ public boolean hasNext() {
}
}
} catch (IOException ex) {
throw new RuntimeException(ex);
throw new KernelEngineException(
format("Error reading JSON file: %s", currentFile.getPath()), ex);
}

return nextLine != null;
Expand Down Expand Up @@ -215,7 +218,7 @@ private Row parseJson(String json, StructType readSchema) {
final JsonNode jsonNode = objectReaderReadBigDecimals.readTree(json);
return new DefaultJsonRow((ObjectNode) jsonNode, readSchema);
} catch (JsonProcessingException ex) {
throw new RuntimeException(String.format("Could not parse JSON: %s", json), ex);
throw new KernelEngineException(format("Could not parse JSON: %s", json), ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.delta.kernel.defaults.internal.parquet;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.*;
import static java.util.Objects.requireNonNull;
Expand All @@ -34,6 +33,7 @@
import static org.apache.parquet.hadoop.ParquetInputFormat.*;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.exceptions.KernelEngineException;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
Expand Down Expand Up @@ -85,8 +85,8 @@ public boolean hasNext() {
hasNotConsumedNextElement = reader.nextKeyValue() &&
reader.getCurrentValue() != null;
return hasNotConsumedNextElement;
} catch (IOException | InterruptedException ie) {
throw new RuntimeException(ie);
} catch (IOException | InterruptedException ex) {
throw new KernelEngineException("Error reading Parquet file: " + path, ex);
}
}

Expand Down Expand Up @@ -148,7 +148,7 @@ private void initParquetReaderIfRequired() {
reader.initialize(fileReader, confCopy);
} catch (IOException e) {
Utils.closeCloseablesSilently(fileReader, reader);
throw new UncheckedIOException(e);
throw new KernelEngineException("Error reading Parquet file: " + path, e);
}
}
}
Expand Down
Loading
Loading