Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-26595][flink-connector-jdbc][postgreSQL]Improve the PostgresDi… #19053

Closed
wants to merge 3 commits into from

Conversation

hapihu
Copy link
Contributor

@hapihu hapihu commented Mar 11, 2022

I'm trying to use Flink CDC to synchronize mysql data to matrixDB in real time.
But I encountered an error.
The error message is as follows:

CIRCULAR REFERENCE:java.io.IOException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO user_1(id, name, address, phone_number, email) VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_110@foo.com') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, address=EXCLUDED.address, phone_number=EXCLUDED.phone_number, email=EXCLUDED.email was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported Call getNextException to see other errors in the batch.

This exception is caused by the getUpsertStatement method of PostgresDialect.
There is something wrong with the upsert statement.
In the Update statement, uniqueKey-related columns should be deleted;

I did the following experiment to test my modifications.
At the same time, I recompiled and packaged flink-connector-JDBC. Using the modified flink-connector-JDBC, my program no longer reported errors.

-- 1、Create a table for maxtrixDB
CREATE TABLE user_1 (
  id int,
  name VARCHAR(255) NOT NULL DEFAULT 'flink',
  address VARCHAR(1024),
  phone_number VARCHAR(512),
  email VARCHAR(255),
  UNIQUE(id)
);


-- 2、Insert a record.
INSERT INTO user_1(id, name, address, phone_number, email) 
VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_110@foo.com') 
ON CONFLICT (id) 
DO UPDATE SET 
id=EXCLUDED.id, 
name=EXCLUDED.name, 
address=EXCLUDED.address, 
phone_number=EXCLUDED.phone_number, 
email=EXCLUDED.email;

-- 3、Executing the above insert statement results in the following error.
ERROR:  modification of distribution columns in OnConflictUpdate is not supported


-- 4、If the value is changed to the following statement, the command is executed successfully.
INSERT INTO user_1(id, name, address, phone_number, email) 
VALUES ('110'::numeric, 'user_110', 'Shanghai', '123567891234', 'user_110@foo.com') 
ON CONFLICT (id) 
DO UPDATE SET 
name=EXCLUDED.name, 
address=EXCLUDED.address, 
phone_number=EXCLUDED.phone_number, 
email=EXCLUDED.email;

The PostgresDialect class handles upsert statements as follows:

// package org.apache.flink.connector.jdbc.dialect.psql
    public Optional<String> getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        String uniqueColumns =
                Arrays.stream(uniqueKeyFields)
                        .map(this::quoteIdentifier)
                        .collect(Collectors.joining(", "));
        String updateClause =
                Arrays.stream(fieldNames)
                        .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
                        .collect(Collectors.joining(", "));
        return Optional.of(
                getInsertIntoStatement(tableName, fieldNames)
                        + " ON CONFLICT ("
                        + uniqueColumns
                        + ")"
                        + " DO UPDATE SET "
                        + updateClause);
    }

To fix this problem, make the following changes to PostgresDialect:

// package org.apache.flink.connector.jdbc.dialect.psql
    public Optional<String> getUpsertStatement(
            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
        String uniqueColumns =
                Arrays.stream(uniqueKeyFields)
                        .map(this::quoteIdentifier)
                        .collect(Collectors.joining(", "));
        List tempList = Arrays.asList(uniqueKeyFields);
        String updateClause =
                Arrays.stream(fieldNames)
                        .filter(f->!tempList.contains(f))
                        .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
                        .collect(Collectors.joining(", "));
        return Optional.of(
                getInsertIntoStatement(tableName, fieldNames)
                        + " ON CONFLICT ("
                        + uniqueColumns
                        + ")"
                        + " DO UPDATE SET "
                        + updateClause);
    }

…alect method for getting upsert statements
@flinkbot
Copy link
Collaborator

flinkbot commented Mar 11, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

hapihu added 2 commits March 15, 2022 11:18
@hailiangyuan
Copy link

I have the same problem ! but i Met with failure when i recompiled flink-connector-JDBC.Where to get it recompiled flink-connector-JDBC

@hapihu
Copy link
Contributor Author

hapihu commented Mar 23, 2022

@hailiangyuan
I compiled based on tag 1.13.1.
Then develop using the compiled JAR package.

git fetch origin release-1.13.1
git checkout -b release-1.13.1

# then
mvn clean install -DskipTests 

@hailiangyuan
Copy link

tks! I try now

@MartijnVisser
Copy link
Contributor

We've now moved the code from the JDBC connector to its own dedicated repository; please re-route this PR to https://github.com/apache/flink-connector-jdbc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants