Skip to content

Commit

Permalink
Change test execution method to run_dag() from operator.execute()
Browse files Browse the repository at this point in the history
  • Loading branch information
utkarsharma2 committed Dec 14, 2022
1 parent c5a2214 commit 2009843
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import pandas
from airflow import AirflowException
from airflow.decorators.base import get_unique_task_id
from airflow.models.xcom_arg import XComArg
from airflow.providers.common.sql.operators.sql import SQLColumnCheckOperator

from astro.databases import create_database
Expand Down Expand Up @@ -61,17 +60,22 @@ def __init__(
self.kwargs = kwargs
self.df = None

dataset_qualified_name = ""
dataset_conn_id = ""

if isinstance(dataset, BaseTable):
db = create_database(conn_id=self.dataset.conn_id) # type: ignore
self.conn_id = self.dataset.conn_id
super().__init__(
table=db.get_table_qualified_name(table=self.dataset),
column_mapping=self.column_mapping,
partition_clause=self.partition_clause,
conn_id=dataset.conn_id,
database=dataset.metadata.database,
task_id=task_id if task_id is not None else get_unique_task_id("column_check"),
)
dataset_qualified_name = db.get_table_qualified_name(table=self.dataset)
dataset_conn_id = dataset.conn_id

super().__init__(
table=dataset_qualified_name,
column_mapping=self.column_mapping,
partition_clause=self.partition_clause,
conn_id=dataset_conn_id,
task_id=task_id if task_id is not None else get_unique_task_id("column_check"),
)

def execute(self, context: "Context"):
if isinstance(self.dataset, BaseTable):
Expand Down Expand Up @@ -198,7 +202,7 @@ def column_check(
partition_clause: Optional[str] = None,
task_id: Optional[str] = None,
**kwargs,
) -> XComArg:
) -> ColumnCheckOperator:
"""
Performs one or more of the templated checks in the column_checks dictionary.
Checks are performed on a per-column basis specified by the column_mapping.
Expand Down Expand Up @@ -235,4 +239,4 @@ def column_check(
partition_clause=partition_clause,
kwargs=kwargs,
task_id=task_id,
).output
)
2 changes: 1 addition & 1 deletion python-sdk/tests/data/data_validation.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name,age,city,emp_id
Dwight Schrute,30,,10
Dwight Schrute,30.0,,10
Michael Scott,,LA,1
Jim Halpert,,California City,35
Loading

0 comments on commit 2009843

Please sign in to comment.