Skip to content

Conversation

@liangyu-1
Copy link
Contributor

What is the purpose of the change

This PR is to find out what makes the hadoop-fs UT unstable.

Brief change log

  • add some log in file AbstractRecoverableWriterTest.java

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@liangyu-1
Copy link
Contributor Author

@lsyldliu is this pr OK?

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 5, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@lsyldliu lsyldliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liangyu-1 Thanks for your contribution, I left some comments.

recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
} catch (IOException e) {
System.err.println("Unable to open file for writing " + path.toString());
throw e;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apr 28 12:19:16 java.io.IOException: All datanodes [DatanodeInfoWithStorage[127.0.0.1:46278,DS-26d47d25-42de-4eef-a409-8a700a8bc82a,DISK]] are bad. Aborting...
Apr 28 12:19:16 	at org.apache.hadoop.hdfs.DataStreamer.handleBadDatanode(DataStreamer.java:1537)
Apr 28 12:19:16 	at org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1472)
Apr 28 12:19:16 	at org.apache.hadoop.hdfs.DataStreamer.processDatanodeError(DataStreamer.java:1244)
Apr 28 12:19:16 	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:663)

Based on the original error message, we need to locate why the HDFS DataNode node is broken, can throwing an exception on the client side find the root cause? I don't know much about HDFS, so I'm not sure. Do we need to turn on the logging on the Server side when we pull up the HDFS cluster and observe the behavior on the Server side?

try {
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
System.err.println("Initial write failed: " + e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use LOG.info to print the message?

recoverables.put(INIT_EMPTY_PERSIST, stream.persist());

stream.write(testData1.getBytes(StandardCharsets.UTF_8));
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should simplify the print log logic, only print it the catch block as following:

        // This is just for locate  the root cause:
        // https://issues.apache.org/jira/browse/FLINK-37703
        // After the fix, this logic should be reverted.
        int branch = 0;
        try {
            branch++;
            stream = initWriter.open(path);
            branch++;
            recoverables.put(INIT_EMPTY_PERSIST, stream.persist());

            branch++;
            stream.write(testData1.getBytes(StandardCharsets.UTF_8));

            branch++;
            recoverables.put(INTERM_WITH_STATE_PERSIST, stream.persist());
            branch++;
            recoverables.put(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, stream.persist());

            // and write some more data
            branch++;
            stream.write(testData2.getBytes(StandardCharsets.UTF_8));

            branch++;
            recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist());
        } catch (IOException e) {
            LOG.info(
                    "The exception branch was: {}, detail exception msg: {}",
                    branch,
                    e.getMessage());
            throw e;
        } finally {


recoveredStream.write(testData3.getBytes(StandardCharsets.UTF_8));
recoveredStream.closeForCommit().commit();
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor

@lsyldliu lsyldliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
I'm just curious that why the history commits exists here? we don't need it.

You should use the git rebase to rebase the master branch.

*/
public abstract class AbstractRecoverableWriterTest {

private static final Logger Log = LoggerFactory.getLogger(AbstractRecoverableWriterTest.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static final Logger Log = LoggerFactory.getLogger(AbstractRecoverableWriterTest.class);
private static final Logger LOG = LoggerFactory.getLogger(AbstractRecoverableWriterTest.class);

@liangyu-1 liangyu-1 closed this Jun 6, 2025
@liangyu-1 liangyu-1 deleted the FLINK-37703 branch June 6, 2025 07:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants