Skip to content

Commit

Permalink
193 append test refactor (#217)
Browse files Browse the repository at this point in the history
* As a step towards "maturing" the astro DAG authoring project, we must rewrite our tests to ensure that every integration test runs against every database.

This step will simultaneously reduce the number of tests we need to maintain, make testing much simpler as we add new databases, and will make a future refactor much simpler as we can ensure proper coverage.

To do this, we will take advantage of two features in pytest, fixtures and parameterize.

For this ticket, we will update the `append` function.

**Acceptance criteria**
Have a single test file validating `append` across all databases.

1. Integration tests should be marked with `pytest.marker.integration`:

* Each test should work across all databases
* Validating both `Table` and `TempTable` as inputs
* Use `test_utils.run_dag`
* Use PyTest `fixtures` and `parameterize` to have a single main test that will validate transform across multiple databases

The tests should validate these scenarios:
1. appending two tables against a single column
2. appending two tables against multiple columns
3. appending against all fields by not specifying fields
4. appending with casting
5. append with some casted fields and some uncasted fields
6. test with two different databases (should fail)

* merged append tests

* fix invalid test

* fix different db test
  • Loading branch information
dimberman authored and utkarsharma2 committed Mar 30, 2022
1 parent 72427d9 commit 2b919f3
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 586 deletions.
116 changes: 116 additions & 0 deletions tests/operators/test_agnostic_append.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import pathlib

import pandas as pd
from airflow.exceptions import BackfillUnfinished

from astro import sql as aql
from astro.dataframe import dataframe as adf
from tests.operators import utils as test_utils

CWD = pathlib.Path(__file__).parent
import pytest


@adf
def validate_basic(df: pd.DataFrame):
assert len(df) == 6
assert not df["sell"].hasnans
assert df["rooms"].hasnans


@adf
def validate_append_all(df: pd.DataFrame):
assert len(df) == 6
assert not df["sell"].hasnans
assert not df["rooms"].hasnans


@adf
def validate_caste_only(df: pd.DataFrame):
assert len(df) == 6
assert not df["age"].hasnans
assert df["sell"].hasnans


@pytest.fixture
def append_params(request):
mode = request.param
if mode == "basic":
return {
"columns": ["sell", "living"],
}, validate_basic
if mode == "all_fields":
return {}, validate_append_all
if mode == "with_caste":
return {
"columns": ["sell", "living"],
"casted_columns": {"age": "INTEGER"},
}, validate_basic
if mode == "caste_only":
return {"casted_columns": {"age": "INTEGER"}}, validate_caste_only


@pytest.mark.parametrize(
"append_params",
["basic", "all_fields", "with_caste"],
indirect=True,
)
@pytest.mark.parametrize(
"sql_server",
[
"bigquery",
"snowflake",
"postgres",
"sqlite",
],
indirect=True,
)
def test_append(sql_server, sample_dag, tmp_table, append_params):
app_param, validate_append = append_params

with sample_dag:
load_main = aql.load_file(
path=str(CWD) + "/../data/homes_main.csv",
output_table=tmp_table,
)
load_append = aql.load_file(
path=str(CWD) + "/../data/homes_append.csv",
output_table=tmp_table,
)
appended_table = aql.append(
**app_param,
main_table=load_main,
append_table=load_append,
)
validate_append(appended_table)
test_utils.run_dag(sample_dag)


from astro.sql.table import TempTable


@pytest.mark.parametrize(
"sql_server",
[
"postgres",
],
indirect=True,
)
def test_append_on_tables_on_different_db(sample_dag, sql_server):
tmp_table_1 = TempTable(conn_id="postgres_conn")
tmp_table_2 = TempTable(conn_id="sqlite_conn")
with pytest.raises(BackfillUnfinished):
with sample_dag:
load_main = aql.load_file(
path=str(CWD) + "/../data/homes_main.csv",
output_table=tmp_table_1,
)
load_append = aql.load_file(
path=str(CWD) + "/../data/homes_append.csv",
output_table=tmp_table_2,
)
appended_table = aql.append(
main_table=load_main,
append_table=load_append,
)
test_utils.run_dag(sample_dag)
Loading

0 comments on commit 2b919f3

Please sign in to comment.