Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-14843][e2e] Refactor bucketing sink test to make it more stable and comprehensible #10685

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -74,8 +74,8 @@ public static void main(String[] args) throws Exception {
.setBucketer(new KeyBucketer())
.setBatchSize(Long.MAX_VALUE)
.setBatchRolloverInterval(Long.MAX_VALUE)
.setInactiveBucketCheckInterval(Long.MAX_VALUE)
.setInactiveBucketThreshold(Long.MAX_VALUE);
.setInactiveBucketCheckInterval(50)
.setInactiveBucketThreshold(1000);

// generate data, shuffle, perform stateful operation, sink
sEnv.addSource(new Generator(10, idlenessMs, 60))
Expand Down
11 changes: 3 additions & 8 deletions flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
Expand Up @@ -24,10 +24,7 @@ JOB_OUTPUT_DIR=${TEST_DATA_DIR}/out/result
LOG_DIR=${FLINK_DIR}/log

function get_total_number_of_valid_lines {
# this method assumes that pending files contain valid data.
# That is because close() cannot move files to FINAL state but moves them to PENDING.
# Given this, the job of the test has bucket size = Long.MAX
find ${TEST_DATA_DIR}/out -type f \( -iname "*.pending" -or -iname "*.in-progress" -or -iname "part-*" \) -exec cat {} + | sort -g | wc -l
find ${TEST_DATA_DIR}/out -type f \( -iname "part-*" \) -exec cat {} + | sort -g | wc -l
}

function wait_for_complete_result {
Expand Down Expand Up @@ -124,8 +121,6 @@ echo "Restarting 1 TM"
$FLINK_DIR/bin/taskmanager.sh start
wait_for_number_of_running_tms 4

sleep 10

echo "Killing 2 TMs"
kill_random_taskmanager
kill_random_taskmanager
Expand All @@ -149,8 +144,8 @@ wait_job_terminal_state ${JOB_ID} "CANCELED"

echo "Job $JOB_ID was cancelled, time to verify"

# get all lines in pending or part files
find ${TEST_DATA_DIR}/out -type f \( -iname "*.pending" -or -iname "part-*" \) -exec cat {} + > ${TEST_DATA_DIR}/complete_result
# get all lines in part files
find ${TEST_DATA_DIR}/out -type f \( -iname "part-*" \) -exec cat {} + > ${TEST_DATA_DIR}/complete_result

# for debugging purposes
#echo "Checking proper result..."
Expand Down