diff --git a/academic_observatory_workflows/database/schema/qa_checks/qa_table.json b/academic_observatory_workflows/database/schema/qa_checks/qa_table.json new file mode 100644 index 000000000..1517a0c6a --- /dev/null +++ b/academic_observatory_workflows/database/schema/qa_checks/qa_table.json @@ -0,0 +1,39 @@ +[ + { + "name": "full_table_id", + "type": "STRING", + "mode": "RECORD", + "fields": [ + { + "name": "sample_date", + "description": "Date of the table (shard-date/date created)", + "type": "DATE", + "mode": "REQURIED" + }, + { + "name": "primary_key", + "description": "Array of primary keys that are used to identify records.", + "type": "STRING", + "mode": "REPEATED" + }, + { + "name": "row_count", + "description": "Number of rows in the table.", + "type": "INTEGER", + "mode": "REQURIED" + }, + { + "name": "distinct_records", + "description": "Number of distinct records in the table, based off of 'primary_key'.", + "type": "INTEGER", + "mode": "REQURIED" + }, + { + "name": "no_primary_key", + "description": "Number of records that do not have an entryunder 'primary_key' (None or nulls).", + "type": "INTEGER", + "mode": "REQURIED" + } + ] + } +] diff --git a/academic_observatory_workflows/workflows/qa_checks.py b/academic_observatory_workflows/workflows/qa_checks.py index 1d4c198ef..1e9cda79e 100644 --- a/academic_observatory_workflows/workflows/qa_checks.py +++ b/academic_observatory_workflows/workflows/qa_checks.py @@ -24,12 +24,13 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from datetime import timedelta -from typing import Dict, List, Set, Optional, Tuple, Union +from typing import Any, Dict, List, Set, Optional, Tuple, Union import pendulum from airflow.exceptions import AirflowException +from google.cloud import bigquery -from academic_observatory_workflows.config import sql_folder, Tag +from academic_observatory_workflows.config import schema_folder as default_schema_folder, sql_folder, Tag from observatory.api.client.model.dataset_release import DatasetRelease from observatory.platform.api import make_observatory_api from observatory.platform.bigquery import ( @@ -54,36 +55,29 @@ from observatory.platform.workflows.workflow import Workflow, make_snapshot_date, set_task_state, Release -# @dataclass -# class Dataset: -# def __init__(self, *, table_id: str, source_dataset: str, sharded_table: bool, primary_key: str): -# """Create a metadata class for each of the tables to be produced. - -# There will be one one table for each dataset made. - -# Each table will hold the QA information from each table, e.g. -# """ -# self.table_id = table_id -# self.source_dataset = source_dataset -# self.sharded_table = sharded_table -# self.primary_key = primary_key class QA_CheckRelease(Release): - def __init__(self, *, - # list of tables that were processed. - # what operations were done for this release. - + """Construct a QACheckRelease for the workflow.""" - + def __init__( + self, + *, + workflow_list: List[str], + check_operations: List[str], + schema_folder: str, ): - - self. + self.workflow_list = workflow_list + self.check_operations = check_operations + self.schema_folder = schema_folder + @dataclass class Table: def __init__( self, *, + project_id: str, + dataset_id: str, table_id: str, sharded: bool, primary_key_id_loc: Union[List[str], str], @@ -94,16 +88,24 @@ def __init__( Each table will hold the QA information from each table, e.g. - :param table_id: The table name (not the full table name) + :param project_id: Which project where the table is located. + :param dataset_id: The dataset that the table is under. + :param table_id: The name of the table (not the full qualifed table name). :param source_dataset: Where the table is from. :param sharded: True if the table is shared or not. :param primary_key_id_loc: Location of where the primary key is located in the table e.g. MedlineCiation.PMID.value, could be multiple different identifiers. """ + self.project_id = project_id + self.dataset_id = dataset_id self.table_id = table_id self.sharded = sharded self.primary_key_id_loc = [primary_key_id_loc] if isinstance(primary_key_id_loc, str) else primary_key_id_loc + @property + def full_table_id(self): + return f"{self.project_id}.{self.dataset_id}.{self.table_id}" + class QA_Check_Workflow(Workflow): SENSOR_DAG_IDS = [ @@ -126,7 +128,9 @@ def __init__( *, dag_id: str, cloud_workspace: CloudWorkspace, + production_project: str = "academic-observatory", bq_dataset_id: str = "qa_checks", + schema_folder: str = os.path.join(default_schema_folder(), "qa_checks"), observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: Optional[pendulum.DateTime] = pendulum.datetime(2020, 1, 1), schedule_interval: Optional[str] = "@weekly", @@ -150,15 +154,27 @@ def __init__( tags=[Tag.academic_observatory], ) + self.production_project = production_project + self.bq_dataset_id = bq_dataset_id self.data_location = cloud_workspace.data_location + self.schema_folder = schema_folder + self.qa_check_template_path = "academic_observatory_workflows/database/sql/create_qa_table.sql.jinja2" self.sensor_dag_ids = sensor_dag_ids if sensor_dag_ids is None: self.sensor_dag_ids = QA_Check_Workflow.SENSOR_DAG_IDS + # Tables to exclude from list of datasets. "Name": exception (that will be included) + table_to_exclude = { + "upsert": [], + "delete": [], + "snapshot": ["unpaywall_snapshot"], + "backup": [], + } + # List of all datasets to go through and produce QA tables of # main IDs to take note of: @@ -167,33 +183,140 @@ def __init__( "pubmed": [ Table( table_id="pubmed", + dataset_id="pubmed", sharded=False, primary_key_location=["MedlineCitation.PMID.value", "MedlineCitation.PMID.Version"], ) ], "openalex": [ - Table(table_id="authors", source_dataset="openalex", sharded=False, primary_key_location="ids.doi"), - Table(table_id="concepts", source_dataset="openalex", sharded=False, primary_key_location="ids.doi"), - Table(table_id="funders", source_dataset="openalex", sharded=False, primary_key_location="ids.doi"), Table( - table_id="institutions", source_dataset="openalex", sharded=False, primary_key_location="ids.doi" + project_id=production_project, + dataset_id="openalex", + table_id="authors", + sharded=False, + primary_key_location="ids.doi", + ), + Table( + project_id=production_project, + dataset_id="openalex", + table_id="concepts", + sharded=False, + primary_key_location="ids.doi", + ), + Table( + project_id=production_project, + dataset_id="openalex", + table_id="funders", + sharded=False, + primary_key_location="ids.doi", + ), + Table( + project_id=production_project, + dataset_id="openalex", + table_id="institutions", + sharded=False, + primary_key_location="ids.doi", + ), + Table( + project_id=production_project, + dataset_id="openalex", + table_id="publishers", + sharded=False, + primary_key_location="ids.doi", + ), + Table( + project_id=production_project, + dataset_id="openalex", + table_id="sources", + sharded=False, + primary_key_location="ids.doi", + ), + Table( + project_id=production_project, + dataset_id="openalex", + table_id="works", + sharded=False, + primary_key_location="ids.doi", ), - Table(table_id="publishers", source_dataset="openalex", sharded=False, primary_key_location="ids.doi"), - Table(table_id="sources", source_dataset="openalex", sharded=False, primary_key_location="ids.doi"), - Table(table_id="works", source_dataset="openalex", sharded=False, primary_key_location="ids.doi"), ], "doi_workflow": [ - Table(table_id="author", source_dataset="observatory", sharded=True, primary_key_location="id"), - Table(table_id="book", source_dataset="observatory", sharded=True, primary_key_location="isbn"), - Table(table_id="country", source_dataset="observatory", sharded=True, primary_key_location="id"), - Table(table_id="doi", source_dataset="observatory", sharded=True, primary_key_location="id"), - Table(table_id="funder", source_dataset="observatory", sharded=True, primary_key_location="id"), - Table(table_id="group", source_dataset="observatory", sharded=True, primary_key_location="id"), - Table(table_id="institution", source_dataset="observatory", sharded=True, primary_key_location="id"), - Table(table_id="journal", source_dataset="observatory", sharded=True, primary_key_location="id"), - Table(table_id="publisher", source_dataset="observatory", sharded=True, primary_key_location="id"), - Table(table_id="region", source_dataset="observatory", sharded=True, primary_key_location="id"), - Table(table_id="subregion", source_dataset="observatory", sharded=True, primary_key_location="id"), + Table( + project_id=production_project, + dataset_id="observatory", + table_id="author", + sharded=True, + primary_key_location="id", + ), + Table( + project_id=production_project, + dataset_id="observatory", + table_id="book", + sharded=True, + primary_key_location="isbn", + ), + Table( + project_id=production_project, + dataset_id="observatory", + table_id="country", + sharded=True, + primary_key_location="id", + ), + Table( + project_id=production_project, + dataset_id="observatory", + table_id="doi", + sharded=True, + primary_key_location="id", + ), + Table( + project_id=production_project, + dataset_id="observatory", + table_id="funder", + sharded=True, + primary_key_location="id", + ), + Table( + project_id=production_project, + dataset_id="observatory", + table_id="group", + sharded=True, + primary_key_location="id", + ), + Table( + project_id=production_project, + dataset_id="observatory", + table_id="institution", + sharded=True, + primary_key_location="id", + ), + Table( + project_id=production_project, + dataset_id="observatory", + table_id="journal", + sharded=True, + primary_key_location="id", + ), + Table( + project_id=production_project, + dataset_id="observatory", + table_id="publisher", + sharded=True, + primary_key_location="id", + ), + Table( + project_id=production_project, + dataset_id="observatory", + table_id="region", + sharded=True, + primary_key_location="id", + ), + Table( + project_id=production_project, + dataset_id="observatory", + table_id="subregion", + sharded=True, + primary_key_location="id", + ), ], "ror": [Table(table_id="ror", sharded=True, primary_key_id_loc="id")], } @@ -281,15 +404,15 @@ def qa_check_dataset(self, release: QA_CheckRelease, **kwargs): # assert that the table actually exists, else throw an error. assert bq_table_exists( - table.table_id - ), f"The table {table.table_id} does not exist in dataset: {table.source_dataset}" - - logging.info(f"QA for table - {table.table_id}") + table.full_table_id + ), f"The table {table.full_table_id} does not exist in dataset: {table.dataset_id}" - success = bq_create_empty_table_() + logging.info(f"QA for table - {table.full_table_id}") # needs qa_table_id, needs to append to the last row of info, run date and time, + # self. + # Compile the sql to make the qa check sql = render_template( self.qa_check_template_path, @@ -311,6 +434,58 @@ def bq_table_exists(table_id: str) -> bool: :param table_id: Fully qualified table id. :return exists: True if the table exists.""" - project_id, dataset_id, table_name = table_id.split(".") + bq_client = bigquery.Client() + table_ref = bq_client.get_table(table_id) + + try: + bq_client.get_table(table_ref) + return True + except Exception as e: + if "Not found: Table" in str(e): + return False + else: + raise + + +def bq_table_row_count(table_id: str) -> int: + """Get the number of row that are present in table. + + :param table_id: Fully qualified table id. + :return row_count: Number of rows in the Bigquery table.""" + + bq_client = bigquery.Client() + table = bq_client.get_table(table_id) + row_count = table.num_rows + + return row_count + + +def bq_count_distinct_records(table_id: str, primary_key: Union[str, List[str]]) -> int: + """ + Finds the distinct number of records that have these matching primary keys. + + :param table_id: The fully qualified table id. + :param primary_key: Singular or list of primary keys for records. + """ + + keys = " ,".join(primary_key) if isinstance(primary_key, list) else primary_key + sql = f""" + SELECT COUNT(*) as count + FROM (SELECT distinct {keys} FROM {table_id}) + """ + + return [dict(row) for row in bq_run_query(sql)][0]["count"] + + +def bq_update_field(table_id: str, primary_key: Union[str, List[str]], field: str, list_to_append: List[Any]): + """ + Update information under a particular field. + """ + + keys = " ,".join(primary_key) if isinstance(primary_key, list) else primary_key + + sql = f"""UPDATE {table_id}` + SET {field} = ARRAY_CONCAT({field}, {list_to_append}) + WHERE name = """ - bq_client = bigquery.client() + result = bq_run_query(sql)