Skip to content

Commit

Permalink
Merge pull request #24 from databrickslabs/dev
Browse files Browse the repository at this point in the history
Upgrade to Delta 2.1.0 and Spark 3.3
  • Loading branch information
himanishk committed Oct 5, 2022
2 parents c5d6aba + 4bd2135 commit ac6161c
Show file tree
Hide file tree
Showing 17 changed files with 205 additions and 92 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

# Delta Operational Metrics Store (DeltaOMS)
Delta Operational Metrics Store (DeltaOMS) is a solution that helps to build a
centralized repository of operational metrics/statistics for your [Lakehouse](http://cidrdb.org/cidr2021/papers/cidr2021_paper17.pdf)
centralized repository of Delta Transaction logs and associated operational metrics/statistics for your [Lakehouse](http://cidrdb.org/cidr2021/papers/cidr2021_paper17.pdf)
built on [Delta Lake](https://github.com/delta-io/delta)

## Project Overview
DeltaOMS provides a solution for automatically collecting operational metrics/statistics from Delta
DeltaOMS provides a solution for automatically collecting Delta Transaction logs and associated operational metrics/statistics from Delta
Lakehouse tables into a separate centralized database. This will enable you to gain centralized access
to the operational metrics for your data in near real-time. This centralized data can be utilized
to gain helpful operational insights, setting up monitoring/alerting and observability for your
Expand Down
21 changes: 11 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ ThisBuild / dynverSonatypeSnapshots := true
lazy val compileScalastyle = taskKey[Unit]("compileScalastyle")
lazy val testScalastyle = taskKey[Unit]("testScalastyle")

val sparkVersion = "3.1.1"
val deltaVersion = "1.0.0"
val sparkVersion = "3.3.0"
val deltaVersion = "2.1.0"

lazy val commonSettings = Seq(
name := "delta-oms",
organization := "com.databricks.labs",
scalaVersion := "2.12.10",
scalaVersion := "2.12.14",
javacOptions ++= Seq("-source", "1.8", "-target", "1.8"),
scalacOptions ++= Seq("-target:jvm-1.8"),
javaOptions += "-Xmx1024m",
Expand Down Expand Up @@ -90,7 +90,7 @@ lazy val root = (project in file(".")).
"io.delta" %% "delta-core" % deltaVersion % "provided",

// Test Dependencies
"org.scalatest" %% "scalatest" % "3.1.0" % Test,
"org.scalatest" %% "scalatest" % "3.2.9" % Test,
"junit" % "junit" % "4.12" % Test,
"com.novocode" % "junit-interface" % "0.11" % Test,
"org.apache.spark" %% "spark-catalyst" % sparkVersion % Test classifier "tests",
Expand All @@ -103,9 +103,9 @@ lazy val root = (project in file(".")).
assembly / test := {},
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false),
assembly / assemblyShadeRules := Seq(
ShadeRule.rename("org.apache.spark.sql.delta.**" ->
"com.databricks.sql.transaction.tahoe.@1").inAll
),
ShadeRule.rename("org.apache.spark.sql.delta.**" ->
"com.databricks.sql.transaction.tahoe.@1").inAll
),
assembly / logLevel := Level.Error,
assembly / artifact := {
val art = (assembly / artifact).value
Expand All @@ -116,14 +116,15 @@ lazy val root = (project in file(".")).
)

def versionFmt(out: sbtdynver.GitDescribeOutput): String = {
if(out.isCleanAfterTag) out.ref.dropPrefix
if (out.isCleanAfterTag) out.ref.dropPrefix
else out.ref.dropPrefix + out.commitSuffix.mkString("-", "+", "") + "-SNAPSHOT"
}

def fallbackVersion(d: java.util.Date): String = s"HEAD-${sbtdynver.DynVer timestamp d}"

inThisBuild(List(
version := dynverGitDescribeOutput.value.mkVersion(versionFmt, fallbackVersion(dynverCurrentDate.value)),
version := dynverGitDescribeOutput.value.mkVersion(versionFmt,
fallbackVersion(dynverCurrentDate.value)),
dynver := {
val d = new java.util.Date
sbtdynver.DynVer.getGitDescribeOutput(d).mkVersion(versionFmt, fallbackVersion(d))
Expand All @@ -133,6 +134,6 @@ inThisBuild(List(
lazy val distribution = project
.settings(commonSettings,
sonatypeProfileName := "com.databricks",
Compile / packageBin := (root / assembly).value,
Compile / packageBin := (root / assembly).value
)

17 changes: 10 additions & 7 deletions docs/content/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ draft: false

# Delta Operational Metrics Store (DeltaOMS)
Delta Operational Metrics Store (DeltaOMS) is a solution that helps to build a
centralized repository of operational metrics/statistics for your [Lakehouse](http://cidrdb.org/cidr2021/papers/cidr2021_paper17.pdf)
centralized repository of Delta Transaction logs and associated
operational metrics/statistics for your [Lakehouse](http://cidrdb.org/cidr2021/papers/cidr2021_paper17.pdf)
built on [Delta Lake](https://github.com/delta-io/delta)

## Project Overview
DeltaOMS provides a solution for automatically collecting operational metrics/statistics from Delta
Lakehouse tables into a separate centralized database. This will enable you to gain centralized access
to the operational metrics for your data in near real-time. This centralized data can be utilized
to gain helpful operational insights, setting up monitoring/alerting and observability for your
DeltaOMS provides a solution for automatically collecting Delta Transaction logs and associated
operational metrics/statistics from Delta Lakehouse tables into a separate centralized database.
This will enable you to gain centralized access to the Delta log metadata (like file size, file statistics)
and operational metrics for your data in near real-time.

This centralized data can be utilized to gain helpful operational insights, setting up monitoring/alerting and observability for your
Delta Lakehouse ETL Pipelines. It could also help you to identify trends based on data characteristics,
improve ETL pipeline performance, provide capabilities for auditing and traceability of your Delta Lake data etc.

Expand Down Expand Up @@ -48,8 +51,8 @@ make it available for analytics.

## How much does it cost ?
DeltaOMS does not have any **direct cost** associated with it other than the cost to run the jobs
on your environment.The overall cost will be determined primarily by the number of Delta Lakehouse
objects tracked and frequency of the OMS data refresh.
on your environment.The overall cost will be determined primarily by the scale of Delta Lakehouse
objects (Delta transaction log files) tracked and frequency of the OMS data refresh.
We have found that the additional insights gained from DeltaOMS helps reduce the total cost of
ownership through better management and optimization of your data pipelines while providing much
improved view on the operational metrics and statistics for the Delta Lakehouse.
Expand Down
24 changes: 17 additions & 7 deletions docs/content/faq/general.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ This simplifies the process for users looking to gain insights into their Delta
**Q. What typical operational insights would I get from the solution ?**

DeltaOMS centralized repository provides interfaces for custom analysis on the Delta Lake
operational metrics using tools like Apache Spark, Databricks SQL Analytics etc.
operational metrics using tools like Apache Spark, Databricks SQL etc.

For example, it could answer questions like :

Expand All @@ -50,15 +50,25 @@ For example, it could answer questions like :
Data Engineering teams, Data Lake Admins and Operational Analysts would be able to
manage and use this feature for operational insights on the Delta Lake.

**Q Can I run this solution on non-Databricks environment ?**

This project is distributed under Databricks license and cannot be used outside of Databricks environment

**Q. How will I be charged ?**

This solution is fully deployed in the users Databricks environment. The automated jobs for the framework
will run on the Databricks environment.Depending on the configuration set by the users
This solution is fully deployed in the users Databricks or Spark environment. The jobs for the framework
will run on the execution environment.Depending on the configuration set by the users
(for example, update frequency of the audit logs, number of databases/delta path enabled, number of transactions ingested etc.),
the cost of the automated jobs and associated storage cost will vary.

We ran few simple benchmarks on a cluster with 12 cores , 90 GB memory (On-Demand pricing) and noticed the following:
We ran few simple ingestion benchmarks on an AWS based Databricks cluster :

- Initial ingestion and processing of 36,000 Delta transactions took about 12 minutes
- Subsequently, each 1000 incremental transactions got ingested and processed in about 2 minutes
- It costs about $15 for processing 1M transactions
| | Xtra Small | Small | Medium | Large |
| --------------------------------- |------------|--------|--------|--------|
| Initial Txns | 100000 | 87000 | 76400 | 27500 |
| Avg Txns Size | ~1 Kb | ~500 Kb| ~1 MB | ~2.5 MB|
| Approx Total Txns Size | ~100 Mb | ~44 GB | ~76 GB | ~70 GB |
| Cluster Config<br>*- Workers*<br>*- Driver*<br>*- DB Runtime*| <br>**(5) i3.2xl** - 305 GB Mem , 40 Cores <br>**i3.xl** - 61 GB Mem, 8 Cores <br> **DB Runtime** - 11.2 | <br>**(5) i3.4xl** - 610 GB Mem , 80 Cores <br>**i3.2xl** - 61 GB Mem, 8 Cores <br> **DB Runtime** - 11.2 | <br>**(5) i3.4xl** - 610 GB Mem , 80 Cores <br>**i3.2xl** - 61 GB Mem, 8 Cores <br> **DB Runtime** - 11.2 | <br>**(5) i3.4xl** - 610 GB Mem , 80 Cores <br>**i3.2xl** - 61 GB Mem, 8 Cores <br> **DB Runtime** - 11.2 |
| Initial Raw Ingestion Time | ~15 mins | ~ 50 mins | ~ 60 mins | ~ 40 mins |
| Incremental Additional Txns | 1000 | 1000 | 1000 | 1000 |
| Incremental Raw Ingestion Time | ~ 1 min | ~ 2 min | ~ 3 min | ~ 3 mins |
3 changes: 2 additions & 1 deletion docs/content/getting_started/additionalconfigurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ DeltaOMS Spark configuration (spark.conf) details :
| databricks.labs.deltaoms.truncate.path.config | Truncate the internal Path Config table | N | false | false | Configuration |
| databricks.labs.deltaoms.skip.path.config | Skip populating the internal Path Config tables during each streaming ingestion run | N | true | false | Ingestion |
| databricks.labs.deltaoms.skip.initialize | Skip running DeltaOMS initialization for each run | N | true | false | Configuration, Ingestion |
| databricks.labs.deltaoms.trigger.interval | Trigger interval for processing the Delta logs from the configured tables/paths | N | 30s | Once | Ingestion |
| databricks.labs.deltaoms.trigger.interval | Trigger interval for processing the Delta logs from the configured tables/paths | N | 30s | AvailableNow | Ingestion |
| databricks.labs.deltaoms.trigger.max.files | Maximum number of Delta log files to process for each Trigger interval | N | 2048 | 1024 | Ingestion |
| databricks.labs.deltaoms.starting.stream | Starting stream number for the Ingestion Job | N | 10 | 1 | Ingestion |
| databricks.labs.deltaoms.ending.stream | Ending stream number for the Ingestion Job | N | 30 | 50 | Ingestion |
| databricks.labs.deltaoms.use.autoloader | Use Autoloader for the Ingestion Job | N | false | true | Ingestion |
2 changes: 1 addition & 1 deletion docs/content/getting_started/prerequisites.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ Make sure you have the following available before proceeding :
- Proper access permissions to create database, tables and write data to the desired OMS location
- Access to create [libraries](https://docs.databricks.com/libraries/index.html) on your Databricks environment. Required to attach the relevant DeltaOMS (delta-oms) libraries
- (Optional) Able to access the DeltaOMS github [repo](https://github.com/databrickslabs/delta-oms) for demo notebooks and scripts
- Databricks Runtime 9.1 LTS+
- Databricks Runtime 11.0+ environment
9 changes: 0 additions & 9 deletions src/main/resources/log4j.properties

This file was deleted.

39 changes: 39 additions & 0 deletions src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Set everything to be logged to the console
rootLogger.level = warn
rootLogger.appenderRef.stdout.ref = console

appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_OUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

logger.repl.name = org.apache.spark.repl.Main
logger.repl.level = warn

logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
logger.thriftserver.level = warn

# Settings to quiet third party logs that are too verbose
logger.jetty1.name = org.sparkproject.jetty
logger.jetty1.level = warn
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
logger.jetty2.level = error
logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper
logger.replexprTyper.level = info
logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
logger.replSparkILoopInterpreter.level = info
logger.parquet1.name = org.apache.parquet
logger.parquet1.level = error
logger.parquet2.name = parquet
logger.parquet2.level = error

logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
logger.RetryingHMSHandler.level = fatal
logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
logger.FunctionRegistry.level = error

appender.console.filter.1.type = RegexFilter
appender.console.filter.1.regex = .*Thrift error occurred during processing of message.*
appender.console.filter.1.onMatch = deny
appender.console.filter.1.onMismatch = neutral
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ trait OMSOperations extends Serializable with SparkSettings with Logging with Sc
val readStream = streamTargetAndLog._1
val targetInfo = streamTargetAndLog._2
assert(targetInfo.wuid.isDefined, "OMS Readstreams should be associated with WildcardPath")
val triggerInterval = triggerIntervalOption.getOrElse("once")
val trigger = if (triggerInterval.equalsIgnoreCase("once")) {
Trigger.Once()
val triggerInterval = triggerIntervalOption.getOrElse("availableNow")
val trigger = if (triggerInterval.equalsIgnoreCase("availableNow") || triggerInterval.equalsIgnoreCase("once")) { // scalastyle:ignore
Trigger.AvailableNow()
} else {
Trigger.ProcessingTime(triggerInterval)
}
Expand Down Expand Up @@ -211,7 +211,7 @@ trait OMSOperations extends Serializable with SparkSettings with Logging with Sc
fetchStreamTargetAndDeltaLogForPath(p,
config.checkpointBase.get,
config.checkpointSuffix.get,
getRawActionsTablePath(config), config.useAutoloader))
getRawActionsTablePath(config), config.useAutoloader, config.maxFilesPerTrigger))
val logWriteStreamQueries = logReadStreams
.map(lrs => processDeltaLogStreams(lrs,
getRawActionsTablePath(config),
Expand Down Expand Up @@ -253,14 +253,15 @@ trait OMSOperations extends Serializable with SparkSettings with Logging with Sc

def fetchStreamTargetAndDeltaLogForPath(pathInfo: (String, String),
checkpointBaseDir: String, checkpointSuffix: String, rawActionsTablePath: String,
useAutoLoader: Boolean):
useAutoLoader: Boolean, maxFilesPerTrigger: String):
Option[(DataFrame, StreamTargetInfo)] = {
val wildCardPath = pathInfo._1
val wuid = pathInfo._2
val checkpointPath = checkpointBaseDir + "/_oms_checkpoints/raw_actions_" +
wuid + checkpointSuffix

val readPathStream = fetchStreamingDeltaLogForPath(wildCardPath, useAutoLoader)
val readPathStream = fetchStreamingDeltaLogForPath(wildCardPath, useAutoLoader,
maxFilesPerTrigger)
if(readPathStream.isDefined) {
Some(readPathStream.get,
StreamTargetInfo(path = rawActionsTablePath, checkpointPath = checkpointPath,
Expand All @@ -270,33 +271,34 @@ trait OMSOperations extends Serializable with SparkSettings with Logging with Sc
}
}

def fetchStreamingDeltaLogForPath(path: String, useAutoloader: Boolean = true)
def fetchStreamingDeltaLogForPath(path: String, useAutoloader: Boolean = true,
maxFilesPerTrigger: String = "1024")
: Option[DataFrame] = {
val actionSchema: StructType = ScalaReflection.schemaFor[SingleAction].dataType
.asInstanceOf[StructType]
val regex_str = "^(.*)\\/_delta_log\\/(.*)\\.json$"
val file_modification_time = getFileModificationTimeUDF()
val deltaLogDFOpt = if (useAutoloader) {
Some(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.maxFilesPerTrigger", maxFilesPerTrigger)
.option("cloudFiles.useIncrementalListing", "true")
.schema(actionSchema)
.load(path))
.load(path).select("*", "_metadata"))
} else {
getDeltaLogs(actionSchema, path)
getDeltaLogs(actionSchema, path, maxFilesPerTrigger)
}
if (deltaLogDFOpt.nonEmpty) {
val deltaLogDF = deltaLogDFOpt.get
.withColumn(FILE_NAME, col("_metadata.file_path"))
.withColumn(COMMIT_TS, col("_metadata.file_modification_time"))
Some(deltaLogDF
.withColumn(FILE_NAME, input_file_name())
.withColumn(PATH, regexp_extract(col(s"$FILE_NAME"), regex_str, 1))
.withColumn(PUID, substring(sha1(col(s"$PATH")), 0, 7))
.withColumn(COMMIT_VERSION, regexp_extract(col(s"$FILE_NAME"),
regex_str, 2).cast(LongType))
.withColumn(UPDATE_TS, lit(Instant.now()))
.withColumn("modTs", file_modification_time(col(s"$FILE_NAME")))
.withColumn(COMMIT_TS, to_timestamp($"modTs"))
.withColumn(COMMIT_DATE, to_date(col(s"$COMMIT_TS")))
.drop("modTs"))
.drop("_metadata"))
} else {
None
}
Expand Down Expand Up @@ -503,9 +505,12 @@ trait OMSOperations extends Serializable with SparkSettings with Logging with Sc
snapshotInputFiles
}

def getDeltaLogs(schema: StructType, path: String): Option[DataFrame] = {
def getDeltaLogs(schema: StructType, path: String,
maxFilesPerTrigger: String = "1024"): Option[DataFrame] = {
val deltaLogTry = Try {
spark.readStream.schema(schema).json(path)
spark.readStream.schema(schema)
.option("maxFilesPerTrigger", maxFilesPerTrigger)
.json(path).select("*", "_metadata")
}
deltaLogTry match {
case Success(value) => Some(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ trait OMSRunner extends Serializable
val response = httpClient.execute(getClusterByIdGet)
logInfo(EntityUtils.toString(response.getEntity, "UTF-8"))
}
setTrackingHeader()

scala.util.control.Exception.ignoring(classOf[Throwable]) { setTrackingHeader() }

def consolidateAndValidateOMSConfig(args: Array[String], config: OMSConfig): OMSConfig

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ trait OMSSparkConf extends Serializable with SparkSettings {
val SRC_DATABASES = buildConfKey("src.databases")
val TABLE_PATTERN = buildConfKey("table.pattern")
val TRIGGER_INTERVAL = buildConfKey("trigger.interval")
val TRIGGER_MAX_FILES = buildConfKey("trigger.max.files")
val STARTING_STREAM = buildConfKey("starting.stream")
val ENDING_STREAM = buildConfKey("ending.stream")
val USE_AUTOLOADER = buildConfKey("use.autoloader")
Expand All @@ -52,7 +53,8 @@ trait OMSSparkConf extends Serializable with SparkSettings {
RAW_ACTION_TABLE, SOURCE_CONFIG_TABLE, PATH_CONFIG_TABLE, PROCESSED_HISTORY_TABLE,
COMMITINFO_SNAPSHOT_TABLE, ACTION_SNAPSHOT_TABLE, CONSOLIDATE_WILDCARD_PATHS,
TRUNCATE_PATH_CONFIG, SKIP_PATH_CONFIG, SKIP_INITIALIZE, SRC_DATABASES,
TABLE_PATTERN, TRIGGER_INTERVAL, STARTING_STREAM, ENDING_STREAM, USE_AUTOLOADER)
TABLE_PATTERN, TRIGGER_INTERVAL, STARTING_STREAM, ENDING_STREAM, USE_AUTOLOADER,
TRIGGER_MAX_FILES)

def consolidateOMSConfigFromSparkConf(config: OMSConfig): OMSConfig = {
configFields.foldLeft(config) {
Expand Down Expand Up @@ -103,6 +105,9 @@ trait OMSSparkConf extends Serializable with SparkSettings {
case USE_AUTOLOADER => spark.conf.getOption(USE_AUTOLOADER).
fold(omsSparkConfig) {
scv => omsSparkConfig.copy(useAutoloader = scv.toBoolean)}
case TRIGGER_MAX_FILES => spark.conf.getOption(TRIGGER_MAX_FILES).
fold(omsSparkConfig) {
scv => omsSparkConfig.copy(maxFilesPerTrigger = scv)}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ case class OMSConfig(baseLocation: Option[String] = None,
srcDatabases: Option[String] = None,
tablePattern: Option[String] = None,
triggerInterval: Option[String] = None,
maxFilesPerTrigger : String = "1024",
startingStream: Int = 1,
endingStream: Int = 50)
Loading

0 comments on commit ac6161c

Please sign in to comment.