diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SystemMetadata.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SystemMetadata.scala index a9db28c33124..e3c93f97a54d 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SystemMetadata.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SystemMetadata.scala @@ -40,14 +40,14 @@ case class FlowSystemMetadata( * which is storage/_checkpoints/flow_destination_table/flow_name. * @return the checkpoint root directory for `flow` */ - private def flowCheckpointsDirOpt(): Option[Path] = { + def flowCheckpointsDirOpt(): Option[Path] = { Option(if (graph.table.contains(flow.destinationIdentifier) || graph.sink.contains(flow.destinationIdentifier)) { val checkpointRoot = new Path(context.storageRoot, "_checkpoints") - val flowTableName = flow.destinationIdentifier.table + val flowTableId = flow.destinationIdentifier.nameParts.mkString(Path.SEPARATOR) val flowName = flow.identifier.table val checkpointDir = new Path( - new Path(checkpointRoot, flowTableName), + new Path(checkpointRoot, flowTableId), flowName ) logInfo( diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SystemMetadataSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SystemMetadataSuite.scala index 71a4b7f68404..c37a6fb52f95 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SystemMetadataSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SystemMetadataSuite.scala @@ -227,6 +227,38 @@ class SystemMetadataSuite updateContext2 ) } + + test("checkpoint dirs for tables with same name but different schema don't collide") { + val session = spark + import session.implicits._ + + // create a pipeline with only a single ST + val graph = new TestGraphRegistrationContext(spark) { + implicit val sparkSession: SparkSession = spark + val mem: MemoryStream[Int] = MemoryStream[Int] + mem.addData(1, 2, 3) + registerView("a", query = dfFlowFunc(mem.toDF())) + registerTable("st") + registerFlow("st", "st", query = readStreamFlowFunc("a")) + registerTable("schema2.st") + registerFlow("schema2.st", "schema2.st", query = readStreamFlowFunc("a")) + }.toDataflowGraph + + val updateContext = TestPipelineUpdateContext( + unresolvedGraph = graph, + spark = spark, + storageRoot = storageRoot, + failOnErrorEvent = true + ) + + val stFlow = graph.flow(fullyQualifiedIdentifier("st")) + val schema2StFlow = graph.flow(fullyQualifiedIdentifier("st", database = Option("schema2"))) + val stSystemMetadata = FlowSystemMetadata(updateContext, stFlow, graph) + val schema2StSystemMetadata = FlowSystemMetadata(updateContext, schema2StFlow, graph) + assert( + stSystemMetadata.flowCheckpointsDirOpt() != schema2StSystemMetadata.flowCheckpointsDirOpt() + ) + } } trait SystemMetadataTestHelpers { @@ -242,8 +274,9 @@ trait SystemMetadataTestHelpers { ): Path = { val expectedRawCheckPointDir = tableOrSinkElement match { case t if t.isInstanceOf[Table] || t.isInstanceOf[Sink] => + val tableId = t.identifier.nameParts.mkString(Path.SEPARATOR) new Path(updateContext.storageRoot) - .suffix(s"/_checkpoints/${t.identifier.table}/${flowElement.identifier.table}") + .suffix(s"/_checkpoints/$tableId/${flowElement.identifier.table}") .toString case _ => fail(