Skip to content

Commit

Permalink
Added 'extend_existing' to Sqla Table object (#626)
Browse files Browse the repository at this point in the history
What is the current behavior?

Quite often this test group fails to run in the CI:

nox -s "test-3.8(airflow='2.3')" -- --splits 12 --group 1 --cov=src --cov-report=xml --cov-branch

With:

=========================== short test summary info ============================
FAILED tests/test_example_dags.py::test_example_dag[example_snowflake_partial_table_with_append]
==== 1 failed, 29 passed, 329 deselected, 22 warnings in 294.40s (0:04:54) =====

Even if there were no changes that affected this test/our code-base, for instance:
#480
https://github.com/astronomer/astro-sdk/runs/7231300739

closes: #516
What is the new behavior?

The issue was with SQLA's reflection cache which keeps track of the tables created and maintains all the tables in a map
schema_columns[self.normalize_name(table_name)]. When this test was individually run there are no issues on local since SQLA cache is just initialized just for this test and because of which this was an intermittent issue CI. The fix is to introduce a parameter extend_existing which basically ensures the cache is not used.

Ran test multiple time to ensure this is working:
https://github.com/astronomer/astro-sdk/actions/runs/2832783413
Does this introduce a breaking change?

Nope
  • Loading branch information
utkarsharma2 committed Aug 12, 2022
1 parent bdc66e8 commit 6b3a053
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 31 deletions.
2 changes: 1 addition & 1 deletion docs/astro/sql/operators/drop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ When to Use drop operator
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The drop operator is used to delete tables from the database if they exist. It can be used on both Temporary as well as Persistent Tables.

.. literalinclude:: ../../../../example_dags/example_snowflake_partial_table_with_append.py
.. literalinclude:: ../../../../example_dags/example_sqlite_load_transform.py
:language: python
:start-after: [START drop_table_example]
:end-before: [END drop_table_example]
35 changes: 9 additions & 26 deletions example_dags/example_snowflake_partial_table_with_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@
from airflow.decorators import dag

from astro.files import File
from astro.sql import (
append,
cleanup,
dataframe,
drop_table,
load_file,
run_raw_sql,
transform,
)
from astro.sql import append, cleanup, dataframe, load_file, run_raw_sql, transform
from astro.sql.table import Metadata, Table

"""
Expand Down Expand Up @@ -66,10 +58,10 @@ def filter_data(homes_long: Table):


@run_raw_sql
def create_table():
def create_table(table: Table):
"""Create the reporting data which will be the target of the append method"""
return """
CREATE TABLE IF NOT EXISTS homes_reporting (
CREATE TABLE IF NOT EXISTS {{table}} (
sell number,
list number,
variable varchar,
Expand All @@ -85,7 +77,6 @@ def example_snowflake_partial_table_with_append():
homes_data1 = load_file(
input_file=File(path=FILE_PATH + "homes.csv"),
output_table=Table(
name="homes",
conn_id=SNOWFLAKE_CONN_ID,
metadata=Metadata(
database=os.getenv("SNOWFLAKE_DATABASE"),
Expand All @@ -97,7 +88,6 @@ def example_snowflake_partial_table_with_append():
homes_data2 = load_file(
input_file=File(path=FILE_PATH + "homes2.csv"),
output_table=Table(
name="homes2",
conn_id=SNOWFLAKE_CONN_ID,
metadata=Metadata(
database=os.getenv("SNOWFLAKE_DATABASE"),
Expand All @@ -119,31 +109,24 @@ def example_snowflake_partial_table_with_append():

filtered_data = filter_data(
homes_long=transformed_data,
output_table=Table(name="expensive_homes_long"),
output_table=Table(),
)
homes_reporting = Table(conn_id=SNOWFLAKE_CONN_ID)
create_results_table = create_table(
table=homes_reporting, conn_id=SNOWFLAKE_CONN_ID
)

create_results_table = create_table(conn_id=SNOWFLAKE_CONN_ID)

# Append transformed & filtered data to reporting table
# Dependency is inferred by passing the previous `filtered_data` task to `append_table` param
# [START append_example_with_columns_list]
record_results = append(
source_table=filtered_data,
target_table=Table(name="homes_reporting", conn_id=SNOWFLAKE_CONN_ID),
target_table=homes_reporting,
columns=["sell", "list", "variable", "value"],
)
# [END append_example_with_columns_list]
record_results.set_upstream(create_results_table)

# We truncate this table only to avoid wasting Snowflake resources
# Why? Between 2022-03-25 and 2022-04-11 it accumulated 301G (89 million rows) because
# this example DAG used to append rows without deleting them
# [START drop_table_example]
truncate_results = drop_table(
table=Table(name="homes_reporting", conn_id=SNOWFLAKE_CONN_ID)
)
# [END drop_table_example]
truncate_results.set_upstream(record_results)
cleanup()


Expand Down
17 changes: 14 additions & 3 deletions example_dags/example_sqlite_load_transform.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import time
from datetime import datetime

from airflow import DAG

from astro import sql as aql
from astro.files import File
from astro.sql import drop_table
from astro.sql.table import Table

START_DATE = datetime(2000, 1, 1)


@aql.transform()
def top_five_animations(input_table: Table):
def get_top_five_animations(input_table: Table):
return """
SELECT Title, Rating
FROM {{input_table}}
Expand All @@ -20,6 +22,8 @@ def top_five_animations(input_table: Table):
"""


imdb_movies_name = "imdb_movies" + str(int(time.time()))

with DAG(
"example_sqlite_load_transform",
schedule_interval=None,
Expand All @@ -32,14 +36,21 @@ def top_five_animations(input_table: Table):
path="https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb.csv"
),
task_id="load_csv",
output_table=Table(name="imdb_movies", conn_id="sqlite_default"),
output_table=Table(name=imdb_movies_name, conn_id="sqlite_default"),
)

top_five_animations(
top_five_animations = get_top_five_animations(
input_table=imdb_movies,
output_table=Table(
name="top_animation",
conn_id="sqlite_default",
),
)
# Note - Using persistent table just to showcase drop_table operator.
# [START drop_table_example]
truncate_results = drop_table(
table=Table(name=imdb_movies_name, conn_id="sqlite_default")
)
# [END drop_table_example]
truncate_results.set_upstream(top_five_animations)
aql.cleanup()
5 changes: 4 additions & 1 deletion src/astro/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,10 @@ def get_sqla_table(self, table: Table) -> SqlaTable:
:param table: Astro Table to be converted to SQLAlchemy table instance
"""
return SqlaTable(
table.name, table.sqlalchemy_metadata, autoload_with=self.sqlalchemy_engine
table.name,
table.sqlalchemy_metadata,
autoload_with=self.sqlalchemy_engine,
extend_existing=True,
)

# ---------------------------------------------------------
Expand Down

0 comments on commit 6b3a053

Please sign in to comment.