Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Astro SDK + Task Mapping #377

Closed
kaxil opened this issue May 19, 2022 · 9 comments · Fixed by #575
Closed

Astro SDK + Task Mapping #377

kaxil opened this issue May 19, 2022 · 9 comments · Fixed by #575
Assignees
Labels
documentation Improvements or additions to documentation research/question Issues requiring a research or an Open Question
Milestone

Comments

@kaxil
Copy link
Collaborator

kaxil commented May 19, 2022

Check the possibility of using task mapping with Astro SDK, something like

@sql
def get_campaigns():
    return """select campaign_id from active_campaigns"""

summarize_campaign.expand(capaign_id=get_campaigns())

Add an example DAG or add a task in existing Example DAG to show this usage

@kaxil kaxil added the research/question Issues requiring a research or an Open Question label May 19, 2022
@kaxil kaxil added the documentation Improvements or additions to documentation label Jun 28, 2022
@kaxil kaxil added this to the 1.0.0 milestone Jun 28, 2022
@kaxil
Copy link
Collaborator Author

kaxil commented Jul 20, 2022

Any progress on this @pankajastro ?

@pankajastro
Copy link
Contributor

Any progress on this @pankajastro ?

No @kaxil last week I came to know about this but will conclude by this week.

@kaxil
Copy link
Collaborator Author

kaxil commented Jul 23, 2022

Sounds good, let's try to get it done on Monday so we can cut 1.0.0b1 on Mon/Tue

@phanikumv
Copy link
Collaborator

@pankajastro is facing the below issue while spinning up airflow through docker container. Debugging through it

ERROR: You need to upgrade the database. Please run airflow db upgrade. Make sure the command is run using Airflow version 2.3.3+astro.1.

@pankajastro
Copy link
Contributor

pankajastro commented Jul 25, 2022

@sql decorator directly does not work and throw the below error. I'm using from astro import sql import.

Broken DAG: [/usr/local/airflow/dags/task_map_test.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/usr/local/airflow/dags/task_map_test.py", line 31, in <module>
    def get_campaigns(table: Table):
TypeError: 'module' object is not callable  

@pankajastro
Copy link
Contributor

The potential decorator that I can use for this is either sql.transform or sql.run_raw_sql.

Getting below error when GOOGLE_APPLICATION_CREDENTIALS env is not set and using airflow connection only

[2022-07-26, 07:52:55 UTC] {base.py:68} INFO - Using connection ID 'google_cloud_default' for task execution.
[2022-07-26, 07:52:55 UTC] {base.py:68} INFO - Using connection ID 'google_cloud_default' for task execution.
[2022-07-26, 07:52:57 UTC] {bigquery.py:1053} INFO - Dataset Resource: Dataset(DatasetReference('astronomer-airflow-providers', 'tmp_astro'))
[2022-07-26, 07:52:57 UTC] {base.py:68} INFO - Using connection ID 'google_cloud_default' for task execution.
[2022-07-26, 07:53:01 UTC] {_metadata.py:99} WARNING - Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: timed out
[2022-07-26, 07:53:04 UTC] {_metadata.py:99} WARNING - Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: timed out
[2022-07-26, 07:53:04 UTC] {_metadata.py:99} WARNING - Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: [Errno 111] Connection refused
[2022-07-26, 07:53:04 UTC] {_default.py:290} WARNING - Authentication failed using Compute Engine authentication due to unavailable metadata server.
[2022-07-26, 07:53:04 UTC] {taskinstance.py:1909} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/astro/sql/operators/transform.py", line 16, in execute
    self.database_impl.drop_table(self.output_table)
  File "/usr/local/lib/python3.9/site-packages/astro/databases/base.py", line 204, in drop_table
    self.run_sql(statement)
  File "/usr/local/lib/python3.9/site-packages/astro/databases/base.py", line 93, in run_sql
    result = self.connection.execute(sqlalchemy.text(sql_statement), parameters)
  File "/usr/local/lib/python3.9/site-packages/astro/databases/base.py", line 68, in connection
    return self.sqlalchemy_engine.connect()
  File "/usr/local/lib/python3.9/site-packages/astro/databases/google/bigquery.py", line 43, in sqlalchemy_engine
    return create_engine(uri)
  File "<string>", line 2, in create_engine
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/deprecations.py", line 309, in warned
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/create.py", line 576, in create_engine
    (cargs, cparams) = dialect.create_connect_args(u)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy_bigquery/base.py", line 815, in create_connect_args
    client = _helpers.create_bigquery_client(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy_bigquery/_helpers.py", line 58, in create_bigquery_client
    credentials, default_project = google.auth.default(scopes=SCOPES)
  File "/usr/local/lib/python3.9/site-packages/google/auth/_default.py", line 616, in default
    raise exceptions.DefaultCredentialsError(_HELP_MESSAGE)
google.auth.exceptions.DefaultCredentialsError: Could not automatically determine credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. For more information, please see https://cloud.google.com/docs/authentication/getting-started
[2022-07-26, 07:53:04 UTC] {taskinstance.py:1415} INFO - Marking task as FAILED. dag_id=example_dynamic_map_task, task_id=get_campaigns, execution_date=20220726T075249, start_date=20220726T075253, end_date=20220726T075304
[2022-07-26, 07:53:04 UTC] {manager.py:53} WARNING - Unable to find an extractor. task_type=TransformOperator ***_dag_id=example_dynamic_map_task task_id=get_campaigns ***_run_id=manual__2022-07-26T07:52:49.528508+00:00 
[2022-07-26, 07:53:04 UTC] {console.py:21} INFO - {"eventTime": "2022-07-26T07:53:04.243050Z", "eventType": "FAIL", "inputs": [], "job": {"facets": {}, "name": "example_dynamic_map_task.get_campaigns", "namespace": "default"}, "outputs": [], "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.10.0/integration/***", "run": {"facets": {"unknownSourceAttribute": {"_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.10.0/integration/***", "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet", "unknownItems": [{"name": "TransformOperator", "properties": {"_BaseOperator__from_mapped": false, "_BaseOperator__init_kwargs": {"conn_id": "", "multiple_outputs": false, "op_args": [], "op_kwargs": {}, "python_callable": "<<non-serializable: function>>", "task_id": "get_campaigns"}, "_BaseOperator__instantiated": true, "_dag": "<<non-serializable: DAG>>", "_inlets": [], "_log": "<<non-serializable: Logger>>", "_outlets": [], "conn_id": "", "depends_on_past": false, "do_xcom_push": true, "downstream_task_ids": [], "email_on_failure": true, "email_on_retry": true, "executor_config": {}, "ignore_first_depends_on_past": true, "inlets": [], "kwargs": {"default_args": {}, "multiple_outputs": false, "op_args": [], "op_kwargs": {}, "python_callable": "<<non-serializable: function>>", "task_id": "get_campaigns"}, "multiple_outputs": false, "op_args": [], "op_kwargs": {}, "outlets": [], "output_table": "<<non-serializable: Table>>", "owner": "***", "parameters": {}, "params": "<<non-serializable: ParamsDict>>", "pool": "default_pool", "pool_slots": 1, "priority_weight": 1, "python_callable": "<<non-serializable: function>>", "queue": "default", "retries": 0, "retry_delay": "<<non-serializable: timedelta>>", "retry_exponential_backoff": false, "sql": "", "start_date": "<<non-serializable: DateTime>>", "task_group": "<<non-serializable: weakproxy>>", "task_id": "get_campaigns", "trigger_rule": "all_success", "upstream_task_ids": [], "wait_for_downstream": false, "weight_rule": "downstream"}, "type": "operator"}]}}, "runId": "f2a20953-7b2c-4c8a-9dbd-f28f64e3751f"}}
[2022-07-26, 07:53:04 UTC] {standard_task_runner.py:92} ERROR - Failed to execute job 50 for task get_campaigns (Could not automatically determine credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. For more information, please see https://cloud.google.com/docs/authentication/getting-started; 218)
[2022-07-26, 07:53:04 UTC] {local_task_job.py:156} INFO - Task exited with return code 1
[2022-07-26, 07:53:04 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

@pankajastro
Copy link
Contributor

pankajastro commented Jul 26, 2022

using sql.transform and export GOOGLE_APPLICATION_CREDENTIALS in containers

Getting below error. related docs https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#what-data-types-can-be-expanded

[2022-07-26, 08:21:20 UTC] {taskinstance.py:1909} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2370, in _record_task_map_for_downstreams
    raise UnmappableXComTypePushed(value)
airflow.exceptions.UnmappableXComTypePushed: unmappable return type 'Table'
[2022-07-26, 08:21:20 UTC] {taskinstance.py:1415} INFO - Marking task as FAILED. dag_id=example_dynamic_map_task, task_id=get_campaigns, execution_date=20220726T082057, start_date=20220726T082100, end_date=20220726T082120
[2022-07-26, 08:21:20 UTC] {manager.py:53} WARNING - Unable to find an extractor. task_type=TransformOperator ***_dag_id=example_dynamic_map_task task_id=get_campaigns ***_run_id=manual__2022-07-26T08:20:57.134603+00:00 
[2022-07-26, 08:21:20 UTC] {console.py:21} INFO - {"eventTime": "2022-07-26T08:21:20.564416Z", "eventType": "FAIL", "inputs": [], "job": {"facets": {}, "name": "example_dynamic_map_task.get_campaigns", "namespace": "default"}, "outputs": [], "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.10.0/integration/***", "run": {"facets": {"unknownSourceAttribute": {"_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.10.0/integration/***", "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet", "unknownItems": [{"name": "TransformOperator", "properties": {"_BaseOperator__from_mapped": false, "_BaseOperator__init_kwargs": {"conn_id": "", "multiple_outputs": false, "op_args": [], "op_kwargs": {"table": "<<non-serializable: Table>>"}, "python_callable": "<<non-serializable: function>>", "task_id": "get_campaigns"}, "_BaseOperator__instantiated": true, "_dag": "<<non-serializable: DAG>>", "_inlets": [], "_log": "<<non-serializable: Logger>>", "_outlets": [], "conn_id": "", "depends_on_past": false, "do_xcom_push": true, "downstream_task_ids": [], "email_on_failure": true, "email_on_retry": true, "executor_config": {}, "ignore_first_depends_on_past": true, "inlets": [], "kwargs": {"default_args": {}, "multiple_outputs": false, "op_args": [], "op_kwargs": {"table": "<<non-serializable: Table>>"}, "python_callable": "<<non-serializable: function>>", "task_id": "get_campaigns"}, "multiple_outputs": false, "op_args": [], "op_kwargs": {"table": "<<non-serializable: Table>>"}, "outlets": [], "output_table": "<<non-serializable: Table>>", "owner": "***", "parameters": {}, "params": "<<non-serializable: ParamsDict>>", "pool": "default_pool", "pool_slots": 1, "priority_weight": 1, "python_callable": "<<non-serializable: function>>", "queue": "default", "retries": 0, "retry_delay": "<<non-serializable: timedelta>>", "retry_exponential_backoff": false, "sql": "", "start_date": "<<non-serializable: DateTime>>", "task_group": "<<non-serializable: weakproxy>>", "task_id": "get_campaigns", "trigger_rule": "all_success", "upstream_task_ids": [], "wait_for_downstream": false, "weight_rule": "downstream"}, "type": "operator"}]}}, "runId": "0d157b9f-764f-4571-865b-976e31edd751"}}
[2022-07-26, 08:21:20 UTC] {standard_task_runner.py:92} ERROR - Failed to execute job 54 for task get_campaigns (unmappable return type 'Table'; 494)

@pankajastro
Copy link
Contributor

with some transformation, I'm able to run https://github.com/astronomer/astro-sdk/tree/example_dynamic_map but the changes are significant so not sure we should add this example or not

@pankajastro
Copy link
Contributor

it looks better with run_raw_sql #575. Thank you @tatiana for the help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation research/question Issues requiring a research or an Open Question
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants