Skip to content

Conversation

@sundargates
Copy link
Contributor

@sundargates sundargates commented Dec 2, 2021

Context

We have observed that the IcebergWriterStage can produce corrupted data files when the committer worker changes. This is because changes to the committer worker result in new subscriptions to the data file stream (Observable<DataFIle>) produced by the writer stage. The new subscriptions result in the input hot stream being consumed from a different thread than before, and since the data structures that parquet manages are not thread-safe, this leads to data file corruption.

As part of this change, we ensure that all the writes to the ParquetFileWriter are made from the same thread.

Checklist

  • ./gradlew build compiles code correctly
  • Added new tests where applicable
  • ./gradlew test passes all tests
  • Extended README or added javadocs where applicable
  • Added copyright headers for new files from CONTRIBUTING.md

@jeffchao
Copy link
Contributor

jeffchao commented Dec 2, 2021

@sundargates looks good so far. I'll take a closer look tomorrow. Question: Did reproducing it via resubmitting/killing the Committer stage work? I'm wondering if my hunch that we discussed offline helped.

Copy link
Contributor

@liuml07 liuml07 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me; I'm still learning the root cause and how this is being tested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, why do we need to force the Parquet version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added reasoning for why we need to depend on this parquet version

Comment on lines +155 to +159
Copy link
Contributor

@liuml07 liuml07 Dec 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if this can be in a loop so the test can fail fast.

for (int i = 0; i < 2 * size / 100; i++) {
  Thread.sleep(100);
  if (failure.get() != null) {
    LOG.error(failure.get());
    fail(....);
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants