Skip to content

Commit 40ba346

Browse files
[Kernel][Metrics][PR#2] Add SnapshotReport for reporting snapshot construction (#3903)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Adds a `SnapshotReport` for reporting snapshot construction. We record a `SnapshotReport` after successfully constructing a snapshot or if an exception is thrown during construction. We use `SnapshotContext` to propagate and update information about the snapshot construction. For example, we update the "version" as soon as it's resolved for time-travel by timestamp or load latest snapshot queries. This means in the case of the exception we include as much information as is available. ## How was this patch tested? Adds a test suite `MetricsReportSuite` with unit tests. ## Does this PR introduce _any_ user-facing changes? No.
1 parent ecc5109 commit 40ba346

File tree

13 files changed

+926
-40
lines changed

13 files changed

+926
-40
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,13 @@
2626
import io.delta.kernel.internal.actions.Metadata;
2727
import io.delta.kernel.internal.actions.Protocol;
2828
import io.delta.kernel.internal.fs.Path;
29+
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
30+
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
2931
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
3032
import io.delta.kernel.internal.replay.LogReplay;
3133
import io.delta.kernel.internal.snapshot.LogSegment;
3234
import io.delta.kernel.internal.util.VectorUtils;
35+
import io.delta.kernel.metrics.SnapshotReport;
3336
import io.delta.kernel.types.StructType;
3437
import java.util.List;
3538
import java.util.Map;
@@ -45,13 +48,15 @@ public class SnapshotImpl implements Snapshot {
4548
private final Metadata metadata;
4649
private final LogSegment logSegment;
4750
private Optional<Long> inCommitTimestampOpt;
51+
private final SnapshotReport snapshotReport;
4852

4953
public SnapshotImpl(
5054
Path dataPath,
5155
LogSegment logSegment,
5256
LogReplay logReplay,
5357
Protocol protocol,
54-
Metadata metadata) {
58+
Metadata metadata,
59+
SnapshotQueryContext snapshotContext) {
5560
this.logPath = new Path(dataPath, "_delta_log");
5661
this.dataPath = dataPath;
5762
this.version = logSegment.version;
@@ -60,6 +65,7 @@ public SnapshotImpl(
6065
this.protocol = protocol;
6166
this.metadata = metadata;
6267
this.inCommitTimestampOpt = Optional.empty();
68+
this.snapshotReport = SnapshotReportImpl.forSuccess(snapshotContext);
6369
}
6470

6571
/////////////////
@@ -107,6 +113,7 @@ public StructType getSchema(Engine engine) {
107113

108114
@Override
109115
public ScanBuilder getScanBuilder(Engine engine) {
116+
// TODO when we add ScanReport we will pass the SnapshotReport downstream here
110117
return new ScanBuilderImpl(dataPath, protocol, metadata, getSchema(engine), logReplay, engine);
111118
}
112119

@@ -130,6 +137,10 @@ public List<String> getPartitionColumnNames(Engine engine) {
130137
return VectorUtils.toJavaList(getMetadata().getPartitionColumns());
131138
}
132139

140+
public SnapshotReport getSnapshotReport() {
141+
return snapshotReport;
142+
}
143+
133144
/**
134145
* Get the domain metadata map from the log replay, which lazily loads and replays a history of
135146
* domain metadata actions, resolving them to produce the current state of the domain metadata.

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@
2626
import io.delta.kernel.exceptions.TableNotFoundException;
2727
import io.delta.kernel.internal.actions.Protocol;
2828
import io.delta.kernel.internal.fs.Path;
29+
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
30+
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
2931
import io.delta.kernel.internal.snapshot.SnapshotManager;
3032
import io.delta.kernel.internal.util.Clock;
33+
import io.delta.kernel.metrics.SnapshotReport;
3134
import io.delta.kernel.types.StructField;
3235
import io.delta.kernel.types.StructType;
3336
import io.delta.kernel.utils.CloseableIterator;
@@ -90,19 +93,39 @@ public String getPath(Engine engine) {
9093

9194
@Override
9295
public Snapshot getLatestSnapshot(Engine engine) throws TableNotFoundException {
93-
return snapshotManager.buildLatestSnapshot(engine);
96+
SnapshotQueryContext snapshotContext = SnapshotQueryContext.forLatestSnapshot(tablePath);
97+
try {
98+
return snapshotManager.buildLatestSnapshot(engine, snapshotContext);
99+
} catch (Exception e) {
100+
recordSnapshotErrorReport(engine, snapshotContext, e);
101+
throw e;
102+
}
94103
}
95104

96105
@Override
97106
public Snapshot getSnapshotAsOfVersion(Engine engine, long versionId)
98107
throws TableNotFoundException {
99-
return snapshotManager.getSnapshotAt(engine, versionId);
108+
SnapshotQueryContext snapshotContext =
109+
SnapshotQueryContext.forVersionSnapshot(tablePath, versionId);
110+
try {
111+
return snapshotManager.getSnapshotAt(engine, versionId, snapshotContext);
112+
} catch (Exception e) {
113+
recordSnapshotErrorReport(engine, snapshotContext, e);
114+
throw e;
115+
}
100116
}
101117

102118
@Override
103119
public Snapshot getSnapshotAsOfTimestamp(Engine engine, long millisSinceEpochUTC)
104120
throws TableNotFoundException {
105-
return snapshotManager.getSnapshotForTimestamp(engine, millisSinceEpochUTC);
121+
SnapshotQueryContext snapshotContext =
122+
SnapshotQueryContext.forTimestampSnapshot(tablePath, millisSinceEpochUTC);
123+
try {
124+
return snapshotManager.getSnapshotForTimestamp(engine, millisSinceEpochUTC, snapshotContext);
125+
} catch (Exception e) {
126+
recordSnapshotErrorReport(engine, snapshotContext, e);
127+
throw e;
128+
}
106129
}
107130

108131
@Override
@@ -316,4 +339,11 @@ private CloseableIterator<ColumnarBatch> getRawChanges(
316339
logger.info("{}: Reading the commit files with readSchema {}", tablePath, readSchema);
317340
return DeltaLogActionUtils.readCommitFiles(engine, commitFiles, readSchema);
318341
}
342+
343+
/** Creates a {@link SnapshotReport} and pushes it to any {@link MetricsReporter}s. */
344+
private void recordSnapshotErrorReport(
345+
Engine engine, SnapshotQueryContext snapshotContext, Exception e) {
346+
SnapshotReport snapshotReport = SnapshotReportImpl.forError(snapshotContext, e);
347+
engine.getMetricsReporters().forEach(reporter -> reporter.report(snapshotReport));
348+
}
319349
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import io.delta.kernel.exceptions.TableNotFoundException;
3232
import io.delta.kernel.internal.actions.*;
3333
import io.delta.kernel.internal.fs.Path;
34+
import io.delta.kernel.internal.metrics.SnapshotMetrics;
35+
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
3436
import io.delta.kernel.internal.replay.LogReplay;
3537
import io.delta.kernel.internal.snapshot.LogSegment;
3638
import io.delta.kernel.internal.snapshot.SnapshotHint;
@@ -105,8 +107,11 @@ public Transaction build(Engine engine) {
105107
// Table doesn't exist yet. Create an initial snapshot with the new schema.
106108
Metadata metadata = getInitialMetadata();
107109
Protocol protocol = getInitialProtocol();
108-
LogReplay logReplay = getEmptyLogReplay(engine, metadata, protocol);
109-
snapshot = new InitialSnapshot(table.getDataPath(), logReplay, metadata, protocol);
110+
SnapshotQueryContext snapshotContext = SnapshotQueryContext.forVersionSnapshot(tablePath, -1);
111+
LogReplay logReplay =
112+
getEmptyLogReplay(engine, metadata, protocol, snapshotContext.getSnapshotMetrics());
113+
snapshot =
114+
new InitialSnapshot(table.getDataPath(), logReplay, metadata, protocol, snapshotContext);
110115
}
111116

112117
boolean isNewTable = snapshot.getVersion(engine) < 0;
@@ -204,8 +209,19 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
204209
}
205210

206211
private class InitialSnapshot extends SnapshotImpl {
207-
InitialSnapshot(Path dataPath, LogReplay logReplay, Metadata metadata, Protocol protocol) {
208-
super(dataPath, LogSegment.empty(table.getLogPath()), logReplay, protocol, metadata);
212+
InitialSnapshot(
213+
Path dataPath,
214+
LogReplay logReplay,
215+
Metadata metadata,
216+
Protocol protocol,
217+
SnapshotQueryContext snapshotContext) {
218+
super(
219+
dataPath,
220+
LogSegment.empty(table.getLogPath()),
221+
logReplay,
222+
protocol,
223+
metadata,
224+
snapshotContext);
209225
}
210226

211227
@Override
@@ -214,14 +230,16 @@ public long getTimestamp(Engine engine) {
214230
}
215231
}
216232

217-
private LogReplay getEmptyLogReplay(Engine engine, Metadata metadata, Protocol protocol) {
233+
private LogReplay getEmptyLogReplay(
234+
Engine engine, Metadata metadata, Protocol protocol, SnapshotMetrics snapshotMetrics) {
218235
return new LogReplay(
219236
table.getLogPath(),
220237
table.getDataPath(),
221238
-1,
222239
engine,
223240
LogSegment.empty(table.getLogPath()),
224-
Optional.empty()) {
241+
Optional.empty(),
242+
snapshotMetrics) {
225243

226244
@Override
227245
protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.metrics;
17+
18+
import io.delta.kernel.metrics.SnapshotMetricsResult;
19+
import java.util.Optional;
20+
21+
/**
22+
* Stores the metrics for an ongoing snapshot construction. These metrics are updated and recorded
23+
* throughout the snapshot query using this class.
24+
*
25+
* <p>At report time, we create an immutable {@link SnapshotMetricsResult} from an instance of
26+
* {@link SnapshotMetrics} to capture the metrics collected during the query. The {@link
27+
* SnapshotMetricsResult} interface exposes getters for any metrics collected in this class.
28+
*/
29+
public class SnapshotMetrics {
30+
31+
public final Timer timestampToVersionResolutionTimer = new Timer();
32+
33+
public final Timer loadInitialDeltaActionsTimer = new Timer();
34+
35+
public SnapshotMetricsResult captureSnapshotMetricsResult() {
36+
return new SnapshotMetricsResult() {
37+
38+
final Optional<Long> timestampToVersionResolutionDurationResult =
39+
timestampToVersionResolutionTimer.totalDurationIfRecorded();
40+
final long loadInitialDeltaActionsDurationResult =
41+
loadInitialDeltaActionsTimer.totalDurationNs();
42+
43+
@Override
44+
public Optional<Long> timestampToVersionResolutionDurationNs() {
45+
return timestampToVersionResolutionDurationResult;
46+
}
47+
48+
@Override
49+
public long loadInitialDeltaActionsDurationNs() {
50+
return loadInitialDeltaActionsDurationResult;
51+
}
52+
};
53+
}
54+
55+
@Override
56+
public String toString() {
57+
return String.format(
58+
"SnapshotMetrics(timestampToVersionResolutionTimer=%s, "
59+
+ "loadInitialDeltaActionsTimer=%s)",
60+
timestampToVersionResolutionTimer, loadInitialDeltaActionsTimer);
61+
}
62+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.metrics;
17+
18+
import io.delta.kernel.metrics.SnapshotReport;
19+
import java.util.Optional;
20+
21+
/**
22+
* Stores the context for a given Snapshot query. This includes information about the query
23+
* parameters (i.e. table path, time travel parameters), updated state as the snapshot query
24+
* progresses (i.e. resolved version), and metrics.
25+
*
26+
* <p>This is used to generate a {@link SnapshotReport}. It exists from snapshot query initiation
27+
* until either successful snapshot construction or failure.
28+
*/
29+
public class SnapshotQueryContext {
30+
31+
/** Creates a {@link SnapshotQueryContext} for a Snapshot created by a latest snapshot query */
32+
public static SnapshotQueryContext forLatestSnapshot(String tablePath) {
33+
return new SnapshotQueryContext(tablePath, Optional.empty(), Optional.empty());
34+
}
35+
36+
/** Creates a {@link SnapshotQueryContext} for a Snapshot created by a AS OF VERSION query */
37+
public static SnapshotQueryContext forVersionSnapshot(String tablePath, long version) {
38+
return new SnapshotQueryContext(tablePath, Optional.of(version), Optional.empty());
39+
}
40+
41+
/** Creates a {@link SnapshotQueryContext} for a Snapshot created by a AS OF TIMESTAMP query */
42+
public static SnapshotQueryContext forTimestampSnapshot(String tablePath, long timestamp) {
43+
return new SnapshotQueryContext(tablePath, Optional.empty(), Optional.of(timestamp));
44+
}
45+
46+
private final String tablePath;
47+
private Optional<Long> version;
48+
private final Optional<Long> providedTimestamp;
49+
private final SnapshotMetrics snapshotMetrics = new SnapshotMetrics();
50+
51+
/**
52+
* @param tablePath the table path for the table being queried
53+
* @param providedVersion the provided version for a time-travel-by-version query, empty if this
54+
* is not a time-travel-by-version query
55+
* @param providedTimestamp the provided timestamp for a time-travel-by-timestamp query, empty if
56+
* this is not a time-travel-by-timestamp query
57+
*/
58+
private SnapshotQueryContext(
59+
String tablePath, Optional<Long> providedVersion, Optional<Long> providedTimestamp) {
60+
this.tablePath = tablePath;
61+
this.version = providedVersion;
62+
this.providedTimestamp = providedTimestamp;
63+
}
64+
65+
public String getTablePath() {
66+
return tablePath;
67+
}
68+
69+
public Optional<Long> getVersion() {
70+
return version;
71+
}
72+
73+
public Optional<Long> getProvidedTimestamp() {
74+
return providedTimestamp;
75+
}
76+
77+
public SnapshotMetrics getSnapshotMetrics() {
78+
return snapshotMetrics;
79+
}
80+
81+
/**
82+
* Updates the {@code version} stored in this snapshot context. This version should be updated
83+
* upon version resolution for non time-travel-by-version queries. For latest snapshot queries
84+
* this is after log segment construction. For time-travel by timestamp queries this is after
85+
* timestamp to version resolution.
86+
*/
87+
public void setVersion(long updatedVersion) {
88+
version = Optional.of(updatedVersion);
89+
}
90+
91+
@Override
92+
public String toString() {
93+
return String.format(
94+
"SnapshotQueryContext(tablePath=%s, version=%s, providedTimestamp=%s, snapshotMetric=%s)",
95+
tablePath, version, providedTimestamp, snapshotMetrics);
96+
}
97+
}

0 commit comments

Comments
 (0)