# Task 1 (Part 1): Process Large Zip Files

#### Import the Required Libraries

In [1]:
from zipfile import ZipFile
import pandas as pd
import os
import io
import csv
from datetime import datetime
import numpy as np
from google.cloud import bigquery

#### Setup the Data Directories

In [2]:
data_directory = "Data/LargeZips/"
zip_files = os.listdir("Data/LargeZips")

#### Setup the Schema

In [10]:
schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField("register_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("emp_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("upc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_type", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_subtype", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("department", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("quantity", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("Scale", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("unitPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("total", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("regPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("altPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tax", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("taxexempt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("foodstamp", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("wicable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discountable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discounttype", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("voided", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("percentDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("ItemQtty", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volDiscType", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volume", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("VolSpecial", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("mixMatch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("matched", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memType", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("staff", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("numflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("itemstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tenderstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("charflag", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("varflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("batchHeaderID", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("local", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("organic", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("display", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("receipt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("card_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("store", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("branch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("match_id", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_id", "FLOAT", mode="NULLABLE")
]

#### Asssign the Column Header Names

In [11]:
header_names = ['datetime', 'register_no', 'emp_no', 'trans_no', 'upc', 'description',
       'trans_type', 'trans_subtype', 'trans_status', 'department', 'quantity',
       'Scale', 'cost', 'unitPrice', 'total', 'regPrice', 'altPrice', 'tax',
       'taxexempt', 'foodstamp', 'wicable', 'discount', 'memDiscount',
       'discountable', 'discounttype', 'voided', 'percentDiscount', 'ItemQtty',
       'volDiscType', 'volume', 'VolSpecial', 'mixMatch', 'matched', 'memType',
       'staff', 'numflag', 'itemstatus', 'tenderstatus', 'charflag', 'varflag',
       'batchHeaderID', 'local', 'organic', 'display', 'receipt', 'card_no',
       'store', 'branch', 'match_id', 'trans_id']

#### Functions

In [12]:
# Function to cast a DataFrame to a specified schema
def cast_dataframe_to_schema(df, schema):
    print(f"Casting DataFrame to BigQuery schema")
    try:
        for field in schema:
            column_name = field.name
            field_type = field.field_type
 
            if column_name in df.columns:
                if field_type == "STRING":
                    df[column_name] = df[column_name].astype(str)
                elif field_type == "FLOAT":
                    df[column_name] = pd.to_numeric(df[column_name], errors='coerce')  # Coerce invalid values to NaN
                elif field_type == "BOOLEAN":
                    df[column_name] = df[column_name].apply(lambda x: bool(x) if pd.notnull(x) else None)  # Handle nulls
                elif field_type == "TIMESTAMP":
                    df[column_name] = pd.to_datetime(df[column_name], errors='coerce', utc=True)  # Convert to datetime and coerce errors
 
        df = df[[field.name for field in schema if field.name in df.columns]]
        return df
    except Exception as e:
        print(f"Error casting DataFrame to schema: {e}")
        raise


#### Processing The Zip Files

This section opens all the large zip files, one at a time, and checks for delimiters and headers. If a header row is missing, the list of header names is added. Next, it converts the datetime to a Pandas datetime object. Additionally, it handles missing or null values and matches the schema to the desired schema for Goggle Big Query. Finally, the zip file is stored as a csv file in a file for processed files ready to upload to Big Query.

In [13]:
# Directory to save processed files
processed_files_directory = "data/processed_files/"

# Process ZIP files
zip_files = os.listdir(data_directory)

# Iterate over each zip file
for current_zf in zip_files:
    zip_file_path = os.path.join(data_directory, current_zf)
    with ZipFile(zip_file_path, 'r') as zf:
        zipped_files = zf.namelist()

        # Iterate over each file in the current zip file
        for file_name in zipped_files:
            with zf.open(file_name) as working_file:
                try:
                    # Open and wrap it to read as text and take a sample to find headers and delimiters
                    input_file = io.TextIOWrapper(working_file, encoding="utf-8")
                    sample = input_file.read(1024 * 4)  # Sample for sniffing

                    # Find Delimiters and Headers
                    sniffer = csv.Sniffer()
                    dialect = sniffer.sniff(sample, delimiters=[",", ";", "\t"])
                    has_header = sniffer.has_header(sample)
                    print(f"For {file_name}, the delimiter is '{dialect.delimiter}' and has header: {has_header}")

                    # Reset the file pointer to read again from the start
                    working_file.seek(0)

                    # Load the CSV
                    df = pd.read_csv(
                        working_file,
                        delimiter=dialect.delimiter,
                        header=0 if has_header else None,
                        low_memory=False
                    )

                    # If the file doesn't have headers, assign them now
                    if not has_header:
                        df.columns = header_names
                        print(f"Assigned common headers to {file_name}.")

                    # Parse the datetime column, if it exists
                    if 'datetime' in df.columns:
                        print(f"Parsing 'datetime' column for {file_name}...")
                        df['datetime'] = pd.to_datetime(df['datetime'], errors='coerce')

                    print(f"Loaded {file_name} successfully!")

                except Exception as e:
                    print(f"Error loading {file_name}: {e}")
                    continue  # Move to the next file if there's an error

                # Ensure df is not None and has data
                if df is not None and not df.empty:
                    try:
                        # Replace various null representations with a uniform 'NULL'
                        df = df.replace({
                            '/N': 'NULL', 
                            '//N': 'NULL', 
                            ' ': 'NULL', 
                            '': 'NULL', 
                            'NaN': 'NULL', 
                            'nan': 'NULL', 
                            'N': 'NULL',
                            np.nan: 'NULL'
                        })
                        print(f"Missing Values converted to Nulls for {file_name}")

                        # Cast the DataFrame to the schema
                        df = cast_dataframe_to_schema(df, schema)

                        # Save the processed DataFrame
                        processed_filename = os.path.join(
                            processed_files_directory, f"{os.path.splitext(file_name)[0]}_processed.csv"
                        )
                        df.to_csv(processed_filename, index=False)
                        print(f"Processed and saved {file_name}.")

                    except Exception as e:
                        print(f"Error processing {file_name}: {e}")

    print(f"Completed processing ZIP file: {current_zf}")

For transArchive_201001_201003.csv, the delimiter is ',' and has header: True
Parsing 'datetime' column for transArchive_201001_201003.csv...
Loaded transArchive_201001_201003.csv successfully!
Missing Values converted to Nulls for transArchive_201001_201003.csv
Casting DataFrame to BigQuery schema
Processed and saved transArchive_201001_201003.csv.
Completed processing ZIP file: transArchive_201001_201003.zip
For transArchive_201004_201006.csv, the delimiter is ',' and has header: True
Parsing 'datetime' column for transArchive_201004_201006.csv...
Loaded transArchive_201004_201006.csv successfully!
Missing Values converted to Nulls for transArchive_201004_201006.csv
Casting DataFrame to BigQuery schema
Processed and saved transArchive_201004_201006.csv.
Completed processing ZIP file: transArchive_201004_201006.zip
For transArchive_201007_201009.csv, the delimiter is ',' and has header: True
Parsing 'datetime' column for transArchive_201007_201009.csv...
Loaded transArchive_201007_201