Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ case class FlowSystemMetadata(
* which is storage/_checkpoints/flow_destination_table/flow_name.
* @return the checkpoint root directory for `flow`
*/
private def flowCheckpointsDirOpt(): Option[Path] = {
def flowCheckpointsDirOpt(): Option[Path] = {
Option(if (graph.table.contains(flow.destinationIdentifier) ||
graph.sink.contains(flow.destinationIdentifier)) {
val checkpointRoot = new Path(context.storageRoot, "_checkpoints")
val flowTableName = flow.destinationIdentifier.table
val flowTableId = flow.destinationIdentifier.nameParts.mkString(Path.SEPARATOR)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*.nameParts.mkString(Path.SEPARATOR) looks a little tacky to me. Could you add more function comment about this at line 41?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work for Apache Iceberg database and tables, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 will add a comment.

This is agnostic to the table format; it's just controlling the directory that we're storing streaming checkpoints in. That directory is keyed by the table name, but we're not actually putting it inside the table directory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do it later as a follow-up. Let me merge this first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 here's the followup: #53089. I slotted it under the same JIRA because it's a continuation of this PR, but let me know if it's preferred to file a new JIRA.

val flowName = flow.identifier.table
val checkpointDir = new Path(
new Path(checkpointRoot, flowTableName),
new Path(checkpointRoot, flowTableId),
flowName
)
logInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,38 @@ class SystemMetadataSuite
updateContext2
)
}

test("checkpoint dirs for tables with same name but different schema don't collide") {
val session = spark
import session.implicits._

// create a pipeline with only a single ST
val graph = new TestGraphRegistrationContext(spark) {
implicit val sparkSession: SparkSession = spark
val mem: MemoryStream[Int] = MemoryStream[Int]
mem.addData(1, 2, 3)
registerView("a", query = dfFlowFunc(mem.toDF()))
registerTable("st")
registerFlow("st", "st", query = readStreamFlowFunc("a"))
registerTable("schema2.st")
registerFlow("schema2.st", "schema2.st", query = readStreamFlowFunc("a"))
}.toDataflowGraph

val updateContext = TestPipelineUpdateContext(
unresolvedGraph = graph,
spark = spark,
storageRoot = storageRoot,
failOnErrorEvent = true
)

val stFlow = graph.flow(fullyQualifiedIdentifier("st"))
val schema2StFlow = graph.flow(fullyQualifiedIdentifier("st", database = Option("schema2")))
val stSystemMetadata = FlowSystemMetadata(updateContext, stFlow, graph)
val schema2StSystemMetadata = FlowSystemMetadata(updateContext, schema2StFlow, graph)
assert(
stSystemMetadata.flowCheckpointsDirOpt() != schema2StSystemMetadata.flowCheckpointsDirOpt()
)
}
}

trait SystemMetadataTestHelpers {
Expand All @@ -242,8 +274,9 @@ trait SystemMetadataTestHelpers {
): Path = {
val expectedRawCheckPointDir = tableOrSinkElement match {
case t if t.isInstanceOf[Table] || t.isInstanceOf[Sink] =>
val tableId = t.identifier.nameParts.mkString(Path.SEPARATOR)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need this multiple places, we had better a utility method for this conversion to be safe, @sryza .

new Path(updateContext.storageRoot)
.suffix(s"/_checkpoints/${t.identifier.table}/${flowElement.identifier.table}")
.suffix(s"/_checkpoints/$tableId/${flowElement.identifier.table}")
.toString
case _ =>
fail(
Expand Down