-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
193 append test refactor #217
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
import pathlib | ||
|
||
import pandas as pd | ||
from airflow.exceptions import BackfillUnfinished | ||
|
||
from astro import sql as aql | ||
from astro.dataframe import dataframe as adf | ||
from tests.operators import utils as test_utils | ||
|
||
CWD = pathlib.Path(__file__).parent | ||
import pytest | ||
|
||
|
||
@adf | ||
def validate_basic(df: pd.DataFrame): | ||
assert len(df) == 6 | ||
assert not df["sell"].hasnans | ||
assert df["rooms"].hasnans | ||
|
||
|
||
@adf | ||
def validate_append_all(df: pd.DataFrame): | ||
assert len(df) == 6 | ||
assert not df["sell"].hasnans | ||
assert not df["rooms"].hasnans | ||
|
||
|
||
@adf | ||
def validate_caste_only(df: pd.DataFrame): | ||
assert len(df) == 6 | ||
assert not df["age"].hasnans | ||
assert df["sell"].hasnans | ||
|
||
|
||
@pytest.fixture | ||
def append_params(request): | ||
mode = request.param | ||
if mode == "basic": | ||
return { | ||
"columns": ["sell", "living"], | ||
}, validate_basic | ||
if mode == "all_fields": | ||
return {}, validate_append_all | ||
if mode == "with_caste": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this test runs against all the databases, we probably do not need to run "basic" and "cast_only" against all the other databases. |
||
return { | ||
"columns": ["sell", "living"], | ||
"casted_columns": {"age": "INTEGER"}, | ||
}, validate_basic | ||
if mode == "caste_only": | ||
return {"casted_columns": {"age": "INTEGER"}}, validate_caste_only | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"append_params", | ||
["basic", "all_fields", "with_caste"], | ||
indirect=True, | ||
) | ||
@pytest.mark.parametrize( | ||
"sql_server", | ||
[ | ||
"bigquery", | ||
"snowflake", | ||
"postgres", | ||
"sqlite", | ||
], | ||
indirect=True, | ||
) | ||
def test_append(sql_server, sample_dag, tmp_table, append_params): | ||
app_param, validate_append = append_params | ||
|
||
with sample_dag: | ||
load_main = aql.load_file( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we move loading files into tmp_table? is there any usecase with just tmp_table without loading it to db? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now using load_file ensures that the actual function is being passed info from a previous task instead of a table object |
||
path=str(CWD) + "/../data/homes_main.csv", | ||
output_table=tmp_table, | ||
) | ||
load_append = aql.load_file( | ||
path=str(CWD) + "/../data/homes_append.csv", | ||
output_table=tmp_table, | ||
) | ||
appended_table = aql.append( | ||
**app_param, | ||
main_table=load_main, | ||
append_table=load_append, | ||
) | ||
validate_append(appended_table) | ||
test_utils.run_dag(sample_dag) | ||
|
||
|
||
from astro.sql.table import TempTable | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"sql_server", | ||
[ | ||
"postgres", | ||
], | ||
indirect=True, | ||
) | ||
def test_append_on_tables_on_different_db(sample_dag, sql_server): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this test case be renamed or we should add more DBs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would it need to be renamed? It's testing a piece of code that's not DB dependent |
||
tmp_table_1 = TempTable(conn_id="postgres_conn") | ||
tmp_table_2 = TempTable(conn_id="sqlite_conn") | ||
Comment on lines
+100
to
+101
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dimberman Are we tearing down these tables? |
||
with pytest.raises(BackfillUnfinished): | ||
with sample_dag: | ||
load_main = aql.load_file( | ||
path=str(CWD) + "/../data/homes_main.csv", | ||
output_table=tmp_table_1, | ||
) | ||
load_append = aql.load_file( | ||
path=str(CWD) + "/../data/homes_append.csv", | ||
output_table=tmp_table_2, | ||
) | ||
appended_table = aql.append( | ||
main_table=load_main, | ||
append_table=load_append, | ||
) | ||
test_utils.run_dag(sample_dag) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be expanded to testcases instead of fixtures?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will make it much easier to add more testcases with less boilerplate. We can see if it becomes a problem but this ultimately makes it easier to create test grids
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dimberman I agree with @utkarsharma2 on this one. I don't think we should aim to have a single test per operator. We already have a few dimensions we are using parametrizations:
I strongly recommend we do not use parametrization for groups of parameters sent to our tasks/operators.
For many operators, I don't think we need to test all the possible configurations of parameters with all the databases.