In [1]:
import os
import pandas as pd
import numpy as np
import pandas_gbq
import sqlite3
import zipfile
import csv
import io
import glob

# Do our imports for the code
from google.cloud import bigquery
from google.oauth2 import service_account
from zipfile import ZipFile
from google.cloud.exceptions import NotFound

## 1. Extracting Zip Files

In [2]:
# Set the directory path where your ZIP files are located locally
# Use a raw string for the path
#directory_path = "/Users/biancabostrom/Documents/ADA/Wedge Project/WedgeZipOfZips_Big"
directory_path = r'C:\Users\hills\Documents\Fall2023\ADA\wedge-project\data\WedgeZipOfZips_Big'
output_folder = 'extracted_zips_big'

In [3]:
# Create the output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)

# Iterate over all the files in the directory
for filename in os.listdir(directory_path):
    if filename.endswith('.zip'):
        # Construct the full file path
        file_path = os.path.join(directory_path, filename)

        # Create a folder for each ZIP file
        folder_name = os.path.splitext(filename)[0]
        extract_path = os.path.join(output_folder)

        # Print the file path for debugging
        print(f"Attempting to extract: {file_path}")

        try:
            # Open the ZIP file
            with zipfile.ZipFile(file_path, 'r') as zip_ref:
                # Extract all the contents into the directory
                zip_ref.extractall(extract_path)
                print(f"Extracted {filename} to {extract_path}")
        except Exception as e:
            print(f"Error extracting {filename}: {e}")

print("All files extracted.")


Attempting to extract: C:\Users\hills\Documents\Fall2023\ADA\wedge-project\data\WedgeZipOfZips_Big\transArchive_201001_201003.zip
Extracted transArchive_201001_201003.zip to extracted_zips_big
Attempting to extract: C:\Users\hills\Documents\Fall2023\ADA\wedge-project\data\WedgeZipOfZips_Big\transArchive_201004_201006.zip
Extracted transArchive_201004_201006.zip to extracted_zips_big
Attempting to extract: C:\Users\hills\Documents\Fall2023\ADA\wedge-project\data\WedgeZipOfZips_Big\transArchive_201007_201009.zip
Extracted transArchive_201007_201009.zip to extracted_zips_big
Attempting to extract: C:\Users\hills\Documents\Fall2023\ADA\wedge-project\data\WedgeZipOfZips_Big\transArchive_201010_201012.zip
Extracted transArchive_201010_201012.zip to extracted_zips_big
Attempting to extract: C:\Users\hills\Documents\Fall2023\ADA\wedge-project\data\WedgeZipOfZips_Big\transArchive_201101_201103.zip
Extracted transArchive_201101_201103.zip to extracted_zips_big
Attempting to extract: C:\Users\hil

## 2. Cleaning files: headers, delimeters, nulls and quotes 

In [11]:
correct_headers = [
    "datetime", "register_no", "emp_no", "trans_no", "upc", "description", "trans_type", "trans_subtype",
    "trans_status", "department", "quantity", "Scale", "cost", "unitPrice", "total", "regPrice", "altPrice",
    "tax", "taxexempt", "foodstamp", "wicable", "discount", "memDiscount", "discountable", "discounttype",
    "voided", "percentDiscount", "ItemQtty", "volDiscType", "volume", "VolSpecial", "mixMatch", "matched",
    "memType", "staff", "numflag", "itemstatus", "tenderstatus", "charflag", "varflag", "batchHeaderID", 
    "local", "organic", "display", "receipt", "card_no", "store", "branch", "match_id", "trans_id"
]
# loop though all files in the directory
for root, dirs, files in os.walk(output_folder):
    for file in files:
        full_path = os.path.join(root, file)
        if file.endswith('.csv'):
            with open(full_path,'r') as f:
                first_line = f.readline().strip()

            with open(full_path,'r') as f:
                content = f.read()
            # check if the file likely has headers based on the first line
            if not first_line.startswith('"datetime"') and not first_line.startswith('datetime'):
                content = ','.join(correct_headers) + '\n' + content

            #content = content.replace('\"','inch')

            #with open(full_path,'w') as f:``
                #f.write(content)

            content = '\n'.join(['inch' + line.strip('\"') + 'inch' for line in content.split('\n')])

            with open(full_path, 'w') as f:
                f.write(content)


In [12]:
# Bianca's info
#service_path = "/Users/biancabostrom/Documents/ADA/Wedge\ Project/wedge-404400-cb3a632effa5.json"
#service_file = 'wedge-404400-cb3a632effa5.json' 
#gbq_proj_id = "wedge-404400" 
#gbq_dataset_id = "wedge_data"
#credentials = service_account.Credentials.from_service_account_file("/Users/biancabostrom/Documents/ADA/Wedge Project/wedge-404400-cb3a632effa5.json")

# Spencer's info
service_path = r"C:\Users\hills\Documents\Fall2023\ADA\wedge-project\leafy-sunrise-403222-f51fcd80b921.json"
service_file = 'leafy-sunrise-403222-f51fcd80b921.json' # change this to your authentication information  
gbq_proj_id = "leafy-sunrise-403222" # change this to your project. 
gbq_dataset_id = "wedge_data"
credentials = service_account.Credentials.from_service_account_file(r"C:\Users\hills\Documents\Fall2023\ADA\wedge-project\leafy-sunrise-403222-f51fcd80b921.json")

private_key = service_path + service_file


In [13]:
schema = [
    {"name": "datetime", "type": "TIMESTAMP"},     # 1
    {"name": "register_no", "type": "FLOAT"},      # 2
    {"name": "emp_no", "type": "FLOAT"},           # 3
    {"name": "trans_no", "type": "FLOAT"},         # 4
    {"name": "upc", "type": "STRING"},             # 5
    {"name": "description", "type": "STRING"},     # 6
    {"name": "trans_type", "type": "STRING"},      # 7
    {"name": "trans_subtype", "type": "STRING"},   # 8
    {"name": "trans_status", "type": "STRING"},    # 9
    {"name": "department", "type": "FLOAT"},       # 10
    {"name": "quantity", "type": "FLOAT"},         # 11
    {"name": "Scale", "type": "FLOAT"},            # 12
    {"name": "cost", "type": "FLOAT"},             # 13
    {"name": "unitPrice", "type": "FLOAT"},        # 14
    {"name": "total", "type": "FLOAT"},            # 15
    {"name": "regPrice", "type": "FLOAT"},         # 16
    {"name": "altPrice", "type": "FLOAT"},         # 17
    {"name": "tax", "type": "FLOAT"},              # 18
    {"name": "taxexempt", "type": "FLOAT"},        # 19
    {"name": "foodstamp", "type": "FLOAT"},        # 20
    {"name": "wicable", "type": "FLOAT"},          # 21
    {"name": "discount", "type": "FLOAT"},         # 22
    {"name": "memDiscount", "type": "FLOAT"},      # 23
    {"name": "discountable", "type": "FLOAT"},     # 24
    {"name": "discounttype", "type": "FLOAT"},     # 25
    {"name": "voided", "type": "FLOAT"},           # 26
    {"name": "percentDiscount", "type": "FLOAT"},  # 27
    {"name": "ItemQtty", "type": "FLOAT"},         # 28
    {"name": "volDiscType", "type": "FLOAT"},      # 29
    {"name": "volume", "type": "FLOAT"},           # 30
    {"name": "VolSpecial", "type": "FLOAT"},       # 31
    {"name": "mixMatch", "type": "FLOAT"},         # 32
    {"name": "matched", "type": "FLOAT"},          # 33
    {"name": "memType", "type": "BOOLEAN"},        # 34
    {"name": "staff", "type": "BOOLEAN"},          # 35
    {"name": "numflag", "type": "FLOAT"},          # 36
    {"name": "itemstatus", "type": "FLOAT"},       # 37
    {"name": "tenderstatus", "type": "FLOAT"},     # 38
    {"name": "charflag", "type": "STRING"},        # 39
    {"name": "varflag", "type": "FLOAT"},          # 40
    {"name": "batchHeaderID", "type": "BOOLEAN"},  # 41
    {"name": "local", "type": "FLOAT"},            # 42
    {"name": "organic", "type": "FLOAT"},          # 43
    {"name": "display", "type": "BOOLEAN"},        # 44
    {"name": "receipt", "type": "FLOAT"},          # 45
    {"name": "card_no", "type": "FLOAT"},          # 46
    {"name": "store", "type": "FLOAT"},            # 47
    {"name": "branch", "type": "FLOAT"},           # 48
    {"name": "match_id", "type": "FLOAT"},         # 49
    {"name": "trans_id", "type": "FLOAT"}          # 50
]


In [14]:
def drop_table_if_exists(gbq_dataset_id, table_name, credentials, gbq_proj_id):
    client = bigquery.Client(credentials=credentials, project=gbq_proj_id)
    table_id = f"{gbq_proj_id}.{gbq_dataset_id}.{table_name}"

    try:
        client.delete_table(table_id)
        print(f"deleted table '{table_id}'")
    except NotFound:
        print(f"table '{table_id}' not found, skipping deletion.")

def detect_delimiter(filename):
    with open(filename,'r') as file:
        first_line = file.readline()
        return ";" if ";" in first_line else ","
    
def clean_dataframe(df):
    # Original column names
    original_columns = df.columns.tolist()

    float_columns = [
        'register_no', 'emp_no', 'trans_no', 'department', 'quantity', 'Scale', 'cost', 'unitPrice', 'total', 'regPrice'
        , 'altPrice', 'tax', 'taxexempt', 'foodstamp', 'wicable', 'discount', 'memDiscount', 'discountable', 'discounttype'
        , 'voided', 'percentDiscount', 'ItemQtty', 'volDiscType', 'volume', 'VolSpecial', 'mixMatch', 'matched', 'numflag'
        , 'itemstatus', 'tenderstatus', 'varflag', 'local', 'organic', 'receipt', 'card_no', 'store', 'branch', 'match_id'
        ,'trans_id'
    ]

    boolean_columns = ['memType', 'staff', 'batchHeaderID', 'display']

    string_columns = ['upc', 'description', 'trans_type', 'trans_subtype', 'trans_status', 'charflag']

    # Replace the modified column names with the original ones
    df.columns = original_columns

    for col in string_columns:
        if col in df.columns:
            df[col] = df[col].astype(str)
            df[col] = df[col].str.replace('"', '', regex=False)

    for col in float_columns:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')

    # Check if the column exists before filling NaN values
    for col in float_columns:
        if col in df.columns:
            df[col] = df[col].fillna(0)

    for col in boolean_columns:
        if col in df.columns:
            df[col] = df[col].astype(bool)

    df['datetime'] = pd.to_datetime(df['datetime'], errors='coerce')

    replace_strings = ["\\n", "\\\\", "nan", "NULL"]
    df.replace(replace_strings, "", inplace=True)

    for col in df.columns:
        if df[col].dtype == 'object':
            df[col] = df[col].str.strip()

    for col in ['ItemQtty', 'reciept']:
        if col in df.columns:
            df[col] = df[col].astype(str).str.replace('"', '', regex=False)

    # Print the first 20 rows for inspection
    print("First 20 rows after cleaning:")
    print(df.head(20))

    df = df.applymap(lambda x: None if x == '' else x)

    return df


In [15]:
print("Columns in the DataFrame:", df.columns)
df['datetime'] = pd.to_datetime(df['datetime'], errors='coerce')

Columns in the DataFrame: Index(['inchdatetimeinch', 'inchregister_noinch', 'inchemp_noinch',
       'inchtrans_noinch', 'inchupcinch', 'inchdescriptioninch',
       'inchtrans_typeinch', 'inchtrans_subtypeinch', 'inchtrans_statusinch',
       'inchdepartmentinch', 'inchquantityinch', 'inchScaleinch',
       'inchcostinch', 'inchunitPriceinch', 'inchtotalinch',
       'inchregPriceinch', 'inchaltPriceinch', 'inchtaxinch',
       'inchtaxexemptinch', 'inchfoodstampinch', 'inchwicableinch',
       'inchdiscountinch', 'inchmemDiscountinch', 'inchdiscountableinch',
       'inchdiscounttypeinch', 'inchvoidedinch', 'inchpercentDiscountinch',
       'inchItemQttyinch', 'inchvolDiscTypeinch', 'inchvolumeinch',
       'inchVolSpecialinch', 'inchmixMatchinch', 'inchmatchedinch',
       'inchmemTypeinch', 'inchstaffinch', 'inchnumflaginch',
       'inchitemstatusinch', 'inchtenderstatusinch', 'inchcharflaginch',
       'inchvarflaginch', 'inchbatchHeaderIDinch', 'inchlocalinch',
       'inchorgan

KeyError: 'datetime'

## 3. Upload to GBQ

In [8]:
# move through all files in the directory
for root, dirs, files in os.walk(output_folder):
    for file in files:
        full_path = os.path.join(root, file)

        if file.endswith('.csv'):
            print(f"Found CSV file: {file}")

            delimiter = detect_delimiter(full_path)
            print(f"Detected delimiter: {delimiter}")

            # Reading CSV with correct handling of quoted fields
            df = pd.read_csv(full_path, delimiter=delimiter, quotechar='"', dtype=str, low_memory=False)

            table_name = file.replace('.csv', '')

            # Drop the table if it exists
            drop_table_if_exists(gbq_dataset_id, table_name, credentials, gbq_proj_id)
            client = bigquery.Client(credentials=credentials, project=gbq_proj_id)

            # Construct the fully-qualified table_id without ".csv" extension
            table_id = f"{gbq_proj_id}.{gbq_dataset_id}.{table_name}"

            try:
                client.delete_table(table_id)
                print(f"Deleted table '{table_id}'")
            except NotFound:
                print(f"Table '{table_id}' not found, skipping deletion.")

            # Clean the DataFrame
            df = clean_dataframe(df)

            # Modify the field names to comply with the gbq rules
            df.columns = [col.lower().replace(';', '') for col in df.columns]

            print(f"Uploading {table_name} to BigQuery...")
            pandas_gbq.to_gbq(df, f"{gbq_dataset_id}.{table_name}", project_id=gbq_proj_id, if_exists='replace', credentials=credentials, table_schema=schema)
            del df  # Clean the DataFrame from memory

Found CSV file: transArchive_201001_201003.csv
Detected delimiter: ,
table 'leafy-sunrise-403222.wedge_data.transArchive_201001_201003' not found, skipping deletion.
Table 'leafy-sunrise-403222.wedge_data.transArchive_201001_201003' not found, skipping deletion.


KeyError: 'datetime'

# Hi John - cells below are for reference 

# Testing code chunk

chunk_size = 50000

def drop_table_if_exists(gbq_dataset_id, table_name, credentials, gbq_proj_id):
    client = bigquery.Client(credentials=credentials, project=gbq_proj_id)
    table_id = f"{gbq_proj_id}.{gbq_dataset_id}.{table_name}"

    try:
        client.delete_table(table_id)
        print(f"deleted table '{table_id}'")
    except NotFound:
        print(f"table '{table_id}' not found, skipping deletion.")

def detect_delimiter(filename):
    with open(filename,'r') as file:
        first_line = file.readline()
        return ";" if ";" in first_line else ","
    
def clean_dataframe(df):
    float_columns = [
        'register_no', 'emp_no', 'trans_no', 'department', 'quantity', 'Scale', 'cost', 'unitPrice', 'total', 'regPrice'
        , 'altPrice', 'tax', 'taxexempt', 'foodstamp', 'wicable', 'discount', 'memDiscount', 'discountable', 'discounttype'
        , 'voided', 'percentDiscount', 'ItemQtty', 'volDiscType', 'volume', 'VolSpecial', 'mixMatch', 'matched', 'numflag'
        , 'itemstatus', 'tenderstatus', 'varflag', 'local', 'organic', 'receipt', 'card_no', 'store', 'branch', 'match_id'
        ,'trans_id'
    ]

    boolean_columns = [ 'memType', 'staff', 'batchHeaderID', 'display']

    string_columns = ['upc', 'description', 'trans_type', 'trans_subtype', 'trans_status', 'charflag']

    for col in string_columns:
        if col in df.columns:
            df[col]  = df[col].astype(str)
            df[col] = df[col].str.replace('"', '', regex=False)

    for col in float_columns:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')

    df[float_columns] = df[float_columns].fillna(0)

    for col in boolean_columns:
        if col in df.columns:
            df[col] = df[col].astype(bool)

    df['datetime'] = pd.to_datetime(df['datetime'], errors='coerce')

    replace_strings = ["\\n", "\\\\", "nan", "NULL"]
    df.replace(replace_strings, "", inplace=True)

    for col in df.columns:
        if df[col].dtype == 'object':
            df[col] = df[col].str.strip()

    for col in ['itemQtty', 'reciept']:
        if col in df.columns:
            df[col] = df[col].astype(str).str.replace('"', '', regex=False)

    # Print the first 20 rows for inspection
    print("First 20 rows after cleaning:")
    print(df.head(20))

    df = df.applymap(lambda x: None if x == '' else x)

    return df




# hold on to this while I test another chunk - more than likley delete

chunk_size = 50000

def drop_table_if_exists(gbq_dataset_id, table_name, credentials, gbq_proj_id):
    client = bigquery.Client(credentials=credentials, project=gbq_proj_id)
    table_id = f"{gbq_proj_id}.{gbq_dataset_id}.{table_name}"

    try:
        client.delete_table(table_id)
        print(f"deleted table '{table_id}'")
    except NotFound:
        print(f"table '{table_id}' not found, skipping deletion.")

def detect_delimiter(filename):
    with open(filename,'r') as file:
        first_line = file.readline()
        return ";" if ";" in first_line else ","
    
def clean_dataframe(df):
    float_columns = [
        'register_no', 'emp_no', 'trans_no', 'department', 'quantity', 'Scale', 'cost', 'unitPrice', 'total', 'regPrice'
        , 'altPrice', 'tax', 'taxexempt', 'foodstamp', 'wicable', 'discount', 'memDiscount', 'discountable', 'discounttype'
        , 'voided', 'percentDiscount', 'itemQtty', 'volDiscType', 'volume', 'VolSpecial', 'mixMatch', 'matched', 'numflag'
        , 'itemstatus', 'tenderstatus', 'varflag', 'local', 'organic', 'reciept', 'card_no', 'store', 'branch', 'match_id'
        ,'trans_id'
    ]

    boolean_columns = [ 'memType', 'staff', 'batchHeaderID', 'display']

    string_columns = ['upc', 'description', 'trans_type', 'trans_subtype', 'trans_status', 'charflag']

        # Check if columns exist before applying operations

    # List of columns to check and clean
    #columns_to_clean = ['itemQtty', 'reciept']

   # for col in columns_to_clean:
    #    if col in df.columns:
     #       if col == 'charflag':
      #          df[col] = df[col].str.strip()
       #     elif col in ['itemQtty', 'reciept']:
        #        df[col] = df[col].astype(str).str.replace('"', '', regex=False)


    for col in string_columns:
        if col in df.columns:
            df[col]  = df[col].astype(str)
            df [col] = df [col].str.replace('"', '', regex=False)

    for col in float_columns:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')

    df[float_columns] = df[float_columns].fillna(0)

    for col in boolean_columns:
        if col in df.columns:
            df[col] = df[col].astype(bool)

    df['datetime'] = pd.to_datetime(df['datetime'], errors='coerce')
    
    replace_strings = ["\\n", "\\\\", "nan", "NULL"]
    df.replace(replace_strings, "", inplace=True)

    for col in df.columns:
        if df[col].dtype == 'object':
            df[col] = df[col].str.strip()
            #df[col] = df[col].str.replace('\\\"', '', regex=False)

    for col in df.columns: # added these three lines trying to problem solve next chuck for gbq file path
        if col == 'charflag' and col in df.columns:
            df[col] = df[col].str.strip()

    df = df.applymap(lambda x: None if x == '' else x)

#duplicate code to play with

# move through all files in the directory
for root, dirs, files in os.walk(output_folder):
    for file in files:
        full_path = os.path.join(root, file)
        
        if file.endswith('.csv'):
            print(f"Found CSV file: {file}")

            delimiter = detect_delimiter(full_path)
            print(f"detected delimiter: {delimiter}")

            # reading csv with correct handling of quoted fields
            chunk_iter = pd.read_csv(full_path, delimiter=delimiter, quotechar='"', chunksize=chunk_size, dtype=str, low_memory=False)

            table_name = file.replace('.csv', '')

            # drop the table if it exists
            drop_table_if_exists(gbq_dataset_id, table_name, credentials, gbq_proj_id)
            client = bigquery.Client(credentials=credentials, project=gbq_proj_id)
    
            # Construct the fully-qualified table_id without ".csv" extension
            table_id = f"{gbq_proj_id}.{gbq_dataset_id}.{table_name}"

            try:
                client.delete_table(table_id)
                print(f"deleted table '{table_id}'")
            except NotFound:
                print(f"table '{table_id}' not found, skipping deletion.")

            print(f"reading csv file in chunks: {file}...")
            for idx, chunk_df in enumerate(chunk_iter):
                # clean the DF
                chunk_df = clean_dataframe(chunk_df)

                # modify the field names to comply with the gbq rules
                chunk_df.columns = [col.lower().replace(';', '') for col in chunk_df.columns]

                print(f"uploading chunk {idx + 1} to {table_name}...")
                if idx == 0:
                    # for the first chunk create the table with the defined schema
                    pandas_gbq.to_gbq(chunk_df, f"{gbq_dataset_id}.{table_name}", project_id=gbq_proj_id, if_exists='replace', credentials=credentials, table_schema=schema)
                else:
                    # for subsequent chunks, append to the table 
                    pandas_gbq.to_gbq(chunk_df, f"{gbq_dataset_id}.{table_name}", project_id=gbq_proj_id, if_exists='append', credentials=credentials)
                del chunk_df  # clean the chunk from memory
        break
    break


# move through all files in the directory
for root, dirs, files in os.walk(output_folder):
    for file in files:
        full_path = os.path.join(root, file)
        
        if file.endswith('.csv'):
            print(f"Found CSV file: {file}")

            delimiter = detect_delimiter(full_path)
            print(f"detected delimiter: {delimiter}")

            #reading csv with correct handeling of quoted fields
            chunk_iter = pd.read_csv(full_path, delimiter=delimiter,quotechar='"', chunksize=chunk_size, dtype=str, low_memory=False)

            table_name = file.replace('data.csv', '')

            #drop the table if it exists
            drop_table_if_exists(gbq_dataset_id, table_name, credentials, gbq_proj_id)
            client = bigquery.Client(credentials=credentials, project=gbq_proj_id)
    
                # Construct the fully-qualified table_id
            table_id = f"{gbq_proj_id}.{gbq_dataset_id}.{table_name}"

            try:
                client.delete_table(table_id)
                print(f"deleted table '{table_id}'")
            except NotFound:
                print(f"table '{table_id}' not found, skipping deletion.")

            print(f"reading csv file in chucks: {file}...")
            for idx, chunk_df in enumerate(chunk_iter):
                #clean the DF
                chunk_df = clean_dataframe(chunk_df)

                #modify the field names to comply with the gbq rules
                chunk_df.columns = [col.lower().replace(';','') for col in chunk_df.columns]

                print(f"uploading chunk {idx + 1} to {table_name}...")
                if idx == 0:
                    # for the first chuck create the table with the defined schema
                    pandas_gbq.to_gbq(chunk_df, f"{gbq_dataset_id}.{table_name}", project_id=gbq_proj_id, if_exists='replace', credentials=credentials, table_schema=schema)
                else:
                    # for subsuquent chunks, append to the table 
                    pandas_gbq.to_gbq(chunk_df, f"{gbq_dataset_id}.{table_name}", project_id=gbq_proj_id, if_exists='append', credentials=credentials)
                del chunk_df # clean the chunk from memory
        break
    break