Skip to content

Commit

Permalink
spec: Add fileCount to dataset input & output stats
Browse files Browse the repository at this point in the history
Signed-off-by: Martynov Maxim <martinov_m_s_@mail.ru>
  • Loading branch information
dolfinus committed Apr 3, 2024
1 parent 9cb16d0 commit 35f9b65
Show file tree
Hide file tree
Showing 13 changed files with 55 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ void factory() throws JsonProcessingException {
ol.newDataQualityMetricsInputDatasetFacetBuilder()
.rowCount(10L)
.bytes(20L)
.fileCount(5L)
.columnMetrics(
ol.newDataQualityMetricsInputDatasetFacetColumnMetricsBuilder()
.put(
Expand Down Expand Up @@ -258,7 +259,12 @@ void factory() throws JsonProcessingException {
.build())
.outputFacets(
ol.newOutputDatasetOutputFacetsBuilder()
.outputStatistics(ol.newOutputStatisticsOutputDatasetFacet(10L, 20L))
.outputStatistics(
ol.newOutputStatisticsOutputDatasetFacetBuilder()
.rowCount(10L)
.size(20L)
.fileCount(5L)
.build())
.build())
.build());

Expand Down Expand Up @@ -296,6 +302,7 @@ void factory() throws JsonProcessingException {
inputDataset.getInputFacets().getDataQualityMetrics();
assertEquals((Long) 10L, dq.getRowCount());
assertEquals((Long) 20L, dq.getBytes());
assertEquals((Long) 5L, dq.getFileCount());
DataQualityMetricsInputDatasetFacetColumnMetricsAdditional colMetrics =
dq.getColumnMetrics().getAdditionalProperties().get("mycol");
assertEquals((Double) 10D, colMetrics.getCount());
Expand All @@ -315,6 +322,7 @@ void factory() throws JsonProcessingException {
assertEquals(roundTrip(json), roundTrip(mapper.writeValueAsString(read)));
assertEquals((Long) 10L, outputDataset.getOutputFacets().getOutputStatistics().getRowCount());
assertEquals((Long) 20L, outputDataset.getOutputFacets().getOutputStatistics().getSize());
assertEquals((Long) 5L, outputDataset.getOutputFacets().getOutputStatistics().getFileCount());

assertEquals(json, mapper.writeValueAsString(read));
}
Expand Down
6 changes: 4 additions & 2 deletions client/python/openlineage/client/facet.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,11 @@ def _get_schema() -> str:

@attr.s
class OutputStatisticsOutputDatasetFacet(BaseFacet):
rowCount: int = attr.ib() # noqa: N815
rowCount: Optional[int] = attr.ib(default=None) # noqa: N815
size: Optional[int] = attr.ib(default=None)
fileCount: Optional[int] = attr.ib(default=None) # noqa: N815

_additional_skip_redact: ClassVar[List[str]] = ["rowCount", "size"]
_additional_skip_redact: ClassVar[List[str]] = ["rowCount", "size", "fileCount"]

@staticmethod
def _get_schema() -> str:
Expand All @@ -170,6 +171,7 @@ class ColumnMetric:
class DataQualityMetricsInputDatasetFacet(BaseFacet):
rowCount: Optional[int] = attr.ib(default=None) # noqa: N815
bytes: Optional[int] = attr.ib(default=None) # noqa: A003
fileCount: Optional[int] = attr.ib(default=None) # noqa: N815
columnMetrics: Dict[str, ColumnMetric] = attr.ib(factory=dict) # noqa: N815

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ protected void build(
"outputStatistics",
context
.getOpenLineage()
.newOutputStatisticsOutputDatasetFacet(
.newOutputStatisticsOutputDatasetFacetBuilder()
.rowCount(
Optional.of(metrics.get(JobMetricsHolder.Metric.WRITE_RECORDS))
.map(Number::longValue)
.orElse(null),
.orElse(null))
.size(
Optional.of(metrics.get(JobMetricsHolder.Metric.WRITE_BYTES))
.map(Number::longValue)
.orElse(null)));
.orElse(null))
.build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public void testSerializeRunEvent() throws IOException, URISyntaxException {
ol.newDataQualityMetricsInputDatasetFacetBuilder()
.rowCount(10L)
.bytes(20L)
.fileCount(5L)
.columnMetrics(
ol.newDataQualityMetricsInputDatasetFacetColumnMetricsBuilder()
.put(
Expand All @@ -107,7 +108,12 @@ public void testSerializeRunEvent() throws IOException, URISyntaxException {
"output",
null,
ol.newOutputDatasetOutputFacetsBuilder()
.outputStatistics(ol.newOutputStatisticsOutputDatasetFacet(10L, 20L))
.outputStatistics(
ol.newOutputStatisticsOutputDatasetFacetBuilder()
.rowCount(10L)
.size(20L)
.fileCount(5L)
.build())
.build()));
OpenLineage.RunEvent runStateUpdate =
ol.newRunEventBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"dataQualityMetrics": {
"rowCount": 10,
"bytes": 20,
"fileCount": 5,
"columnMetrics": {
"mycol": {
"nullCount": 1,
Expand All @@ -70,7 +71,8 @@
"outputFacets": {
"outputStatistics": {
"rowCount": 10,
"size": 20
"size": 20,
"fileCount": 5
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions proxy/fluentd/events/event_full.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/DataQualityMetricsInputDatasetFacet",
"rowCount": 1000,
"bytes": 1048576
"bytes": 1048576,
"fileCount": 5
},
"dataQualityAssertions": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
Expand Down Expand Up @@ -155,7 +156,8 @@
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/OutputStatisticsOutputDatasetFacet",
"rowCount": 2000,
"size": 2097152
"size": 2097152,
"fileCount": 5
}
},
"facets": {
Expand Down
5 changes: 3 additions & 2 deletions proxy/fluentd/events/event_invalid_input_dataset_facet.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/DataQualityMetricsInputDatasetFacet",
"noRowCount": 1000,
"bytes": 1048576
"bytes": 1048576,
"fileCount": 5
}
}
}
],
"outputs": [ ],
"outputs": [],
"producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/RunEvent"
}
7 changes: 4 additions & 3 deletions proxy/fluentd/events/event_invalid_output_dataset_facet.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"namespace": "my-scheduler-namespace",
"name": "myjob"
},
"inputs": [ ],
"inputs": [],
"outputs": [
{
"namespace": "my-datasource-namespace",
Expand All @@ -17,8 +17,9 @@
"outputStatistics": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/OutputStatisticsOutputDatasetFacet",
"notARowCount": 2000,
"size": 2097152
"rowCount": false,
"size": 2097152,
"fileCount": 5
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion proxy/fluentd/test/plugin/test_parser_openlineage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class OpenlineageParserTest < Test::Unit::TestCase
err = assert_raise Fluent::ParserError do
@parser.instance.parse(ol_event)
end
assert_match(/Openlineage validation failed: (.+) "rowCount" is a required property/, err.message)
assert_match(/Openlineage validation failed: (.+) path \"\/outputs\/0\/outputFacets\/outputStatistics\/rowCount/, err.message)
end

private
Expand Down
6 changes: 5 additions & 1 deletion spec/facets/DataQualityMetricsInputDatasetFacet.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://openlineage.io/spec/facets/1-0-0/DataQualityMetricsInputDatasetFacet.json",
"$id": "https://openlineage.io/spec/facets/1-0-1/DataQualityMetricsInputDatasetFacet.json",
"$defs": {
"DataQualityMetricsInputDatasetFacet": {
"allOf": [
Expand All @@ -19,6 +19,10 @@
"description": "The size in bytes",
"type": "integer"
},
"fileCount": {
"description": "The number of files evaluated",
"type": "integer"
},
"columnMetrics": {
"description": "The property key is the column name",
"type": "object",
Expand Down
9 changes: 6 additions & 3 deletions spec/facets/OutputStatisticsOutputDatasetFacet.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://openlineage.io/spec/facets/1-0-0/OutputStatisticsOutputDatasetFacet.json",
"$id": "https://openlineage.io/spec/facets/1-0-1/OutputStatisticsOutputDatasetFacet.json",
"$defs": {
"OutputStatisticsOutputDatasetFacet": {
"allOf": [
Expand All @@ -17,9 +17,12 @@
"size": {
"description": "The size in bytes written to the dataset",
"type": "integer"
},
"fileCount": {
"description": "The number of files written to the dataset",
"type": "integer"
}
},
"required": ["rowCount"]
}
}
]
}
Expand Down
2 changes: 1 addition & 1 deletion spec/tests/DataQualityMetricsInputDatasetFacet/1.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
}
},
"rowCount": 10,

"fileCount": 5,
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/facets/1-0-0/DataQualityMetricsInputDatasetFacet.json"
}
Expand Down
3 changes: 2 additions & 1 deletion spec/tests/OutputStatisticsOutputDatasetFacet/1.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/OutputStatisticsOutputDatasetFacet",
"rowCount": 2000,
"size": 2097152
"size": 2097152,
"fileCount": 5
}
}

0 comments on commit 35f9b65

Please sign in to comment.