Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmassen-hane committed Sep 21, 2023
1 parent f54f2fe commit 1068853
Show file tree
Hide file tree
Showing 2 changed files with 403 additions and 162 deletions.
233 changes: 123 additions & 110 deletions academic_observatory_workflows/workflows/data_quality_check_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,24 @@ def full_table_id(self):


class DataQualityCheckWorkflow(Workflow):
# This workflow will wait until all of the below have finished before running the data quality check.
DAG_IDS_TO_CHECK = {
# This workflow will wait until all of the below have finished before running the data quality check workflow.
SENSOR_DAG_IDS = [
"crossref_events",
"crossref_fundref",
"crossref_metadata",
"doi_workflow",
"geonames",
"grid",
"open_citations",
"openalex",
"orcid",
"pubmed",
"ror",
"unpaywall",
]

# These are a list of datasets from workflows or imported from other sources that are under the academic-observatory project.
DATASETS = {
"crossref_events": [
Table(
project_id="alex-dev-356105",
Expand Down Expand Up @@ -128,7 +144,25 @@ class DataQualityCheckWorkflow(Workflow):
fields="doi",
),
],
"doi_workflow": [
"geonames": [
Table(
project_id="alex-dev-356105",
dataset_id="geonames",
name="geonames",
sharded=True,
fields="oci",
),
],
"grid": [
Table(
project_id="alex-dev-356105",
dataset_id="grid",
name="grid",
sharded=True,
fields="oci",
),
],
"observatory": [
Table(
project_id="alex-dev-356105",
dataset_id="observatory",
Expand Down Expand Up @@ -207,31 +241,71 @@ class DataQualityCheckWorkflow(Workflow):
fields="id",
),
],
"geonames": [
"open_citations": [
Table(
project_id="alex-dev-356105",
dataset_id="geonames",
name="geonames",
dataset_id="open_citations",
name="open_citations",
sharded=True,
fields="oci",
),
],
"grid": [
"openaire": [
Table(
project_id="alex-dev-356105",
dataset_id="grid",
name="grid",
dataset_id="openaire",
name="community_infrastructure",
sharded=True,
fields="oci",
fields="id",
),
],
"open_citations": [
Table(
project_id="alex-dev-356105",
dataset_id="open_citations",
name="open_citations",
dataset_id="openaire",
name="dataset",
sharded=True,
fields="oci",
fields="id",
),
Table(
project_id="alex-dev-356105",
dataset_id="openaire",
name="datasource",
sharded=True,
fields="id",
),
Table(
project_id="alex-dev-356105",
dataset_id="openaire",
name="organization",
sharded=True,
fields="id",
),
Table(
project_id="alex-dev-356105",
dataset_id="openaire",
name="otherresearchproduct",
sharded=True,
fields="id",
),
Table(
project_id="alex-dev-356105",
dataset_id="openaire",
name="publication",
sharded=True,
fields="id",
),
Table(
project_id="alex-dev-356105",
dataset_id="openaire",
name="relation",
sharded=True,
fields=["source", "target"],
),
Table(
project_id="alex-dev-356105",
dataset_id="openaire",
name="software",
sharded=True,
fields="id",
),
],
"openalex": [
Expand Down Expand Up @@ -312,89 +386,29 @@ class DataQualityCheckWorkflow(Workflow):
fields="id",
),
],
"unpaywall": [
Table(
project_id="alex-dev-356105",
dataset_id="unpaywall",
name="unpaywall",
sharded=False,
fields="doi",
),
"scihub": [
Table(
project_id="alex-dev-356105",
dataset_id="unpaywall_snapshot",
name="unpaywall_snapshot",
dataset_id="scihub",
name="scihub",
sharded=True,
fields="doi",
),
],
}

# These are a list of datasets that are under the same project which have been made or collected from other sources.
DATASETS_TO_CHECK = {
"openaire": [
Table(
project_id="alex-dev-356105",
dataset_id="openaire",
name="community_infrastructure",
sharded=True,
fields="id",
),
Table(
project_id="alex-dev-356105",
dataset_id="openaire",
name="dataset",
sharded=True,
fields="id",
),
Table(
project_id="alex-dev-356105",
dataset_id="openaire",
name="datasource",
sharded=True,
fields="id",
),
Table(
project_id="alex-dev-356105",
dataset_id="openaire",
name="organization",
sharded=True,
fields="id",
),
Table(
project_id="alex-dev-356105",
dataset_id="openaire",
name="otherresearchproduct",
sharded=True,
fields="id",
),
Table(
project_id="alex-dev-356105",
dataset_id="openaire",
name="publication",
sharded=True,
fields="id",
),
Table(
project_id="alex-dev-356105",
dataset_id="openaire",
name="relation",
sharded=True,
fields=["source", "target"],
),
"unpaywall": [
Table(
project_id="alex-dev-356105",
dataset_id="openaire",
name="software",
sharded=True,
fields="id",
),
dataset_id="unpaywall",
name="unpaywall",
sharded=False,
fields="doi",
)
],
"scihub": [
"unpaywall_snapshot": [
Table(
project_id="alex-dev-356105",
dataset_id="scihub",
name="scihub",
dataset_id="unpaywall_snapshot",
name="unpaywall_snapshot",
sharded=True,
fields="doi",
),
Expand All @@ -412,8 +426,8 @@ def __init__(
start_date: Optional[pendulum.DateTime] = pendulum.datetime(2020, 1, 1),
schedule: Optional[str] = "@weekly",
queue: str = "default",
dag_ids_to_check: Dict[str, List[Table]] = None,
datasets_to_check: Dict[str, List[Table]] = None,
sensor_dag_ids: List[str] = None,
datasets: Dict[str, List[Table]] = None,
):
# TODO: Fix the param details for all functions.
"""Create the DataQualityCheck Workflow.
Expand Down Expand Up @@ -443,17 +457,17 @@ def __init__(
self.data_location = cloud_workspace.data_location
self.schema_path = schema_path

self.dag_ids_to_check = dag_ids_to_check
if dag_ids_to_check is None:
self.dag_ids_to_check = DataQualityCheckWorkflow.DAG_IDS_TO_CHECK
self.sensor_dag_ids = sensor_dag_ids
if sensor_dag_ids is None:
self.sensor_dag_ids = DataQualityCheckWorkflow.SENSOR_DAG_IDS

self.datasets_to_check = datasets_to_check
if datasets_to_check is None:
self.datasets_to_check = DataQualityCheckWorkflow.DATASETS_TO_CHECK
self.datasets = datasets
if datasets is None:
self.datasets = DataQualityCheckWorkflow.DATASETS

# Add sensors
with self.parallel_tasks():
for ext_dag_id, _ in self.dag_ids_to_check.items():
for ext_dag_id in self.sensor_dag_ids:
sensor = DagRunSensor(
task_id=f"{ext_dag_id}_sensor",
external_dag_id=ext_dag_id,
Expand All @@ -469,20 +483,11 @@ def __init__(

# Create parallel task for checking the workflows define above.
with self.parallel_tasks():
for dag_id, tables in self.dag_ids_to_check.items():
self.add_task(
self.perform_data_quality_check,
op_kwargs={"task_id": dag_id, "tables": tables},
task_id=dag_id,
)

# Create parallel task for checking the datasets define above.
with self.parallel_tasks():
for dataset_id, tables in self.datasets_to_check.items():
for task_id, tables in self.datasets.items():
self.add_task(
self.perform_data_quality_check,
op_kwargs={"task_id": dataset_id, "tables": tables},
task_id=dataset_id,
op_kwargs={"task_id": task_id, "tables": tables},
task_id=task_id,
)

def make_release(self, **kwargs) -> DataQualityCheckRelease:
Expand All @@ -496,7 +501,7 @@ def make_release(self, **kwargs) -> DataQualityCheckRelease:
return DataQualityCheckRelease(
dag_id=self.dag_id,
run_id=kwargs["run_id"],
workflow_list=self.dag_ids_to_check,
workflow_list=self.datasets,
schema_path=self.schema_path,
)

Expand Down Expand Up @@ -534,12 +539,15 @@ def perform_data_quality_check(self, release: DataQualityCheckRelease, **kwargs)
dataset_id=table_to_check.dataset_id, base_name=table_to_check.name
)
else:
print(f"table_to_check.full_table_id: {table_to_check.full_table_id}")
sub_tables: List[BQTable] = [bq_get_table(table_to_check.full_table_id)]

assert (
len(sub_tables) > 0
), f"No table or sharded tables found in Bigquery for: {table_to_check.dataset_id}.{table_to_check.name}"

print(f"Sub tables is {sub_tables}")

records = []
table: BQTable
for table in sub_tables:
Expand Down Expand Up @@ -685,7 +693,12 @@ def bq_get_table(full_table_id: str) -> BQTable:
:return: The table obecjt from the Bigqury API."""

bq_client = bigquery.Client()
return bq_client.get_table(full_table_id) if bq_table_exists(full_table_id) else None
try:
table = bq_client.get_table(full_table_id)
return table
except:
print(f"Table is not found! {full_table_id}")
return None


def bq_list_tables_shards(dataset_id: str, base_name: str) -> List[BQTable]:
Expand Down
Loading

0 comments on commit 1068853

Please sign in to comment.