In [None]:
#Install Notes 
#Open a terminal and run: conda install -c conda-forge scipy
#then conda install -c conda-forge tqdm

In [1]:
from google.cloud import bigquery
import pandas as pd
from tqdm import tqdm


def get_num_description_column(col_name, table_name):
    """
    Takes a column name and a table name, returning a string with descriptive 
    statistics for the column specified.

    Calculates the mean, median, max, min and IQR for the specified column using 
    BigQuery SQL and returns a string with the results concatenated together.

    Args:
        col_name (str): The name of a the numeric column.
        table_name (str): The name of the table containing the specified column.

    Returns:
        str: A string with the mean, median, max, min and IQR for the specified column.
    """
    sql_query = f"""
        WITH stats AS (
            SELECT
                AVG({col_name}) AS mean,
                MAX({col_name}) AS max,
                MIN({col_name}) AS min,
                APPROX_QUANTILES({col_name}, 2)[OFFSET(1)] AS median,
                APPROX_QUANTILES({col_name}, 4)[OFFSET(1)] AS q1,
                APPROX_QUANTILES({col_name}, 4)[OFFSET(3)] AS q3
            FROM `{table_name}`
        )
        SELECT CONCAT('Mean: ', CAST(mean AS STRING),  
                      ', Median: ', CAST(median AS STRING), 
                      ', Max: ', CAST(max AS STRING), 
                      ', Min: ', CAST(min AS STRING),  
                      ', IQR: ', CAST(q3 - q1 AS STRING))
        FROM stats
    """
    return pd.read_gbq(sql_query).iloc[0,0]


def get_date_description_column(col_name, table_name):
    """
    Takes a column name and a table name, returning a string with the min and max 
    dates.

    Calculates the min and max dates for the specified column using BigQuery SQL 
    and returns a string with the results concatenated together.

    Args:
        col_name (str): The name of a date column.
        table_name (str): The name of the table containing the specified column.

    Returns:
        str: A string with the min and max dates for the specified column.
    """
    sql_query = f"""
        WITH stats AS (
            SELECT 
                MAX({col_name}) AS max_date, 
                MIN({col_name}) AS min_date 
            FROM `{table_name}`
        )
        SELECT
            CONCAT('From: ', CAST(min_date AS STRING), 
                   ' To: ', CAST(max_date AS STRING))
        FROM stats
    """
    return pd.read_gbq(sql_query).iloc[0,0]


def get_bool_description_column(col_name, table_name):
    """
    Takes a column name and a table name, returning a the count of `True` and 
    `False` values.

    Calculates the count of `True` and `False` values for the specified column 
    using BigQuery SQL and returns a string with the results concatenated 
    together.

    Args:
        col_name (str): The name of the boolean column.
        table_name (str): The name of the table containing the specified column.

    Returns:
        str: A string with the count of `True` and `False` values for the 
             specified column.
    """
    sql_query = f"""
        WITH stats AS (
            SELECT
                COUNTIF({col_name} = TRUE) AS true_count,
                COUNTIF({col_name} = FALSE) AS false_count
            FROM `{table_name}`
        )
        SELECT
            CONCAT('False: ', CAST(false_count AS STRING), 
                   ', True: ', CAST(true_count AS STRING))
        FROM stats
    """
    return pd.read_gbq(sql_query).iloc[0,0]


def get_string_description_column(col_name, table_name):
    sql_query = f"""
        WITH top_entries AS (
            SELECT {col_name}, COUNT(*) AS count
            FROM `{table_name}`
            GROUP BY {col_name}
            ORDER BY count DESC
            LIMIT 5 
        ),
        total_entries AS (
            SELECT COUNT(DISTINCT {col_name}) AS total_count
            FROM `{table_name}` 
        )
        SELECT IF((SELECT total_count FROM total_entries) > 5, 
                   CONCAT('Top 5: ', STRING_AGG(CONCAT({col_name}, ': ', 
                          CAST(count AS STRING)), ', ')), 
                   STRING_AGG(CONCAT({col_name}, ': ', 
                              CAST(count AS STRING)), ', '))
        FROM top_entries
    """
    return pd.read_gbq(sql_query).iloc[0,0]


def create_data_dict(dataset_id):
    
    """
    Create a data dictionary table for a BigQuery dataset.
    
    Takes the ID of a BigQuery dataset and creates a data_dict table in the same 
    dataset. `data_dict` contains information about tables in the 
    dataset with names prefixed " tbl_" or "cb_":
    table name, column name, data type, and a summary 
    description of each column. `description` column includes summary statistics 
    for numeric columns (mean, median, IQR, min,  max), the number of unique 
    values and top 5 values for string columns, the  date range for date 
    columns, and the count of True and False values for boolean columns. 
    
    Args:
        dataset_id (str): The ID of the BigQuery dataset.
        
    Output:
        None - `data_dict` table is uploaded to biqquery dataset at "dataset_id"
    """
    
    client = bigquery.Client()
    dataset_ref = client.dataset(dataset_id)
    tables = list(client.list_tables(dataset_ref))
    rows = []
    table_count = 0
    output_dict = {
        "table_name": [],
        "column_name": [],
        "data_type": [],
        "description": []
    }
    for table in tables:
        if table.table_id.startswith("tbl_") or table.table_id.startswith("cb_"):
            table_count += 1
            print(f"Processing table {table_count} of {len(tables)}: {table.table_id}")
            table_ref = dataset_ref.table(table.table_id)
            table = client.get_table(table_ref)
            for schema_field in tqdm(table.schema):
                output_dict["table_name"].append(table.table_id)
                output_dict["column_name"].append(schema_field.name)
                output_dict["data_type"].append(schema_field.field_type)
                full_table_id = f"{dataset_id}.{table.table_id}"
                if schema_field.field_type == "STRING":
                    output_dict["description"].append(
                        get_string_description_column(schema_field.name, 
                                                      full_table_id) 
                    )
                elif schema_field.field_type in ["INTEGER", "FLOAT", "NUMERIC", "INT64", "FLOAT64"]:
                    output_dict["description"].append(
                        get_num_description_column(schema_field.name, 
                                                      full_table_id) 
                    )
                elif schema_field.field_type in ["DATE", "TIMESTAMP", "DATETIME","TIME"]:
                    output_dict["description"].append(
                        get_date_description_column(schema_field.name, 
                                                      full_table_id) 
                    )
                elif schema_field.field_type in ["BOOL", "BOOLEAN"]:
                    output_dict["description"].append(
                        get_bool_description_column(schema_field.name, 
                                                      full_table_id) 
                    )
    output_df = pd.DataFrame(output_dict)

    
    output_df.to_gbq(f"{dataset_id}.data_dictionary", progress_bar=False)
    print("Finished creating data_dict table")

In [2]:
create_data_dict("CB_FDM_DeathCertificates")



Processing table 1 of 6: tbl_NEC_Deaths_QWO_20230817_RELEASE


  0%|          | 0/90 [00:00<?, ?it/s]


ImportError: Missing optional dependency 'pandas-gbq'. pandas-gbq is required to load data from Google BigQuery. See the docs: https://pandas-gbq.readthedocs.io. Use pip or conda to install pandas-gbq.