## ADA: Wedge Process
This repository builds on our Wedge Exploration exercise. This exercise will help you carry out the Wedge project at an A level.

You'll write code that carries out the following steps:

Create an empty data frame called wedge_summary with the following columns: file_name, num_rows, num_cards, num_dates
Iterate over the zip files that hold the Wedge transaction files
Unzip each file one at a time (so this will be part of a for loop)
Use the CSV sniffer to determine the delimiter and whether or not there is a header row.
Read, or attempt to read, the file into a Pandas dataframe, using the delimiter and handling headers correctly.
For each file, store a row in wedge_summary that holds the values listed above. num_cards should be the unique card numbers in the file and num_dates should be the number of dates.

In [1]:
import os
import zipfile
import pandas as pd
import csv
from google.cloud import bigquery

In [None]:
data_dir = 'data/WedgeZipOfZips_small/'

The follwoing block of code unzips each zipped file and saves the unzipped file in the 'extracted' folder under the parents directory.

In [None]:
for file in os.listdir(data_dir):
    file_path = os.path.join(data_dir, file)  # Construct full path
    if os.path.isfile(file_path) and 'transArchive_' in file_path:  # Check if it's a file, not a directory
        with zipfile.ZipFile(file_path) as my_zip:
            for zipped_file in my_zip.namelist():
                my_zip.extract(zipped_file, path='data/WedgeZipOfZips_small/extracted')

In [None]:
# Function to analyze the CSV file using csv.Sniffer
def sniff_csv(file_path):
    with open(file_path, 'r', newline='') as csvfile:
        # Read a sample of the file to allow sniffing
        sample = csvfile.read(1024 * 4)
        # Create a Sniffer object
        sniffer = csv.Sniffer()
        # Determine if there is a header
        has_header = sniffer.has_header(sample)
        # Sniff the delimiter
        dialect = sniffer.sniff(sample)
        return dialect.delimiter, has_header


In [None]:
# Unify NULL characters
for file in os.listdir('data/WedgeZipOfZips_small/extracted'):
    file_path = os.path.join('data/WedgeZipOfZips_small/extracted/', file)
    if os.path.isfile:
        with open(file_path, 'r') as infile:
            # Read the entire content of the file
            content = infile.read()

        # Replace occurrences of '/N' and '//N' with 'NULL'
        modified_content = content.replace(r'\N', 'NULL').replace(r'\\N', 'NULL')

        # Open the same file in write mode to overwrite it with the modified content
        with open(file_path, 'w') as outfile:
            # Write the modified content back to the file
            outfile.write(modified_content)

In [None]:
# Create an empty summary table
wedge_summary = pd.DataFrame(columns = ['file_name', 'num_rows', 'num_cards', 'num_dates'])

In [None]:
data_dir = 'data/WedgeZipOfZips_small/extracted/'

In [None]:
col_names = ['datetime', 'register_no', 'emp_no', 'trans_no', 'upc', 'description',
       'trans_type', 'trans_subtype', 'trans_status', 'department', 'quantity',
       'Scale', 'cost', 'unitPrice', 'total', 'regPrice', 'altPrice', 'tax',
       'taxexempt', 'foodstamp', 'wicable', 'discount', 'memDiscount',
       'discountable', 'discounttype', 'voided', 'percentDiscount', 'ItemQtty',
       'volDiscType', 'volume', 'VolSpecial', 'mixMatch', 'matched', 'memType',
       'staff', 'numflag', 'itemstatus', 'tenderstatus', 'charflag', 'varflag',
       'batchHeaderID', 'local', 'organic', 'display', 'receipt', 'card_no',
       'store', 'branch', 'match_id', 'trans_id']

This next chunk opens each csv in the extracted folder and adds meta data into the summary df, than exports table as summary.csv

In [None]:
summary_data = []  # List to hold summary data

for file in os.listdir(data_dir):
    file_path = os.path.join(data_dir, file)
    if os.path.isfile(file_path):
        # Sniff the CSV file
        delimiter, has_header = sniff_csv(file_path)
        
        if not has_header:
            df = pd.read_csv(file_path, delimiter=delimiter, header=None, names=col_names)
        else:
            df = pd.read_csv(file_path, delimiter=delimiter, header=0)

        # Clean column names
        df.columns = df.columns.str.strip()  # Remove any leading/trailing spaces

        # Replace null characters with uniform null value
        df.replace({'/N': 'NULL', '//N': 'NULL'}, inplace=True)
        
        # Extract the columns of interest
        num_rows = df.shape[0]
        num_cards = df['card_no'].nunique() if 'card_no' in df.columns else 0
        
        df['datetime'] = pd.to_datetime(df['datetime']) # CHANGE TO DATETIME
        num_dates = df['datetime'].dt.date.nunique() if 'datetime' in df.columns else 0

        # Add the file summary to the summary list
        summary_data.append({'file_name': file, 'num_rows': num_rows, 'num_cards': num_cards, 'num_dates': num_dates})

# Create a DataFrame from the summary list
wedge_summary = pd.DataFrame(summary_data)

# Save the summary to CSV
wedge_summary.to_csv('data/WedgeZipOfZips_small/summary.csv', index=False)



## Upload data to the cloud

In [None]:
# Set the path to your CSV file
csv_file_path = "data/WedgeZipOfZips_small/extracted/transArchive_201001_201003_small.csv"
zip_file_path = "data/WedgeZipOfZips_small/transArchive_201001_201003_small.zip"

# Create a zip file containing the CSV file
with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
    zipf.write(csv_file_path, os.path.basename(csv_file_path))

In [4]:
# Read the CSV file into a Pandas DataFrame
schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField("register_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("emp_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("upc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_type", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_subtype", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("department", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("quantity", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("Scale", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("unitPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("total", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("regPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("altPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tax", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("taxexempt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("foodstamp", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("wicable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discountable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discounttype", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("voided", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("percentDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("ItemQtty", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volDiscType", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volume", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("VolSpecial", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("mixMatch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("matched", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memType", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("staff", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("numflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("itemstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tenderstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("charflag", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("varflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("batchHeaderID", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("local", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("organic", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("display", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("receipt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("card_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("store", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("branch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("match_id", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_id", "FLOAT", mode="NULLABLE"),
]

def cast_dataframe_to_schema(df, schema):
    for field in schema:
        column_name = field.name
        field_type = field.field_type

        if column_name in df.columns:
            if field_type == "STRING":
                df[column_name] = df[column_name].astype(str)
            elif field_type == "FLOAT":
                df[column_name] = pd.to_numeric(df[column_name], errors='coerce')
            elif field_type == "BOOLEAN":
                df[column_name] = df[column_name].astype(bool)
            elif field_type == "TIMESTAMP":
                df[column_name] = pd.to_datetime(df[column_name], errors='coerce')
    
    return df

In [5]:
data_dir = 'data/WedgeZipOfZips_small/extracted/'

project_id = 'engellantwedge2024'  # Change this to your project ID
client = bigquery.Client(project=project_id)

for file in os.listdir(data_dir):
    file_path = os.path.join(data_dir, file)
    if os.path.isfile(file_path):
        try:
            dataset_id = 'wedge_transactions_small'
            table_id = os.path.splitext(file)[0] #Create table name
            table_ref = f"{client.project}.{dataset_id}.{table_id}"
            
            df = pd.read_csv(file_path) #Read the CSV file into a DataFrame
            
            df = cast_dataframe_to_schema(df, schema) # Ensure dataframe follows schema
            
            job = client.load_table_from_dataframe(df, table_ref)
            job.result()
            print(f'Loaded {job.output_rows} rows into {table_ref}')
            
        except Exception as e:
            print(f"Error processing {file}: {e}")
            continue
        

Error processing transArchive_201612_small.csv: 400 POST https://bigquery.googleapis.com/upload/bigquery/v2/projects/engellantwedge2024/jobs?uploadType=multipart: Invalid field name " .1". Fields must contain the allowed characters, and be at most 300 characters long. For allowed characters, please refer to https://cloud.google.com/bigquery/docs/schemas#column_names
Error processing transArchive_201401_201403_inactive_small.csv: 400 POST https://bigquery.googleapis.com/upload/bigquery/v2/projects/engellantwedge2024/jobs?uploadType=multipart: Invalid field name "datetime;"register_no";"emp_no";"trans_no";"upc";"description";"trans_type";"trans_subtype";"trans_status";"department";"quantity";"Scale";"cost";"unitPrice";"total";"regPrice";"altPrice";"tax";"taxexempt";"foodstamp";"wicable";"discount";"memDiscount";"discountable";"discounttype";"voided";"percentDiscount";"ItemQtty";"volDiscType";"volume";"VolSpecial";"mixMatch";"matched";"memType";"staff";"numflag";"itemstatus";"tenderstatus

KeyboardInterrupt: 

In [7]:
# Read the CSV file into a Pandas DataFrame
csv_file_path = 'data/WedgeZipOfZips_small/extracted/transArchive_201001_201003_small.csv'  # Change this to your CSV file path
df = pd.read_csv(csv_file_path)

# Initialize a BigQuery client
project_id = 'engellantwedge2024'  # Change this to your project ID
client = bigquery.Client(project=project_id)

# Define the dataset and table where you want to upload the DataFrame
dataset_id = 'wedge_transactions_small'  # Change this to your BigQuery dataset ID
table_id = 'test_table'  # Change this to your BigQuery table ID

# Define the fully qualified table ID
table_ref = f"{client.project}.{dataset_id}.{table_id}"

# Upload the DataFrame to BigQuery
# If the table doesn't exist, it will be created
job = client.load_table_from_dataframe(df, table_ref)

# Wait for the job to complete
job.result()

print(f"Loaded {job.output_rows} rows into {table_ref}.")


Loaded 10000 rows into engellantwedge2024.wedge_transactions_small.test_table.


In [6]:
client.project
pd.da

'engellantwedge2024'

In [5]:
project_id = 'engellantwedge2024'  # Change this to your project ID
client = bigquery.Client(project=project_id)

In [None]:
# Initialize BigQuery client
client = bigquery.Client()

# Set the path to your file
zip_file_path = "data/WedgeZipOfZips_small/transArchive_201001_201003_small.zip"

# Specify project, dataset, and table details
project_id = "engellantwedge2024"
dataset_id = "wedge_transactions_small"
table_id = 'test_table'
table_ref = client.dataset(dataset_id).table(table_id)

# Define the schema explicitly
schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField("register_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("emp_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("upc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_type", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_subtype", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("department", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("quantity", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("Scale", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("unitPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("total", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("regPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("altPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tax", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("taxexempt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("foodstamp", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("wicable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discountable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discounttype", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("voided", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("percentDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("ItemQtty", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volDiscType", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volume", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("VolSpecial", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("mixMatch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("matched", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memType", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("staff", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("numflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("itemstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tenderstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("charflag", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("varflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("batchHeaderID", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("local", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("organic", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("display", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("receipt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("card_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("store", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("branch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("match_id", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_id", "FLOAT", mode="NULLABLE"),
]

# Specify job configuration with the defined schema
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,  # Skip the header row
    schema=schema,  # Use the specified schema
)

try:
    # Load the CSV file into BigQuery
    with open(zip_file_path, "rb") as source_file:
        job = client.load_table_from_file(source_file, table_ref, job_config=job_config)

    job.result()  # Wait for the job to complete

    # Print the number of rows loaded
    print(f"Loaded {job.output_rows} rows into {table_ref}")
except Exception as e:
    print(f"An error occurred: {e}")
