## SPSS file loader

Loads large SPSS files to BigQuery.

__To Do__
- Update schema with YAML

In [15]:
import re
from datetime import datetime as dt
import pandas as pd
from savReaderWriter import SavReader
from database_connection import databaseConnection
from google.cloud import bigquery



def get_db_client(full_table_id):
    db = databaseConnection()
    project_id, dataset_id, table_id = _get_table_id_set(full_table_id)
    client = bigquery.Client(
        project=project_id,
        credentials=db.credentials)
    return client


def delete_table(full_table_id):
    # If the table does not exist, delete_table raises
    # google.api_core.exceptions.NotFound unless not_found_ok is True.
    client = get_db_client(full_table_id)
    client.delete_table(full_table_id, not_found_ok=True)  # Make an API request.
    print("Deleted table '{}'.".format(full_table_id))


def _get_table_id_set(full_table_id):
    project_id = full_table_id.split('.')[0]
    dataset_id = full_table_id.split('.')[1]
    table_id = full_table_id.split('.')[2]
    return (project_id, dataset_id, table_id)


def _get_chunck_cutoffs(file_length, interval=50_000):
    """ Create a list of lists containing cutoff levels
    to chunk SPSS files.
    """
    i0=0
    i1=interval
    c_list=[]
    while i1 <= file_length:
        c = [i0, i1]
        c_list += [c]
        i0 = i1 + 1
        i1 = i0 + (interval - 1)
    c_list = c_list + [[i0, file_length]]
    return c_list


def spss_to_csv(filename):
    with SavReader(filename) as reader:
        header = [re.sub("(\'|b)", "", str(h)) for h in reader.header]
        file_length = len(reader)
        df = pd.DataFrame(columns=header)
        chunck_cutoffs = _get_chunck_cutoffs(file_length)
        file_list = []
        for i, chunk in enumerate(chunck_cutoffs):
            lines = []
            for line in reader[chunk[0] : chunk[1]]:
                lines += [line]
            df_tmp = pd.DataFrame(lines, columns=header)
            filename_out = re.sub("\.sav", "_{}.csv".format(i), filename)
            file_list += [filename_out]
            print("LOADING {} of {}: (lines {} of {}) --> {}".format(
                i+1, len(chunck_cutoffs), chunk[1], file_length, filename_out))
            df_tmp.to_csv(filename_out)
            del lines, df_tmp
        return file_list


def csv_to_db(source_file, full_table_id, replace=True):
    """ Load a CSV file to BigQuery using the BigQuery Python API. 
    
    Example:
        ```
        csv_to_db(
            file = ,
            full_table_id = )
        ```
    
    Attributes:
        source_file (str): The file path/name of the CSV file. Include
            the file suffix (i.e. .csv).
            
        full_table_id (str): BigQuery table ID. Example:
            `my-project.dope_dataset.terrific_table`
            
        replace (boolean): Whether to replace the table, if it exists.
    
    To Do:
        * Load table schema from YAML
    
    """
    project_id, dataset_id, table_id = _get_table_id_set(full_table_id)
    client = get_db_client(full_table_id)
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.skip_leading_rows = 1
    job_config.autodetect = True
    if replace:
        delete_table(full_table_id)
    with open(source_file, "rb") as sf:
        job = client.load_table_from_file(sf, table_ref, job_config=job_config)
    job.result()  # Waits for table load to complete.
    print("Loaded {} rows into {}:{}.{}.".format(
        job.output_rows, project_id, dataset_id, table_id))


def spss_to_db(filename, full_table_id, replace=True):
    """ Load SPSS files to BigQuery using the BigQuery Python API. 
    
    This function breaks large SPSS files into smaller CSVs and 
    then loads them into BigQuery.
    
    Example:
        ```
        spss_to_db(
            filename = 'dopeData.sav',
            full_table_id = 'my-project.dope_data.awesome_table')
        ```
    
    Attributes:
        filename (str): The file path/name of the CSV file. Include
            the file suffix (i.e. .csv).
            
        full_table_id (str): BigQuery table ID. Example:
            `my-project.dope_dataset.terrific_table`
            
        replace (boolean): Whether to replace the table, if it exists.
    """
    project_id, dataset_id, table_id = _get_table_id_set(full_table_id)
    client = get_db_client(full_table_id)
    print("STEP 1: CONVERT SPSS FILES INTO CSV FILE(S)\n")
    file_list = list(set(spss_to_csv(filename)))
    print("\nSTEP 2: LOAD CSV FILE(S) INTO BIGQUERY TABLE\n")
    if replace:
        delete_table(full_table_id)
    for i, file in enumerate(file_list):
        print("LOADING {} of {}: {} --> {}:{}.{}.".format(
            i+1, len(file_list), file, project_id, dataset_id, table_id))
        csv_to_db(file, full_table_id, replace=False)
    print("\nSPSS FILE LOAD COMPLETE!\n\n"
          "Note: you have a number of CSV files in your data folder."
          "You should delete them before merging to a repository."
         )



<br>

### Run spss_to_db

`spss_to_db()` breaks large SPSS files into smaller CSVs and then loads them into BigQuery.

In [14]:
# Load a CSV file to BigQuery
# csv_file = '/home/jovyan/project-implicit/data/race_iat/Race_IAT.public.2019_0.csv'
# full_table_id = 'algomosaic-nyc.project_implicit.race_ait_2019'
# csv_to_db(source_file=filename, full_table_id=full_table_id)

In [10]:
# Load a SPSS file to BigQuery
spss_file = '/home/jovyan/project-implicit/data/race_iat/Race_IAT.public.2019.sav'
full_table_id = 'algomosaic-nyc.project_implicit.race_ait_2019'


In [11]:

spss_to_db(spss_file, full_table_id)


STEP 1: CONVERT SPSS FILES INTO CSV FILE(S)

LOADING 0 of 18: (lines 50000 of 875209), /home/jovyan/project-implicit/data/race_iat/Race_IAT.public.2019_0.csv
LOADING 1 of 18: (lines 100000 of 875209), /home/jovyan/project-implicit/data/race_iat/Race_IAT.public.2019_1.csv
LOADING 2 of 18: (lines 150000 of 875209), /home/jovyan/project-implicit/data/race_iat/Race_IAT.public.2019_2.csv
LOADING 3 of 18: (lines 200000 of 875209), /home/jovyan/project-implicit/data/race_iat/Race_IAT.public.2019_3.csv
LOADING 4 of 18: (lines 250000 of 875209), /home/jovyan/project-implicit/data/race_iat/Race_IAT.public.2019_4.csv
LOADING 5 of 18: (lines 300000 of 875209), /home/jovyan/project-implicit/data/race_iat/Race_IAT.public.2019_5.csv
LOADING 6 of 18: (lines 350000 of 875209), /home/jovyan/project-implicit/data/race_iat/Race_IAT.public.2019_6.csv
LOADING 7 of 18: (lines 400000 of 875209), /home/jovyan/project-implicit/data/race_iat/Race_IAT.public.2019_7.csv
LOADING 8 of 18: (lines 450000 of 875209), /

In [None]:
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table
#                  to add an empty column.
full_table_id = "your-project.your_dataset.your_table_name"

table = client.get_table(full_table_id)  # Make an API request.

original_schema = table.schema
new_schema = original_schema[:]  # Creates a copy of the schema.
new_schema.append(bigquery.SchemaField("phone", "STRING"))

table.schema = new_schema
table = client.update_table(table, ["schema"])  # Make an API request.

if len(table.schema) == len(original_schema) + 1 == len(new_schema):
    print("A new column has been added.")
else:
    print("The column has not been added.")

