Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve MetadataCollectingFilter spec and documentation #698

Merged
merged 1 commit into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 54 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,9 @@ MetadataCollectingFilter provides a way to add additional data to lineage produc

Data can be added to the following lineage entities: `executionPlan`, `executionEvent`, `operation`, `read` and `write`.

Inside each entity two places can be used for user data: `labels` and `extra`.
Labels are intended for identification and filtering on the server.
Extra is for any other user defined data.
Inside each entity is dedicated map named `extra` that can store any additional user data.

`executionPlan` and `executionEvent` have additional map `labels`. Labels are intended for identification and filtering on the server.

Example usage:

Expand All @@ -458,12 +458,15 @@ json-with-rules.json could look like this:
"extra": {
"my-extra-1": 42,
"my-extra-2": [ "aaa", "bbb", "ccc" ]
}
},
"labels": {
"my-label": "my-value"
}
},
"write": {
"labels": {
"my-label": "my-value"
}
"extra": {
"foo": "extra-value"
}
}
}
```
Expand All @@ -482,17 +485,57 @@ See the following example:
"my-extra-2": [ "aaa", "bbb", "ccc" ],
"bar": { "$env": "BAR_HOME" },
"baz": { "$jvm": "some.jvm.prop" },
"daz": { "$js": "session.conf().get('k')" }
"daz": { "$js": "session.conf().get('k')" },
"appName": { "$js":"session.sparkContext().appName()" }
}
}
}
```

For the javascript evaluation `org.apache.spark.sql.SparkSession` is available as `session`,
internal Spark Logical Plan `org.apache.spark.sql.catalyst.plans.logical.LogicalPlan` ia available as `logicalPlan` and
`Option[org.apache.spark.sql.execution.SparkPlan]` as `executedPlanOpt`.
For the javascript evaluation following variables are available by default:

| variable | Scala Type |
|-------------------|:-----------------------------------------------------------|
| `session` | `org.apache.spark.sql.SparkSession` |
| `logicalPlan` | `org.apache.spark.sql.catalyst.plans.logical.LogicalPlan` |
| `executedPlanOpt` | `Option[org.apache.spark.sql.execution.SparkPlan]` |

Using those objects it should be possible to extract almost any relevant information from Spark.

The rules can be conditional, meaning the specified params will be added only when some condition is met.
See the following example:

```json
{
"executionEvent[@.timestamp > 65]": {
"extra": { "tux": 1 }
},
"executionEvent[@.extra['foo'] == 'a' && @.extra['bar'] == 'x']": {
"extra": { "bux": 2 }
},
"executionEvent[@.extra['foo'] == 'a' && !@.extra['bar']]": {
"extra": { "dux": 3 }
},
"executionEvent[@.extra['baz'][2] >= 3]": {
"extra": { "mux": 4 }
},
"executionEvent[@.extra['baz'][2] < 3]": {
"extra": { "fux": 5 }
},
"executionEvent[session.sparkContext.conf['spark.ui.enabled'] == 'false']": {
"extra": { "tux": 1 }
}
}
```

The condition is enclosed by `[]` after entity name.
Here the `@` serves as a reference to currently processed entity, in this case executionEvent.
The `[]` inside the condition statement can also serve as a way to access maps and sequences.
Logical and comparison operators are available.

`session` and other variables available for js are available here as well.


For more examples of usage please se `MetadataCollectingFilterSpec` test class.

<a id="spark-coverage"></a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class MetadataCollectingFilterSpec extends AnyFlatSpec with EnvFixture with Matc
private val logicalPlan = mock[LogicalPlan]
private val sparkSession = SparkSession.builder
.master("local")
.appName("testApp")
.config("spark.driver.host", "localhost")
.config("spark.ui.enabled", "false")
.config("k", "nice")
Expand Down Expand Up @@ -68,7 +69,9 @@ class MetadataCollectingFilterSpec extends AnyFlatSpec with EnvFixture with Matc
| "foo": { "$js": "executionPlan.name()" },
| "bar": { "$env": "BAR_HOME" },
| "baz": { "$jvm": "some.jvm.prop" },
| "daz": { "$js": "session.conf().get('k')" }
| "daz": { "$js": "session.conf().get('k')" },
| "appName1": { "$js": "session.conf().get('spark.app.name')" },
| "appName2": { "$js":"session.sparkContext().appName()" }
| }
| }
|}
Expand All @@ -93,8 +96,37 @@ class MetadataCollectingFilterSpec extends AnyFlatSpec with EnvFixture with Matc
extra("bar") shouldBe "rabbit"
extra("baz") shouldBe "123"
extra("daz") shouldBe "nice"
extra("appName1") shouldBe "testApp"
extra("appName2") shouldBe "testApp"
}

it should "support extra in Write" in {
val configString =
"""
|{
| "write": {
| "extra": {
| "foo": "extra-value",
| "appName": { "$js":"session.sparkContext().appName()" }
| }
| }
|}
|""".stripMargin

val config = new BaseConfiguration {
addPropertyDirect(InjectRulesKey, configString)
}

val filter = new MetadataCollectingFilter(config)

val processedWrite = filter.processWriteOperation(wop, harvestingContext)

val extra = processedWrite.extra
extra("foo") shouldBe "extra-value"
extra("appName") shouldBe "testApp"
}


it should "support labels" in {
val configString =
"""
Expand Down
Loading