From e0da30d6411e30009214ddb662e45fb01ce99fbc Mon Sep 17 00:00:00 2001 From: Himanish Kushary Date: Wed, 24 May 2023 13:04:53 -0400 Subject: [PATCH] Bug fix for duplicate source config entries --- .../com/databricks/labs/deltaoms/common/OMSOperations.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/scala/com/databricks/labs/deltaoms/common/OMSOperations.scala b/src/main/scala/com/databricks/labs/deltaoms/common/OMSOperations.scala index 2f308db..9fd5d56 100644 --- a/src/main/scala/com/databricks/labs/deltaoms/common/OMSOperations.scala +++ b/src/main/scala/com/databricks/labs/deltaoms/common/OMSOperations.scala @@ -51,7 +51,7 @@ trait OMSOperations extends Serializable with SparkSettings with Logging with Sc def fetchSourceConfigForProcessing(config: OMSConfig): Array[SourceConfig] = { val sourceConfigs = spark.read.table(getSourceConfigTableName(config)) - .where(s"$SKIP_PROCESSING <> true").select(PATH, SKIP_PROCESSING) + .where(s"$SKIP_PROCESSING <> true").select(PATH, SKIP_PROCESSING).distinct() processWildcardDirectories(sourceConfigs).collect() } @@ -106,7 +106,6 @@ trait OMSOperations extends Serializable with SparkSettings with Logging with Sc s"""pathconfig.$PUID = pathconfig_updates.$PUID and |pathconfig.$WUID = pathconfig_updates.$WUID |""".stripMargin) - .whenMatched.updateExpr(Map(s"$UPDATE_TS" -> s"pathconfig_updates.$UPDATE_TS")) .whenNotMatched.insertAll().execute() case Failure(ex) => throw new RuntimeException(s"Unable to update the Path Config table. $ex") }