Google Big Query is a distributed data warehouse built on a serverless architecture . We’ll discuss this framework in class. In this task you’ll upload all Wedge transaction records to Google Big Query. You’ll want to make sure that the column data types are correctly specified and you’ve properly handled the null values. 
The requirements for this task change depending on the grade you’re going for. 
Note: this assignment can be done manually or programmatically. Naturally I’d prefer it be done programmatically so that you get more practice, but that’s not required to get full credit. 

In [1]:
import pandas as pd
import zipfile
from google.cloud import bigquery
from google.oauth2 import service_account
from google.api_core.exceptions import NotFound
from google.cloud.bigquery import SchemaField
import os



In [None]:

zip_path = 'Data\wedge-clean-files.zip'  # Replace with your zip file path
extract_path = 'Data'   # Replace with your desired extract path

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

In [2]:
#export GOOGLE_APPLICATION_CREDENTIALS='wedge-project-403222-85fe5b35980b.json'

service_path = ""
service_file = 'wedge-project-403222-80aeb3085a6a.json' # change this to your authentication information  

gbq_proj_id = 'wedge-project-403222'  

# And this should stay the same. 
private_key = service_path + service_file

# Now we pass in our credentials so that Python has permission to access our project.
credentials = service_account.Credentials.from_service_account_file(private_key)

# And finally we establish our connection
client = bigquery.Client(credentials = credentials, project=gbq_proj_id)


In [3]:
# Check if the dataset exists
dataset_id = 'Transactions'
dataset_ref = client.dataset(dataset_id)

try:
    client.get_dataset(dataset_ref)
    print(f"Dataset {dataset_id} already exists.")
except NotFound:
    # Create the dataset if it does not exist
    dataset = bigquery.Dataset(dataset_ref)
    dataset = client.create_dataset(dataset)
    print(f"Dataset {dataset_id} created.")



Dataset Transactions already exists.


In [11]:
folder_path = 'Data\clean-files'
data_types = {
    'datetime': 'str',
    'register_no': 'Float64',
    'emp_no': 'Float64',
    'trans_no': 'Float64',
    'upc': 'str',
    'description': 'str',
    'trans_type': 'str',
    'trans_subtype': 'str',
    'trans_status': 'str',
    'department': 'Float64',
    'quantity': 'Float64',
    'Scale': 'Float64',
    'cost': 'Float64',
    'unitPrice': 'Float64',
    'total': 'Float64',
    'regprice': 'Float64',
    'altprice': 'Float64',
    'tax': 'Float64',
    'taxexempt': 'Float64',
    'foodstamp': 'Float64',
    'wicable': 'Float64',
    'discount': 'Float64',
    'memDiscount': 'Float64',
    'discountable': 'Float64',
    'discounttype': 'Float64',
    'voided': 'Float64',
    'percentDiscount': 'Float64',
    'ItemQtty': 'Float64',
    'volDiscType': 'Float64',
    'volume': 'Float64',
    'VolSpecial': 'Float64',
    'mixMatch': 'Float64',
    'matched': 'Float64',
    'memType': 'Float64',
    'staff': 'Float64',
    'numflag': 'Float64',
    'itemstatus': 'Float64',
    'tenderstatus': 'Float64',
    'charflag': 'str',
    'varflag': 'Float64',
    'batchHeaderID': 'Float64',
    'local': 'Float64',
    'organic': 'Float64',
    'display': 'Float64',
    'receipt': 'Float64',
    'card_no': 'Float64',
    'store': 'Float64',
    'branch': 'Float64',
    'match_id': 'Float64',
    'trans_id': 'Float64',

}

dataframes = {}

for filename in os.listdir(folder_path):
    if filename.endswith('.csv'):  # or whatever file type your datasets are
        file_path = os.path.join(folder_path, filename)
        dataframes[filename] = pd.read_csv(file_path, dtype=data_types)

ValueError: Unable to parse string " " at position 12851

In [14]:
schema1 = [
    SchemaField("datetime", "STRING", mode="NULLABLE"),
    SchemaField("register_no", "INTEGER", mode="NULLABLE"),
    SchemaField("emp_no", "INTEGER", mode="NULLABLE"),
    SchemaField("trans_no", "INTEGER", mode="NULLABLE"),
    SchemaField("upc", "STRING", mode="NULLABLE"),
    SchemaField("description", "STRING", mode="NULLABLE"),
    SchemaField("trans_type", "STRING", mode="NULLABLE"),
    SchemaField("trans_subtype", "STRING", mode="NULLABLE"),
    SchemaField("trans_status", "STRING", mode="NULLABLE"),
    SchemaField("department", "INTEGER", mode="NULLABLE"),
    SchemaField("quantity", "FLOAT", mode="NULLABLE"),
    SchemaField("Scale", "INTEGER", mode="NULLABLE"),
    SchemaField("cost", "FLOAT", mode="NULLABLE"),
    SchemaField("unitPrice", "FLOAT", mode="NULLABLE"),
    SchemaField("total", "FLOAT", mode="NULLABLE"),
    SchemaField("regPrice", "FLOAT", mode="NULLABLE"),
    SchemaField("altPrice", "FLOAT", mode="NULLABLE"),
    SchemaField("tax", "INTEGER", mode="NULLABLE"),
    SchemaField("taxexempt", "INTEGER", mode="NULLABLE"),
    SchemaField("foodstamp", "INTEGER", mode="NULLABLE"),
    SchemaField("wicable", "INTEGER", mode="NULLABLE"),
    SchemaField("discount", "FLOAT", mode="NULLABLE"),
    SchemaField("memDiscount", "FLOAT", mode="NULLABLE"),
    SchemaField("discountable", "INTEGER", mode="NULLABLE"),
    SchemaField("discounttype", "INTEGER", mode="NULLABLE"),
    SchemaField("voided", "INTEGER", mode="NULLABLE"),
    SchemaField("percentDiscount", "FLOAT", mode="NULLABLE"),
    SchemaField("ItemQtty", "FLOAT", mode="NULLABLE"),
    SchemaField("volDiscType", "INTEGER", mode="NULLABLE"),
    SchemaField("volume", "INTEGER", mode="NULLABLE"),
    SchemaField("VolSpecial", "FLOAT", mode="NULLABLE"),
    SchemaField("mixMatch", "INTEGER", mode="NULLABLE"),
    SchemaField("matched", "INTEGER", mode="NULLABLE"),
    SchemaField("memType", "FLOAT", mode="NULLABLE"),
    SchemaField("staff", "INTEGER", mode="NULLABLE"),
    SchemaField("numflag", "INTEGER", mode="NULLABLE"),
    SchemaField("itemstatus", "INTEGER", mode="NULLABLE"),
    SchemaField("tenderstatus", "INTEGER", mode="NULLABLE"),
    SchemaField("charflag", "STRING", mode="NULLABLE"),
    SchemaField("varflag", "INTEGER", mode="NULLABLE"),
    SchemaField("batchHeaderID", "FLOAT", mode="NULLABLE"),
    SchemaField("local", "INTEGER", mode="NULLABLE"),
    SchemaField("organic", "FLOAT", mode="NULLABLE"),
    SchemaField("display", "FLOAT", mode="NULLABLE"),
    SchemaField("receipt", "INTEGER", mode="NULLABLE"),
    SchemaField("card_no", "INTEGER", mode="NULLABLE"),
    SchemaField("store", "INTEGER", mode="NULLABLE"),
    SchemaField("branch", "INTEGER", mode="NULLABLE"),
    SchemaField("match_id", "INTEGER", mode="NULLABLE"),
    SchemaField("trans_id", "INTEGER", mode="NULLABLE")
]

In [16]:
# BigQuery client
#client = bigquery.Client()

# Path to the directory where files are extracted
files_path = 'Data\clean-files' # Update this to your path




# Loop through the files and upload each to BigQuery
for filename in os.listdir(files_path):
    if filename.endswith('.csv'):  # Assuming files are in CSV format
 
        file_path = os.path.join(files_path, filename)
        dataframe = pd.read_csv(file_path, low_memory=False)

        float_columns = [
            "quantity", "cost", "unitPrice", "total", "regPrice", "altPrice",
            "discount", "memDiscount", "percentDiscount", "ItemQtty", "VolSpecial",
            "memType", "batchHeaderID", "organic", "display"
        ]
        integer_columns = [
            "register_no", "emp_no", "trans_no", "department", "Scale", "tax", 
            "taxexempt", "foodstamp", "wicable", "discountable", "discounttype", 
            "voided", "volDiscType", "volume", "mixMatch", "matched", "staff", 
            "numflag", "itemstatus", "tenderstatus", "varflag", "local", 
            "receipt", "card_no", "store", "branch", "match_id", "trans_id"
        ]

        # Process float columns
        dataframe[float_columns] = dataframe[float_columns].replace(' ', float('nan'))
        dataframe[float_columns] = dataframe[float_columns].apply(pd.to_numeric, errors='coerce')

        # Process integer columns
        dataframe[integer_columns] = dataframe[integer_columns].replace(' ', -1)
        dataframe[integer_columns] = dataframe[integer_columns].fillna(-1).astype('int')

        project_id = 'wedge-project-403222'
        dataset_id = 'Transactions'
        table_id = os.path.splitext(filename)[0]

        # Define the full table ID
        table_full_id = f"{client.project}.{dataset_id}.{table_id}"

        # If the table does not exist, it will be created. If it exists, data will be appended.
        job = client.load_table_from_dataframe(dataframe, table_full_id, job_config=bigquery.LoadJobConfig(schema=schema1))

        # Wait for the job to complete
        job.result()
        print(f"Uploaded {filename} to {table_full_id}")

Uploaded transArchive_201001_201003_clean.csv to wedge-project-403222.Transactions.transArchive_201001_201003_clean
Uploaded transArchive_201004_201006_clean.csv to wedge-project-403222.Transactions.transArchive_201004_201006_clean
Uploaded transArchive_201007_201009_clean.csv to wedge-project-403222.Transactions.transArchive_201007_201009_clean


BadRequest: 400 POST https://bigquery.googleapis.com/upload/bigquery/v2/projects/wedge-project-403222/jobs?uploadType=resumable: Provided Schema does not match Table wedge-project-403222:Transactions.transArchive_201010_201012_clean. Field taxexempt has changed type from FLOAT to INTEGER