Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPEC] Add fileCount to dataset stats facets #2562

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*Adding the ability to support OpenLineage within Spark extensions' code. Refer to [README](https://github.com/OpenLineage/OpenLineage/tree/spark/built-in-lineage/integration/spark-interfaces-scala#readme) for more details.*
* **Flink: support Flink 1.19.0** [`#2545`](https://github.com/OpenLineage/OpenLineage/pull/2545) [@HuangZhenQiu](https://github.com/HuangZhenQiu)
*Adding the integration test coverage for flink 1.19.0.
* **Spec: add "fileCount" to dataset stat facets** [`#2562`](https://github.com/OpenLineage/OpenLineage/pull/2562) [@dolfinus](https://github.com/dolfinus)
*Adds "fileCount" field to DataQualityMetricsInputDatasetFacet and OutputStatisticsOutputDatasetFacet specification*

### Fixed
* **Flink: disable module metadata generation** [`#2507`](https://github.com/OpenLineage/OpenLineage/pull/2531) [@HuangZhenQiu](https://github.com/HuangZhenQiu)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ void factory() throws JsonProcessingException {
ol.newDataQualityMetricsInputDatasetFacetBuilder()
.rowCount(10L)
.bytes(20L)
.fileCount(5L)
.columnMetrics(
ol.newDataQualityMetricsInputDatasetFacetColumnMetricsBuilder()
.put(
Expand Down Expand Up @@ -258,7 +259,12 @@ void factory() throws JsonProcessingException {
.build())
.outputFacets(
ol.newOutputDatasetOutputFacetsBuilder()
.outputStatistics(ol.newOutputStatisticsOutputDatasetFacet(10L, 20L))
.outputStatistics(
ol.newOutputStatisticsOutputDatasetFacetBuilder()
.rowCount(10L)
.size(20L)
.fileCount(5L)
.build())
.build())
.build());

Expand Down Expand Up @@ -296,6 +302,7 @@ void factory() throws JsonProcessingException {
inputDataset.getInputFacets().getDataQualityMetrics();
assertEquals((Long) 10L, dq.getRowCount());
assertEquals((Long) 20L, dq.getBytes());
assertEquals((Long) 5L, dq.getFileCount());
DataQualityMetricsInputDatasetFacetColumnMetricsAdditional colMetrics =
dq.getColumnMetrics().getAdditionalProperties().get("mycol");
assertEquals((Double) 10D, colMetrics.getCount());
Expand All @@ -315,6 +322,7 @@ void factory() throws JsonProcessingException {
assertEquals(roundTrip(json), roundTrip(mapper.writeValueAsString(read)));
assertEquals((Long) 10L, outputDataset.getOutputFacets().getOutputStatistics().getRowCount());
assertEquals((Long) 20L, outputDataset.getOutputFacets().getOutputStatistics().getSize());
assertEquals((Long) 5L, outputDataset.getOutputFacets().getOutputStatistics().getFileCount());

assertEquals(json, mapper.writeValueAsString(read));
}
Expand Down
6 changes: 4 additions & 2 deletions client/python/openlineage/client/facet.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,11 @@ def _get_schema() -> str:

@attr.s
class OutputStatisticsOutputDatasetFacet(BaseFacet):
rowCount: int = attr.ib() # noqa: N815
rowCount: Optional[int] = attr.ib(default=None) # noqa: N815
size: Optional[int] = attr.ib(default=None)
fileCount: Optional[int] = attr.ib(default=None) # noqa: N815

_additional_skip_redact: ClassVar[List[str]] = ["rowCount", "size"]
_additional_skip_redact: ClassVar[List[str]] = ["rowCount", "size", "fileCount"]

@staticmethod
def _get_schema() -> str:
Expand All @@ -170,6 +171,7 @@ class ColumnMetric:
class DataQualityMetricsInputDatasetFacet(BaseFacet):
rowCount: Optional[int] = attr.ib(default=None) # noqa: N815
bytes: Optional[int] = attr.ib(default=None) # noqa: A003
fileCount: Optional[int] = attr.ib(default=None) # noqa: N815
columnMetrics: Dict[str, ColumnMetric] = attr.ib(factory=dict) # noqa: N815

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ protected void build(
"outputStatistics",
context
.getOpenLineage()
.newOutputStatisticsOutputDatasetFacet(
.newOutputStatisticsOutputDatasetFacetBuilder()
.rowCount(
Optional.of(metrics.get(JobMetricsHolder.Metric.WRITE_RECORDS))
.map(Number::longValue)
.orElse(null),
.orElse(null))
.size(
Optional.of(metrics.get(JobMetricsHolder.Metric.WRITE_BYTES))
.map(Number::longValue)
.orElse(null)));
.orElse(null))
.build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void testSerializeRunEvent() throws IOException, URISyntaxException {
ol.newDataQualityMetricsInputDatasetFacetBuilder()
.rowCount(10L)
.bytes(20L)
.fileCount(5L)
.columnMetrics(
ol.newDataQualityMetricsInputDatasetFacetColumnMetricsBuilder()
.put(
Expand All @@ -107,7 +108,12 @@ public void testSerializeRunEvent() throws IOException, URISyntaxException {
"output",
null,
ol.newOutputDatasetOutputFacetsBuilder()
.outputStatistics(ol.newOutputStatisticsOutputDatasetFacet(10L, 20L))
.outputStatistics(
ol.newOutputStatisticsOutputDatasetFacetBuilder()
.rowCount(10L)
.size(20L)
.fileCount(5L)
.build())
.build()));
OpenLineage.RunEvent runStateUpdate =
ol.newRunEventBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"dataQualityMetrics": {
"rowCount": 10,
"bytes": 20,
"fileCount": 5,
"columnMetrics": {
"mycol": {
"nullCount": 1,
Expand All @@ -70,7 +71,8 @@
"outputFacets": {
"outputStatistics": {
"rowCount": 10,
"size": 20
"size": 20,
"fileCount": 5
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions proxy/fluentd/events/event_full.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/DataQualityMetricsInputDatasetFacet",
"rowCount": 1000,
"bytes": 1048576
"bytes": 1048576,
"fileCount": 5
},
"dataQualityAssertions": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
Expand Down Expand Up @@ -155,7 +156,8 @@
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/OutputStatisticsOutputDatasetFacet",
"rowCount": 2000,
"size": 2097152
"size": 2097152,
"fileCount": 5
}
},
"facets": {
Expand Down
5 changes: 3 additions & 2 deletions proxy/fluentd/events/event_invalid_input_dataset_facet.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/DataQualityMetricsInputDatasetFacet",
"noRowCount": 1000,
"bytes": 1048576
"bytes": 1048576,
"fileCount": 5
}
}
}
],
"outputs": [ ],
"outputs": [],
"producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/RunEvent"
}
7 changes: 4 additions & 3 deletions proxy/fluentd/events/event_invalid_output_dataset_facet.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"namespace": "my-scheduler-namespace",
"name": "myjob"
},
"inputs": [ ],
"inputs": [],
"outputs": [
{
"namespace": "my-datasource-namespace",
Expand All @@ -17,8 +17,9 @@
"outputStatistics": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/OutputStatisticsOutputDatasetFacet",
"notARowCount": 2000,
"size": 2097152
"rowCount": "wrong",
"size": 2097152,
"fileCount": 5
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion proxy/fluentd/test/plugin/test_parser_openlineage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class OpenlineageParserTest < Test::Unit::TestCase
err = assert_raise Fluent::ParserError do
@parser.instance.parse(ol_event)
end
assert_match(/Openlineage validation failed: (.+) "rowCount" is a required property/, err.message)
assert_match(/Openlineage validation failed: (.+) path \"\/outputs\/0\/outputFacets\/outputStatistics\/rowCount/, err.message)
end

private
Expand Down
6 changes: 5 additions & 1 deletion spec/facets/DataQualityMetricsInputDatasetFacet.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://openlineage.io/spec/facets/1-0-0/DataQualityMetricsInputDatasetFacet.json",
"$id": "https://openlineage.io/spec/facets/1-0-1/DataQualityMetricsInputDatasetFacet.json",
"$defs": {
"DataQualityMetricsInputDatasetFacet": {
"allOf": [
Expand All @@ -19,6 +19,10 @@
"description": "The size in bytes",
"type": "integer"
},
"fileCount": {
"description": "The number of files evaluated",
"type": "integer"
},
"columnMetrics": {
"description": "The property key is the column name",
"type": "object",
Expand Down
9 changes: 6 additions & 3 deletions spec/facets/OutputStatisticsOutputDatasetFacet.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://openlineage.io/spec/facets/1-0-0/OutputStatisticsOutputDatasetFacet.json",
"$id": "https://openlineage.io/spec/facets/1-0-1/OutputStatisticsOutputDatasetFacet.json",
"$defs": {
"OutputStatisticsOutputDatasetFacet": {
"allOf": [
Expand All @@ -17,9 +17,12 @@
"size": {
"description": "The size in bytes written to the dataset",
"type": "integer"
},
"fileCount": {
"description": "The number of files written to the dataset",
"type": "integer"
}
},
"required": ["rowCount"]
}
}
]
}
Expand Down
2 changes: 1 addition & 1 deletion spec/tests/DataQualityMetricsInputDatasetFacet/1.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
}
},
"rowCount": 10,

"fileCount": 5,
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DataQualityMetricsInputDatasetFacet.json"
}
Expand Down
3 changes: 2 additions & 1 deletion spec/tests/OutputStatisticsOutputDatasetFacet/1.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/OutputStatisticsOutputDatasetFacet",
"rowCount": 2000,
"size": 2097152
"size": 2097152,
"fileCount": 5
}
}