diff --git a/spark/src/main/scala/ai/chronon/spark/JoinBase.scala b/spark/src/main/scala/ai/chronon/spark/JoinBase.scala index 3e86e2fb9..c1af322fe 100644 --- a/spark/src/main/scala/ai/chronon/spark/JoinBase.scala +++ b/spark/src/main/scala/ai/chronon/spark/JoinBase.scala @@ -410,16 +410,6 @@ abstract class JoinBase(joinConf: api.Join, s"groupBy.metaData.team needs to be set for joinPart ${jp.groupBy.metaData.name}") } - val source = joinConf.left - if (useBootstrapForLeft) { - logger.info("Overwriting left side to use saved Bootstrap table...") - source.overwriteTable(bootstrapTable) - val query = source.query - // sets map and where clauses already applied to bootstrap transformation - query.setSelects(null) - query.setWheres(null) - } - // Run validations before starting the job val today = tableUtils.partitionSpec.at(System.currentTimeMillis()) val analyzer = new Analyzer(tableUtils, joinConf, today, today, silenceMode = true) @@ -439,9 +429,21 @@ abstract class JoinBase(joinConf: api.Join, // First run command to archive tables that have changed semantically since the last run val archivedAtTs = Instant.now() + // TODO: We should not archive the output table in the case of selected join parts mode tablesToRecompute(joinConf, outputTable, tableUtils).foreach( tableUtils.archiveOrDropTableIfExists(_, Some(archivedAtTs))) + // Check semantic hash before overwriting left side + val source = joinConf.left + if (useBootstrapForLeft) { + logger.info("Overwriting left side to use saved Bootstrap table...") + source.overwriteTable(bootstrapTable) + val query = source.query + // sets map and where clauses already applied to bootstrap transformation + query.setSelects(null) + query.setWheres(null) + } + // detect holes and chunks to fill // OverrideStartPartition is used to replace the start partition of the join config. This is useful when // 1 - User would like to test run with different start partition