From 2da7dc07f47388d0a9673ac4cd29219bbbe7e493 Mon Sep 17 00:00:00 2001 From: Alex Massen-Hane Date: Fri, 28 Jul 2023 11:09:10 +0800 Subject: [PATCH] Update --- .../database/schema/qa_checks/qa_table.json | 94 +-- .../workflows/qa_checks.py | 552 ++++++++++-------- 2 files changed, 381 insertions(+), 265 deletions(-) diff --git a/academic_observatory_workflows/database/schema/qa_checks/qa_table.json b/academic_observatory_workflows/database/schema/qa_checks/qa_table.json index 1517a0c6a..55b240be7 100644 --- a/academic_observatory_workflows/database/schema/qa_checks/qa_table.json +++ b/academic_observatory_workflows/database/schema/qa_checks/qa_table.json @@ -1,39 +1,67 @@ [ + { + "name": "name", + "description": "Name of the table.", + "mode": "RECORD" + }, { "name": "full_table_id", + "description": "The fully qualified table name: project_id.dataset_id.table_id ", "type": "STRING", - "mode": "RECORD", - "fields": [ - { - "name": "sample_date", - "description": "Date of the table (shard-date/date created)", - "type": "DATE", - "mode": "REQURIED" - }, - { - "name": "primary_key", - "description": "Array of primary keys that are used to identify records.", - "type": "STRING", - "mode": "REPEATED" - }, - { - "name": "row_count", - "description": "Number of rows in the table.", - "type": "INTEGER", - "mode": "REQURIED" - }, - { - "name": "distinct_records", - "description": "Number of distinct records in the table, based off of 'primary_key'.", - "type": "INTEGER", - "mode": "REQURIED" - }, - { - "name": "no_primary_key", - "description": "Number of records that do not have an entryunder 'primary_key' (None or nulls).", - "type": "INTEGER", - "mode": "REQURIED" - } - ] + "mode": "REQURIED" + }, + { + "name": "date_created", + "description": "Date of when the table was created.", + "type": "DATE", + "mode": "REQURIED" + }, + { + "name": "date_last_updated", + "description": "Date of when the table was modified - obtained from Observatory API DatasetRelease.data_interval_end.", + "type": "DATE", + "mode": "REQURIED" + }, + { + "name": "shard", + "description": "If the table is sharded or not.", + "type": "BOOL", + "mode": "REQURIED" + }, + { + "name": "date_shard", + "description": "Date from the table shard (if null it is not a sharded table).", + "type": "DATE", + "mode": "NULLABLE" + }, + { + "name": "date_checked", + "description": "Date of when this table was checked by the QA workflow.", + "type": "DATE", + "mode": "REQURIED" + }, + { + "name": "primary_key", + "description": "Array of primary keys that are used to identify records.", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "row_count", + "description": "Number of rows in the table.", + "type": "INTEGER", + "mode": "REQURIED" + }, + { + "name": "distinct_records", + "description": "Number of distinct records in the table, based off of 'primary_key'.", + "type": "INTEGER", + "mode": "REQURIED" + }, + { + "name": "records_with_no_primary_key", + "description": "Number of records that do not have an entry under 'primary_key' (None or nulls).", + "type": "INTEGER", + "mode": "REQURIED" } ] diff --git a/academic_observatory_workflows/workflows/qa_checks.py b/academic_observatory_workflows/workflows/qa_checks.py index 1e9cda79e..9670b8042 100644 --- a/academic_observatory_workflows/workflows/qa_checks.py +++ b/academic_observatory_workflows/workflows/qa_checks.py @@ -17,8 +17,7 @@ from __future__ import annotations -import copy -import json +import re import logging import os from concurrent.futures import ThreadPoolExecutor, as_completed @@ -29,6 +28,9 @@ import pendulum from airflow.exceptions import AirflowException from google.cloud import bigquery +from google.cloud.bigquery import Table as BQTable + +from observatory.platform.api import get_latest_dataset_release, get_dataset_releases, make_observatory_api from academic_observatory_workflows.config import schema_folder as default_schema_folder, sql_folder, Tag from observatory.api.client.model.dataset_release import DatasetRelease @@ -55,19 +57,23 @@ from observatory.platform.workflows.workflow import Workflow, make_snapshot_date, set_task_state, Release -class QA_CheckRelease(Release): +class QACheckRelease(Release): """Construct a QACheckRelease for the workflow.""" def __init__( self, *, + dag_id: str, + run_id: str, workflow_list: List[str], - check_operations: List[str], schema_folder: str, ): + super().__init__( + dag_id=dag_id, + run_id=run_id, + ) self.workflow_list = workflow_list - self.check_operations = check_operations self.schema_folder = schema_folder @@ -107,7 +113,7 @@ def full_table_id(self): return f"{self.project_id}.{self.dataset_id}.{self.table_id}" -class QA_Check_Workflow(Workflow): +class QACheckWorkflow(Workflow): SENSOR_DAG_IDS = [ "crossref_metadata", "crossref_fundref", @@ -128,9 +134,10 @@ def __init__( *, dag_id: str, cloud_workspace: CloudWorkspace, - production_project: str = "academic-observatory", + production_project: str = "alex-dev-356105", bq_dataset_id: str = "qa_checks", - schema_folder: str = os.path.join(default_schema_folder(), "qa_checks"), + bq_table_id: str = "qa_checks_", + schema_folder: str = os.path.join(default_schema_folder(), "qa_table.json"), observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: Optional[pendulum.DateTime] = pendulum.datetime(2020, 1, 1), schedule_interval: Optional[str] = "@weekly", @@ -143,8 +150,11 @@ def __init__( :param observatory_api_conn_id: COnnection ID for the observatory API. :param start_date: the start date. :param schedule_interval: the schedule interval. + """ + self.observatory_api_conn_id = observatory_api_conn_id + super().__init__( dag_id=dag_id, start_date=start_date, @@ -155,8 +165,11 @@ def __init__( ) self.production_project = production_project - self.bq_dataset_id = bq_dataset_id + self.bq_table_id = bq_table_id + + self.cloud_workspace = cloud_workspace + self.project_id = cloud_workspace.project_id self.data_location = cloud_workspace.data_location self.schema_folder = schema_folder @@ -165,198 +178,187 @@ def __init__( self.sensor_dag_ids = sensor_dag_ids if sensor_dag_ids is None: - self.sensor_dag_ids = QA_Check_Workflow.SENSOR_DAG_IDS - - # Tables to exclude from list of datasets. "Name": exception (that will be included) - table_to_exclude = { - "upsert": [], - "delete": [], - "snapshot": ["unpaywall_snapshot"], - "backup": [], - } + self.sensor_dag_ids = QACheckWorkflow.SENSOR_DAG_IDS # List of all datasets to go through and produce QA tables of - - # main IDs to take note of: - # doi, PMID, - self.workflows_to_check = { - "pubmed": [ + self.dag_ids_to_check = { + "pubmed_telescope": [ Table( - table_id="pubmed", + project_id=production_project, dataset_id="pubmed", + table_id="pubmed", sharded=False, - primary_key_location=["MedlineCitation.PMID.value", "MedlineCitation.PMID.Version"], + primary_key_id_loc=["MedlineCitation.PMID.value", "MedlineCitation.PMID.Version"], ) - ], - "openalex": [ - Table( - project_id=production_project, - dataset_id="openalex", - table_id="authors", - sharded=False, - primary_key_location="ids.doi", - ), - Table( - project_id=production_project, - dataset_id="openalex", - table_id="concepts", - sharded=False, - primary_key_location="ids.doi", - ), - Table( - project_id=production_project, - dataset_id="openalex", - table_id="funders", - sharded=False, - primary_key_location="ids.doi", - ), - Table( - project_id=production_project, - dataset_id="openalex", - table_id="institutions", - sharded=False, - primary_key_location="ids.doi", - ), - Table( - project_id=production_project, - dataset_id="openalex", - table_id="publishers", - sharded=False, - primary_key_location="ids.doi", - ), - Table( - project_id=production_project, - dataset_id="openalex", - table_id="sources", - sharded=False, - primary_key_location="ids.doi", - ), - Table( - project_id=production_project, - dataset_id="openalex", - table_id="works", - sharded=False, - primary_key_location="ids.doi", - ), - ], - "doi_workflow": [ - Table( - project_id=production_project, - dataset_id="observatory", - table_id="author", - sharded=True, - primary_key_location="id", - ), - Table( - project_id=production_project, - dataset_id="observatory", - table_id="book", - sharded=True, - primary_key_location="isbn", - ), - Table( - project_id=production_project, - dataset_id="observatory", - table_id="country", - sharded=True, - primary_key_location="id", - ), - Table( - project_id=production_project, - dataset_id="observatory", - table_id="doi", - sharded=True, - primary_key_location="id", - ), - Table( - project_id=production_project, - dataset_id="observatory", - table_id="funder", - sharded=True, - primary_key_location="id", - ), - Table( - project_id=production_project, - dataset_id="observatory", - table_id="group", - sharded=True, - primary_key_location="id", - ), - Table( - project_id=production_project, - dataset_id="observatory", - table_id="institution", - sharded=True, - primary_key_location="id", - ), - Table( - project_id=production_project, - dataset_id="observatory", - table_id="journal", - sharded=True, - primary_key_location="id", - ), - Table( - project_id=production_project, - dataset_id="observatory", - table_id="publisher", - sharded=True, - primary_key_location="id", - ), - Table( - project_id=production_project, - dataset_id="observatory", - table_id="region", - sharded=True, - primary_key_location="id", - ), - Table( - project_id=production_project, - dataset_id="observatory", - table_id="subregion", - sharded=True, - primary_key_location="id", - ), - ], - "ror": [Table(table_id="ror", sharded=True, primary_key_id_loc="id")], + ] } - self.observatory_api_conn_id = observatory_api_conn_id + # "openalex": [ + # Table( + # project_id=production_project, + # dataset_id="openalex", + # table_id="authors", + # sharded=False, + # primary_key_id_loc="ids.doi", + # ), + # Table( + # project_id=production_project, + # dataset_id="openalex", + # table_id="concepts", + # sharded=False, + # primary_key_id_loc="ids.doi", + # ), + # Table( + # project_id=production_project, + # dataset_id="openalex", + # table_id="funders", + # sharded=False, + # primary_key_id_loc="ids.doi", + # ), + # Table( + # project_id=production_project, + # dataset_id="openalex", + # table_id="institutions", + # sharded=False, + # primary_key_id_loc="ids.doi", + # ), + # Table( + # project_id=production_project, + # dataset_id="openalex", + # table_id="publishers", + # sharded=False, + # primary_key_id_loc="ids.doi", + # ), + # Table( + # project_id=production_project, + # dataset_id="openalex", + # table_id="sources", + # sharded=False, + # primary_key_id_loc="ids.doi", + # ), + # Table( + # project_id=production_project, + # dataset_id="openalex", + # table_id="works", + # sharded=False, + # primary_key_id_loc="ids.doi", + # ), + # ], + # "doi_workflow": [ + # Table( + # project_id=production_project, + # dataset_id="observatory", + # table_id="author", + # sharded=True, + # primary_key_id_loc="id", + # ), + # Table( + # project_id=production_project, + # dataset_id="observatory", + # table_id="book", + # sharded=True, + # primary_key_id_loc="isbn", + # ), + # Table( + # project_id=production_project, + # dataset_id="observatory", + # table_id="country", + # sharded=True, + # primary_key_id_loc="id", + # ), + # Table( + # project_id=production_project, + # dataset_id="observatory", + # table_id="doi", + # sharded=True, + # primary_key_id_loc="id", + # ), + # Table( + # project_id=production_project, + # dataset_id="observatory", + # table_id="funder", + # sharded=True, + # primary_key_id_loc="id", + # ), + # Table( + # project_id=production_project, + # dataset_id="observatory", + # table_id="group", + # sharded=True, + # primary_key_id_loc="id", + # ), + # Table( + # project_id=production_project, + # dataset_id="observatory", + # table_id="institution", + # sharded=True, + # primary_key_id_loc="id", + # ), + # Table( + # project_id=production_project, + # dataset_id="observatory", + # table_id="journal", + # sharded=True, + # primary_key_id_loc="id", + # ), + # Table( + # project_id=production_project, + # dataset_id="observatory", + # table_id="publisher", + # sharded=True, + # primary_key_id_loc="id", + # ), + # Table( + # project_id=production_project, + # dataset_id="observatory", + # table_id="region", + # sharded=True, + # primary_key_id_loc="id", + # ), + # Table( + # project_id=production_project, + # dataset_id="observatory", + # table_id="subregion", + # sharded=True, + # primary_key_id_loc="id", + # ), + # ], + # "ror": [Table(table_id="ror", sharded=True, primary_key_id_loc="id")], + # } + self.input_table_id_tasks = [] - self.add_task(self.create_qa_check_dataset) - self.create_tasks() - - def create_tasks(self): - # Add sensors - with self.parallel_tasks(): - for ext_dag_id in self.sensor_dag_ids: - sensor = DagRunSensor( - task_id=f"{ext_dag_id}_sensor", - external_dag_id=ext_dag_id, - mode="reschedule", - duration=timedelta(days=7), # Look back up to 7 days from execution date - poke_interval=int(timedelta(hours=1).total_seconds()), # Check at this interval if dag run is ready - timeout=int(timedelta(days=3).total_seconds()), # Sensor will fail after 3 days of waiting - ) - self.add_operator(sensor) + # # Add sensors + # with self.parallel_tasks(): + # for ext_dag_id in self.sensor_dag_ids: + # sensor = DagRunSensor( + # task_id=f"{ext_dag_id}_sensor", + # external_dag_id=ext_dag_id, + # mode="reschedule", + # duration=timedelta(days=7), # Look back up to 7 days from execution date + # poke_interval=int(timedelta(hours=1).total_seconds()), # Check at this interval if dag run is ready + # timeout=int(timedelta(days=3).total_seconds()), # Sensor will fail after 3 days of waiting + # ) + # self.add_operator(sensor) # Setup tasks self.add_setup_task(self.check_dependencies) + self.add_task(self.create_qa_check_dataset) + # Create tasks creating the QA Metadata tables self.input_table_task_ids = [] with self.parallel_tasks(): - for workflow, table_list in self.workflows_to_check: - task_id = f"qa_check_{workflow}" + for dag_id, table_list in self.dag_ids_to_check.items(): + task_id = f"qa_check_{dag_id}" self.add_task( self.qa_check_dataset, - op_kwargs={"qa_check_workflow": workflow, "task_id": task_id, "table_list": table_list}, + op_kwargs={"dag_id": dag_id, "task_id": task_id, "table_list": table_list}, task_id=task_id, ) self.input_table_task_ids.append(task_id) - def make_release(self, **kwargs) -> Release: - """Make a release instance. The release is passed as an argument to the function (TelescopeFunction) that is + def make_release(self, **kwargs) -> QACheckRelease: + """Make a QA Check release instance. The release is passed as an argument to the function (TelescopeFunction) that is called in 'task_callable'. :param kwargs: the context passed from the PythonOperator. See @@ -364,22 +366,23 @@ def make_release(self, **kwargs) -> Release: to this argument. :return: A release instance or list of release instances """ - - snapshot_date = make_snapshot_date(**kwargs) - return Release( + return QACheckRelease( dag_id=self.dag_id, run_id=kwargs["run_id"], - snapshot_date=snapshot_date, + workflow_list=self.workflows_to_check, + schema_folder=self.schema_folder, ) - def create_qa_check_dataset(self, release: QA_CheckRelease, **kwargs): + def create_qa_check_dataset(self, release: QACheckRelease, **kwargs): """Create dataset for all the QA Check tables.""" - success = bq_create_dataset(project_id=CloudWorkspace.project_id, dataset_id=self.bq_dataset_id) + success = bq_create_dataset( + project_id=self.project_id, dataset_id=self.bq_dataset_id, location=self.data_location + ) set_task_state(success, self.create_qa_check_dataset.__name__, release) - def qa_check_dataset(self, release: QA_CheckRelease, **kwargs): + def qa_check_dataset(self, release: QACheckRelease, **kwargs): """ For each dataset, create a table where the rows of the table hold the qa metadata of each of the tables. @@ -388,44 +391,85 @@ def qa_check_dataset(self, release: QA_CheckRelease, **kwargs): and append it onto the last row that exists.""" - # if a sharded table - and if not done beofre - use the observatory api - + dag_id = kwargs["dag_id"] table_list: List[Table] = kwargs["table_list"] - # Get list of tables in the dataset - make sure that the number of tables match whats given? - # exclude upsert and delete tables, and snapshots + qa_workflow_table_id = f"{self.bq_table_id}_{dag_id}" - for table in table_list: - # if the table is sharded, loop through each of the shards + # Create the QA table if it already doesn't exist. + # bq_create_empty_table() - # Checl that hte shards havent been done before. - - # if its a new one, do the QA check. - - # assert that the table actually exists, else throw an error. - assert bq_table_exists( - table.full_table_id - ), f"The table {table.full_table_id} does not exist in dataset: {table.dataset_id}" - - logging.info(f"QA for table - {table.full_table_id}") - - # needs qa_table_id, needs to append to the last row of info, run date and time, + for table_to_check in table_list: + if table_to_check.sharded: + # Get list of all tables that share the same name. + bq_sharded_tables: List[BQTable] = bq_list_tables_with_prefix( + dataset_id=table_to_check.dataset_id, table_id=table_to_check.table_id + ) - # self. + # Querytable to see if full_table_id is in there. + for bq_sharded_table in bq_sharded_tables: + assert bq_is_table_date_sharded( + bq_sharded_table.full_table_id + ), f"Table is not sharded. {bq_sharded_table.full_table_id}" + + sql = f""" + SELECT table.full_table_id, date_shard FROM `{self.project_id}.{self.bq_dataset_id}.{qa_workflow_table_id}. + """ + + else: + assert bq_table_exists( + table_to_check.full_table_id + ), f"The table {table_to_check.full_table_id} does not exist in dataset: {table_to_check.dataset_id}" + + # Get the lastest date modified from the Observatory API + dataset_releases = get_dataset_releases(dag_id=dag_id, dataset_id=table_to_check.dataset_id) + latest_release: DatasetRelease = get_latest_dataset_release( + dataset_releases, date_key="data_interval_end" + ) + table_last_edited = latest_release.data_interval_end - # Compile the sql to make the qa check - sql = render_template( - self.qa_check_template_path, - last_table_created_date=release.snapshot_date, - ) + # Table on BQ - fetch table object with metadata + table_on_bq: BQTable = bq_list_tables_with_prefix( + dataset_id=table_to_check.dataset_id, prefix=table_to_check.table_id + )[0] - # dictionary from sql run + qa_check_record = [perform_qa_check(table_to_check, table_on_bq)] - # query the table - if last run was the same as the last sharhded date. + bq_load_from_memory( + table_id=f"{self.project_id}.{self.bq_dataset_id}.{qa_workflow_table_id}", records=qa_check_record + ) - # append onto the table - # assert that number of rows has increased for n number of tables in the source dataset. +def perform_qa_check(table_to_check: Table, table_in_bq: BQTable) -> dict: + """""" + distinct_records = bq_count_distinct_records( + table_to_check.full_table_id, primary_key=table_to_check.primary_key_id_loc + ) + + num_primary_records_null = bq_count_num_nulls_for_field( + table_to_check.full_table_id, fields=table_to_check.primary_key_id_loc + ) + + if table_to_check.sharded: + shard_id = re.findall(r"\d{8}$", table_in_bq.full_table_id)[-1] + date_shard = pendulum.from_format(shard_id, "YYYYMMDD") + full_table_id = f"{table_to_check.project_id}.{table_to_check.dataset_id}.{table_to_check.table_id}{shard_id}" + else: + date_shard = None + full_table_id = table_to_check.full_table_id + + return dict( + table_id=table_to_check.table_id, + full_table_id=full_table_id, + date_created=pendulum.instance(table_in_bq.created), + sharded=table_to_check.sharded, + date_shard=date_shard, + date_checked=pendulum.now(), + primary_key=table_to_check.primary_key_id_loc, + row_count=table_in_bq.num_rows, + distinct_records=distinct_records, + records_with_no_primary_keys=num_primary_records_null, + ) def bq_table_exists(table_id: str) -> bool: @@ -435,16 +479,8 @@ def bq_table_exists(table_id: str) -> bool: :return exists: True if the table exists.""" bq_client = bigquery.Client() - table_ref = bq_client.get_table(table_id) - try: - bq_client.get_table(table_ref) - return True - except Exception as e: - if "Not found: Table" in str(e): - return False - else: - raise + return bool(bq_client.get_table(table_id)) def bq_table_row_count(table_id: str) -> int: @@ -471,21 +507,73 @@ def bq_count_distinct_records(table_id: str, primary_key: Union[str, List[str]]) keys = " ,".join(primary_key) if isinstance(primary_key, list) else primary_key sql = f""" SELECT COUNT(*) as count - FROM (SELECT distinct {keys} FROM {table_id}) + FROM ( SELECT distinct {keys} FROM {table_id} ) """ - + print(sql) return [dict(row) for row in bq_run_query(sql)][0]["count"] -def bq_update_field(table_id: str, primary_key: Union[str, List[str]], field: str, list_to_append: List[Any]): +def bq_count_num_nulls_for_field(full_table_id: str, fields: Union[str, List[str]]) -> int: + """Return the number of nulls for a singular field or number of fields. + This is separated by an OR condition, thus will be counts if any of the fields listed are nulls/empty. + + :param full_table_id: The fully qualified table id. + :param fields: A single string or list of strings to have the number of nulls checked. + :return: The integer number of nulls present in the given fields.""" + + fields_to_check = " IS NULL OR ".join(fields) if isinstance(fields, list) else fields + sql = f""" + SELECT COUNTIF( {fields_to_check} IS NULL) AS nullCount + FROM `{full_table_id}` """ - Update information under a particular field. + return [dict(row) for row in bq_run_query(sql)][0]["nullCount"] + + +# def bq_update_field(table_id: str, primary_key: Union[str, List[str]], field: str, list_to_append: List[Any]): +# """ +# Update information under a particular field. +# """ + +# keys = " ,".join(primary_key) if isinstance(primary_key, list) else primary_key + +# sql = f"""UPDATE {table_id}` +# SET {field} = ARRAY_CONCAT({field}, {list_to_append}) +# WHERE name = """ + +# result = bq_run_query(sql) + + +def bq_list_tables_with_prefix(dataset_id: str, prefix: str) -> List[BQTable]: + """Get a list of table obects that share the same prefix in a dataset. + + :param dataset_id: The name of the dataset. + :param prefix: Prefix to search for. + :return: List of table objects that share the specified prefix.""" + + bq_client = bigquery.Client() + tables = bq_client.list_tables(dataset_id) + + return [bq_client.get_table(table) for table in tables if table.table_id.startswith(prefix)] + + +def bq_is_table_date_sharded(table_id) -> bool: + """Determine if the table is in a series of shards or not. + + :param table_id: Fully qualified table id. + :return: True if the table is a date sharded table. """ - keys = " ,".join(primary_key) if isinstance(primary_key, list) else primary_key + return bool(re.search(r"\d{8}$", table_id)) + + +def bq_get_table_created_date(full_table_id: str) -> pendulum.datetime: + """Retrive the date of when a table was created. + + :param full_table_id: Full qualified table id. + :return: Datetime of when the table was created. + """ - sql = f"""UPDATE {table_id}` - SET {field} = ARRAY_CONCAT({field}, {list_to_append}) - WHERE name = """ + bq_client = bigquery.client() + table = bq_client.get_table(full_table_id) - result = bq_run_query(sql) + return pendulum.instance(table.created)