Skip to content

Commit eca5a7f

Browse files
authored
[Spark] InCommitTimestamp: Use clock.currentTimeMillis() instead of nanoTime() in commitLarge (#3111)
## Description We currently use NANOSECONDS.toMillis(System.nanoTime()) for generating the ICT when `commitLarge` is called. However, this usage of System.nanoTime() is not correct as it should only be used for measuring time difference, not to get an approximate wall clock time. This leads to scenarios where the ICT becomes very small (e.g. 1 Jan 1970) sometimes because some systems return a very small number when System.nanoTime() is called. This PR changes this so that clock.getCurrentTimeMillis() is used instead. ## How was this patch tested? Added a test case to ensure that `clock.getCurrentTimeMillis()` is being used.
1 parent e15132b commit eca5a7f

File tree

2 files changed

+41
-2
lines changed

2 files changed

+41
-2
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1337,10 +1337,11 @@ trait OptimisticTransactionImpl extends TransactionalWrite
13371337

13381338
try {
13391339
val tags = Map.empty[String, String]
1340+
val commitTimestampMs = clock.getTimeMillis()
13401341
val commitInfo = CommitInfo(
1341-
NANOSECONDS.toMillis(commitStartNano),
1342+
commitTimestampMs,
13421343
operation = op.name,
1343-
generateInCommitTimestampForFirstCommitAttempt(NANOSECONDS.toMillis(commitStartNano)),
1344+
generateInCommitTimestampForFirstCommitAttempt(commitTimestampMs),
13441345
operationParameters = op.jsonEncodedValues,
13451346
context,
13461347
readVersion = Some(readVersion),

spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,44 @@ class InCommitTimestampSuite
143143
}
144144
}
145145

146+
for (useCommitLarge <- BOOLEAN_DOMAIN)
147+
test("txn.commit should use clock.currentTimeMillis() for ICT" +
148+
s" [useCommitLarge: $useCommitLarge]") {
149+
withTempDir { tempDir =>
150+
spark.range(2).write.format("delta").save(tempDir.getAbsolutePath)
151+
// Clear the DeltaLog cache so that a new DeltaLog is created with the manual clock.
152+
DeltaLog.clearCache()
153+
val expectedCommit1Time = System.currentTimeMillis()
154+
val clock = new ManualClock(expectedCommit1Time)
155+
val deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock)
156+
val ver0Snapshot = deltaLog.snapshot
157+
assert(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(ver0Snapshot.metadata))
158+
val usageRecords = Log4jUsageLogger.track {
159+
if (useCommitLarge) {
160+
deltaLog.startTransaction().commitLarge(
161+
spark,
162+
Seq(createTestAddFile("1")).toIterator,
163+
newProtocolOpt = None,
164+
DeltaOperations.ManualUpdate,
165+
context = Map.empty,
166+
metrics = Map.empty)
167+
} else {
168+
deltaLog.startTransaction().commit(
169+
Seq(createTestAddFile("1")),
170+
DeltaOperations.ManualUpdate,
171+
tags = Map.empty
172+
)
173+
}
174+
}
175+
val ver1Snapshot = deltaLog.snapshot
176+
val retrievedTimestamp = getInCommitTimestamp(deltaLog, version = 1)
177+
assert(ver1Snapshot.timestamp == retrievedTimestamp)
178+
assert(ver1Snapshot.timestamp == expectedCommit1Time)
179+
val expectedOpType = if (useCommitLarge) "delta.commit.large" else "delta.commit"
180+
assert(filterUsageRecords(usageRecords, expectedOpType).length == 1)
181+
}
182+
}
183+
146184
test("Missing CommitInfo should result in a DELTA_MISSING_COMMIT_INFO exception") {
147185
withTempDir { tempDir =>
148186
spark.range(10).write.format("delta").save(tempDir.getAbsolutePath)

0 commit comments

Comments
 (0)