Skip to content

Commit

Permalink
Document for append operator (#619)
Browse files Browse the repository at this point in the history
What is the current behavior?

In the past, we had a tutorial which illustrated how to use each of our operators/decorators:
https://github.com/astronomer/astro-sdk/blob/be6280df00ccff0d7a1c0dfb099b2065303dbe88/REFERENCE.md

closes: #590
What is the new behavior?

Have a reference page per operator/decorator similar to
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/ecs.html#howto-operator-ecsoperator

In which we reference parts of an (automated tested) example DAG which illustrates the usage of that operator/decorator.

Many of these use cases already exist in our example DAGs - we should reference them.
Does this introduce a breaking change?

Nope
  • Loading branch information
utkarsharma2 authored Aug 9, 2022
1 parent 6d68dd6 commit 367f94b
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 0 deletions.
43 changes: 43 additions & 0 deletions docs/astro/sql/operators/append.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
======================================
append operator
======================================

.. _append_operator:

When to use the ``append`` operator
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
We can use ``append`` operator when we want to append the source table to the target table.

.. literalinclude:: ../../../../example_dags/example_append.py
:language: python
:start-after: [START append_example]
:end-before: [END append_example]

When used without a columns parameter, AstroSDK assumes that both tables have the same schema.

When tables have same schema
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#. **Case 1:** When complete table needs to be merged. You can skip ``columns`` parameter.
.. literalinclude:: ../../../../example_dags/example_append.py
:language: python
:start-after: [START append_example]
:end-before: [END append_example]

#. **Case 2:** When subset of columns needs to be merged to target table we pass ``List`` of cols in ``columns`` parameter.
.. literalinclude:: ../../../../example_dags/example_snowflake_partial_table_with_append.py
:language: python
:start-after: [START append_example_with_columns_list]
:end-before: [END append_example_with_columns_list]

When table have different schema
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
When tables have different schema, we can map different column names by passing a ``dict`` of *source cols to target cols*.

.. literalinclude:: ../../../../example_dags/example_append.py
:language: python
:start-after: [START append_example_col_dict]
:end-before: [END append_example_col_dict]

Conflicts
~~~~~~~~~
``append operator`` doesn't handle the conflicts that may arise while appending data. If you want to handle those scenarios, you can use ``merge operator``
50 changes: 50 additions & 0 deletions example_dags/example_append.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import pathlib
from datetime import datetime, timedelta

from airflow.models import DAG

from astro import sql as aql
from astro.files import File
from astro.sql.table import Table

CWD = pathlib.Path(__file__).parent

default_args = {
"owner": "airflow",
"retries": 1,
"retry_delay": 0,
}

dag = DAG(
dag_id="example_append",
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30),
default_args=default_args,
)

DATA_DIR = str(CWD) + "/data/"

with dag:
load_main = aql.load_file(
input_file=File(path=DATA_DIR + "homes.csv"),
output_table=Table(conn_id="postgres_conn"),
)
load_append = aql.load_file(
input_file=File(path=DATA_DIR + "/homes2.csv"),
output_table=Table(conn_id="postgres_conn"),
)
# [START append_example]
aql.append(
target_table=load_main,
source_table=load_append,
)
# [END append_example]

# [START append_example_col_dict]
aql.append(
target_table=load_main, source_table=load_append, columns={"beds": "baths"}
)
# [END append_example_col_dict]

aql.cleanup()
2 changes: 2 additions & 0 deletions example_dags/example_snowflake_partial_table_with_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,13 @@ def example_snowflake_partial_table_with_append():

# Append transformed & filtered data to reporting table
# Dependency is inferred by passing the previous `filtered_data` task to `append_table` param
# [START append_example_with_columns_list]
record_results = append(
source_table=filtered_data,
target_table=Table(name="homes_reporting", conn_id=SNOWFLAKE_CONN_ID),
columns=["sell", "list", "variable", "value"],
)
# [END append_example_with_columns_list]
record_results.set_upstream(create_results_table)

# We truncate this table only to avoid wasting Snowflake resources
Expand Down
1 change: 1 addition & 0 deletions tests/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def session():
"example_snowflake_partial_table_with_append",
"example_sqlite_load_transform",
"example_dynamic_map_task",
"example_append",
"example_load_file",
],
)
Expand Down

0 comments on commit 367f94b

Please sign in to comment.