Skip to content

[Spark] Add RowTrackingBackfillCommand#3449

Merged
tdas merged 5 commits intodelta-io:masterfrom
longvu-db:stack/delta-row-tracking-backfill-command
Aug 2, 2024
Merged

[Spark] Add RowTrackingBackfillCommand#3449
tdas merged 5 commits intodelta-io:masterfrom
longvu-db:stack/delta-row-tracking-backfill-command

Conversation

@longvu-db
Copy link
Contributor

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Adding the RowTrackingBackfillCommand, the ability to assign row IDs to table rows after the table creation.

How was this patch tested?

Added UTs.

Does this PR introduce any user-facing changes?

No.

Comment on lines +55 to +69
def recordBackfillBatchStats(txnId: String, wasSuccessful: Boolean): Unit = {
if (wasSuccessful) {
numSuccessfulBatch.incrementAndGet()
} else {
numFailedBatch.incrementAndGet()
}
val totalExecutionTimeInMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)
val batchStats = BackfillBatchStats(
origTxn.txnId, txnId, batchId, filesInBatch.size, totalExecutionTimeInMs, wasSuccessful)
recordDeltaEvent(
origTxn.deltaLog,
opType = backfillBatchStatsOpType,
data = batchStats
)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: define as a private method in BackfillBatch and add a comment for better readability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johanl-db I'm not sure if it is more readable since

  1. It is separate from the function, so I will have to look back and forth.
  2. I will have to pass origTxn, startTimeMs, batchId, numSuccessfulBatch, numFailedBatch as arguments.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, nevermind

Comment on lines +92 to +93
val mayInterruptIfRunning = false
futures.foreach(_.cancel(mayInterruptIfRunning))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val mayInterruptIfRunning = false
futures.foreach(_.cancel(mayInterruptIfRunning))
futures.foreach(_.cancel(mayInterruptIfRunning = false))

Copy link
Contributor Author

@longvu-db longvu-db Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johanl-db This actually doesn't work because cancel is from Java Future so we don't have the Scala's syntactic sugar.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see

Copy link
Collaborator

@johanl-db johanl-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, some minor comments

@longvu-db longvu-db requested a review from johanl-db July 31, 2024 14:35
@tdas tdas merged commit 8eb7a4f into delta-io:master Aug 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants