Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-28513] Fix Flink Table API CSV streaming sink throws SerializedThrowable exception #21458

Merged
merged 1 commit into from
Sep 4, 2023

Conversation

Samrat002
Copy link
Contributor

@Samrat002 Samrat002 commented Dec 6, 2022

What is the purpose of the change

CSVBulkWriter calls sync() function at the closing time. sync() works for all the file system that are syncable in nature like hdfs and others. S3 currently don't support any sync() function.

Brief change log

This change modifies sync() method to flush all data in buffer and close file and commit the write.

Verifying this change

  1. Added unit test to test sync .
  2. Verified in sample EMR cluster.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no) no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no) no
  • The serializers: (yes / no / don't know) no
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know) no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no
  • The S3 file system connector: (yes / no / don't know) no

Documentation

  • Does this pull request introduce a new feature? (yes / no) no
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) no

@Samrat002 Samrat002 marked this pull request as draft December 6, 2022 07:24
@flinkbot
Copy link
Collaborator

flinkbot commented Dec 6, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@Samrat002 Samrat002 changed the title [FLINK-28513][hotfix] stream.sync is not supported for all fileformats [FLINK-28513][hotfix] stream.sync is not supported for s3 fileformat Dec 7, 2022
@Samrat002 Samrat002 marked this pull request as ready for review December 8, 2022 07:22
Comment on lines 129 to 139
fileStream.sync();
// for s3 there is no sync supported.
// instead calling persist() to put data into s3.
persist();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not look like these are equivalent. It seems as though .sync() is blocking and persist() is async. Is there a way to way for persist to complete to retain the semantics here?

Also, not tests failed or added for this change. Can we add a test please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test and validated manually in EMR cluster via writing Csv data in s3 of (10Gb and 70GB)

@Samrat002 Samrat002 changed the title [FLINK-28513][hotfix] stream.sync is not supported for s3 fileformat [FLINK-28513] Fix Flink Table API CSV streaming sink throws SerializedThrowable exception May 1, 2023
@Samrat002
Copy link
Contributor Author

Samrat002 commented May 1, 2023

Made changes and added test.
@dannycranmer please review whenever time

@Samrat002 Samrat002 requested a review from hlteoh37 May 4, 2023 04:18
Copy link
Contributor

@hlteoh37 hlteoh37 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@Samrat002
Copy link
Contributor Author

@dannycranmer please review whenever time.

@@ -126,7 +126,16 @@ public long getPos() throws IOException {

@Override
public void sync() throws IOException {
fileStream.sync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Samrat002 this is concerning me: "The S3 file system connector: (yes / no / don't know) maybe". When is this method called? Is it possible we can violate the semantics of the 2-phase commit File Sink here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync method is called on the following scenerios

  1. S3RecoverableWriter
  2. FlinkS3FileSystem creates new instance of S3RecoverableWriter when createRecoverableWriter() method is called
  3. CsvBulkWriter uses FlinkS3FileSystem and calls recoverableWriter.
  4. BulkWriter

This change will not alter any processing guarantee.

In the current changes in sync() method , it takes the lock first then makes a call to filesystem flush and commits remaining blocks (writes to s3). This flow results in exactly once . Same code flow is implemented for AzureBlobFsRecoverableDataOutputStream .

From the class BlockBlobAppendStream

    public void hsync() throws IOException {
        if (this.compactionEnabled) {
            this.flush();
        }

    }

@Samrat002 Samrat002 force-pushed the FLINK-28513 branch 2 times, most recently from 3ecd545 to dd9b2db Compare August 4, 2023 15:22
@Samrat002
Copy link
Contributor Author

@dannycranmer please review whenever time

@Samrat002
Copy link
Contributor Author

I have taken an example where a datagen table is created with 2 fields fname and lname. Also created another table which is of type filesystem and points to a specfic s3 path and format used is csv.

 -- create a genertor table 
CREATE TABLE generator (
    fname STRING,
    lname STRING
) WITH (
  'connector' = 'datagen'
  
);

-- create a sample dynamic table with connector filesystem. It supports csv as format. 
CREATE TABLE `name_table` (
  `fname` STRING,
  `lname` STRING
) with (
'connector'='filesystem',
'format' = 'csv',
'path' = 's3://dbsamrat-flink-dev/data/default/name_table'
);

-- run a job to insert data in table (s3)
insert into name_table select * from generator;

Here is the below flink-conf file used for the cluster (also these configs are picked in job )

Attaching the jobmanager log for insertion of data in csvformated s3 path which uses CsvBulkWriter and maintains 2 phase commit.
jobmanager.log

It can be noted that 2 phase commit is happening at checkpoint trigger.

Additional job executed seperately to read data from name_table.
count_jobmanager.log

@dannycranmer @hlteoh37 please review if this satisfy the guarentee for exactly once .

@hlteoh37
Copy link
Contributor

hlteoh37 commented Sep 4, 2023

Ok this looks good to me. Thanks for fixing and testing @Samrat002

Copy link
Contributor

@dannycranmer dannycranmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the deep dive @Samrat002

@Samrat002 Samrat002 deleted the FLINK-28513 branch September 4, 2023 16:10
@MartijnVisser
Copy link
Contributor

In hindsight I'm quite concerned that we have merged this without any change to the tests. We run nightly tests for the FileSink and StreamingFileSink against S3. Why have those not failed? Why haven't we made improvement to them before merging this in?

@hlteoh37
Copy link
Contributor

hlteoh37 commented Jan 2, 2024

Thanks for flagging @MartijnVisser. I'd agree that it would be good to update tests to reflect this discovered bug in the Filesystem S3 integration. I had forgotten that we have a test suite for S3 Filesystem integration!

I see it has already been flagged up in the newer PR #23725 (review). Let's use that JIRA + PR to track the test suite updates

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants