### Task 1: Building a Transaction Database in Google BigQuery

#### Overview  
In Task 1, the focus is on loading the raw Wedge Co-Op transaction data into Google BigQuery while addressing various data quality issues. This task involves managing inconsistencies such as mixed delimiters (commas and semicolons), different representations of null values ("NULL", "\N", and "\\N"), and mismatched column headers. Python scripts are used to automate the Extract, Transform, and Load (ETL) process. The raw transaction files are first extracted from local directories. In the transformation stage, the script standardizes null values, corrects column headers, and ensures data type consistency across files, including proper handling of numeric fields and date-time formats. Once the data is cleaned and transformed, it is loaded into Google BigQuery with schema enforcement to ensure that the transactional data adheres to the expected structure. This process creates a clean, structured dataset in BigQuery, ready for analysis in the subsequent tasks. 

##### Step 1: Extract  

Extract Zipped Raw Files

In [2]:
import zipfile
import os
import shutil

def extract_main_zip(main_zip_file, extract_to_folder):
    """
    Extracts the contents of the main zip file to the specified folder.
    
    Parameters:
    - main_zip_file (str): Path to the main zip file.
    - extract_to_folder (str): Folder where the extracted contents will be saved.

    The function handles:
    - Skipping files that already exist.
    - Creating directories as needed.
    - Extracting each file from the zip archive.
    """
    # Ensure the extraction folder exists
    os.makedirs(extract_to_folder, exist_ok=True)

    # Open the main zip file
    with zipfile.ZipFile(main_zip_file, 'r') as main_zip:
        # Loop through all files in the main zip file
        for zip_info in main_zip.infolist():
            # Create the full output path
            output_file_path = os.path.join(extract_to_folder, zip_info.filename)
            
            # Skip if the file or folder already exists
            if os.path.exists(output_file_path):
                print(f"Skipping {zip_info.filename}, already exists.")
                continue

            # Create directory if it's a folder in the archive
            if zip_info.is_dir():
                os.makedirs(output_file_path, exist_ok=True)
                print(f"Created directory {output_file_path}")
            else:
                # Create necessary directories for files
                os.makedirs(os.path.dirname(output_file_path), exist_ok=True)
                
                # Extract the file to the target location
                with main_zip.open(zip_info) as source, open(output_file_path, 'wb') as target:
                    shutil.copyfileobj(source, target)
                print(f"Extracted {zip_info.filename} to {output_file_path}")

# Input definitions
main_zip = 'D:/WedgeProject/Wedge-Project-ADA-Riley-ORorke/data/WedgeZipOfZips.zip'
extract_folder = 'D:/WedgeProject/Wedge-Project-ADA-Riley-ORorke/data/extracted_main_zip'

# Extract the main zip file
extract_main_zip(main_zip, extract_folder)


Skipping transArchive_201001_201003.zip, already exists.
Skipping transArchive_201004_201006.zip, already exists.
Skipping transArchive_201007_201009.zip, already exists.
Skipping transArchive_201010_201012.zip, already exists.
Skipping transArchive_201101_201103.zip, already exists.
Skipping transArchive_201104.zip, already exists.
Skipping transArchive_201105.zip, already exists.
Skipping transArchive_201106.zip, already exists.
Skipping transArchive_201107_201109.zip, already exists.
Skipping transArchive_201110_201112.zip, already exists.
Skipping transArchive_201201_201203.zip, already exists.
Skipping transArchive_201201_201203_inactive.zip, already exists.
Skipping transArchive_201204_201206.zip, already exists.
Skipping transArchive_201204_201206_inactive.zip, already exists.
Skipping transArchive_201207_201209.zip, already exists.
Skipping transArchive_201207_201209_inactive.zip, already exists.
Skipping transArchive_201210_201212.zip, already exists.
Skipping transArchive_201

Extract the Nested Zip Files

In [3]:
import zipfile
import os

def extract_all_csvs_to_one_folder(extract_folder, output_folder):
    """
    Extracts all CSV files from nested zip files in the specified folder and saves them to a single output folder.

    Parameters:
    - extract_folder (str): Folder containing the nested zip files.
    - output_folder (str): Folder where the extracted CSV files will be saved.

    The function handles:
    - Walking through directories to find and extract CSV files from nested zip files.
    - Skipping CSV files that already exist in the output folder.
    - Handling invalid zip files.
    """
    # Ensure the output folder exists
    os.makedirs(output_folder, exist_ok=True)

    # Walk through the extracted folder and look for zip files
    for root, dirs, files in os.walk(extract_folder):
        for file in files:
            if file.endswith('.zip'):
                nested_zip_path = os.path.join(root, file)
                
                # Check if the file is a valid zip file before proceeding
                try:
                    with zipfile.ZipFile(nested_zip_path, 'r') as nested_zip:
                        # Extract CSV files from the nested zip
                        for zip_info in nested_zip.infolist():
                            if zip_info.filename.endswith('.csv'):
                                output_file_path = os.path.join(output_folder, zip_info.filename)
                                
                                # Skip CSV files that already exist
                                if not os.path.exists(output_file_path):
                                    nested_zip.extract(zip_info, output_folder)
                                    print(f"Extracted {zip_info.filename} to {output_folder}")
                                else:
                                    print(f"Skipping {zip_info.filename}, already exists.")
                except zipfile.BadZipFile:
                    print(f"Skipping {nested_zip_path}, not a valid zip file.")

# Input definitions
extract_folder = 'D:/WedgeProject/Wedge-Project-ADA-Riley-ORorke/data/extracted_main_zip'
output_folder = 'D:/WedgeProject/Wedge-Project-ADA-Riley-ORorke/data/extracted_csv_files'  

# Extract all CSVs from nested zip files
extract_all_csvs_to_one_folder(extract_folder, output_folder)

Skipping transArchive_201001_201003.csv, already exists.
Skipping transArchive_201004_201006.csv, already exists.
Skipping transArchive_201007_201009.csv, already exists.
Skipping transArchive_201010_201012.csv, already exists.
Skipping transArchive_201101_201103.csv, already exists.
Skipping transArchive_201104.csv, already exists.
Skipping transArchive_201105.csv, already exists.
Skipping transArchive_201106.csv, already exists.
Skipping transArchive_201107_201109.csv, already exists.
Skipping transArchive_201110_201112.csv, already exists.
Skipping transArchive_201201_201203.csv, already exists.
Skipping transArchive_201201_201203_inactive.csv, already exists.
Skipping transArchive_201204_201206.csv, already exists.
Skipping transArchive_201204_201206_inactive.csv, already exists.
Skipping transArchive_201207_201209.csv, already exists.
Skipping transArchive_201207_201209_inactive.csv, already exists.
Skipping transArchive_201210_201212.csv, already exists.
Skipping transArchive_201

##### Step 2: Transform  

In [75]:
import pandas as pd
import glob
import os

# Path to the reference file for column headers
reference_file_path = 'D:/WedgeProject/Wedge-Project-ADA-Riley-ORorke/data/reference_files/transArchive_201001_201003_clean.csv'

# Load the reference file to get the correct column headers
df_reference = pd.read_csv(reference_file_path)
reference_columns = df_reference.columns.tolist()

def clean_csv_file(input_file, output_file):
    """
    Cleans and standardizes the input CSV file, ensuring the correct headers and replacing NULL values.
    Saves the cleaned file to the specified output location.
    """
    try:
        # Attempt to load the file with automatic delimiter detection
        try:
            df = pd.read_csv(input_file, sep=None, engine='python')
        except pd.errors.ParserError:
            # If automatic detection fails, try with semicolon delimiter
            df = pd.read_csv(input_file, delimiter=';')
        
        # If column count doesn't match the reference, try loading with a comma delimiter
        if df.shape[1] != len(reference_columns):
            df = pd.read_csv(input_file, delimiter=',')
        
        # Replace various NULL representations with None/NaN
        df.replace({"NULL": None, r"\\N": None, r"\N": None}, inplace=True)
        
        # Ensure the correct headers
        if list(df.columns) != reference_columns:
            first_row_as_header = df.iloc[0].tolist()
            if set(first_row_as_header) == set(reference_columns):
                # If the first row matches the reference, use it as headers
                df.columns = first_row_as_header
                df = df.iloc[1:].reset_index(drop=True)
            else:
                # Otherwise, apply the reference headers
                df.columns = reference_columns
        
        # Save the cleaned CSV file
        df.to_csv(output_file, index=False, sep=",")
        print(f"File cleaned and saved: {output_file}")
    
    except pd.errors.EmptyDataError:
        print(f"Error: {input_file} is empty.")
    except pd.errors.ParserError:
        print(f"Error: Could not parse {input_file}.")
    except FileNotFoundError:
        print(f"Error: {input_file} not found.")
    except Exception as e:
        print(f"Error processing {input_file}: {e}")

def process_all_csv_files(input_folder, output_folder):
    """
    Processes all CSV files in the input folder, cleaning and standardizing them before saving to the output folder.
    Skips files that have already been processed and saved in the output folder.
    """
    # Ensure the output folder exists
    os.makedirs(output_folder, exist_ok=True)
    
    # Get all CSV files from the input folder
    csv_files = glob.glob(f"{input_folder}/**/*.csv", recursive=True)
    print(f"Found {len(csv_files)} CSV files to process.")
    
    # Process each file
    for csv_file in csv_files:
        output_file = os.path.join(output_folder, os.path.basename(csv_file))
        
        # Skip files that have already been processed
        if os.path.exists(output_file):
            print(f"Skipping already processed file: {output_file}")
            continue
        
        # Clean and save the file
        clean_csv_file(csv_file, output_file)
    
    # List all saved files in the output folder
    saved_files = glob.glob(f"{output_folder}/*.csv")
    print(f"\nSaved {len(saved_files)} files to {output_folder}:")
    for file in saved_files:
        print(file)

# Input definitions
input_folder = 'D:/WedgeProject/Wedge-Project-ADA-Riley-ORorke/data/extracted_csv_files'
output_folder = 'D:/WedgeProject/Wedge-Project-ADA-Riley-ORorke/data/final_cleaned_csv_files'

# Process the files
process_all_csv_files(input_folder, output_folder)


Found 53 CSV files to process.
Skipping already processed file: D:/WedgeProject/Wedge-Project-ADA-Riley-ORorke/data/final_cleaned_csv_files\transArchive_201001_201003.csv
Skipping already processed file: D:/WedgeProject/Wedge-Project-ADA-Riley-ORorke/data/final_cleaned_csv_files\transArchive_201004_201006.csv
Skipping already processed file: D:/WedgeProject/Wedge-Project-ADA-Riley-ORorke/data/final_cleaned_csv_files\transArchive_201007_201009.csv
Skipping already processed file: D:/WedgeProject/Wedge-Project-ADA-Riley-ORorke/data/final_cleaned_csv_files\transArchive_201010_201012.csv
Skipping already processed file: D:/WedgeProject/Wedge-Project-ADA-Riley-ORorke/data/final_cleaned_csv_files\transArchive_201101_201103.csv


KeyboardInterrupt: 

##### Step 3: Load

In [None]:
from google.cloud import bigquery
import os

# Initialize BigQuery client with the correct Project ID
client = bigquery.Client(project='wedgeproject-rileyororke')

# Define the dataset ID and create the dataset if it doesn't exist
dataset_id = 'wedgeproject-rileyororke.transaction_tables'
dataset = bigquery.Dataset(dataset_id)
client.create_dataset(dataset, exists_ok=True)

# Path to the folder containing the CSV files
folder_path = r"D:/WedgeProject/Wedge-Project-ADA-Riley-ORorke/data/BigClean"

# Schema definition for the transaction table
schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP"),
    bigquery.SchemaField("register_no", "FLOAT"),
    bigquery.SchemaField("emp_no", "FLOAT"),
    bigquery.SchemaField("trans_no", "FLOAT"),
    bigquery.SchemaField("upc", "STRING"),
    bigquery.SchemaField("description", "STRING"),
    bigquery.SchemaField("trans_type", "STRING"),
    bigquery.SchemaField("trans_subtype", "STRING"),
    bigquery.SchemaField("trans_status", "STRING"),
    bigquery.SchemaField("department", "FLOAT"),
    bigquery.SchemaField("quantity", "FLOAT"),
    bigquery.SchemaField("Scale", "FLOAT"),
    bigquery.SchemaField("cost", "FLOAT"),
    bigquery.SchemaField("unitPrice", "FLOAT"),
    bigquery.SchemaField("total", "FLOAT"),
    bigquery.SchemaField("regPrice", "FLOAT"),
    bigquery.SchemaField("altPrice", "FLOAT"),
    bigquery.SchemaField("tax", "FLOAT"),
    bigquery.SchemaField("taxexempt", "FLOAT"),
    bigquery.SchemaField("foodstamp", "FLOAT"),
    bigquery.SchemaField("wicable", "FLOAT"),
    bigquery.SchemaField("discount", "FLOAT"),
    bigquery.SchemaField("memDiscount", "FLOAT"),
    bigquery.SchemaField("discountable", "FLOAT"),
    bigquery.SchemaField("discounttype", "FLOAT"),
    bigquery.SchemaField("voided", "FLOAT"),
    bigquery.SchemaField("percentDiscount", "FLOAT"),
    bigquery.SchemaField("ItemQtty", "FLOAT"),
    bigquery.SchemaField("volDiscType", "FLOAT"),
    bigquery.SchemaField("volume", "FLOAT"),
    bigquery.SchemaField("VolSpecial", "FLOAT"),
    bigquery.SchemaField("mixMatch", "FLOAT"),
    bigquery.SchemaField("matched", "FLOAT"),
    bigquery.SchemaField("memType", "STRING"),
    bigquery.SchemaField("staff", "FLOAT"),
    bigquery.SchemaField("numflag", "FLOAT"),
    bigquery.SchemaField("itemstatus", "FLOAT"),
    bigquery.SchemaField("tenderstatus", "FLOAT"),
    bigquery.SchemaField("charflag", "STRING"),
    bigquery.SchemaField("varflag", "FLOAT"),
    bigquery.SchemaField("batchHeaderID", "STRING"),
    bigquery.SchemaField("local", "FLOAT"),
    bigquery.SchemaField("organic", "STRING"),
    bigquery.SchemaField("display", "STRING"),
    bigquery.SchemaField("receipt", "FLOAT"),
    bigquery.SchemaField("card_no", "FLOAT"),
    bigquery.SchemaField("store", "FLOAT"),
    bigquery.SchemaField("branch", "FLOAT"),
    bigquery.SchemaField("match_id", "FLOAT"),
    bigquery.SchemaField("trans_id", "FLOAT"),
]

def load_csv_to_bigquery(folder_path, dataset_id):
    """
    Load all CSV files from the specified folder into BigQuery.
    Each file is uploaded to a table named after the file (without extension).
    Skips tables that already exist in BigQuery.
    """
    # Iterate over all files in the folder and process CSV files
    for file_name in os.listdir(folder_path):
        if file_name.endswith('.csv'):
            file_path = os.path.join(folder_path, file_name)
            
            # Use the file name (without extension) as the table ID
            table_id = os.path.splitext(file_name)[0]
            
            # Define the table reference within the dataset
            table_ref = client.dataset('transaction_tables').table(table_id)

            # Check if the table already exists
            try:
                client.get_table(table_ref)
                print(f"Skipping table {table_id}, it already exists.")
                continue  # Skip the rest of the loop if the table exists
            except Exception:
                # If the table does not exist, proceed with loading
                pass

            # Configure the load job for CSV format
            job_config = bigquery.LoadJobConfig(
                source_format=bigquery.SourceFormat.CSV,
                schema=schema,
                skip_leading_rows=1  # Skip header row
            )

            # Open the file and load it into BigQuery
            with open(file_path, 'rb') as file:
                load_job = client.load_table_from_file(file, table_ref, job_config=job_config)
                load_job.result()  # Wait for the job to complete
            
            print(f"Loaded {file_name} into {dataset_id}.{table_id}")

# Input definitions
folder_path = 'D:/WedgeProject/Wedge-Project-ADA-Riley-ORorke/data/final_cleaned_csv_files'

# Run the function to load CSVs from the specified folder to BigQuery
load_csv_to_bigquery(folder_path, dataset_id)
