
-Complete



In [2]:
from google.cloud import bigquery
import os
import zipfile
import polars as pl
import pandas as pd
from datetime import datetime
from pandas_gbq import to_gbq

# Define the dataset and table names
dataset_id = 'wedge-project-JBangtson.the_wedge_dataset'

# Initialize a BigQuery client
client = bigquery.Client(project='wedge-project-jbangtson')


data_directory = "E:\\College\\Fall 2024\\ADA\\Wedge\\Wedge_Project\\data\\unzipped\\"

## Importing the files in as lazy dataframes.

This script efficiently loads multiple CSV files into Polars LazyFrame objects, allowing for optimized data processing without immediately materializing the data into memory. It selectively handles CSVs based on their naming convention (specifically files marked as "inactive") and manages errors gracefully during the loading process.

In [3]:
# Need to put all csvs into there own dfs
#https://chatgpt.com/share/66e4ad8b-ea5c-8000-9117-d884dd0bbfb3


# Initialize an empty list to store LazyFrames
lazy_df_list = []

# Loop through files and load lazily
for idx, file in enumerate(os.listdir(data_directory)):
    

    file_path = os.path.join(data_directory, file)


    if len(os.listdir(data_directory)[idx].split("_")) >= 4 and os.listdir(data_directory)[idx].split("_")[3] == "inactive.csv":
        # Use LazyFrame for efficient processing
        lazy_df = pl.scan_csv(file_path, has_header=True, null_values=["\\N"], ignore_errors=True,separator=";")

    else:
        # Use LazyFrame for efficient processing
        lazy_df = pl.scan_csv(file_path, has_header=True, null_values=["\\N"], ignore_errors=True)
    
    # Append LazyFrame to the list
    lazy_df_list.append(lazy_df)

# Example: Materialize (collect) one of the lazy DataFrames to inspect it
df = lazy_df_list[0].collect()
#df1 = lazy_df_list[38].collect()

clean_columns = df.columns
#print(df.head())


## Creating GBQ Schema

This code defines a schema for a BigQuery table using bigquery.SchemaField objects. The schema specifies the structure of the data, including the column names, data types (e.g., FLOAT, STRING, BOOLEAN, TIMESTAMP), and whether each field is nullable. This schema can be used to load, query, and manage structured transaction data in BigQuery.

In [4]:

wedge_schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField("register_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("emp_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("upc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_type", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_subtype", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("department", "FLOAT", mode="NULLABLE"),#
    bigquery.SchemaField("quantity", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("Scale", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("unitPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("total", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("regPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("altPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tax", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("taxexempt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("foodstamp", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("wicable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discountable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discounttype", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("voided", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("percentDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("ItemQtty", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volDiscType", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volume", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("VolSpecial", "FLOAT", mode="NULLABLE"),###
    bigquery.SchemaField("mixMatch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("matched", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memType", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("staff", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("numflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("itemstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tenderstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("charflag", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("varflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("batchHeaderID", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("local", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("organic", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("display", "BOOLEAN", mode="NULLABLE"),
    bigquery.SchemaField("receipt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("card_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("store", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("branch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("match_id", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_id", "FLOAT", mode="NULLABLE")
]



----
## Cleaning/Casting Methods

### Wedge Cleaner 
The wedge_cleaner function processes a pandas DataFrame to ensure data consistency and cleanliness by applying specific type-safe casting functions to each column based on its name. It handles columns differently based on their expected data type:

    Datetime Columns: Applies datetime_safe_cast for proper datetime formatting.
    Float Columns: Applies float_safe_cast to ensure values are converted to floats.
    String Columns: Applies string_safe_cast for consistent string formatting.
    Boolean Columns: Applies boolean_safe_cast to correctly convert values to boolean.

The function iterates over each column, performs the appropriate casting, and returns the cleaned DataFrame.

In [5]:

def wedge_cleaner(pandas_df):
    # Apply the function within the loop


    for col_name in pandas_df:
        print(f'Column: {col_name}')

        # Datetime

        try:
            if col_name == "datetime":
                pandas_df[col_name] = pd.to_datetime(pandas_df[col_name])

        except:
            print(f"Error with {col_name} column with the value {pandas_df[col_name]}") 

        # Float
        if col_name in ["register_no", "emp_no", "trans_no", "department", "quantity", "Scale", "cost", "unitPrice", "total", "regPrice", "altPrice", "tax", "taxexempt", "foodstamp", "wicable", "discount", "memDiscount", "discountable", "discounttype", "voided", "percentDiscount", "itemQtty", "volDiscType", "volume", "volSpecial", "mixMatch", "matched", "numflag", "itemstatus", "tenderstatus", "varflag", "local", "organic", "receipt", "card_no", "store", "branch", "match_id", "trans_id"]:
            pandas_df[col_name] = pandas_df[col_name].apply(float_safe_cast)

        # String
        if col_name in ["upc", "description", "trans_type", "trans_subtype", "trans_status", "charflag"]:
            pandas_df[col_name] = pandas_df[col_name].apply(string_safe_cast)

        # Boolean
        if col_name in ["memType", "staff", "batchHeaderID", "display"]:
            pandas_df[col_name] = pandas_df[col_name].apply(boolean_safe_cast)
        




    return pandas_df
    
    #print(col_data)

### Datetime

In [6]:
from datetime import datetime

# Date time

def datetime_safe_cast(val):
    try:
        return datetime.strptime(val, '%Y-%m-%d %H:%M:%S')
    
    except (ValueError, TypeError):
        print(f"Returning None Datetime. Cannot convert {val} to datetime")
        return datetime.min # or another value to handle invalid cases

        

### Float

In [7]:
# Float
def float_safe_cast(val):
    try:
        return float(val) if val is not None else None
    except ValueError:
        print("Returning None Float")
        return None  # or another value to handle invalid cases




### String

In [8]:
# String
def string_safe_cast(val):
    if str(val) in "nan" :
        return ""

    try:
        return str(val)
    except ValueError:
        print("Returning None String")
        return ""  # or another value to handle invalid cases



### Boolean

In [9]:

# Boolean
def boolean_safe_cast(val):
    try:
        return bool(val)
    except ValueError:
        print("Returning None Boolean")
        return None  # or another value to handle invalid cases

## Cleans and Uploads to GBQ (🙏)

This script processes a list of Polars LazyFrame objects by converting them to pandas DataFrames, cleaning them using the wedge_cleaner function, and then uploading the cleaned DataFrames to Google BigQuery (GBQ).
Steps:
Iterate Over LazyFrames:
For each LazyFrame in lazy_df_list, the script prints the file number and name.
Collect and Clean Data:
Convert the LazyFrame to a pandas DataFrame (df) and set column names to clean_columns.
Use wedge_cleaner to clean the DataFrame.
Upload to GBQ:
Print status messages indicating the start and completion of the upload process.
Define the project ID and destination table name for BigQuery.
Upload the cleaned DataFrame to BigQuery using to_gbq() with the if_exists='replace' parameter to replace the table if it already exists.
This code ensures that each file is processed, cleaned, and uploaded to a specified BigQuery table.

In [None]:
# Date time
#df = lazy_df_list[45].collect()
#df.columns = clean_columns






#pandas_df = df.to_pandas()

for idx, lazy_df in enumerate(lazy_df_list):
    print(f"Cleaning file number: {idx}\nFile Name: {os.listdir(data_directory)[idx]}")

    df = lazy_df.collect()
    df.columns = clean_columns
    clean_panda_df = df
    
    clean_panda_df = wedge_cleaner(clean_panda_df)
    
    

    print(f"Finished cleaning file number: {idx}\nFile Name: {os.listdir(data_directory)[idx]}!\n\n------------------------------------")

    print(f"Uploading file number: {idx}\nFile Name: {os.listdir(data_directory)[idx]} to GBQ")

    # Define project_id and destination table
    project_id = 'wedge-project-jbangtson'
    table_name = str(os.listdir(data_directory)[idx]).split(".")[0]
    destination_table = f'the_wedge_dataset.{table_name}'


    to_gbq(clean_panda_df, destination_table, project_id=project_id, if_exists='replace', schema = wedge_schema)
   

In [None]:
# Date time
#df = lazy_df_list[45].collect()
#df.columns = clean_columns






#pandas_df = df.to_pandas()

for idx, lazy_df in enumerate(lazy_df_list[37:38], start=37):
    print(f"Cleaning file number: {idx}\nFile Name: {os.listdir(data_directory)[idx]}")

    df = pd.read_csv("data/unzipped/" + os.listdir(data_directory)[idx], sep=",")
    # df = pain_in_ass_df
    
    
    clean_panda_df = wedge_cleaner(df)
    
    

    print(f"Finished cleaning file number: {idx}\nFile Name: {os.listdir(data_directory)[idx]}!\n\n------------------------------------")

    print(f"Uploading file number: {idx}\nFile Name: {os.listdir(data_directory)[idx]} to GBQ")

    # Define project_id and destination table
    project_id = 'wedge-project-jbangtson'
    table_name = str(os.listdir(data_directory)[idx]).split(".")[0]
    destination_table = f'the_wedge_dataset.{table_name}'


    to_gbq(clean_panda_df, destination_table, project_id=project_id, if_exists='replace')
   

In [None]:
fruit = ["apple", "banana", "cherry"]

for idx, x in enumerate(fruit[1:], start=1):  # Start index from 10
    print(idx, x)

In [60]:
#open a csv file to a pandas dataframe
table_name = "transArchive_201602"
manual_troubleshooting_df = pd.read_csv(f"data/unzipped/{table_name}.csv", sep=",")


In [61]:
#convert a pandas columns datatype
manual_troubleshooting_df.columns = clean_columns


#to_gbq(manual_troubleshooting_df, destination_table, project_id=project_id, if_exists='replace', table_schema=wedge_schema)

In [None]:
manual_troubleshooting_df = wedge_cleaner(manual_troubleshooting_df)


# Define project_id and destination table
project_id = 'wedge-project-jbangtson'

destination_table = f'the_wedge_dataset.{table_name}'


In [None]:
# Define the table_id


# Set the job configuration
job_config = bigquery.LoadJobConfig(schema=wedge_schema)

# Upload the DataFrame to BigQuery
job = client.load_table_from_dataframe(manual_troubleshooting_df, destination_table, job_config=job_config)
job.result() 

In [None]:
for x in range(4, 5):

    #open a csv file to a pandas dataframe
    table_name = f"transArchive_20160{x}"
    manual_troubleshooting_df = pd.read_csv(f"data/unzipped/{table_name}.csv", sep=",")


    manual_troubleshooting_df.columns = clean_columns

    manual_troubleshooting_df = wedge_cleaner(manual_troubleshooting_df)


    # Define project_id and destination table
    project_id = 'wedge-project-jbangtson'

    destination_table = f'the_wedge_dataset.{table_name}'


    job_config = bigquery.LoadJobConfig(schema=wedge_schema, write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE)

    # Upload the DataFrame to BigQuery
    job = client.load_table_from_dataframe(manual_troubleshooting_df, destination_table, job_config=job_config)
    job.result() 