Skip to content

Commit

Permalink
Flink refine jdbc table name
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenqiu Huang <huangzhenqiu0825@gmail.com>
  • Loading branch information
HuangZhenQiu authored and Peter (ACS) Huang committed Mar 13, 2024
1 parent 4ba9374 commit f0e1c99
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public String getConnectionUrl() {

public Optional<String> getTableName() {
Optional<String> queryOpt = Optional.empty();
Optional<JdbcConnectionOptions> connectionOptionsOpt = getConnectionOptions();
if (source instanceof JdbcRowDataLookupFunction) {
Optional<JdbcConnectionOptions> connectionOptionsOpt = getConnectionOptions();
return connectionOptionsOpt
.map(
connectionOptions ->
Expand All @@ -56,7 +56,9 @@ public Optional<String> getTableName() {

return queryOpt
.flatMap(query -> OpenLineageSql.parse(List.of(query)))
.map(sqlMeta -> sqlMeta.inTables().isEmpty() ? "" : sqlMeta.inTables().get(0).name());
.map(
sqlMeta ->
sqlMeta.inTables().isEmpty() ? "" : sqlMeta.inTables().get(0).qualifiedName());
}

private Optional<JdbcConnectionOptions> getConnectionOptions() {
Expand Down

0 comments on commit f0e1c99

Please sign in to comment.