Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-27429] Add Vertica JDBC dialect #33

Closed
wants to merge 17 commits into from

Conversation

jredzepovic
Copy link

@jredzepovic jredzepovic commented Mar 30, 2023

What is the purpose of the change

Add the implementation of the Vertica dialect

Brief change log

Added:
VerticaDialect
VerticaDialectFactory
VerticaRowConverter

Verifying this change

This change added tests and can be verified as follows:

  • Added integration tests for end-to-end deployment
  • Some cases already covered with tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? ( no)
  • If yes, how is the feature documented? (docs)

@boring-cyborg
Copy link

boring-cyborg bot commented Mar 30, 2023

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

Copy link
Member

@eskabetxe eskabetxe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jredzepovic, thanks for you contribution..

Would be great that you can add some description of you are adding to the description.
Also you need to rebase the PR as it have some conflicts with actual master branch.

I left some comments.

<td><code>BIGINT</code></td>
</tr>
<tr>
<td><code>BIGINT UNSIGNED</code></td>
<td></td>
<td></td>
<td></td>
<td><code>DECIMAL(20, 0)</code></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we are removing this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a little cleanup here, no information was removed(though it looks like that in the change log).
This is what it currently looks like on the master branch for the Chinese version(3 table rows instead of 2):

<tr>
  <td>
    <code>BIGINT</code><br>
    <code>INT UNSIGNED</code></td>
  <td></td>
  <td>
    <code>BIGINT</code><br>
    <code>BIGSERIAL</code></td>
  <td><code>BIGINT</code></td>
  <td><code>BIGINT</code></td>
</tr>
<tr>
  <td><code>BIGINT UNSIGNED</code></td>
  <td></td>
  <td></td>
  <td></td>
  <td><code>DECIMAL(20, 0)</code></td>
</tr>
<tr>
  <td><code>BIGINT</code></td>
  <td></td>
  <td><code>BIGINT</code></td>
  <td><code>BIGINT</code></td>
</tr>

My change merged two BIGINT rows and it looks like this(2 table rows, as in the English version):

<tr>
  <td>
    <code>BIGINT</code><br>
    <code>INT UNSIGNED</code></td>
  <td></td>
  <td>
    <code>BIGINT</code><br>
    <code>BIGSERIAL</code></td>
  <td><code>BIGINT</code></td>
  <td>
    <code>INTEGER</code><br>
    <code>INT</code><br>
    <code>BIGINT</code><br>
    <code>INT8</code><br>
    <code>SMALLINT</code><br>
    <code>TINYINT</code></td>
  <td><code>BIGINT</code></td>
</tr>
<tr>
  <td><code>BIGINT UNSIGNED</code></td>
  <td></td>
  <td></td>
  <td></td>
  <td></td>
  <td><code>DECIMAL(20, 0)</code></td>
</tr>

@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
throw new UnsupportedOperationException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though Vertica does support the MERGE statement, I wasn't able to have it working with Flink. This was the implementation I originally did, based on the OracleDialect example:

@Override
public Optional<String> getUpsertStatement(
        String tableName, String[] fieldNames, String[] uniqueKeyFields) {

    String valuesBinding =
            Arrays.stream(fieldNames)
                    .map(f -> ":" + f + " " + quoteIdentifier(f))
                    .collect(Collectors.joining(", "));

    String usingClause = String.format("SELECT %s", valuesBinding);

    String onClause =
            Arrays.stream(uniqueKeyFields)
                    .map(f -> "t." + quoteIdentifier(f) + "=s." + quoteIdentifier(f))
                    .collect(Collectors.joining(" AND "));

    final Set<String> uniqueKeyFieldsSet =
            Arrays.stream(uniqueKeyFields).collect(Collectors.toSet());
    String updateClause =
            Arrays.stream(fieldNames)
                    .filter(f -> !uniqueKeyFieldsSet.contains(f))
                    .map(f -> quoteIdentifier(f) + " = s." + quoteIdentifier(f))
                    .collect(Collectors.joining(", "));

    String insertFields =
            Arrays.stream(fieldNames)
                    .map(this::quoteIdentifier)
                    .collect(Collectors.joining(", "));

    String valuesClause =
            Arrays.stream(fieldNames)
                    .map(f -> "s." + quoteIdentifier(f))
                    .collect(Collectors.joining(", "));

    String mergeQuery =
            "MERGE INTO %s t USING (%s) s "
                    + "ON %s "
                    + "WHEN MATCHED THEN UPDATE SET %s "
                    + "WHEN NOT MATCHED THEN INSERT (%s) "
                    + "VALUES (%s)";

    return Optional.of(
            String.format(
                    mergeQuery,
                    quoteIdentifier(tableName),
                    usingClause,
                    onClause,
                    updateClause,
                    insertFields,
                    valuesClause));
}

The constructed MERGE query was syntactically correct, which I have validated by running it on the Vertica database instance (I had to replace :placeholders with the actual values).
When running the query within Flink(upsert statement) I got this exception, originating from the Vertica JDBC driver:
java.sql.SQLException: [Vertica][VJDBC](3376) ERROR: Failed to find conversion function from unknown to int
After trying to find out what was the reason for the exception, I've stumbled upon this StackOverflow post:
https://stackoverflow.com/questions/18073901/failed-to-find-conversion-function-from-unknown-to-text.

My guess is that parametrized MERGE statement is not supported by Vertica JDBC driver and that could also be the reason KafkaConnect JDBC connector doesn't support upsert statement with Vertica as well, but only insert.

return new VerticaMetadata(CONTAINER);
}

static Connection getConnection() throws SQLException, ClassNotFoundException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DatabaseMetadata already have a getConnection, is really needed this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, looks like I've missed that one. I will remove this method.

@jredzepovic
Copy link
Author

Hi @eskabetxe, thank you for your review!

I went through the open pull requests to check if there is a common format for the PR description and ended up copying yours for Trino dialect. 🙂
In addition to that, I've resolved the conflicts with the master branch.

@MartijnVisser
Copy link
Contributor

@jredzepovic Could you rebase and take the latest changes from @eskabetxe into account that were introduced via #22 ?

reswqa and others added 10 commits June 11, 2023 15:46
… for testing. This closes apache#22

* [FLINK-30790] Introduce DatabaseExtension

* [FLINK-30790] Adapt MySql to DatabaseExtension

* [FLINK-30790] Adapt Oracle to DatabaseExtension

* [FLINK-30790] Adapt Postgres to DatabaseExtension

* [FLINK-30790] Adapt SqlServer to DatabaseExtension

* [FLINK-30790] Adapt test databases (Derby and H2) to DatabaseExtension

* [FLINK-15462] Cleaning and fixing JdbcExactlyOnceSinkE2eTest

* [FLINK-15462] Cleaning and fixing JdbcDynamicTableSinkITCase

* [FLINK-15462] Cleaning and fixing JdbcDynamicTableSourceITCase

[FLINK-27429] Refactor tests
… reusing resources. This closes apache#4

* [FLINK-29750][Connector/JDBC] Improve PostgresCatalog#listTables() by reusing resources
@jredzepovic
Copy link
Author

It doesn't look easy to rebase all those changes - will open a new PR instead 😅

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants