Skip to content

Commit

Permalink
Fix parsing JDBCOptions(table=...) containing subquery
Browse files Browse the repository at this point in the history
Signed-off-by: Martynov Maxim <martinov_m_s_@mail.ru>
  • Loading branch information
dolfinus committed Mar 29, 2024
1 parent 1c60584 commit 5682e76
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ public static Optional<SqlMeta> extractQueryFromSpark(JDBCRelation relation) {
Optional<String> table =
ScalaConversionUtils.asJavaOptional(
relation.jdbcOptions().parameters().get(JDBCOptions$.MODULE$.JDBC_TABLE_NAME()));
if (table.isPresent()) {
// in some cases table value can be "(SELECT col1, col2 FROM table_name WHERE some='filter')
// ALIAS"
if (table.isPresent() && !table.get().startsWith("(")) {
DbTableMeta origin = new DbTableMeta(null, null, table.get());
return Optional.of(
new SqlMeta(
Expand All @@ -97,28 +99,31 @@ public static Optional<SqlMeta> extractQueryFromSpark(JDBCRelation relation) {
Collections.singletonList(new ColumnMeta(origin, field.name()))))
.collect(Collectors.toList()),
Collections.emptyList()));
} else {
String tableOrQuery = relation.jdbcOptions().tableOrQuery();
String query =
tableOrQuery.substring(0, tableOrQuery.lastIndexOf(")")).replaceFirst("\\(", "");

String dialect = extractDialectFromJdbcUrl(relation.jdbcOptions().url());
Optional<SqlMeta> sqlMeta = OpenLineageSql.parse(Collections.singletonList(query), dialect);

if (!sqlMeta.get().errors().isEmpty()) { // error return nothing
log.error(
String.format(
"error while parsing query: %s",
sqlMeta.get().errors().stream()
.map(ExtractionError::toString)
.collect(Collectors.joining(","))));
return Optional.empty();
} else if (sqlMeta.get().inTables().isEmpty()) {
log.error("no tables defined in query, this should not happen");
return Optional.empty();
}
return Optional.of(sqlMeta.get());
}

String tableOrQuery = relation.jdbcOptions().tableOrQuery();
String query = tableOrQuery.substring(0, tableOrQuery.lastIndexOf(")")).replaceFirst("\\(", "");

String dialect = extractDialectFromJdbcUrl(relation.jdbcOptions().url());
Optional<SqlMeta> sqlMeta = OpenLineageSql.parse(Collections.singletonList(query), dialect);

if (!sqlMeta.isPresent()) { // missing JNI library
return sqlMeta;
}
if (!sqlMeta.get().errors().isEmpty()) { // error return nothing
log.error(
String.format(
"error while parsing query: %s",
sqlMeta.get().errors().stream()
.map(ExtractionError::toString)
.collect(Collectors.joining(","))));
return Optional.empty();
}
if (sqlMeta.get().inTables().isEmpty()) {
log.error("no tables defined in query, this should not happen");
return Optional.empty();
}
return sqlMeta;
}

private static String extractDialectFromJdbcUrl(String jdbcUrl) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class JdbcRelationHandlerTest {
String invalidJdbc = "(test) SPARK_GEN_SUBQ_0";
String url = "postgresql://localhost:5432/test";
String mysqlUrl = "mysql://localhost:3306/test";
String unknownUrl = "unknown://localhost:1234/test";
StructType schema =
new StructType().add("k", DataTypes.IntegerType).add("j", DataTypes.StringType);

Expand Down Expand Up @@ -91,6 +92,12 @@ void testHandlingJdbcTable() {

@Test
void testHandlingJdbcDbTableAsSubQuery() {
CaseInsensitiveMap params =
CaseInsensitiveMap$.MODULE$.apply(
ScalaConversionUtils.fromJavaMap(
Collections.singletonMap(
JDBCOptions$.MODULE$.JDBC_TABLE_NAME(), jdbcDbTableAsSubQuery)));
when(jdbcOptions.parameters()).thenReturn(params);
when(jdbcOptions.tableOrQuery()).thenReturn(jdbcDbTableAsSubQuery);
StructType schema1 =
new StructType().add("k", DataTypes.IntegerType).add("j1", DataTypes.StringType);
Expand Down Expand Up @@ -120,6 +127,22 @@ void testMysqlDialect() {
.getDataset("test.jdbc_source2", "mysql://localhost:3306", schema2);
}

@Test
void testUnknownDialect() {
when(jdbcOptions.tableOrQuery()).thenReturn(jdbcQuery);
when(jdbcOptions.url()).thenReturn("jdbc:" + unknownUrl);
StructType schema1 =
new StructType().add("k", DataTypes.IntegerType).add("j1", DataTypes.StringType);
StructType schema2 = new StructType().add("j2", DataTypes.StringType);

jdbcRelationHandler.getDatasets(relation, unknownUrl);

verify(datasetFactory, times(1))
.getDataset("test.jdbc_source1", "unknown://localhost:1234", schema1);
verify(datasetFactory, times(1))
.getDataset("test.jdbc_source2", "unknown://localhost:1234", schema2);
}

@Test
void testInvalidJdbcString() {
when(jdbcOptions.tableOrQuery()).thenReturn(invalidJdbc);
Expand Down

0 comments on commit 5682e76

Please sign in to comment.