In [None]:
#hide
%load_ext autoreload
%autoreload 2

In [None]:
#hide
#export
import os
import pandas as pd
from dotenv import load_dotenv
from google.cloud import bigquery
from am4894bq.schema import get_schema, df_to_bq_schema, schema_diff, update_bq_schema, update_df_schema
from am4894bq.utils import does_table_exist

load_dotenv()

bq_project_id = os.getenv('BQ_PROJECT_ID')

In [None]:
# default_exp pd

# pd

> Pandas related BigQuery functionality.

In [None]:
#export

def clean_colnames(df: pd.DataFrame) -> pd.DataFrame:
    cols_to_rename = {}
    for col in df.columns:
        if type(col) != str:
            cols_to_rename[col] = f"_{col}"
    if len(cols_to_rename) > 0:
        df = df.rename(columns=cols_to_rename)
    return df

In [None]:
#hide

# tests

df = pd.DataFrame([[1, 2]], columns=[0, 'col2'])
df = clean_colnames(df)

assert list(df.columns) == ['_0', 'col2']

1


In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#export


def cols_to_str(df: pd.DataFrame) -> pd.DataFrame:
    """
    Convert all columns in df to string.
    """
    for col, dtype in df.dtypes.iteritems():
        if dtype != 'object':
            df[col] = df[col].astype(str)
    return df



In [None]:
#hide

# tests
df = pd.DataFrame([[1, 'b0'],[2, 'b1']], columns=['col_a_int', 'col_b'])
df = cols_to_str(df)
assert str(df.dtypes) == 'col_a_int    object\ncol_b        object\ndtype: object'

In [None]:
#export


def df_to_gbq(
    df: pd.DataFrame, destination_table: str, project_id: str, if_exists: str = 'append', 
    print_info: bool = True, mode: str = 'pandas', cols_as_str: bool = False) -> pd.DataFrame:
    """
    Save df to BigQuery enforcing schema consistency between df and destination table if it exists.
    """
    
    # only do anything if mode set to wrangle, otherwise just use pandas
    if mode == 'wrangle':
    
        table_id = f'{project_id}.{destination_table}'
        bq_client = bigquery.Client()

        # only need to handle schema's if table already exists and if_exists != 'replace'
        if does_table_exist(bq_client, table_id) and if_exists != 'replace' :

            old_schema = get_schema(table_id)
            new_schema = df_to_bq_schema(df)
            diffs = schema_diff(old_schema, new_schema)

            if len(diffs) > 0:

                # update the table schema in BigQuery
                update_bq_schema(bq_client, table_id, diffs, print_info=print_info)

                # update the df schema to be as expected by BigQuery
                df = update_df_schema(bq_client, table_id, diffs, df, print_info=print_info)
                
    if cols_as_str:
        df = cols_to_str(df)
        
    # load to BigQuery with a retry
    try: 
        df.to_gbq(destination_table, project_id=project_id, if_exists=if_exists)
    except:
        df.to_gbq(destination_table, project_id=project_id, if_exists=if_exists)
            
    return df



In [None]:
#hide

# tests

# make a dummy df
df = pd.DataFrame([['a0', 'b0'],['a1', 'b1']], columns=['col_a', 'col_b'])

# send to bq
df = df_to_gbq(df, 'tmp.tmp', project_id=bq_project_id, if_exists='replace', mode='wrangle')

# read back from bq
df_bq = pd.read_gbq("select * from tmp.tmp")

assert str(df) == str(df_bq)

# add a new col to df
df['col_c'] = ['c0', 'c1']

# drop col_b
df = df.drop(['col_b'], axis=1)

# save to bq
df = df_to_gbq(df, 'tmp.tmp', project_id=bq_project_id, if_exists='append', print_info=False, mode='wrangle')

# read back from bq
df_bq = pd.read_gbq("select * from tmp.tmp order by 1,2,3")

assert str(df_bq) == '  col_a col_b col_c\n0    a0  None    c0\n1    a0    b0  None\n2    a1  None    c1\n3    a1    b1  None'

1it [00:02,  2.80s/it]
Downloading: 100%|██████████| 2/2 [00:00<00:00, 12.77rows/s]
1it [00:04,  4.91s/it]
Downloading: 100%|██████████| 4/4 [00:00<00:00, 21.42rows/s]
