Skip to content

Commit

Permalink
spec: Add fileCount to dataset input & output stats (#2562)
Browse files Browse the repository at this point in the history
Signed-off-by: Martynov Maxim <martinov_m_s_@mail.ru>
  • Loading branch information
dolfinus committed Apr 4, 2024
1 parent a353a75 commit ba870d4
Show file tree
Hide file tree
Showing 14 changed files with 57 additions and 22 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*Adding the ability to support OpenLineage within Spark extensions' code. Refer to [README](https://github.com/OpenLineage/OpenLineage/tree/spark/built-in-lineage/integration/spark-interfaces-scala#readme) for more details.*
* **Flink: support Flink 1.19.0** [`#2545`](https://github.com/OpenLineage/OpenLineage/pull/2545) [@HuangZhenQiu](https://github.com/HuangZhenQiu)
*Adding the integration test coverage for flink 1.19.0.
* **Spec: add "fileCount" to dataset stat facets** [`#2562`](https://github.com/OpenLineage/OpenLineage/pull/2562) [@dolfinus](https://github.com/dolfinus)
*Adds "fileCount" field to DataQualityMetricsInputDatasetFacet and OutputStatisticsOutputDatasetFacet specification*

### Fixed
* **Flink: disable module metadata generation** [`#2507`](https://github.com/OpenLineage/OpenLineage/pull/2531) [@HuangZhenQiu](https://github.com/HuangZhenQiu)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ void factory() throws JsonProcessingException {
ol.newDataQualityMetricsInputDatasetFacetBuilder()
.rowCount(10L)
.bytes(20L)
.fileCount(5L)
.columnMetrics(
ol.newDataQualityMetricsInputDatasetFacetColumnMetricsBuilder()
.put(
Expand Down Expand Up @@ -258,7 +259,12 @@ void factory() throws JsonProcessingException {
.build())
.outputFacets(
ol.newOutputDatasetOutputFacetsBuilder()
.outputStatistics(ol.newOutputStatisticsOutputDatasetFacet(10L, 20L))
.outputStatistics(
ol.newOutputStatisticsOutputDatasetFacetBuilder()
.rowCount(10L)
.size(20L)
.fileCount(5L)
.build())
.build())
.build());

Expand Down Expand Up @@ -296,6 +302,7 @@ void factory() throws JsonProcessingException {
inputDataset.getInputFacets().getDataQualityMetrics();
assertEquals((Long) 10L, dq.getRowCount());
assertEquals((Long) 20L, dq.getBytes());
assertEquals((Long) 5L, dq.getFileCount());
DataQualityMetricsInputDatasetFacetColumnMetricsAdditional colMetrics =
dq.getColumnMetrics().getAdditionalProperties().get("mycol");
assertEquals((Double) 10D, colMetrics.getCount());
Expand All @@ -315,6 +322,7 @@ void factory() throws JsonProcessingException {
assertEquals(roundTrip(json), roundTrip(mapper.writeValueAsString(read)));
assertEquals((Long) 10L, outputDataset.getOutputFacets().getOutputStatistics().getRowCount());
assertEquals((Long) 20L, outputDataset.getOutputFacets().getOutputStatistics().getSize());
assertEquals((Long) 5L, outputDataset.getOutputFacets().getOutputStatistics().getFileCount());

assertEquals(json, mapper.writeValueAsString(read));
}
Expand Down
6 changes: 4 additions & 2 deletions client/python/openlineage/client/facet.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,11 @@ def _get_schema() -> str:

@attr.s
class OutputStatisticsOutputDatasetFacet(BaseFacet):
rowCount: int = attr.ib() # noqa: N815
rowCount: Optional[int] = attr.ib(default=None) # noqa: N815
size: Optional[int] = attr.ib(default=None)
fileCount: Optional[int] = attr.ib(default=None) # noqa: N815

_additional_skip_redact: ClassVar[List[str]] = ["rowCount", "size"]
_additional_skip_redact: ClassVar[List[str]] = ["rowCount", "size", "fileCount"]

@staticmethod
def _get_schema() -> str:
Expand All @@ -170,6 +171,7 @@ class ColumnMetric:
class DataQualityMetricsInputDatasetFacet(BaseFacet):
rowCount: Optional[int] = attr.ib(default=None) # noqa: N815
bytes: Optional[int] = attr.ib(default=None) # noqa: A003
fileCount: Optional[int] = attr.ib(default=None) # noqa: N815
columnMetrics: Dict[str, ColumnMetric] = attr.ib(factory=dict) # noqa: N815

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ protected void build(
"outputStatistics",
context
.getOpenLineage()
.newOutputStatisticsOutputDatasetFacet(
.newOutputStatisticsOutputDatasetFacetBuilder()
.rowCount(
Optional.of(metrics.get(JobMetricsHolder.Metric.WRITE_RECORDS))
.map(Number::longValue)
.orElse(null),
.orElse(null))
.size(
Optional.of(metrics.get(JobMetricsHolder.Metric.WRITE_BYTES))
.map(Number::longValue)
.orElse(null)));
.orElse(null))
.build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void testSerializeRunEvent() throws IOException, URISyntaxException {
ol.newDataQualityMetricsInputDatasetFacetBuilder()
.rowCount(10L)
.bytes(20L)
.fileCount(5L)
.columnMetrics(
ol.newDataQualityMetricsInputDatasetFacetColumnMetricsBuilder()
.put(
Expand All @@ -107,7 +108,12 @@ public void testSerializeRunEvent() throws IOException, URISyntaxException {
"output",
null,
ol.newOutputDatasetOutputFacetsBuilder()
.outputStatistics(ol.newOutputStatisticsOutputDatasetFacet(10L, 20L))
.outputStatistics(
ol.newOutputStatisticsOutputDatasetFacetBuilder()
.rowCount(10L)
.size(20L)
.fileCount(5L)
.build())
.build()));
OpenLineage.RunEvent runStateUpdate =
ol.newRunEventBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"dataQualityMetrics": {
"rowCount": 10,
"bytes": 20,
"fileCount": 5,
"columnMetrics": {
"mycol": {
"nullCount": 1,
Expand All @@ -70,7 +71,8 @@
"outputFacets": {
"outputStatistics": {
"rowCount": 10,
"size": 20
"size": 20,
"fileCount": 5
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions proxy/fluentd/events/event_full.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/DataQualityMetricsInputDatasetFacet",
"rowCount": 1000,
"bytes": 1048576
"bytes": 1048576,
"fileCount": 5
},
"dataQualityAssertions": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
Expand Down Expand Up @@ -155,7 +156,8 @@
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/OutputStatisticsOutputDatasetFacet",
"rowCount": 2000,
"size": 2097152
"size": 2097152,
"fileCount": 5
}
},
"facets": {
Expand Down
5 changes: 3 additions & 2 deletions proxy/fluentd/events/event_invalid_input_dataset_facet.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/DataQualityMetricsInputDatasetFacet",
"noRowCount": 1000,
"bytes": 1048576
"bytes": 1048576,
"fileCount": 5
}
}
}
],
"outputs": [ ],
"outputs": [],
"producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/RunEvent"
}
7 changes: 4 additions & 3 deletions proxy/fluentd/events/event_invalid_output_dataset_facet.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"namespace": "my-scheduler-namespace",
"name": "myjob"
},
"inputs": [ ],
"inputs": [],
"outputs": [
{
"namespace": "my-datasource-namespace",
Expand All @@ -17,8 +17,9 @@
"outputStatistics": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/OutputStatisticsOutputDatasetFacet",
"notARowCount": 2000,
"size": 2097152
"rowCount": "wrong",
"size": 2097152,
"fileCount": 5
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion proxy/fluentd/test/plugin/test_parser_openlineage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class OpenlineageParserTest < Test::Unit::TestCase
err = assert_raise Fluent::ParserError do
@parser.instance.parse(ol_event)
end
assert_match(/Openlineage validation failed: (.+) "rowCount" is a required property/, err.message)
assert_match(/Openlineage validation failed: (.+) path \"\/outputs\/0\/outputFacets\/outputStatistics\/rowCount/, err.message)
end

private
Expand Down
6 changes: 5 additions & 1 deletion spec/facets/DataQualityMetricsInputDatasetFacet.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://openlineage.io/spec/facets/1-0-0/DataQualityMetricsInputDatasetFacet.json",
"$id": "https://openlineage.io/spec/facets/1-0-1/DataQualityMetricsInputDatasetFacet.json",
"$defs": {
"DataQualityMetricsInputDatasetFacet": {
"allOf": [
Expand All @@ -19,6 +19,10 @@
"description": "The size in bytes",
"type": "integer"
},
"fileCount": {
"description": "The number of files evaluated",
"type": "integer"
},
"columnMetrics": {
"description": "The property key is the column name",
"type": "object",
Expand Down
9 changes: 6 additions & 3 deletions spec/facets/OutputStatisticsOutputDatasetFacet.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://openlineage.io/spec/facets/1-0-0/OutputStatisticsOutputDatasetFacet.json",
"$id": "https://openlineage.io/spec/facets/1-0-1/OutputStatisticsOutputDatasetFacet.json",
"$defs": {
"OutputStatisticsOutputDatasetFacet": {
"allOf": [
Expand All @@ -17,9 +17,12 @@
"size": {
"description": "The size in bytes written to the dataset",
"type": "integer"
},
"fileCount": {
"description": "The number of files written to the dataset",
"type": "integer"
}
},
"required": ["rowCount"]
}
}
]
}
Expand Down
2 changes: 1 addition & 1 deletion spec/tests/DataQualityMetricsInputDatasetFacet/1.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
}
},
"rowCount": 10,

"fileCount": 5,
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DataQualityMetricsInputDatasetFacet.json"
}
Expand Down
3 changes: 2 additions & 1 deletion spec/tests/OutputStatisticsOutputDatasetFacet/1.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/OutputStatisticsOutputDatasetFacet",
"rowCount": 2000,
"size": 2097152
"size": 2097152,
"fileCount": 5
}
}

0 comments on commit ba870d4

Please sign in to comment.