Skip to content

Commit bbdb61f

Browse files
neilbest-dbgueniai
andauthored
Add descriptive NamedTransformations to Spark UI (#1223)
* Initial commit * Add descriptive job group IDs and named transformations This makes the Spark UI more developer-friendly when analyzing Overwatch runs. Job group IDs have the form <workspace name>:<OW module name> Any use of `.transform( df => df)` may be replaced with `.transformWithDescription( nt)` after instantiating a `val nt = NamedTransformation( df => df)` as its argument. This commit contains one such application of the new extension method. (See `val jobRunsAppendClusterName` in `WorkflowsTransforms.scala`.) Some logic in `GoldTransforms` falls through to elements of the special job-run-action form of Job Group IDs emitted by the platform but the impact is minimal relative to the benefit to Overwatch development and troubleshooting. Even so this form of Job Group ID is still present in initial Spark events before OW ETL modules begin to execute. * improve TransformationDescriberTest * flip transformation names to beginning of label for greater visibility in Spark UI. `NamedTransformation` type name now appears in labels' second position. (cherry picked from commit 2ead752) * revert modified Spark UI Job Group labels TODO: enumerate the regressions this would introduce when the labels set by then platform are replaced this way. --------- Co-authored-by: Guenia <guenia.izquierdo@databricks.com>
1 parent 3055a22 commit bbdb61f

File tree

6 files changed

+207
-25
lines changed

6 files changed

+207
-25
lines changed

build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ libraryDependencies += "com.databricks" % "dbutils-api_2.12" % "0.0.5" % Provide
1818
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.595" % Provided
1919
libraryDependencies += "io.delta" % "delta-core_2.12" % "1.0.0" % Provided
2020
libraryDependencies += "org.scalaj" %% "scalaj-http" % "2.4.2"
21+
libraryDependencies += "com.lihaoyi" %% "sourcecode" % "0.4.1"
2122

2223
//libraryDependencies += "org.apache.hive" % "hive-metastore" % "2.3.9"
2324

@@ -51,4 +52,4 @@ assemblyMergeStrategy in assembly := {
5152
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
5253
case x => MergeStrategy.first
5354
}
54-
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
55+
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)

src/main/scala/com/databricks/labs/overwatch/pipeline/ETLDefinition.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,18 @@ class ETLDefinition(
2727

2828
val transformedDF = transforms.foldLeft(verifiedSourceDF) {
2929
case (df, transform) =>
30-
df.transform(transform)
30+
/*
31+
* reverting Spark UI Job Group labels for now
32+
*
33+
* TODO: enumerate the regressions this would introduce
34+
* when the labels set by then platform are replaced
35+
* this way.
36+
* df.sparkSession.sparkContext.setJobGroup(
37+
* s"${module.pipeline.config.workspaceName}:${module.moduleName}",
38+
* transform.toString)
39+
*/
40+
41+
df.transform( transform)
3142
}
3243
write(transformedDF, module)
3344
}

src/main/scala/com/databricks/labs/overwatch/pipeline/SilverTransforms.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import org.apache.spark.sql.{Column, DataFrame}
1212

1313
trait SilverTransforms extends SparkSessionWrapper {
1414

15+
import TransformationDescriber._
1516
import spark.implicits._
1617

1718
private val logger: Logger = Logger.getLogger(this.getClass)
@@ -1461,7 +1462,8 @@ trait SilverTransforms extends SparkSessionWrapper {
14611462

14621463
// caching before structifying
14631464
jobRunsDeriveRunsBase(jobRunsLag30D, etlUntilTime)
1464-
.transform(jobRunsAppendClusterName(jobRunsLookups))
1465+
.transformWithDescription(
1466+
jobRunsAppendClusterName( jobRunsLookups))
14651467
.transform(jobRunsAppendJobMeta(jobRunsLookups))
14661468
.transform(jobRunsStructifyLookupMeta(optimalCacheParts))
14671469
.transform(jobRunsAppendTaskAndClusterDetails)

src/main/scala/com/databricks/labs/overwatch/pipeline/WorkflowsTransforms.scala

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import org.apache.spark.sql.{Column, DataFrame}
1212

1313
object WorkflowsTransforms extends SparkSessionWrapper {
1414

15+
import TransformationDescriber._
1516
import spark.implicits._
1617

1718
/**
@@ -991,31 +992,38 @@ object WorkflowsTransforms extends SparkSessionWrapper {
991992
}
992993

993994
/**
994-
* looks up the cluster_name based on id first from job_status_silver and if not present there fallback to latest
995-
* snapshot prior to the run
996-
*/
997-
def jobRunsAppendClusterName(lookups: Map[String, DataFrame])(df: DataFrame): DataFrame = {
998-
999-
val runsWClusterNames1 = if (lookups.contains("cluster_spec_silver")) {
1000-
df.toTSDF("timestamp", "organization_id", "clusterId")
1001-
.lookupWhen(
1002-
lookups("cluster_spec_silver")
1003-
.toTSDF("timestamp", "organization_id", "clusterId")
1004-
).df
1005-
} else df
1006-
1007-
val runsWClusterNames2 = if (lookups.contains("clusters_snapshot_bronze")) {
1008-
runsWClusterNames1
1009-
.toTSDF("timestamp", "organization_id", "clusterId")
1010-
.lookupWhen(
1011-
lookups("clusters_snapshot_bronze")
1012-
.toTSDF("timestamp", "organization_id", "clusterId")
1013-
).df
1014-
} else runsWClusterNames1
995+
* Look up the cluster_name based on id first from
996+
* `job_status_silver`. If not present there fallback to latest
997+
* snapshot prior to the run
998+
*/
999+
1000+
val jobRunsAppendClusterName = (lookups: Map[String,DataFrame]) => NamedTransformation {
1001+
1002+
(df: DataFrame) => {
1003+
1004+
val runsWClusterNames1 = if (lookups.contains("cluster_spec_silver")) {
1005+
df.toTSDF("timestamp", "organization_id", "clusterId")
1006+
.lookupWhen(
1007+
lookups("cluster_spec_silver")
1008+
.toTSDF("timestamp", "organization_id", "clusterId")
1009+
).df
1010+
} else df
1011+
1012+
val runsWClusterNames2 = if (lookups.contains("clusters_snapshot_bronze")) {
1013+
runsWClusterNames1
1014+
.toTSDF("timestamp", "organization_id", "clusterId")
1015+
.lookupWhen(
1016+
lookups("clusters_snapshot_bronze")
1017+
.toTSDF("timestamp", "organization_id", "clusterId")
1018+
).df
1019+
} else runsWClusterNames1
1020+
1021+
runsWClusterNames2
1022+
}
10151023

1016-
runsWClusterNames2
10171024
}
10181025

1026+
10191027
/**
10201028
* looks up the job name based on id first from job_status_silver and if not present there fallback to latest
10211029
* snapshot prior to the run
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.databricks.labs.overwatch.utils
2+
3+
import org.apache.spark.sql.Dataset
4+
5+
// TODO: implement this as a `trait`. Initial attempts would not
6+
// compile because of the dependencies among other `trait`s and
7+
// `object`s that would have to be refactored.
8+
9+
object TransformationDescriber {
10+
11+
12+
class NamedTransformation[T,U](
13+
val transformation: Dataset[T] => Dataset[U])(
14+
implicit _name: sourcecode.Name) {
15+
16+
final val name: String = _name.value
17+
18+
override def toString = s"${_name.value}: NamedTransformation"
19+
20+
}
21+
22+
23+
object NamedTransformation {
24+
25+
def apply[T,U](
26+
transformation: Dataset[T] => Dataset[U])(
27+
implicit name: sourcecode.Name) =
28+
new NamedTransformation( transformation)( name)
29+
30+
}
31+
32+
33+
implicit class TransformationDescriber[T,U]( ds: Dataset[T]) {
34+
35+
def transformWithDescription[U](
36+
namedTransformation: NamedTransformation[T,U])(
37+
implicit
38+
// enclosing: sourcecode.Enclosing,
39+
name: sourcecode.Name,
40+
fileName: sourcecode.FileName,
41+
line: sourcecode.Line
42+
): Dataset[U] = {
43+
44+
// println( s"Inside TransformationDescriber.transformWithDescription: $enclosing")
45+
46+
val callSite = s"${name.value} at ${fileName.value}:${line.value}"
47+
48+
val sc = ds.sparkSession.sparkContext
49+
sc.setJobDescription( namedTransformation.toString)
50+
sc.setCallSite( callSite)
51+
52+
ds.transform( namedTransformation.transformation)
53+
54+
}
55+
56+
}
57+
58+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package com.databricks.labs.overwatch.utils
2+
3+
import com.databricks.labs.overwatch.SparkSessionTestWrapper
4+
import org.apache.spark.sql.DataFrame
5+
import org.scalatest.funspec.AnyFunSpec
6+
import org.scalatest.GivenWhenThen
7+
import java.io.ByteArrayOutputStream
8+
9+
class TransformationDescriberTest
10+
extends AnyFunSpec
11+
with GivenWhenThen
12+
with SparkSessionTestWrapper {
13+
14+
import TransformationDescriber._
15+
import spark.implicits._
16+
spark.conf.set("spark.sql.session.timeZone", "UTC")
17+
18+
val t = (df: DataFrame) => df.select( $"foo")
19+
20+
val nt = NamedTransformation( t)
21+
22+
// TODO: replace use of `s` and `Console.withOut` with an abstraction
23+
24+
val s = new ByteArrayOutputStream
25+
26+
describe( "A NamedTransformation") {
27+
28+
it( "wraps a function literal") {
29+
30+
info( s"nt.transformation: ${nt.transformation}")
31+
32+
assert( nt.transformation === t)
33+
34+
}
35+
36+
it( "knows its own name") {
37+
38+
info( s"`nt.name`: ${nt.name}")
39+
info( s"`nt.toString`: ${nt.toString}")
40+
41+
assert( nt.name === "nt")
42+
assert( nt.toString === "nt: NamedTransformation")
43+
44+
}
45+
46+
Given( "a Spark `Dataset` (including `DataFrame`s)")
47+
48+
val in = Seq( ("foo", "bar")).toDF( "foo", "bar")
49+
50+
Console.withOut( s) {
51+
in.show(numRows= 1, truncate= 0, vertical= true)
52+
}
53+
// info( s.toString)
54+
s.toString.linesIterator.foreach( info(_))
55+
s.reset
56+
57+
When( "a `NamedTransformation` is applied")
58+
59+
val out = in.transformWithDescription( nt)
60+
61+
// val s = new ByteArrayOutputStream
62+
Console.withOut( s) {
63+
out.show(numRows= 1, truncate= 0, vertical= true)
64+
}
65+
// info( s.toString)
66+
s.toString.linesIterator.foreach( info(_))
67+
68+
69+
70+
Then( "the resulting Spark jobs have a matching description (pending)")
71+
72+
// info( s"""spark.jobGroup.id: ${out.sparkSession.sparkContext.getLocalProperty( "spark.jobGroup.id")}""")
73+
74+
val sjd = out.sparkSession.sparkContext.getLocalProperty( "spark.job.description")
75+
76+
info( s"spark.job.description: ${sjd}")
77+
78+
assert( sjd === "nt: NamedTransformation")
79+
80+
// info( s"""spark.callSite.short: ${out.sparkSession.sparkContext.getLocalProperty( "spark.callSite.short")}""")
81+
// info( s"""spark.callSite.long: ${out.sparkSession.sparkContext.getLocalProperty( "spark.callSite.long")}""")
82+
83+
84+
85+
86+
87+
88+
And( "the result of the transformation is correct")
89+
90+
assertResult( "`foo` STRING") {
91+
out.schema.toDDL
92+
}
93+
94+
assertResult( "foo") {
95+
out.first.getString(0)
96+
}
97+
98+
99+
}
100+
101+
102+
}

0 commit comments

Comments
 (0)