Skip to content

Commit

Permalink
Merge pull request #9534: [BEAM-8119] Properly cleanup Pubsub in rele…
Browse files Browse the repository at this point in the history
…ase script
  • Loading branch information
markflyhigh committed Sep 10, 2019
2 parents f1d48fa + 2b3599c commit 361a7b7
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
Expand Up @@ -218,9 +218,10 @@ function create_pubsub() {
# None
#######################################
function cleanup_pubsub() {
gcloud pubsub topics delete --project=$PROJECT_ID $PUBSUB_TOPIC1
gcloud pubsub topics delete --project=$PROJECT_ID $PUBSUB_TOPIC2
gcloud pubsub subscriptions delete --project=$PROJECT_ID $PUBSUB_SUBSCRIPTION
# Suppress error since topic/subscription may not exist
gcloud pubsub topics delete --project=$PROJECT_ID $PUBSUB_TOPIC1 2> /dev/null
gcloud pubsub topics delete --project=$PROJECT_ID $PUBSUB_TOPIC2 2> /dev/null
gcloud pubsub subscriptions delete --project=$PROJECT_ID $PUBSUB_SUBSCRIPTION 2> /dev/null
}


Expand Down
Expand Up @@ -37,6 +37,9 @@ BEAM_PYTHON_SDK=$BEAM_PYTHON_SDK_ZIP
ASC_FILE_NAME=$BEAM_PYTHON_SDK_ZIP".asc"
SHA512_FILE_NAME=$BEAM_PYTHON_SDK_ZIP".sha512"

# Cleanup Pubsub once script exits
trap cleanup_pubsub EXIT


#######################################
# Remove temp directory when complete.
Expand Down Expand Up @@ -116,7 +119,7 @@ function verify_wordcount_dataflow() {
--num_workers $NUM_WORKERS \
--sdk_location $BEAM_PYTHON_SDK

# verify results.
# verify results.
wordcount_output_in_gcs="gs://$BUCKET_NAME/$WORDCOUNT_OUTPUT"
gcs_pull_result=$(gsutil ls gs://$BUCKET_NAME)
if [[ $gcs_pull_result != *$wordcount_output_in_gcs* ]]; then
Expand All @@ -139,6 +142,7 @@ function verify_wordcount_dataflow() {
# None
#######################################
function verify_streaming_wordcount_direct() {
cleanup_pubsub
create_pubsub
print_separator "Running Streaming wordcount example with DirectRunner"
python -m apache_beam.examples.streaming_wordcount \
Expand All @@ -148,12 +152,10 @@ function verify_streaming_wordcount_direct() {
pid=$!
sleep 15

# verify result
# verify result
run_pubsub_publish
verify_steaming_result "DirectRunner" $pid

# Delete the pubsub topics and subscription before running the second job. Will recreate them in the second job.
cleanup_pubsub
kill -9 $pid
sleep 10
}
Expand All @@ -168,6 +170,7 @@ function verify_streaming_wordcount_direct() {
# None
#######################################
function verify_streaming_wordcount_dataflow() {
cleanup_pubsub
create_pubsub
print_separator "Running Streaming wordcount example with DataflowRunner "
python -m apache_beam.examples.streaming_wordcount \
Expand All @@ -193,7 +196,6 @@ function verify_streaming_wordcount_dataflow() {

kill -9 $pid
gcloud dataflow jobs cancel $running_job
cleanup_pubsub
}


Expand Down

0 comments on commit 361a7b7

Please sign in to comment.