You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The "download_result_" tasks are failing when a DAG run is created based on the DAG defined in https://github.com/GoogleCloudPlatform/ci-cd-for-data-processing-workflow/blob/master/source-code/workflow-dag/data-pipeline-test.py. The Dataflow pipeline runs successfully, but the sharding behavior has changed since this DAG was first written. All of the output is written to a single shard instead of multiple shards and the download_result_ tasks fail with a "File not found error". The logs from the error message for the download_result_1 task are included below.
*** Reading remote log from gs://us-central1-data-pipeline-c-ae7165ed-bucket/logs/test_word_count/download_result_1/2021-10-18T21:16:11+00:00/1.log.
[2021-10-18 21:22:08,103] {taskinstance.py:671} INFO - Dependencies all met for <TaskInstance: test_word_count.download_result_1 2021-10-18T21:16:11+00:00 [queued]>
[2021-10-18 21:22:08,176] {taskinstance.py:671} INFO - Dependencies all met for <TaskInstance: test_word_count.download_result_1 2021-10-18T21:16:11+00:00 [queued]>
[2021-10-18 21:22:08,177] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2021-10-18 21:22:08,178] {taskinstance.py:882} INFO - Starting attempt 1 of 1
[2021-10-18 21:22:08,178] {taskinstance.py:883} INFO -
--------------------------------------------------------------------------------
[2021-10-18 21:22:08,226] {taskinstance.py:902} INFO - Executing <Task(GoogleCloudStorageDownloadOperator): download_result_1> on 2021-10-18T21:16:11+00:00
[2021-10-18 21:22:08,230] {standard_task_runner.py:54} INFO - Started process 817 to run task
[2021-10-18 21:22:08,317] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'test_word_count', 'download_result_1', '2021-10-18T21:16:11+00:00', '--job_id', '10', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/data-pipeline-test.py', '--cfg_path', '/tmp/tmpoA3Yca']
[2021-10-18 21:22:08,318] {standard_task_runner.py:78} INFO - Job 10: Subtask download_result_1
[2021-10-18 21:22:09,010] {logging_mixin.py:120} INFO - Running <TaskInstance: test_word_count.download_result_1 2021-10-18T21:16:11+00:00 [running]> on host airflow-worker-86677b8bb6-dnz5q
[2021-10-18 21:22:09,351] {gcs_download_operator.py:86} INFO - Executing download: qwiklabs-gcp-03-cd2d00dd104f-composer-result-test, output-00000-of-00003, None
[2021-10-18 21:22:09,401] {gcp_api_base_hook.py:145} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2021-10-18 21:22:09,617] {taskinstance.py:1152} ERROR - 404 GET https://storage.googleapis.com/download/storage/v1/b/qwiklabs-gcp-03-cd2d00dd104f-composer-result-test/o/output-00000-of-00003?alt=media: No such object: qwiklabs-gcp-03-cd2d00dd104f-composer-result-test/output-00000-of-00003: (u'Request failed with status code', 404, u'Expected one of', 200, 206)
Traceback (most recent call last):
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 985, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/airflow/airflow/contrib/operators/gcs_download_operator.py", line 94, in execute
object=self.object)
File "/usr/local/lib/airflow/airflow/contrib/hooks/gcs_hook.py", line 179, in download
return blob.download_as_string()
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 1391, in download_as_string
timeout=timeout,
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 1302, in download_as_bytes
checksum=checksum,
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/client.py", line 731, in download_blob_to_file
_raise_from_invalid_response(exc)
File "/usr/local/lib/python2.7/dist-packages/google/cloud/storage/blob.py", line 3936, in _raise_from_invalid_response
raise exceptions.from_http_status(response.status_code, message, response=response)
NotFound: 404 GET https://storage.googleapis.com/download/storage/v1/b/qwiklabs-gcp-03-cd2d00dd104f-composer-result-test/o/output-00000-of-00003?alt=media: No such object: qwiklabs-gcp-03-cd2d00dd104f-composer-result-test/output-00000-of-00003: (u'Request failed with status code', 404, u'Expected one of', 200, 206)
[2021-10-18 21:22:09,668] {taskinstance.py:1196} INFO - Marking task as FAILED. dag_id=test_word_count, task_id=download_result_1, execution_date=20211018T211611, start_date=20211018T212208, end_date=20211018T212209
[2021-10-18 21:22:13,158] {local_task_job.py:102} INFO - Task exited with return code 1
The text was updated successfully, but these errors were encountered:
The "download_result_" tasks are failing when a DAG run is created based on the DAG defined in https://github.com/GoogleCloudPlatform/ci-cd-for-data-processing-workflow/blob/master/source-code/workflow-dag/data-pipeline-test.py. The Dataflow pipeline runs successfully, but the sharding behavior has changed since this DAG was first written. All of the output is written to a single shard instead of multiple shards and the download_result_ tasks fail with a "File not found error". The logs from the error message for the download_result_1 task are included below.
Suggested fix: If we want to keep the parallel tasks for the sake of demonstration, we could change line 185 in https://github.com/GoogleCloudPlatform/ci-cd-for-data-processing-workflow/blob/master/source-code/data-processing-code/src/main/java/org/apache/beam/examples/WordCount.java to
.apply("WriteCounts", TextIO.write().to(options.getOutput()).withNumShards(3));
Error log from download_result_1 task:
The text was updated successfully, but these errors were encountered: