Skip to content

Commit

Permalink
improve MetadataCollectingFilter spec and documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
cerveada committed Jun 13, 2023
1 parent 22f5856 commit 6afb5c1
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 12 deletions.
65 changes: 54 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,9 @@ MetadataCollectingFilter provides a way to add additional data to lineage produc

Data can be added to the following lineage entities: `executionPlan`, `executionEvent`, `operation`, `read` and `write`.

Inside each entity two places can be used for user data: `labels` and `extra`.
Labels are intended for identification and filtering on the server.
Extra is for any other user defined data.
Inside each entity is dedicated map named `extra` that can store any additional user data.

`executionPlan` and `executionEvent` have additional map `labels`. Labels are intended for identification and filtering on the server.

Example usage:

Expand All @@ -458,12 +458,15 @@ json-with-rules.json could look like this:
"extra": {
"my-extra-1": 42,
"my-extra-2": [ "aaa", "bbb", "ccc" ]
}
},
"labels": {
"my-label": "my-value"
}
},
"write": {
"labels": {
"my-label": "my-value"
}
"extra": {
"foo": "extra-value"
}
}
}
```
Expand All @@ -482,17 +485,57 @@ See the following example:
"my-extra-2": [ "aaa", "bbb", "ccc" ],
"bar": { "$env": "BAR_HOME" },
"baz": { "$jvm": "some.jvm.prop" },
"daz": { "$js": "session.conf().get('k')" }
"daz": { "$js": "session.conf().get('k')" },
"appName": { "$js":"session.sparkContext().appName()" }
}
}
}
```

For the javascript evaluation `org.apache.spark.sql.SparkSession` is available as `session`,
internal Spark Logical Plan `org.apache.spark.sql.catalyst.plans.logical.LogicalPlan` ia available as `logicalPlan` and
`Option[org.apache.spark.sql.execution.SparkPlan]` as `executedPlanOpt`.
For the javascript evaluation following variables are available by default:

| variable | Scala Type |
|-------------------|:-----------------------------------------------------------|
| `session` | `org.apache.spark.sql.SparkSession` |
| `logicalPlan` | `org.apache.spark.sql.catalyst.plans.logical.LogicalPlan` |
| `executedPlanOpt` | `Option[org.apache.spark.sql.execution.SparkPlan]` |

Using those objects it should be possible to extract almost any relevant information from Spark.

The rules can be conditional, meaning the specified params will be added only when some condition is met.
See the following example:

```json
{
"executionEvent[@.timestamp > 65]": {
"extra": { "tux": 1 }
},
"executionEvent[@.extra['foo'] == 'a' && @.extra['bar'] == 'x']": {
"extra": { "bux": 2 }
},
"executionEvent[@.extra['foo'] == 'a' && !@.extra['bar']]": {
"extra": { "dux": 3 }
},
"executionEvent[@.extra['baz'][2] >= 3]": {
"extra": { "mux": 4 }
},
"executionEvent[@.extra['baz'][2] < 3]": {
"extra": { "fux": 5 }
},
"executionEvent[session.sparkContext.conf['spark.ui.enabled'] == 'false']": {
"extra": { "tux": 1 }
}
}
```

The condition is enclosed by `[]` after entity name.
Here the `@` serves as a reference to currently processed entity, in this case executionEvent.
The `[]` inside the condition statement can also serve as a way to access maps and sequences.
Logical and comparison operators are available.

`session` and other variables available for js are available here as well.


For more examples of usage please se `MetadataCollectingFilterSpec` test class.

<a id="spark-coverage"></a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class MetadataCollectingFilterSpec extends AnyFlatSpec with EnvFixture with Matc
private val logicalPlan = mock[LogicalPlan]
private val sparkSession = SparkSession.builder
.master("local")
.appName("testApp")
.config("spark.driver.host", "localhost")
.config("spark.ui.enabled", "false")
.config("k", "nice")
Expand Down Expand Up @@ -68,7 +69,9 @@ class MetadataCollectingFilterSpec extends AnyFlatSpec with EnvFixture with Matc
| "foo": { "$js": "executionPlan.name()" },
| "bar": { "$env": "BAR_HOME" },
| "baz": { "$jvm": "some.jvm.prop" },
| "daz": { "$js": "session.conf().get('k')" }
| "daz": { "$js": "session.conf().get('k')" },
| "appName1": { "$js": "session.conf().get('spark.app.name')" },
| "appName2": { "$js":"session.sparkContext().appName()" }
| }
| }
|}
Expand All @@ -93,8 +96,37 @@ class MetadataCollectingFilterSpec extends AnyFlatSpec with EnvFixture with Matc
extra("bar") shouldBe "rabbit"
extra("baz") shouldBe "123"
extra("daz") shouldBe "nice"
extra("appName1") shouldBe "testApp"
extra("appName2") shouldBe "testApp"
}

it should "support extra in Write" in {
val configString =
"""
|{
| "write": {
| "extra": {
| "foo": "extra-value",
| "appName": { "$js":"session.sparkContext().appName()" }
| }
| }
|}
|""".stripMargin

val config = new BaseConfiguration {
addPropertyDirect(InjectRulesKey, configString)
}

val filter = new MetadataCollectingFilter(config)

val processedWrite = filter.processWriteOperation(wop, harvestingContext)

val extra = processedWrite.extra
extra("foo") shouldBe "extra-value"
extra("appName") shouldBe "testApp"
}


it should "support labels" in {
val configString =
"""
Expand Down

0 comments on commit 6afb5c1

Please sign in to comment.