From eeb0ef6931d50aa738ad049e336d0b520b3051b4 Mon Sep 17 00:00:00 2001 From: Ben Mears Date: Tue, 21 Nov 2023 17:51:40 -0800 Subject: [PATCH 1/5] Break up joins --- spark/src/main/scala/ai/chronon/spark/Join.scala | 9 ++++++++- spark/src/main/scala/ai/chronon/spark/TableUtils.scala | 3 +++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/ai/chronon/spark/Join.scala b/spark/src/main/scala/ai/chronon/spark/Join.scala index 0adff47f4..d3bd387b0 100644 --- a/spark/src/main/scala/ai/chronon/spark/Join.scala +++ b/spark/src/main/scala/ai/chronon/spark/Join.scala @@ -67,6 +67,7 @@ class Join(joinConf: api.Join, extends JoinBase(joinConf, endPartition, tableUtils, skipFirstHole, mutationScan, showDf) { private val bootstrapTable = joinConf.metaData.bootstrapTable + private val joinsAtATime = 8 private def padFields(df: DataFrame, structType: sql.types.StructType): DataFrame = { structType.foldLeft(df) { @@ -263,7 +264,13 @@ class Join(joinConf: api.Join, // a bootstrap source can cover a partial date range. we combine the columns using coalesce-rule rightResults .foldLeft(bootstrapDf) { - case (partialDf, (rightPart, rightDf)) => joinWithLeft(partialDf, rightDf, rightPart) + case (partialDf, ((rightPart, rightDf), i)) => + val next = joinWithLeft(partialDf, rightDf, rightPart) + if (((i + 1) % joinsAtATime) == 0) { + tableUtils.addJoinBreak(next) + } else { + next + } } // drop all processing metadata columns .drop(Constants.MatchedHashes, Constants.TimePartitionColumn) diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index 5d9d71144..23de4a3a1 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -324,6 +324,9 @@ case class TableUtils(sparkSession: SparkSession) { df } + def addJoinBreak(dataFrame: DataFrame): DataFrame = + dataFrame.(sparkSession) + def insertUnPartitioned(df: DataFrame, tableName: String, tableProperties: Map[String, String] = null, From 9b787bc3ba8662af552ed90db21cb5c6848f7ad2 Mon Sep 17 00:00:00 2001 From: Ben Mears Date: Tue, 21 Nov 2023 17:54:02 -0800 Subject: [PATCH 2/5] Fix addJoinBreak --- spark/src/main/scala/ai/chronon/spark/TableUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index 23de4a3a1..c72a143c5 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -325,7 +325,7 @@ case class TableUtils(sparkSession: SparkSession) { } def addJoinBreak(dataFrame: DataFrame): DataFrame = - dataFrame.(sparkSession) + dataFrame.cache() def insertUnPartitioned(df: DataFrame, tableName: String, From 5096e4a4b2516893077ceccb71e19ff5b163c5a6 Mon Sep 17 00:00:00 2001 From: Ben Mears Date: Wed, 22 Nov 2023 08:38:59 -0800 Subject: [PATCH 3/5] PR feedback --- spark/src/main/scala/ai/chronon/spark/Join.scala | 8 ++++++-- spark/src/main/scala/ai/chronon/spark/TableUtils.scala | 3 ++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/ai/chronon/spark/Join.scala b/spark/src/main/scala/ai/chronon/spark/Join.scala index d3bd387b0..98ac3cd4e 100644 --- a/spark/src/main/scala/ai/chronon/spark/Join.scala +++ b/spark/src/main/scala/ai/chronon/spark/Join.scala @@ -67,7 +67,6 @@ class Join(joinConf: api.Join, extends JoinBase(joinConf, endPartition, tableUtils, skipFirstHole, mutationScan, showDf) { private val bootstrapTable = joinConf.metaData.bootstrapTable - private val joinsAtATime = 8 private def padFields(df: DataFrame, structType: sql.types.StructType): DataFrame = { structType.foldLeft(df) { @@ -262,12 +261,17 @@ class Join(joinConf: api.Join, // combine bootstrap table and join part tables // sequentially join bootstrap table and each join part table. some column may exist both on left and right because // a bootstrap source can cover a partial date range. we combine the columns using coalesce-rule + var previous: Option[DataFrame] = None rightResults .foldLeft(bootstrapDf) { case (partialDf, ((rightPart, rightDf), i)) => val next = joinWithLeft(partialDf, rightDf, rightPart) - if (((i + 1) % joinsAtATime) == 0) { + // Join breaks are added to prevent the Spark app from stalling on a Join that involves too many + // rightParts. + if (((i + 1) % tableUtils.finalJoinParallelism) == 0 && (i != (rightResults.size - 1))) { tableUtils.addJoinBreak(next) + previous.map(_.unpersist()) + previous = Some(next) } else { next } diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index c72a143c5..75fe58d61 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -68,6 +68,7 @@ case class TableUtils(sparkSession: SparkSession) { val joinPartParallelism: Int = sparkSession.conf.get("spark.chronon.join.part.parallelism", "1").toInt val aggregationParallelism: Int = sparkSession.conf.get("spark.chronon.group_by.parallelism", "1000").toInt + val finalJoinParallelism: Int = sparkSession.conf.get("spark.chronon.join.final_join_parallelism", "8").toInt val maxWait: Int = sparkSession.conf.get("spark.chronon.wait.hours", "48").toInt sparkSession.sparkContext.setLogLevel("ERROR") @@ -325,7 +326,7 @@ case class TableUtils(sparkSession: SparkSession) { } def addJoinBreak(dataFrame: DataFrame): DataFrame = - dataFrame.cache() + dataFrame.persist(cacheLevel.getOrElse(StorageLevel.MEMORY_AND_DISK)) def insertUnPartitioned(df: DataFrame, tableName: String, From d1ec5d86be9ad11f86e19a6d9eec4d2498a60543 Mon Sep 17 00:00:00 2001 From: Ben Mears Date: Wed, 22 Nov 2023 09:14:14 -0800 Subject: [PATCH 4/5] Empty-Commit From 93ecd2cb7106b1ceaab38b30bcaf5f905d58984d Mon Sep 17 00:00:00 2001 From: Ben Mears Date: Wed, 22 Nov 2023 13:03:58 -0800 Subject: [PATCH 5/5] Empty-Commit