# Task 1: Building a Transactional Database in Google Big Query

For this portion of the project, we will use the cleaned data files from the Wedge zipped files and upload them to Google BigQuery.

In [21]:
import os
import pandas as pd
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud.bigquery import LoadJobConfig

In [None]:
# Google BigQuery Config
project_id = 'niekampbreannawedge'
dataset_id = 'wedge24'

In [29]:
client = bigquery.Client(project=project_id)

In [34]:
data_directory = "data/clean-files"
files = os.listdir(data_dir)
print(files)

['transArchive_201001_201003_clean.csv', 'transArchive_201004_201006_clean.csv', 'transArchive_201007_201009_clean.csv', 'transArchive_201010_201012_clean.csv', 'transArchive_201101_201103_clean.csv', 'transArchive_201104_clean.csv', 'transArchive_201105_clean.csv', 'transArchive_201106_clean.csv', 'transArchive_201107_201109_clean.csv', 'transArchive_201110_201112_clean.csv', 'transArchive_201201_201203_clean.csv', 'transArchive_201201_201203_inactive_clean.csv', 'transArchive_201204_201206_clean.csv', 'transArchive_201204_201206_inactive_clean.csv', 'transArchive_201207_201209_clean.csv', 'transArchive_201207_201209_inactive_clean.csv', 'transArchive_201210_201212_clean.csv', 'transArchive_201210_201212_inactive_clean.csv', 'transArchive_201301_201303_clean.csv', 'transArchive_201301_201303_inactive_clean.csv', 'transArchive_201304_201306_clean.csv', 'transArchive_201304_201306_inactive_clean.csv', 'transArchive_201307_201309_clean.csv', 'transArchive_201307_201309_inactive_clean.csv

In [35]:
# BigQuery schema
schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField("register_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("emp_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("upc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_type", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_subtype", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("department", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("quantity", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("Scale", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("unitPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("total", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("regPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("altPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tax", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("taxexempt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("foodstamp", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("wicable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discountable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discounttype", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("voided", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("percentDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("ItemQtty", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volDiscType", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volume", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("VolSpecial", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("mixMatch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("matched", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memType", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("staff", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("numflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("itemstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tenderstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("charflag", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("varflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("batchHeaderID", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("local", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("organic", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("display", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("receipt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("card_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("store", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("branch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("match_id", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_id", "FLOAT", mode="NULLABLE"),
]

In [36]:

# Configure job settings
job_config = bigquery.LoadJobConfig(
    schema=schema,
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,  # Replace table if it exists
)

In [37]:
# Upload each file
files = os.listdir(data_directory)

for file_name in files:
    if file_name.endswith("_clean.csv"):  # Match cleaned files
        file_path = os.path.join(data_directory, file_name)
        table_name = file_name.replace("_clean.csv", "")  # Remove suffix
        table_id = f"{project_id}.{dataset_id}.{table_name}"  # Full table name

        print(f"Uploading {file_name} to BigQuery table {table_id}...")

        # Upload file to BigQuery
        with open(file_path, "rb") as source_file:
            job = client.load_table_from_file(source_file, table_id, job_config=job_config)
            job.result()  # Wait for the job to complete
            print(f"Uploaded {file_name} successfully to {table_id}!")

Uploading transArchive_201001_201003_clean.csv to BigQuery table niekampbreannawedge.wedge24.transArchive_201001_201003...
Uploaded transArchive_201001_201003_clean.csv successfully to niekampbreannawedge.wedge24.transArchive_201001_201003!
Uploading transArchive_201004_201006_clean.csv to BigQuery table niekampbreannawedge.wedge24.transArchive_201004_201006...
Uploaded transArchive_201004_201006_clean.csv successfully to niekampbreannawedge.wedge24.transArchive_201004_201006!
Uploading transArchive_201007_201009_clean.csv to BigQuery table niekampbreannawedge.wedge24.transArchive_201007_201009...
Uploaded transArchive_201007_201009_clean.csv successfully to niekampbreannawedge.wedge24.transArchive_201007_201009!
Uploading transArchive_201010_201012_clean.csv to BigQuery table niekampbreannawedge.wedge24.transArchive_201010_201012...
Uploaded transArchive_201010_201012_clean.csv successfully to niekampbreannawedge.wedge24.transArchive_201010_201012!
Uploading transArchive_201101_201103