Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added 'extend_existing' to Sqla Table object #626

Merged
merged 3 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/astro/sql/operators/drop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ When to Use drop operator
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The drop operator is used to delete tables from the database if they exist. It can be used on both Temporary as well as Persistent Tables.

.. literalinclude:: ../../../../example_dags/example_snowflake_partial_table_with_append.py
.. literalinclude:: ../../../../example_dags/example_sqlite_load_transform.py
:language: python
:start-after: [START drop_table_example]
:end-before: [END drop_table_example]
35 changes: 9 additions & 26 deletions example_dags/example_snowflake_partial_table_with_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@
from airflow.decorators import dag

from astro.files import File
from astro.sql import (
append,
cleanup,
dataframe,
drop_table,
load_file,
run_raw_sql,
transform,
)
from astro.sql import append, cleanup, dataframe, load_file, run_raw_sql, transform
from astro.sql.table import Metadata, Table

"""
Expand Down Expand Up @@ -66,10 +58,10 @@ def filter_data(homes_long: Table):


@run_raw_sql
def create_table():
def create_table(table: Table):
"""Create the reporting data which will be the target of the append method"""
return """
CREATE TABLE IF NOT EXISTS homes_reporting (
CREATE TABLE IF NOT EXISTS {{table}} (
sell number,
list number,
variable varchar,
Expand All @@ -85,7 +77,6 @@ def example_snowflake_partial_table_with_append():
homes_data1 = load_file(
input_file=File(path=FILE_PATH + "homes.csv"),
output_table=Table(
name="homes",
conn_id=SNOWFLAKE_CONN_ID,
metadata=Metadata(
database=os.getenv("SNOWFLAKE_DATABASE"),
Expand All @@ -97,7 +88,6 @@ def example_snowflake_partial_table_with_append():
homes_data2 = load_file(
input_file=File(path=FILE_PATH + "homes2.csv"),
output_table=Table(
name="homes2",
conn_id=SNOWFLAKE_CONN_ID,
metadata=Metadata(
database=os.getenv("SNOWFLAKE_DATABASE"),
Expand All @@ -119,31 +109,24 @@ def example_snowflake_partial_table_with_append():

filtered_data = filter_data(
homes_long=transformed_data,
output_table=Table(name="expensive_homes_long"),
output_table=Table(),
)
homes_reporting = Table(conn_id=SNOWFLAKE_CONN_ID)
create_results_table = create_table(
table=homes_reporting, conn_id=SNOWFLAKE_CONN_ID
)

create_results_table = create_table(conn_id=SNOWFLAKE_CONN_ID)

# Append transformed & filtered data to reporting table
# Dependency is inferred by passing the previous `filtered_data` task to `append_table` param
# [START append_example_with_columns_list]
record_results = append(
source_table=filtered_data,
target_table=Table(name="homes_reporting", conn_id=SNOWFLAKE_CONN_ID),
target_table=homes_reporting,
columns=["sell", "list", "variable", "value"],
)
# [END append_example_with_columns_list]
record_results.set_upstream(create_results_table)

# We truncate this table only to avoid wasting Snowflake resources
# Why? Between 2022-03-25 and 2022-04-11 it accumulated 301G (89 million rows) because
# this example DAG used to append rows without deleting them
# [START drop_table_example]
truncate_results = drop_table(
table=Table(name="homes_reporting", conn_id=SNOWFLAKE_CONN_ID)
)
# [END drop_table_example]
truncate_results.set_upstream(record_results)
utkarsharma2 marked this conversation as resolved.
Show resolved Hide resolved
cleanup()


Expand Down
17 changes: 14 additions & 3 deletions example_dags/example_sqlite_load_transform.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import time
from datetime import datetime

from airflow import DAG

from astro import sql as aql
from astro.files import File
from astro.sql import drop_table
from astro.sql.table import Table

START_DATE = datetime(2000, 1, 1)


@aql.transform()
def top_five_animations(input_table: Table):
def get_top_five_animations(input_table: Table):
return """
SELECT Title, Rating
FROM {{input_table}}
Expand All @@ -20,6 +22,8 @@ def top_five_animations(input_table: Table):
"""


imdb_movies_name = "imdb_movies" + str(int(time.time()))

with DAG(
"example_sqlite_load_transform",
schedule_interval=None,
Expand All @@ -32,14 +36,21 @@ def top_five_animations(input_table: Table):
path="https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb.csv"
),
task_id="load_csv",
output_table=Table(name="imdb_movies", conn_id="sqlite_default"),
output_table=Table(name=imdb_movies_name, conn_id="sqlite_default"),
)

top_five_animations(
top_five_animations = get_top_five_animations(
input_table=imdb_movies,
output_table=Table(
name="top_animation",
conn_id="sqlite_default",
),
)
# Note - Using persistent table just to showcase drop_table operator.
# [START drop_table_example]
truncate_results = drop_table(
table=Table(name=imdb_movies_name, conn_id="sqlite_default")
)
# [END drop_table_example]
truncate_results.set_upstream(top_five_animations)
aql.cleanup()
5 changes: 4 additions & 1 deletion src/astro/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,10 @@ def get_sqla_table(self, table: Table) -> SqlaTable:
:param table: Astro Table to be converted to SQLAlchemy table instance
"""
return SqlaTable(
table.name, table.sqlalchemy_metadata, autoload_with=self.sqlalchemy_engine
table.name,
table.sqlalchemy_metadata,
autoload_with=self.sqlalchemy_engine,
extend_existing=True,
Comment on lines +482 to +483
Copy link
Collaborator

@tatiana tatiana Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without these arguments, the Python SDK would raise an exception if concurrent processes tried to create the same table (what was happening in the CI).
By introducing this change, we may be hiding this type of issue - making it harder for users to troubleshoot this anti-pattern (having two processes trying to create a table with same name).

Copy link
Collaborator Author

@utkarsharma2 utkarsharma2 Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tatiana This was my first guess as well I converted persistent tables to temp tables, but the issue still persisted - https://github.com/astronomer/astro-sdk/runs/7765749178?check_suite_focus=true

And I don't think busting the cache will hide the issue if anything this will bring up the issue sooner since now you will be referring to the database directly and not the cache. I don't understand how this is an antipattern.

However, this comes at the cost of running a query to get table columns every time we create the table object, but since we are not handling all the table operations via SQLA. SQLA doesn't have any knowledge about updates done to a table by some operator. So, in my opinion, this should be added to keep such situations in check.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tatiana But I do understand your point, having temp would be better, and chances of collision would be reduced. I can make this change on top of existing changes. WDYT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per the discussion on a call with @tatiana, we concluded the following:

  1. The extend_existing is required and was the root cause of the issue in CI.
  2. Persistent tables in example dag can also lead to conflicts when there are parallel test suites running.

Based on the above points I have updated the PR to reflect those changes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing, thank you very much, @utkarsharma2 !

)

# ---------------------------------------------------------
Expand Down