Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink | fix JDBC dataset naming #2508

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*Use Cassandra cluster info as dataset namespace, and combine keyspace with table name as dataset name.*
* **Flink: bump Flink JDBC connector version to 3.1.2-1.18 for Flink 1.18** (https://github.com/OpenLineage/OpenLineage/pull/2472) [@HuangZhenQiu](https://github.com/HuangZhenQiu)
*Bump Flink JDBC connector version to 3.1.2-1.18 for Flink 1.18.*
* **Flink: Fix dataset naming for JDBC datasets** (https://github.com/OpenLineage/OpenLineage/pull/2508) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Use common code for naming JDBC datasets for both Flink and Spark. Adjust to naming convention.*

### Fixed
* **Flink: fix 1.9.1 version.** [`#2507`](https://github.com/OpenLineage/OpenLineage/pull/2507) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.client.utils;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JdbcUtils {
public static final String SLASH_DELIMITER_USER_PASSWORD_REGEX =
"[A-Za-z0-9_%]+//?[A-Za-z0-9_%]*@";
public static final String COLON_DELIMITER_USER_PASSWORD_REGEX =
"([/|,])[A-Za-z0-9_%]+:?[A-Za-z0-9_%]*@";
public static final String ALPHANUMERIC = "[A-Za-z0-9]+";

/**
* JdbcUrl can contain username and password this method clean-up credentials from jdbcUrl and
* strip the jdbc prefix from the url
*
* @return String
*/
public static String sanitizeJdbcUrl(String jdbcUrl) {
return jdbcUrl
.replaceFirst("^jdbc:", "")
.replaceFirst("^postgresql:", "postgres:")
.replaceAll(SLASH_DELIMITER_USER_PASSWORD_REGEX, "@")
.replaceAll(COLON_DELIMITER_USER_PASSWORD_REGEX, "$1")
.replaceAll("(?<=[?,;&:)=])\\(?(?i)(?:user|username|password)=[^;&,)]+(?:[;&;)]|$)", "")
.replaceAll("\\?.*$", "");
}

public static DatasetIdentifier getDatasetIdentifierFromJdbcUrl(String jdbcUrl, String name) {
List<String> parts = Arrays.stream(name.split("\\.")).collect(Collectors.toList());
return getDatasetIdentifierFromJdbcUrl(jdbcUrl, parts);
}

/**
* The algorithm for this method is as follows. First we parse URI and check if it includes path
* part of URI. If yes, then we check if it contains database. Database is the first part after
* slash in URI - the "db" in something like postgres://host:5432/db. If it does contain it, and
* provided parts list has less than three elements, then we use it as database part of name -
* this indicates that database is the default one in this context. Otherwise, we take database
* from parts list.
*
* @param jdbcUrl String URI we want to take dataset identifier from
* @param parts Provided list of delimited parts of table qualified name parts. Can include
* database name.
* @return DatasetIdentifier
*/
public static DatasetIdentifier getDatasetIdentifierFromJdbcUrl(
String jdbcUrl, List<String> parts) {
jdbcUrl = sanitizeJdbcUrl(jdbcUrl);
String namespace = jdbcUrl;
String urlDatabase = null;

try {
URI uri = new URI(jdbcUrl);
String path = uri.getPath();
if (path != null) {
namespace = String.format("%s://%s", uri.getScheme(), uri.getAuthority());

if (path.startsWith("/")) {
path = path.substring(1);
}

if (path.length() > 1 && path.matches(ALPHANUMERIC)) {
urlDatabase = path;
}
}
} catch (URISyntaxException ignored) {
// If URI parsing fails, we can't do anything smart - let's return provided URI
// as a dataset namespace
}

if (urlDatabase != null && parts.size() <= 3) {
parts.add(0, urlDatabase);
}

String name = String.join(".", parts);

return new DatasetIdentifier(name, namespace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.client.utils;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;

public class JdbcUtilsTest {

@Test
void testSanitizeJdbc() {
assertThat(JdbcUtils.sanitizeJdbcUrl("postgres://localhost:5432"))
.isEqualTo("postgres://localhost:5432");

assertThat(JdbcUtils.sanitizeJdbcUrl("jdbc:postgresql://localhost:5432"))
.isEqualTo("postgres://localhost:5432");

assertThat(
JdbcUtils.sanitizeJdbcUrl(
"jdbc:postgresql://localhost:5432?user=postgres&password=postgres"))
.isEqualTo("postgres://localhost:5432");

assertThat(
JdbcUtils.sanitizeJdbcUrl(
"jdbc:postgresql://localhost:5432?username=postgres&password=postgres"))
.isEqualTo("postgres://localhost:5432");
}

@Test
void testGetDatasetIdentifierFromJdbcUrl() {
assertThat(
JdbcUtils.getDatasetIdentifierFromJdbcUrl("jdbc:postgresql://localhost:5432/", "table"))
.hasFieldOrPropertyWithValue("namespace", "postgres://localhost:5432")
.hasFieldOrPropertyWithValue("name", "table");

assertThat(
JdbcUtils.getDatasetIdentifierFromJdbcUrl(
"jdbc:postgresql://localhost:5432/db", "table"))
.hasFieldOrPropertyWithValue("namespace", "postgres://localhost:5432")
.hasFieldOrPropertyWithValue("name", "db.table");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
"name": "flink_examples_jdbc"
},
"inputs": [{
"namespace": "jdbc:postgresql://postgres:5432/postgres",
"name": "source_event"
"namespace": "postgres://postgres:5432",
"name": "postgres.source_event"
}],
"outputs" : [{
"namespace" : "jdbc:postgresql://postgres:5432/postgres",
"name" : "sink_event"
"namespace" : "postgres://postgres:5432",
"name" : "postgres.sink_event"
}]
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public void testApply(Object source) {
List<OpenLineage.OutputDataset> outputDatasets = jdbcSinkVisitor.apply(source);

assertEquals(1, outputDatasets.size());
assertEquals("jdbc:postgresql://host:port/database", outputDatasets.get(0).getNamespace());
assertEquals("jdbc_table", outputDatasets.get(0).getName());
assertEquals("postgres://host:port", outputDatasets.get(0).getNamespace());
assertEquals("database.jdbc_table", outputDatasets.get(0).getName());
}

private static Stream<Arguments> provideArguments() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public void testApply(Object source) {
List<OpenLineage.InputDataset> inputDatasets = jdbcSourceVisitor.apply(source);

assertEquals(1, inputDatasets.size());
assertEquals("jdbc:postgresql://host:port/database", inputDatasets.get(0).getNamespace());
assertEquals("jdbc_table", inputDatasets.get(0).getName());
assertEquals("postgres://host:port", inputDatasets.get(0).getNamespace());
assertEquals("database.jdbc_table", inputDatasets.get(0).getName());
}

private static Stream<Arguments> provideArguments() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public void testApply(Object source) {
List<OpenLineage.OutputDataset> outputDatasets = jdbcSinkVisitor.apply(source);

assertEquals(1, outputDatasets.size());
assertEquals("jdbc:postgresql://host:port/database", outputDatasets.get(0).getNamespace());
assertEquals("jdbc_table", outputDatasets.get(0).getName());
assertEquals("postgres://host:port", outputDatasets.get(0).getNamespace());
assertEquals("database.jdbc_table", outputDatasets.get(0).getName());
}

private static Stream<Arguments> provideArguments() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public void testApply(Object source) {
List<OpenLineage.InputDataset> inputDatasets = jdbcSourceVisitor.apply(source);

assertEquals(1, inputDatasets.size());
assertEquals("jdbc:postgresql://host:port/database", inputDatasets.get(0).getNamespace());
assertEquals("jdbc_table", inputDatasets.get(0).getName());
assertEquals("postgres://host:port", inputDatasets.get(0).getNamespace());
assertEquals("database.jdbc_table", inputDatasets.get(0).getName());
}

private static Stream<Arguments> provideArguments() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public void testApply(Object source) {
List<OpenLineage.OutputDataset> outputDatasets = jdbcSinkVisitor.apply(source);

assertEquals(1, outputDatasets.size());
assertEquals("jdbc:postgresql://host:port/database", outputDatasets.get(0).getNamespace());
assertEquals("jdbc_table", outputDatasets.get(0).getName());
assertEquals("postgres://host:port", outputDatasets.get(0).getNamespace());
assertEquals("database.jdbc_table", outputDatasets.get(0).getName());
}

private static Stream<Arguments> provideArguments() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public void testApply(Object source) {
List<OpenLineage.InputDataset> inputDatasets = jdbcSourceVisitor.apply(source);

assertEquals(1, inputDatasets.size());
assertEquals("jdbc:postgresql://host:port/database", inputDatasets.get(0).getNamespace());
assertEquals("jdbc_table", inputDatasets.get(0).getName());
assertEquals("postgres://host:port", inputDatasets.get(0).getNamespace());
assertEquals("database.jdbc_table", inputDatasets.get(0).getName());
}

private static Stream<Arguments> provideArguments() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package io.openlineage.flink.visitor;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.client.utils.JdbcUtils;
import io.openlineage.flink.api.OpenLineageContext;
import io.openlineage.flink.visitor.wrapper.JdbcSinkWrapper;
import java.util.Collections;
Expand Down Expand Up @@ -53,8 +55,9 @@ public List<OpenLineage.OutputDataset> apply(Object object) {
String.format("Unsupported JDBC sink type %s", object.getClass().getCanonicalName()));
}

return Collections.singletonList(
createOutputDataset(
context, sinkWrapper.getConnectionUrl(), sinkWrapper.getTableName().get()));
DatasetIdentifier di =
JdbcUtils.getDatasetIdentifierFromJdbcUrl(
sinkWrapper.getConnectionUrl(), sinkWrapper.getTableName().get());
return Collections.singletonList(createOutputDataset(context, di.getNamespace(), di.getName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package io.openlineage.flink.visitor;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.client.utils.JdbcUtils;
import io.openlineage.flink.api.OpenLineageContext;
import io.openlineage.flink.visitor.wrapper.JdbcSourceWrapper;
import java.util.Collections;
Expand Down Expand Up @@ -48,8 +50,9 @@ public List<OpenLineage.InputDataset> apply(Object object) {
String.format("Unsupported JDBC Source type %s", object.getClass().getCanonicalName()));
}

return Collections.singletonList(
createInputDataset(
context, sourceWrapper.getConnectionUrl(), sourceWrapper.getTableName().get()));
DatasetIdentifier di =
JdbcUtils.getDatasetIdentifierFromJdbcUrl(
sourceWrapper.getConnectionUrl(), sourceWrapper.getTableName().get());
return Collections.singletonList(createInputDataset(context, di.getNamespace(), di.getName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

import io.openlineage.client.OpenLineage;
import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.spark.agent.util.JdbcUtils;
import io.openlineage.client.utils.JdbcUtils;
import io.openlineage.spark.agent.util.JdbcSparkUtils;
import io.openlineage.spark.api.DatasetFactory;
import io.openlineage.sql.ColumnMeta;
import io.openlineage.sql.DbTableMeta;
Expand Down Expand Up @@ -45,7 +46,7 @@ public List<D> handleRelation(LogicalRelation x) {
}

public List<D> getDatasets(JDBCRelation relation, String url) {
Optional<SqlMeta> sqlMeta = JdbcUtils.extractQueryFromSpark(relation);
Optional<SqlMeta> sqlMeta = JdbcSparkUtils.extractQueryFromSpark(relation);
if (!sqlMeta.isPresent()) {
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.openlineage.spark.agent.util;

import static io.openlineage.client.utils.JdbcUtils.sanitizeJdbcUrl;

import com.google.common.base.CharMatcher;
import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.sql.ColumnLineage;
Expand All @@ -27,25 +29,7 @@
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation;

@Slf4j
public class JdbcUtils {
/**
* JdbcUrl can contain username and password this method clean-up credentials from jdbcUrl and
* strip the jdbc prefix from the url
*/
public static String sanitizeJdbcUrl(String jdbcUrl) {
return jdbcUrl
.replaceFirst("^jdbc:", "")
.replaceFirst("^postgresql:", "postgres:")
.replaceAll(PlanUtils.SLASH_DELIMITER_USER_PASSWORD_REGEX, "@")
.replaceAll(PlanUtils.COLON_DELIMITER_USER_PASSWORD_REGEX, "$1")
.replaceAll("(?<=[?,;&:)=])\\(?(?i)(?:user|username|password)=[^;&,)]+(?:[;&;)]|$)", "")
.replaceAll("\\?.+$", "");
}

public static DatasetIdentifier getDatasetIdentifierFromJdbcUrl(String jdbcUrl, String name) {
List<String> parts = Arrays.stream(name.split("\\.")).collect(Collectors.toList());
return getDatasetIdentifierFromJdbcUrl(jdbcUrl, parts);
}
public class JdbcSparkUtils {

/**
* The algorithm for this method is as follows. First we parse URI and check if it includes path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@
*/
@Slf4j
public class PlanUtils {

public static final String SLASH_DELIMITER_USER_PASSWORD_REGEX =
"[A-Za-z0-9_%]+//?[A-Za-z0-9_%]*@";
public static final String COLON_DELIMITER_USER_PASSWORD_REGEX =
"([/|,])[A-Za-z0-9_%]+:?[A-Za-z0-9_%]*@";

/**
* Merge a list of {@link PartialFunction}s and return the first value where the function is
* defined or empty list if no function matches the input.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package io.openlineage.spark3.agent.lifecycle.plan.catalog;

import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.spark.agent.util.JdbcUtils;
import io.openlineage.spark.agent.util.JdbcSparkUtils;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -45,7 +45,7 @@ public DatasetIdentifier getDatasetIdentifier(
Stream.concat(Arrays.stream(identifier.namespace()), Stream.of(identifier.name()))
.collect(Collectors.toList());

return JdbcUtils.getDatasetIdentifierFromJdbcUrl(options.url(), parts);
return JdbcSparkUtils.getDatasetIdentifierFromJdbcUrl(options.url(), parts);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@

import com.google.cloud.spark.bigquery.BigQueryRelation;
import io.openlineage.client.utils.DatasetIdentifier;
import io.openlineage.client.utils.JdbcUtils;
import io.openlineage.spark.agent.lifecycle.Rdds;
import io.openlineage.spark.agent.lifecycle.plan.column.ColumnLevelLineageBuilder;
import io.openlineage.spark.agent.util.BigQueryUtils;
import io.openlineage.spark.agent.util.JdbcUtils;
import io.openlineage.spark.agent.util.JdbcSparkUtils;
import io.openlineage.spark.agent.util.PathUtils;
import io.openlineage.spark.agent.util.PlanUtils;
import io.openlineage.spark.agent.util.ScalaConversionUtils;
Expand Down Expand Up @@ -135,7 +136,7 @@ private static List<DatasetIdentifier> extractDatasetIdentifier(
}

private static List<DatasetIdentifier> extractDatasetIdentifier(JDBCRelation relation) {
Optional<SqlMeta> sqlMeta = JdbcUtils.extractQueryFromSpark(relation);
Optional<SqlMeta> sqlMeta = JdbcSparkUtils.extractQueryFromSpark(relation);
return sqlMeta
.map(
meta ->
Expand Down