Skip to content

Commit

Permalink
[FLINK] improve Cassandra lineage metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenqiu Huang <huangzhenqiu0825@gmail.com>
  • Loading branch information
HuangZhenQiu authored and Peter (ACS) Huang committed Feb 29, 2024
1 parent 906f359 commit cd699b5
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 32 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## [Unreleased](https://github.com/OpenLineage/OpenLineage/compare/1.9.1...HEAD)
* **Flink: improve Cassandra lineage metadata** (https://github.com/OpenLineage/OpenLineage/pull/2479) [@HuangZhenQiu](https://github.com/HuangZhenQiu)
*Use Cassandra cluster info as dataset namespace, and combine keyspace with table name as dataset name.*
* **Flink: bump Flink JDBC connector version to 3.1.2-1.18 for Flink 1.18** (https://github.com/OpenLineage/OpenLineage/pull/2472) [@HuangZhenQiu](https://github.com/HuangZhenQiu)
*Bump Flink JDBC connector version to 3.1.2-1.18 for Flink 1.18.*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
"name": "flink_examples_cassandra"
},
"inputs": [{
"namespace": "flink",
"name": "source_event"
"namespace": "${json-unit.any-string}",
"name": "flink.source_event"
}],
"outputs" : [{
"namespace" : "flink",
"name" : "sink_event"
"namespace" : "${json-unit.any-string}",
"name" : "flink.sink_event"
}]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@

package io.openlineage.flink.utils;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.mapping.annotations.Table;
import io.openlineage.flink.visitor.wrapper.WrapperUtils;
import java.lang.annotation.Annotation;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

@Slf4j
public class CassandraUtils {
public static final String CASSANDRA_MANAGER_CLASS = "com.datastax.driver.core.Cluster$Manager";

public static boolean hasClasses() {
try {
Expand All @@ -37,4 +42,45 @@ public static Optional<Table> extractTableAnnotation(Class pojo) {

return Optional.empty();
}

public static Optional<String> findNamespaceFromBuilder(
Optional<ClusterBuilder> clusterBuilderOpt) {
try {
if (clusterBuilderOpt.isPresent()) {
Cluster cluster = clusterBuilderOpt.get().getCluster();
if (cluster != null) {
Optional<Object> managerOpt =
WrapperUtils.<Object>getFieldValue(Cluster.class, cluster, "manager");
if (managerOpt.isPresent()) {
Class managerClass = Class.forName(CASSANDRA_MANAGER_CLASS);
Optional<List<Object>> endpointsOpt =
WrapperUtils.<List<Object>>getFieldValue(
managerClass, managerOpt.get(), "contactPoints");
return CassandraUtils.convertToNamespace(endpointsOpt);
}
}
}
} catch (ClassNotFoundException e) {
log.error("Failed load class required to infer the Cassandra namespace name", e);
}

return Optional.of("");
}

public static ClusterBuilder createClusterBuilder(String contactPoints) {
return new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint(contactPoints).build();
}
};
}

private static Optional<String> convertToNamespace(Optional<List<Object>> endpointsOpt) {
if (endpointsOpt.isPresent() && !endpointsOpt.isEmpty()) {
return Optional.of("cassandra://" + endpointsOpt.get().get(0).toString().split("/")[1]);
}

return Optional.of("");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public List<OpenLineage.OutputDataset> apply(Object object) {
}

return Collections.singletonList(
createOutputDataset(context, sinkWrapper.getKeySpace(), sinkWrapper.getTableName()));
createOutputDataset(
context, (String) sinkWrapper.getNamespace().get(), sinkWrapper.getName()));
}

private CassandraSinkWrapper createWrapperForSink(Object object) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public List<OpenLineage.InputDataset> apply(Object object) {
}

return Collections.singletonList(
createInputDataset(context, sourceWrapper.getKeyspace(), sourceWrapper.getTableName()));
createInputDataset(
context, (String) sourceWrapper.getNamespace().get(), sourceWrapper.getName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

@Slf4j
public class CassandraSinkWrapper<T> {
public static final String CASSANDRA_OUTPUT_FORMAT_BASE_CLASS =
"org.apache.flink.batch.connectors.cassandra.CassandraOutputFormatBase";
public static final String POJO_OUTPUT_CLASS_FIELD_NAME = "outputClass";
public static final String POJO_CLASS_FIELD_NAME = "clazz";
public static final String INSERT_QUERY_FIELD_NAME = "insertQuery";
Expand All @@ -37,23 +43,32 @@ public CassandraSinkWrapper(T sink, Class sinkClass, String fieldName, boolean h
this.fieldName = fieldName;
}

public String getKeySpace() {
if (hasInsertQuery) {
return extractFromQuery(1);
} else {
Class pojoClass = getField(fieldName);
Optional<Table> table = CassandraUtils.extractTableAnnotation(pojoClass);
return table.map(t -> t.keyspace()).orElseThrow();
public Optional<String> getNamespace() {
try {
Class outputFormatBase = Class.forName(CASSANDRA_OUTPUT_FORMAT_BASE_CLASS);
if (outputFormatBase.isAssignableFrom(sink.getClass())) {
Optional<ClusterBuilder> clusterBuilderOpt =
WrapperUtils.<ClusterBuilder>getFieldValue(outputFormatBase, sink, "builder");
return CassandraUtils.findNamespaceFromBuilder(clusterBuilderOpt);
} else if (sink instanceof CassandraSinkBase) {
Optional<ClusterBuilder> clusterBuilderOpt =
WrapperUtils.<ClusterBuilder>getFieldValue(CassandraSinkBase.class, sink, "builder");
return CassandraUtils.findNamespaceFromBuilder(clusterBuilderOpt);
}
} catch (ClassNotFoundException e) {
log.error("Failed load class required to infer the Cassandra namespace name", e);
}

return Optional.of("");
}

public String getTableName() {
public String getName() {
if (hasInsertQuery) {
return extractFromQuery(2);
return String.join(".", extractFromQuery(1), extractFromQuery(2));
} else {
Class pojoClass = getField(fieldName);
Optional<Table> table = CassandraUtils.extractTableAnnotation(pojoClass);
return table.map(t -> t.name()).orElseThrow();
return table.map(t -> String.join(".", t.keyspace(), t.name())).orElseThrow();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.batch.connectors.cassandra.CassandraInputFormatBase;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;

public class CassandraSourceWrapper<T> {
private static final Pattern SELECT_REGEXP =
Expand All @@ -33,23 +35,24 @@ public static <T> CassandraSourceWrapper of(T source, Class sourceClass, boolean
return new CassandraSourceWrapper(source, sourceClass, hasQuery);
}

public String getKeyspace() {
if (hasQuery) {
return extractFromQuery(1);
} else {
Class pojoClass = getField(POJO_CLASS_FIELD_NAME);
Optional<Table> table = CassandraUtils.extractTableAnnotation(pojoClass);
return table.map(t -> t.keyspace()).orElseThrow();
public Optional<String> getNamespace() {
if (source instanceof CassandraInputFormatBase) {
Optional<ClusterBuilder> clusterBuilderOpt =
WrapperUtils.<ClusterBuilder>getFieldValue(
CassandraInputFormatBase.class, source, "builder");
return CassandraUtils.findNamespaceFromBuilder(clusterBuilderOpt);
}

return Optional.of("");
}

public String getTableName() {
public String getName() {
if (hasQuery) {
return extractFromQuery(2);
return String.join(".", extractFromQuery(1), extractFromQuery(2));
} else {
Class pojoClass = getField(POJO_CLASS_FIELD_NAME);
Optional<Table> table = CassandraUtils.extractTableAnnotation(pojoClass);
return table.map(t -> t.name()).orElseThrow();
return table.map(t -> String.join(".", t.keyspace(), t.name())).orElseThrow();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.openlineage.flink.api.OpenLineageContext;
import io.openlineage.flink.client.EventEmitter;
import io.openlineage.flink.pojo.Event;
import io.openlineage.flink.utils.CassandraUtils;
import java.util.List;
import java.util.stream.Stream;
import lombok.SneakyThrows;
Expand All @@ -34,7 +35,6 @@
import org.junit.jupiter.params.provider.MethodSource;

public class CassandraSinkVisitorTest {
private static ClusterBuilder clusterBuilder = mock(ClusterBuilder.class);
private static String insertQuery = "INSERT INTO flink.sink_event (id) VALUES (uuid());";
OpenLineageContext context = mock(OpenLineageContext.class);
OpenLineage openLineage = new OpenLineage(EventEmitter.OPEN_LINEAGE_CLIENT_URI);
Expand Down Expand Up @@ -69,11 +69,12 @@ public void testApply(Object sink) {
List<OpenLineage.OutputDataset> outputDatasets = cassandraSinkVisitor.apply(sink);

assertEquals(1, outputDatasets.size());
assertEquals("flink", outputDatasets.get(0).getNamespace());
assertEquals("sink_event", outputDatasets.get(0).getName());
assertEquals("cassandra://127.0.0.1:9042", outputDatasets.get(0).getNamespace());
assertEquals("flink.sink_event", outputDatasets.get(0).getName());
}

private static Stream<Arguments> provideArguments() {
ClusterBuilder clusterBuilder = CassandraUtils.createClusterBuilder("127.0.0.1");
CassandraPojoOutputFormat pojoOutputFormat =
new CassandraPojoOutputFormat(clusterBuilder, Event.class);
CassandraPojoSink pojoSink = new CassandraPojoSink(Event.class, clusterBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.openlineage.flink.api.OpenLineageContext;
import io.openlineage.flink.client.EventEmitter;
import io.openlineage.flink.pojo.Event;
import io.openlineage.flink.utils.CassandraUtils;
import java.util.List;
import java.util.stream.Stream;
import lombok.SneakyThrows;
Expand All @@ -28,7 +29,6 @@
import org.junit.jupiter.params.provider.MethodSource;

class CassandraSourceVisitorTest {
private static ClusterBuilder clusterBuilder = mock(ClusterBuilder.class);
private static String query = "SELECT * FROM flink.source_event;";
OpenLineageContext context = mock(OpenLineageContext.class);
OpenLineage openLineage = new OpenLineage(EventEmitter.OPEN_LINEAGE_CLIENT_URI);
Expand All @@ -55,11 +55,12 @@ public void testApply(Object source) {
List<OpenLineage.InputDataset> inputDatasets = cassandraSourceVisitor.apply(source);

assertEquals(1, inputDatasets.size());
assertEquals("flink", inputDatasets.get(0).getNamespace());
assertEquals("source_event", inputDatasets.get(0).getName());
assertEquals("cassandra://127.0.0.1:9042", inputDatasets.get(0).getNamespace());
assertEquals("flink.source_event", inputDatasets.get(0).getName());
}

private static Stream<Arguments> provideArguments() {
ClusterBuilder clusterBuilder = CassandraUtils.createClusterBuilder("127.0.0.1");
CassandraPojoInputFormat pojoOutputFormat =
new CassandraPojoInputFormat(query, clusterBuilder, Event.class);
CassandraInputFormat inputFormat = new CassandraInputFormat(query, clusterBuilder);
Expand Down

0 comments on commit cd699b5

Please sign in to comment.