Skip to content

Commit

Permalink
feat(spark-lineage): make platform for spark lineage inferred and con…
Browse files Browse the repository at this point in the history
…figurable (#6251)

Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
  • Loading branch information
danielli-ziprecruiter and treff7es committed Oct 27, 2022
1 parent 615865f commit 7b2a76d
Show file tree
Hide file tree
Showing 16 changed files with 74 additions and 31 deletions.
2 changes: 2 additions & 0 deletions metadata-integration/java/spark-lineage/README.md
Expand Up @@ -132,6 +132,8 @@ The Spark agent can be configured using Databricks Cluster [Spark configuration]
| spark.datahub.metadata.pipeline.platformInstance| | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance| | | dataset level platform instance |
| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD |
| spark.datahub.metadata.table.Platform | | hive | Platform for tables |
| spark.datahub.metadata.include_scheme | | true | Include scheme (e.g. hdfs://, s3://) in dataset URN |
| spark.datahub.coalesce_jobs | | false | Only one datajob(taask) will be emitted containing all input and output datasets for the spark application |
| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true |

Expand Down
Expand Up @@ -58,6 +58,8 @@ public class DatasetExtractor {
CreateDataSourceTableAsSelectCommand.class, CreateHiveTableAsSelectCommand.class, InsertIntoHiveTable.class);
private static final String DATASET_ENV_KEY = "metadata.dataset.env";
private static final String DATASET_PLATFORM_INSTANCE_KEY = "metadata.dataset.platformInstance";
private static final String TABLE_PLATFORM_KEY = "metadata.table.platform";
private static final String INCLUDE_SCHEME_KEY = "metadata.include_scheme";
// TODO InsertIntoHiveDirCommand, InsertIntoDataSourceDirCommand

private DatasetExtractor() {
Expand Down Expand Up @@ -120,10 +122,12 @@ Optional<? extends Collection<SparkDataset>> fromSparkPlanNode(SparkPlan plan, S
InsertIntoHadoopFsRelationCommand cmd = (InsertIntoHadoopFsRelationCommand) p;
if (cmd.catalogTable().isDefined()) {
return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.catalogTable().get(),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
getCommonPlatformInstance(datahubConfig), getTablePlatform(datahubConfig),
getCommonFabricType(datahubConfig))));
}
return Optional.of(Collections.singletonList(new HdfsPathDataset(cmd.outputPath(),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
getCommonPlatformInstance(datahubConfig), getIncludeScheme(datahubConfig),
getCommonFabricType(datahubConfig))));
});

PLAN_TO_DATASET.put(LogicalRelation.class, (p, ctx, datahubConfig) -> {
Expand Down Expand Up @@ -153,23 +157,27 @@ Optional<? extends Collection<SparkDataset>> fromSparkPlanNode(SparkPlan plan, S
CreateDataSourceTableAsSelectCommand cmd = (CreateDataSourceTableAsSelectCommand) p;
// TODO what of cmd.mode()
return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.table(),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
getCommonPlatformInstance(datahubConfig), getTablePlatform(datahubConfig),
getCommonFabricType(datahubConfig))));
});
PLAN_TO_DATASET.put(CreateHiveTableAsSelectCommand.class, (p, ctx, datahubConfig) -> {
CreateHiveTableAsSelectCommand cmd = (CreateHiveTableAsSelectCommand) p;
return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.tableDesc(),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
getCommonPlatformInstance(datahubConfig), getTablePlatform(datahubConfig),
getCommonFabricType(datahubConfig))));
});
PLAN_TO_DATASET.put(InsertIntoHiveTable.class, (p, ctx, datahubConfig) -> {
InsertIntoHiveTable cmd = (InsertIntoHiveTable) p;
return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.table(),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
getCommonPlatformInstance(datahubConfig), getTablePlatform(datahubConfig),
getCommonFabricType(datahubConfig))));
});

PLAN_TO_DATASET.put(HiveTableRelation.class, (p, ctx, datahubConfig) -> {
HiveTableRelation cmd = (HiveTableRelation) p;
return Optional.of(Collections.singletonList(new CatalogTableDataset(cmd.tableMeta(),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
getCommonPlatformInstance(datahubConfig), getTablePlatform(datahubConfig),
getCommonFabricType(datahubConfig))));
});

REL_TO_DATASET.put(HadoopFsRelation.class, (r, ctx, datahubConfig) -> {
Expand All @@ -178,7 +186,8 @@ Optional<? extends Collection<SparkDataset>> fromSparkPlanNode(SparkPlan plan, S

// TODO mapping to URN TBD
return Optional.of(Collections.singletonList(new HdfsPathDataset(res.get(0),
getCommonPlatformInstance(datahubConfig), getCommonFabricType(datahubConfig))));
getCommonPlatformInstance(datahubConfig), getIncludeScheme(datahubConfig),
getCommonFabricType(datahubConfig))));
});
REL_TO_DATASET.put(JDBCRelation.class, (r, ctx, datahubConfig) -> {
JDBCRelation rel = (JDBCRelation) r;
Expand Down Expand Up @@ -258,4 +267,14 @@ private static String getCommonPlatformInstance(Config datahubConfig) {
return datahubConfig.hasPath(DATASET_PLATFORM_INSTANCE_KEY) ? datahubConfig.getString(DATASET_PLATFORM_INSTANCE_KEY)
: null;
}

private static String getTablePlatform(Config datahubConfig) {
return datahubConfig.hasPath(TABLE_PLATFORM_KEY) ? datahubConfig.getString(TABLE_PLATFORM_KEY)
: "hive";
}

private static boolean getIncludeScheme(Config datahubConfig) {
return datahubConfig.hasPath(INCLUDE_SCHEME_KEY) ? datahubConfig.getBoolean(INCLUDE_SCHEME_KEY)
: true;
}
}
Expand Up @@ -9,12 +9,12 @@
@ToString
public class CatalogTableDataset extends SparkDataset {

public CatalogTableDataset(CatalogTable table, String platformInstance, FabricType fabricType) {
this(table.qualifiedName(), platformInstance, fabricType);
public CatalogTableDataset(CatalogTable table, String platformInstance, String platform, FabricType fabricType) {
this(table.qualifiedName(), platformInstance, platform, fabricType);
}

public CatalogTableDataset(String dsName, String platformInstance, FabricType fabricType) {
super("hive", platformInstance, dsName, fabricType);
public CatalogTableDataset(String dsName, String platformInstance, String platform, FabricType fabricType) {
super(platform, platformInstance, dsName, fabricType);
}

}
Expand Up @@ -6,17 +6,38 @@

import lombok.ToString;

import java.net.URI;

@ToString
public class HdfsPathDataset extends SparkDataset {

public HdfsPathDataset(Path path, String platformInstance, FabricType fabricType) {

private static String getPath(Path path, boolean includeScheme) {
URI uri = path.toUri();

if (includeScheme) {
return uri.toString();
} else {
return uri.getHost() + uri.getPath();
}
}

private static String getPlatform(Path path) {
String scheme = path.toUri().getScheme();
if (scheme.equals("s3a") || scheme.equals("s3n")) {
return "s3";
} else {
return scheme;
}
}

public HdfsPathDataset(Path path, String platformInstance, boolean includeScheme, FabricType fabricType) {
// TODO check static partitions?
this(path.toUri().toString(), platformInstance, fabricType);
this(getPath(path, includeScheme), platformInstance, getPlatform(path), fabricType);
}

public HdfsPathDataset(String pathUri, String platformInstance, FabricType fabricType) {
public HdfsPathDataset(String pathUri, String platformInstance, String platform, FabricType fabricType) {
// TODO check static partitions?
super("hdfs", platformInstance, pathUri, fabricType);
super(platform, platformInstance, pathUri, fabricType);
}

}
Expand Up @@ -86,6 +86,7 @@ public class TestSparkJobsLineage {
private static final FabricType DATASET_ENV = FabricType.DEV;
private static final String PIPELINE_PLATFORM_INSTANCE = "test_machine";
private static final String DATASET_PLATFORM_INSTANCE = "test_dev_dataset";
private static final String TABLE_PLATFORM = "hive";

@ClassRule
public static PostgreSQLContainer<?> db;
Expand Down Expand Up @@ -221,15 +222,15 @@ private static DatasetLineage dsl(String callSite, SparkDataset sink, SparkDatas
}

private static HdfsPathDataset hdfsDs(String fileName) {
return new HdfsPathDataset("file:" + abs(DATA_DIR + "/" + fileName), DATASET_PLATFORM_INSTANCE, DATASET_ENV);
return new HdfsPathDataset("file:" + abs(DATA_DIR + "/" + fileName), DATASET_PLATFORM_INSTANCE, "hdfs", DATASET_ENV);
}

private static JdbcDataset pgDs(String tbl) {
return new JdbcDataset(db.getJdbcUrl(), tbl, DATASET_PLATFORM_INSTANCE, DATASET_ENV);
}

private static CatalogTableDataset catTblDs(String tbl) {
return new CatalogTableDataset(tbl(tbl), DATASET_PLATFORM_INSTANCE, DATASET_ENV);
return new CatalogTableDataset(tbl(tbl), DATASET_PLATFORM_INSTANCE, TABLE_PLATFORM, DATASET_ENV);
}

private static String tbl(String tbl) {
Expand Down
@@ -1 +1 @@
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_6)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in2.csv,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/out.csv,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_6)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:file,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:file,test_dev_dataset.file:/src/test/resources/data/in2.csv,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:file,test_dev_dataset.file:/src/test/resources/data/out.csv,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
@@ -1,3 +1,3 @@
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_9)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in2.csv,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,test_dev_dataset.sparktestdb.foo4,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_10)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in2.csv,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,test_dev_dataset.sparktestdb.foo4,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_11)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in2.csv,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,test_dev_dataset.sparktestdb.foo4,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_9)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:file,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:file,test_dev_dataset.file:/src/test/resources/data/in2.csv,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,test_dev_dataset.sparktestdb.foo4,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_10)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:file,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:file,test_dev_dataset.file:/src/test/resources/data/in2.csv,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,test_dev_dataset.sparktestdb.foo4,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_11)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:file,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:file,test_dev_dataset.file:/src/test/resources/data/in2.csv,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,test_dev_dataset.sparktestdb.foo4,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
@@ -1 +1 @@
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_13)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:postgres,test_dev_dataset.sparktestdb.foo2,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:postgres,test_dev_dataset.sparktestdb.foo3,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_13)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:file,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:postgres,test_dev_dataset.sparktestdb.foo2,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:postgres,test_dev_dataset.sparktestdb.foo3,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
@@ -1,4 +1,4 @@
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_18)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in2.csv,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,test_dev_dataset.sparktestdb.foo5,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_18)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:file,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:file,test_dev_dataset.file:/src/test/resources/data/in2.csv,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,test_dev_dataset.sparktestdb.foo5,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_19)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,test_dev_dataset.sparktestdb.foo5,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,test_dev_dataset.sparktestdb.hivetab,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_20)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,test_dev_dataset.sparktestdb.foo5,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,test_dev_dataset.sparktestdb.hivetab,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_21)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,test_dev_dataset.sparktestdb.foo5,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hive,test_dev_dataset.sparktestdb.hivetab,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
@@ -1 +1 @@
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_24)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:hdfs,test_dev_dataset.file:/src/test/resources/data/in2.csv,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:postgres,test_dev_dataset.sparktestdb.foo1,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}
{ "proposal" :{"aspectName":"dataJobInputOutput","entityUrn":"urn:li:dataJob:(urn:li:dataFlow:(spark,test_machine.sparkTestApp,local),QueryExecId_24)","entityType":"dataJob","aspect":{"value":"{\"inputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:file,test_dev_dataset.file:/src/test/resources/data/in1.csv,DEV)\",\"urn:li:dataset:(urn:li:dataPlatform:file,test_dev_dataset.file:/src/test/resources/data/in2.csv,DEV)\"],\"outputDatasets\":[\"urn:li:dataset:(urn:li:dataPlatform:postgres,test_dev_dataset.sparktestdb.foo1,DEV)\"]}","contentType":"application/json"},"changeType":"UPSERT"}}

0 comments on commit 7b2a76d

Please sign in to comment.