Additional Data Quality Checks

In [None]:
# 2. Check for data type consistency
for table in stage_tables:
    table_ref = f"{client.project}.{dataset_id}.{table}"
    schema = client.get_table(table_ref).schema
    for field in schema:
        if field.field_type in ['INT64', 'FLOAT64', 'NUMERIC']:
            query = f"""
            SELECT COUNT(*) as invalid_count
            FROM `{table_ref}`
            WHERE SAFE_CAST({field.name} AS {field.field_type}) IS NULL
            AND {field.name} IS NOT NULL
            """
            result = client.query(query).to_dataframe()
            print(f"Invalid numeric values in {table}.{field.name}:", result['invalid_count'].iloc[0])

# 4. Check for referential integrity
if 'stage_transactions' in stage_tables:
    ref_check_query = f"""
    SELECT COUNT(*) as orphaned_records
    FROM `{client.project}.{dataset_id}.stage_transactions` t
    LEFT JOIN `{client.project}.{dataset_id}.stage_customers` c
    ON t.customer_id = c.customer_id
    WHERE c.customer_id IS NULL
    """
    orphaned = client.query(ref_check_query).to_dataframe()
    print("Orphaned transaction records:", orphaned['orphaned_records'].iloc[0])

# 5. Check for mandatory fields
for table in stage_tables:
    table_ref = f"{client.project}.{dataset_id}.{table}"
    schema = client.get_table(table_ref).schema
    for field in schema:
        query = f"""
        SELECT COUNT(*) as null_count
        FROM `{table_ref}`
        WHERE {field.name} IS NULL
        """
        nulls = client.query(query).to_dataframe()
        print(f"Null values in {table}.{field.name}:", nulls['null_count'].iloc[0])

# Original duplicate removal code
for table in stage_tables:
    table_ref = f"{client.project}.{dataset_id}.{table}"
    schema = client.get_table(table_ref).schema
    columns = [field.name for field in schema]
    col_list = ", ".join(columns)
    delete_duplicates_query = f"""
    CREATE OR REPLACE TABLE `{table_ref}` AS
    SELECT * EXCEPT(row_num) FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY {col_list}) as row_num
        FROM `{table_ref}`
    )
    WHERE row_num = 1
    """
    client.query(delete_duplicates_query).result()
    print(f"Duplicates removed from {table}")
