Skip to content

Commit

Permalink
Cleanup Delta table properties to be set as Table Properties
Browse files Browse the repository at this point in the history
  • Loading branch information
himanishk committed May 24, 2023
1 parent d2f3ec9 commit 1292075
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,19 @@ trait OMSInitializer extends Serializable with Logging {

def cleanupOMS(config: OMSConfig): Unit = {
if (!isUCEnabled) {
val delSchemaPath = getOMSSchemaPath(config)
val deleteSchemaPath = Try {
deleteDirectory(delSchemaPath)
}
deleteSchemaPath match {
case Success(value) =>
logInfo(s"Successfully deleted the directory ${getOMSSchemaPath(config)}")
case Failure(exception) =>
throw new RuntimeException(s"Unable to delete $delSchemaPath : $exception")
}
dropDatabase(config.schemaName.get)
} else {
dropCatalog(config.catalogName.get)
}
val delSchemaPath = getOMSSchemaPath(config)
val deleteSchemaPath = Try {
deleteDirectory(delSchemaPath)
}
deleteSchemaPath match {
case Success(value) =>
logInfo(s"Successfully deleted the directory ${getOMSSchemaPath(config)}")
case Failure(exception) =>
throw new RuntimeException(s"Unable to delete $delSchemaPath : $exception")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.databricks.labs.deltaoms.utils.UtilityOperations._
import io.delta.tables._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.delta.actions.SingleAction
import org.apache.spark.sql.expressions.Window
Expand All @@ -50,14 +50,12 @@ trait OMSOperations extends Serializable with SparkSettings with Logging with Sc
}

def fetchSourceConfigForProcessing(config: OMSConfig): Array[SourceConfig] = {
val spark = SparkSession.active
val sourceConfigs = spark.read.table(getSourceConfigTableName(config))
.where(s"$SKIP_PROCESSING <> true").select(PATH, SKIP_PROCESSING)
processWildcardDirectories(sourceConfigs).collect()
}

def processWildcardDirectories(sourceConfigs: DataFrame): Dataset[SourceConfig] = {
val spark = SparkSession.active
val hadoopConf = new SerializableConfiguration(spark.sessionState.newHadoopConf())

val nonWildCardSourcePaths = sourceConfigs
Expand Down Expand Up @@ -222,7 +220,6 @@ trait OMSOperations extends Serializable with SparkSettings with Logging with Sc
}

def fetchPathConfigForProcessing(pathConfigTableUrl: String): Dataset[PathConfig] = {
val spark = SparkSession.active
spark.read.format("delta").load(pathConfigTableUrl).as[PathConfig]
}

Expand Down
14 changes: 8 additions & 6 deletions src/main/scala/com/databricks/labs/deltaoms/common/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ trait Utils extends Serializable with Logging with Schemas {
val puidCommitDatePartitions = Seq(PUID, COMMIT_DATE)
private val omsProperties: Map[String, String] =
Map("entity" -> s"$ENTITY_NAME", "oms.version" -> s"$OMS_VERSION")
private val tableProperties = Map("delta.autoOptimize.autoCompact" -> "auto",
"delta.autoOptimize.optimizeWrite" -> "true", "delta.enableChangeDataFeed" -> "true")

def pathConfigTableDefinition(omsConfig: OMSConfig): TableDefinition = {
TableDefinition(tableName = getPathConfigTableName(omsConfig),
Expand All @@ -35,7 +37,7 @@ trait Utils extends Serializable with Logging with Schemas {
locationUrl = getPathConfigTableUrl(omsConfig),
schema = pathConfig,
comment = Some("Delta OMS Path Config Table"),
properties = omsProperties
properties = omsProperties ++ tableProperties
)
}

Expand All @@ -53,7 +55,7 @@ trait Utils extends Serializable with Logging with Schemas {
locationUrl = getSourceConfigTableUrl(omsConfig),
schema = sourceConfig,
comment = Some("Delta OMS Source Config Table"),
properties = omsProperties
properties = omsProperties ++ tableProperties
)
}

Expand All @@ -79,7 +81,7 @@ trait Utils extends Serializable with Logging with Schemas {
locationUrl = getRawActionsTableUrl(omsConfig),
schema = rawAction,
comment = Some("Delta OMS Raw Actions Table"),
properties = omsProperties,
properties = omsProperties ++ tableProperties,
partitionColumnNames = puidCommitDatePartitions)
}

Expand All @@ -97,7 +99,7 @@ trait Utils extends Serializable with Logging with Schemas {
locationUrl = getProcessedHistoryTableUrl(omsConfig),
schema = processedHistory,
comment = Some("Delta OMS Processed History Table"),
properties = omsProperties
properties = omsProperties ++ tableProperties
)
}

Expand All @@ -115,7 +117,7 @@ trait Utils extends Serializable with Logging with Schemas {
locationUrl = getActionSnapshotsTableUrl(omsConfig),
schema = actionSnapshot,
comment = Some("Delta OMS Action Snapshots Table"),
properties = omsProperties,
properties = omsProperties ++ tableProperties,
partitionColumnNames = puidCommitDatePartitions
)
}
Expand All @@ -134,7 +136,7 @@ trait Utils extends Serializable with Logging with Schemas {
locationUrl = getCommitSnapshotsTableUrl(omsConfig),
schema = commitSnapshot,
comment = Some("Delta OMS Commit Snapshot Table"),
properties = omsProperties,
properties = omsProperties ++ tableProperties,
partitionColumnNames = puidCommitDatePartitions
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,10 @@ trait SparkSettings extends Serializable with ConfigurationSettings {
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.appName("DELTA_OMS_INBUILT").getOrCreate()
case _ => val spark = SparkSession.builder().appName("Delta OMS").getOrCreate()
spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed",
value = true)
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", value = true)
spark.conf.set("spark.databricks.delta.autoCompact.enabled", value = true)
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", value = true)
spark.conf.set("spark.databricks.labs.deltaoms.version", value = Schemas.OMS_VERSION)
spark
case _ => val currentSparkSession = SparkSession.active
currentSparkSession.conf
.set("spark.databricks.labs.deltaoms.version", value = Schemas.OMS_VERSION)
currentSparkSession
}

def spark: SparkSession = sparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ trait UtilityOperations extends Serializable with Logging {
if (tableDef.locationUrl.nonEmpty) {
tableCreateSQL.append(s"""LOCATION '${tableDef.locationUrl}' """)
}
if (tableDef.properties.nonEmpty) {
val tableProperties = tableDef.properties.map(_.productIterator.mkString("'", "'='", "'"))
.mkString(",")
tableCreateSQL.append(s"TBLPROPERTIES($tableProperties) ")
}
if (tableDef.comment.nonEmpty) {
tableCreateSQL.append(s"COMMENT '${tableDef.comment.get}'")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,6 @@ class OMSOperationsSuite extends QueryTest

test("getLatestRawActionsVersion commit version") {
val rawActions = getUpdatedRawActions(0, getRawActionsTableUrl(omsConfig))
assert(getLatestRawActionsVersion(rawActions) == 0)
assert(getLatestRawActionsVersion(rawActions) == 2)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,28 @@ class UtilityOperationsSuite extends QueryTest with SharedSparkSession with Delt
s"CREATE TABLE IF NOT EXISTS deltaoms.`deltaoms_test`" +
s".`sourceconfig` (${Schemas.sourceConfig.toDDL}) " +
s"LOCATION '/deltaoms/deltaoms/deltaoms/deltaoms_test/sourceconfig' " +
s"TBLPROPERTIES('delta.enableChangeDataFeed'='true'," +
s"'delta.autoOptimize.autoCompact'='auto'," +
s"'entity'='${Schemas.ENTITY_NAME}','oms.version'='${Schemas.OMS_VERSION}'," +
s"'delta.autoOptimize.optimizeWrite'='true') " +
s"COMMENT 'Delta OMS Source Config Table'"
)
assert(srcConfigTableQuery._2 == "TABLE")

// Path Config Table
val pathConfigTableQuery = UtilityOperations
.tableCreateQuery(pathConfigTableDefinition(testConfig))
assert(pathConfigTableQuery._1 == s"CREATE TABLE IF NOT EXISTS " +
s"${testConfig.catalogName.get}.`${testConfig.schemaName.get}`" +
s".`${testConfig.pathConfigTable}` (${Schemas.pathConfig.toDDL}) " +
s"LOCATION '/deltaoms/deltaoms/deltaoms/deltaoms_test/pathconfig' " +
s"TBLPROPERTIES('delta.enableChangeDataFeed'='true'," +
s"'delta.autoOptimize.autoCompact'='auto'," +
s"'entity'='${Schemas.ENTITY_NAME}','oms.version'='${Schemas.OMS_VERSION}'," +
s"'delta.autoOptimize.optimizeWrite'='true') " +
s"COMMENT 'Delta OMS Path Config Table'"
)

// Raw Actions Table
val rawActionsTableQuery = UtilityOperations
.tableCreateQuery(rawActionsTableDefinition(testConfig))
Expand All @@ -171,8 +181,13 @@ class UtilityOperationsSuite extends QueryTest with SharedSparkSession with Delt
s"(${Schemas.rawAction.toDDL}) " +
s"PARTITIONED BY (${puidCommitDatePartitions.mkString(",")}) " +
s"LOCATION '/deltaoms/deltaoms/deltaoms/deltaoms_test/rawactions' " +
s"TBLPROPERTIES('delta.enableChangeDataFeed'='true'," +
s"'delta.autoOptimize.autoCompact'='auto'," +
s"'entity'='${Schemas.ENTITY_NAME}','oms.version'='${Schemas.OMS_VERSION}'," +
s"'delta.autoOptimize.optimizeWrite'='true') " +
s"COMMENT 'Delta OMS Raw Actions Table'"
)

// Processing History Table
val processingHistoryTableQuery = UtilityOperations
.tableCreateQuery(processedHistoryTableDefinition(testConfig))
Expand All @@ -181,8 +196,44 @@ class UtilityOperationsSuite extends QueryTest with SharedSparkSession with Delt
s".`${testConfig.schemaName.get}`.`${testConfig.processedHistoryTable}` " +
s"(${Schemas.processedHistory.toDDL}) " +
s"LOCATION '/deltaoms/deltaoms/deltaoms/deltaoms_test/processedhistory' " +
s"TBLPROPERTIES('delta.enableChangeDataFeed'='true'," +
s"'delta.autoOptimize.autoCompact'='auto'," +
s"'entity'='${Schemas.ENTITY_NAME}','oms.version'='${Schemas.OMS_VERSION}'," +
s"'delta.autoOptimize.optimizeWrite'='true') " +
s"COMMENT 'Delta OMS Processed History Table'"
)

// Commit Snapshot Table
val commitSnapshotTableQuery = UtilityOperations
.tableCreateQuery(commitSnapshotsTableDefinition(testConfig))
assert(commitSnapshotTableQuery._1 == s"CREATE TABLE IF NOT EXISTS " +
s"${testConfig.catalogName.get}" +
s".`${testConfig.schemaName.get}`.`${testConfig.commitInfoSnapshotTable}` " +
s"(${Schemas.commitSnapshot.toDDL}) " +
s"PARTITIONED BY (${puidCommitDatePartitions.mkString(",")}) " +
s"LOCATION '/deltaoms/deltaoms/deltaoms/deltaoms_test/commitinfosnapshots' " +
s"TBLPROPERTIES('delta.enableChangeDataFeed'='true'," +
s"'delta.autoOptimize.autoCompact'='auto'," +
s"'entity'='${Schemas.ENTITY_NAME}','oms.version'='${Schemas.OMS_VERSION}'," +
s"'delta.autoOptimize.optimizeWrite'='true') " +
s"COMMENT 'Delta OMS Commit Snapshot Table'"
)

// Action Snapshot Table
val actionSnapshotsTableQuery = UtilityOperations
.tableCreateQuery(actionSnapshotsTableDefinition(testConfig))
assert(actionSnapshotsTableQuery._1 == s"CREATE TABLE IF NOT EXISTS " +
s"${testConfig.catalogName.get}" +
s".`${testConfig.schemaName.get}`.`${testConfig.actionSnapshotTable}` " +
s"(${Schemas.actionSnapshot.toDDL}) " +
s"PARTITIONED BY (${puidCommitDatePartitions.mkString(",")}) " +
s"LOCATION '/deltaoms/deltaoms/deltaoms/deltaoms_test/actionsnapshots' " +
s"TBLPROPERTIES('delta.enableChangeDataFeed'='true'," +
s"'delta.autoOptimize.autoCompact'='auto'," +
s"'entity'='${Schemas.ENTITY_NAME}','oms.version'='${Schemas.OMS_VERSION}'," +
s"'delta.autoOptimize.optimizeWrite'='true') " +
s"COMMENT 'Delta OMS Action Snapshots Table'"
)
}

test("resolveDeltaLocation Exception for hive_metastore") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ trait DeltaTestSharedSession {
session.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "delta")
session.conf.set(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key, value = false)
session.conf.set("spark.databricks.labs.deltaoms.ucenabled", value = false)
session.conf.set("spark.databricks.delta.allowArbitraryProperties.enabled", value = true)
// session.conf.set(SQLConf.CONVERT_CTAS.key, value = true)
session
}
Expand Down

0 comments on commit 1292075

Please sign in to comment.