Apache Airflow version: 1.10.9+composer
Kubernetes version: 1.16.13-gke.401
Environment:
- Cloud provider or hardware configuration: Google Cloud Composer
- OS (e.g. from /etc/os-release):
VERSION="16.04.7 LTS (Xenial Xerus)"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 16.04.7 LTS"
VERSION_ID="16.04"
- Kernel:
Linux airflow-worker-7d8654b878-5kxxr 4.19.112+ #1 SMP Fri Sep 4 12:00:04 PDT 2020 x86_64 x86_64 x86_64 GNU/Linux
- Install tools: gcloud
- Others:
What happened:
Use-case: ETL DAG triggered by GCS Object Change notification (GCS -> Pub/Sub -> Cloud Function).
Getting the following error reported by triggering function when multiple dag run are triggered within 1 second interval:
'{"error":"Run id manual__2020-10-09T14:12:13+00:00 already exists for dag id example-generation"}
Full stack trace:
2020-10-09 09:12:13.723 CDTgenerate_training_examplesgdlq2rp9sgmd Traceback (most recent call last): File "/layers/google.python.functions-framework/functions-framework/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app response = self.full_dispatch_request() File "/layers/google.python.functions-framework/functions-framework/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request rv = self.handle_user_exception(e) File "/layers/google.python.functions-framework/functions-framework/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception reraise(exc_type, exc_value, tb) File "/layers/google.python.functions-framework/functions-framework/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise raise value File "/layers/google.python.functions-framework/functions-framework/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request rv = self.dispatch_request() File "/layers/google.python.functions-framework/functions-framework/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/layers/google.python.functions-framework/functions-framework/lib/python3.8/site-packages/functions_framework/__init__.py", line 102, in view_func function(data, context) File "/workspace/main.py", line 49, in generate_training_examples make_iap_request( File "/workspace/main.py", line 88, in make_iap_request raise Exception( Exception: Bad response from application: 400 / {'Date': 'Fri, 09 Oct 2020 14:12:13 GMT', 'Content-Type': 'application/json', 'Content-Length': '98', 'Server': 'gunicorn/19.10.0', 'Via': '1.1 google', 'Alt-Svc': 'h3-Q050=":443"; ma=2592000,h3-29=":443"; ma=2592000,h3-27=":443"; ma=2592000,h3-T051=":443"; ma=2592000,h3-T050=":443"; ma=2592000,h3-Q046=":443"; ma=2592000,h3-Q043=":443"; ma=2592000,quic=":443"; ma=2592000; v="46,43"'} / '{"error":"Run id manual__2020-10-09T14:12:13+00:00 already exists for dag id <redacted>"}\n'
Trigger request code (runs in Cloud Function):
client_id = os.getenv("CLIENT_ID")
# This should be part of your webserver's URL:
# {tenant-project-id}.appspot.com
webserver_id = os.getenv("TENANT_PROJECT")
# The name of the DAG you wish to trigger
dag_name = os.getenv("DAG_NAME")
webserver_url = (
'https://'
+ webserver_id
+ '.appspot.com/api/experimental/dags/'
+ dag_name
+ '/dag_runs'
)
# Make a POST request to IAP which then Triggers the DAG
data['run_id'] = datetime.utcnow().strftime('alpaca_%Y-%m-%dT%H:%M:%S.%f')
data['replace_microseconds'] = False
conf = {"conf": data}
print(f"JSON body = {conf}")
make_iap_request(
webserver_url, client_id, method='POST', json={"conf": data})
The root underlying cause is that run_id are not created with millisecond resolution in time stamp, i.e: manual__2020-10-09T14:12:13+00:00. Needless to say, if multiple dag_runs are initiated within a 1 second period, this will fail as observed.
What you expected to happen:
I tried two workarounds to fix this issue.
Workaround 1 - pass custom "run_id" to trigger dag call following guidance here.
Added data['run_id'] = datetime.utcnow().strftime('alpaca_%Y-%m-%dT%H:%M:%S.%f') to request body.
I confirmed that this setting is being received as part of dag_run["conf"]:
[2020-10-09 15:18:45,862] {logging_mixin.py:112} INFO - Config = {'@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage', 'attributes': {'bucketId': '...', 'eventTime': '2020-10-09T14:46:45.145167Z', 'eventType': 'OBJECT_FINALIZE', 'notificationConfig': '...', 'objectGeneration': '...', 'objectId': '...', 'payloadFormat': 'JSON_API_V1'}, 'data': '...', 'run_id': 'alpaca_2020-10-09T14:46:50.490986', 'replace_microseconds': False}
Workaround 2 - pass "replace_microseconds"
Added data['replace_microseconds'] = False to Python code triggering the dag run. I also tried data['replace_microseconds'] = "false".
I again confirmed the settings is present in dag_run["conf"].
Neither of these workarounds resulted in the run_id being modified from it's default, so subsequent dag runs still have the format "manual__2020-10-09T15:28:17+00:00" and the error persists.
How to reproduce it:
Attempt to trigger multiple dag runs within 1 second interval.
Anything else we need to know:
Apache Airflow version: 1.10.9+composer
Kubernetes version: 1.16.13-gke.401
Environment:
Linux airflow-worker-7d8654b878-5kxxr 4.19.112+ #1 SMP Fri Sep 4 12:00:04 PDT 2020 x86_64 x86_64 x86_64 GNU/LinuxWhat happened:
Use-case: ETL DAG triggered by GCS Object Change notification (GCS -> Pub/Sub -> Cloud Function).
Getting the following error reported by triggering function when multiple dag run are triggered within 1 second interval:
'{"error":"Run id manual__2020-10-09T14:12:13+00:00 already exists for dag id example-generation"}Full stack trace:
Trigger request code (runs in Cloud Function):
The root underlying cause is that run_id are not created with millisecond resolution in time stamp, i.e:
manual__2020-10-09T14:12:13+00:00. Needless to say, if multiple dag_runs are initiated within a 1 second period, this will fail as observed.What you expected to happen:
I tried two workarounds to fix this issue.
Workaround 1 - pass custom "run_id" to trigger dag call following guidance here.
Added
data['run_id'] = datetime.utcnow().strftime('alpaca_%Y-%m-%dT%H:%M:%S.%f')to request body.I confirmed that this setting is being received as part of dag_run["conf"]:
Workaround 2 - pass "replace_microseconds"
Added
data['replace_microseconds'] = Falseto Python code triggering the dag run. I also trieddata['replace_microseconds'] = "false".I again confirmed the settings is present in dag_run["conf"].
Neither of these workarounds resulted in the run_id being modified from it's default, so subsequent dag runs still have the format "manual__2020-10-09T15:28:17+00:00" and the error persists.
How to reproduce it:
Attempt to trigger multiple dag runs within 1 second interval.
Anything else we need to know: