Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-14843][e2e] Refactor bucketing sink test to make it more stable and comprehensible #10685

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -74,8 +74,8 @@ public static void main(String[] args) throws Exception {
.setBucketer(new KeyBucketer())
.setBatchSize(Long.MAX_VALUE)
.setBatchRolloverInterval(Long.MAX_VALUE)
.setInactiveBucketCheckInterval(Long.MAX_VALUE)
.setInactiveBucketThreshold(Long.MAX_VALUE);
.setInactiveBucketCheckInterval(2000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not setting the values to something smaller so that the test runs faster?

.setInactiveBucketThreshold(4000);

// generate data, shuffle, perform stateful operation, sink
sEnv.addSource(new Generator(10, idlenessMs, 60))
Expand Down
Expand Up @@ -27,7 +27,7 @@ function get_total_number_of_valid_lines {
# this method assumes that pending files contain valid data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is now invalid as we do not assume that pending files have valid data anymore.

# That is because close() cannot move files to FINAL state but moves them to PENDING.
# Given this, the job of the test has bucket size = Long.MAX
find ${TEST_DATA_DIR}/out -type f \( -iname "*.pending" -or -iname "*.in-progress" -or -iname "part-*" \) -exec cat {} + | sort -g | wc -l
find ${TEST_DATA_DIR}/out -type f \( -iname "part-*" \) -exec cat {} + | sort -g | wc -l
}

function wait_for_complete_result {
Expand Down Expand Up @@ -124,8 +124,6 @@ echo "Restarting 1 TM"
$FLINK_DIR/bin/taskmanager.sh start
wait_for_number_of_running_tms 4

sleep 10

echo "Killing 2 TMs"
kill_random_taskmanager
kill_random_taskmanager
Expand All @@ -150,7 +148,7 @@ wait_job_terminal_state ${JOB_ID} "CANCELED"
echo "Job $JOB_ID was cancelled, time to verify"

# get all lines in pending or part files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is now invalid as we do not get the lines in the pending files.

find ${TEST_DATA_DIR}/out -type f \( -iname "*.pending" -or -iname "part-*" \) -exec cat {} + > ${TEST_DATA_DIR}/complete_result
find ${TEST_DATA_DIR}/out -type f \( -iname "part-*" \) -exec cat {} + > ${TEST_DATA_DIR}/complete_result

# for debugging purposes
#echo "Checking proper result..."
Expand Down