## Task 1

In [7]:
import os
import zipfile
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud.exceptions import NotFound
from google.api_core.exceptions import NotFound
import pandas as pd
import numpy as np

## Clean full data

Read in full data

In [8]:
#define data types for reading in

dtypes = {
    0: 'str',    # datetime (index 0) - TIMESTAMP mapped to string for initial import
    1: 'float',  # register_no (index 1) - FLOAT
    2: 'float',  # emp_no (index 2) - FLOAT
    3: 'float',  # trans_no (index 3) - FLOAT
    4: 'str',    # upc (index 4) - STRING
    5: 'str',    # description (index 5) - STRING
    6: 'str',    # trans_type (index 6) - STRING
    7: 'str',    # trans_subtype (index 7) - STRING
    8: 'str',    # trans_status (index 8) - STRING
    9: 'float',  # department (index 9) - FLOAT
    10: 'float', # quantity (index 10) - FLOAT
    11: 'float', # Scale (index 11) - FLOAT
    12: 'float', # cost (index 12) - FLOAT
    13: 'float', # unitPrice (index 13) - FLOAT
    14: 'float', # total (index 14) - FLOAT
    15: 'float', # regPrice (index 15) - FLOAT
    16: 'float', # altPrice (index 16) - FLOAT
    17: 'float', # tax (index 17) - FLOAT
    18: 'float', # taxexempt (index 18) - FLOAT
    19: 'float', # foodstamp (index 19) - FLOAT
    20: 'float', # wicable (index 20) - FLOAT
    21: 'float', # discount (index 21) - FLOAT
    22: 'float', # memDiscount (index 22) - FLOAT
    23: 'float', # discountable (index 23) - FLOAT
    24: 'float', # discounttype (index 24) - FLOAT
    25: 'float', # voided (index 25) - FLOAT
    26: 'float', # percentDiscount (index 26) - FLOAT
    27: 'float', # ItemQtty (index 27) - FLOAT
    28: 'float', # volDiscType (index 28) - FLOAT
    29: 'float', # volume (index 29) - FLOAT
    30: 'float', # VolSpecial (index 30) - FLOAT
    31: 'float', # mixMatch (index 31) - FLOAT
    32: 'float', # matched (index 32) - FLOAT
    33: 'str',   # memType (index 33) - STRING
    34: 'str',   # staff (index 34) - STRING
    35: 'float', # numflag (index 35) - FLOAT
    36: 'float', # itemstatus (index 36) - FLOAT
    37: 'float', # tenderstatus (index 37) - FLOAT
    38: 'str',   # charflag (index 38) - STRING
    39: 'float', # varflag (index 39) - FLOAT
    40: 'str',   # batchHeaderID (index 40) - STRING
    41: 'float', # local (index 41) - FLOAT
    42: 'str',   # organic (index 42) - STRING (changed from FLOAT to STRING)
    43: 'str',   # display (index 43) - STRING
    44: 'float', # receipt (index 44) - FLOAT
    45: 'float', # card_no (index 45) - FLOAT
    46: 'float', # store (index 46) - FLOAT
    47: 'float', # branch (index 47) - FLOAT
    48: 'float', # match_id (index 48) - FLOAT
    49: 'float'  # trans_id (index 49) - FLOAT
}

In [9]:
expected_columns = [
    'datetime', 'register_no', 'emp_no', 'trans_no', 'upc', 'description', 
    'trans_type', 'trans_subtype', 'trans_status', 'department', 'quantity', 
    'Scale', 'cost', 'unitPrice', 'total', 'regPrice', 'altPrice', 'tax', 
    'taxexempt', 'foodstamp', 'wicable', 'discount', 'memDiscount', 
    'discountable', 'discounttype', 'voided', 'percentDiscount', 'ItemQtty', 
    'volDiscType', 'volume', 'VolSpecial', 'mixMatch', 'matched', 'memType', 
    'staff', 'numflag', 'itemstatus', 'tenderstatus', 'charflag', 'varflag', 
    'batchHeaderID', 'local', 'organic', 'display', 'receipt', 'card_no', 
    'store', 'branch', 'match_id', 'trans_id'
]

In [10]:
#Not using currently, use cell below for data with \\N
'''

folder_path = 'data/Wedge_full/'

# Function to read a CSV file and detect the delimiter
def read_csv_with_delimiter(file_path, expected_columns=expected_columns, dtypes=dtypes):
    try:
        # Try reading with a comma as the delimiter, with dtypes applied
        df = pd.read_csv(file_path, dtype=dtypes)
        
        # Check if the number of columns matches the expected number
        if df.shape[1] != expected_columns:
            print(f"Column mismatch with comma delimiter, trying with semicolon...")
            # Try reading with a semicolon as the delimiter
            df = pd.read_csv(file_path, delimiter=';', dtype=dtypes)
        
        return df
    except Exception as e:
        print(f'Error reading {file_path}: {e}')
        return None

'''

'\n\nfolder_path = \'data/Wedge_full/\'\n\n# Function to read a CSV file and detect the delimiter\ndef read_csv_with_delimiter(file_path, expected_columns=expected_columns, dtypes=dtypes):\n    try:\n        # Try reading with a comma as the delimiter, with dtypes applied\n        df = pd.read_csv(file_path, dtype=dtypes)\n        \n        # Check if the number of columns matches the expected number\n        if df.shape[1] != expected_columns:\n            print(f"Column mismatch with comma delimiter, trying with semicolon...")\n            # Try reading with a semicolon as the delimiter\n            df = pd.read_csv(file_path, delimiter=\';\', dtype=dtypes)\n        \n        return df\n    except Exception as e:\n        print(f\'Error reading {file_path}: {e}\')\n        return None\n\n'

In [11]:
##Alternate for above when \\N is present
folder_path = 'data/Wedge_full/'

def read_csv_with_delimiter(file_path, expected_columns=expected_columns, dtypes=dtypes):
    try:
        # Step 1: Try reading with a comma as the delimiter, treating '\\N' as NaN
        df = pd.read_csv(file_path, dtype=dtypes, na_values='\\N')

        # Step 2: Check if the number of columns matches the expected number
        if df.shape[1] != len(expected_columns):
            print(f"Column mismatch with comma delimiter in {file_path}, trying with semicolon...")

            # Try reading with a semicolon as the delimiter, treating '\\N' as NaN
            df = pd.read_csv(file_path, delimiter=';', dtype=dtypes, na_values='\\N')

        # Step 3: If the first row looks like headers (columns don't match), treat it as data
        if list(df.columns) != expected_columns:
            print(f"Header mismatch detected in {file_path}. Moving first row to data and applying expected columns.")
            
            # Move current headers into the first row and reset the index
            df.columns = range(df.shape[1])  # Temporarily rename columns with numeric indices
            df = pd.concat([pd.DataFrame([df.columns], columns=df.columns), df], ignore_index=True)

            # Assign expected_columns as headers
            df.columns = expected_columns
        
        return df

    except Exception as e:
        print(f'Error reading {file_path}: {e}')
        return None

In [16]:
# Get a list of all CSV files in the folder
csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]

slice_length = 5     # 3 took 15 minutes
slices = [csv_files[i:i+slice_length] for i in range(0, len(csv_files), slice_length)]

slice_to_process = 0 # change this to the slice you want to process
csv_files = slices[slice_to_process]

In [17]:
csv_files

['transArchive_201304_201306_inactive.csv']

In [18]:
# Read in the CSV files using the defined function
dataframes = []
for file in csv_files:
    file_path = os.path.join(folder_path, file)
    
    # Read in the data
    df = read_csv_with_delimiter(file_path, expected_columns, dtypes=dtypes)
    
    # Append the dataframe to the list
    if df is not None:
        dataframes.append(df)
    else:
        print(f'Error reading dataframe for file {file_path}')

Column mismatch with comma delimiter in data/Wedge_full/transArchive_201304_201306_inactive.csv, trying with semicolon...


In [19]:
len(dataframes)

1

Look at first few rows

In [20]:
for i, df in enumerate(dataframes):
    print(f"\nDataFrame {i+1}:")
    print(df.head())



DataFrame 1:
              datetime  register_no  emp_no  trans_no            upc  \
0  2013-04-01 09:28:38          2.0    44.0       2.0  0076333201701   
1  2013-04-01 09:28:42          2.0    44.0       2.0  0089348100141   
2  2013-04-01 09:28:42          2.0    44.0       2.0  0089348100141   
3  2013-04-01 09:28:45          2.0    44.0       2.0  0066529000303   
4  2013-04-01 09:29:01          2.0    44.0       2.0              0   

                      description trans_type trans_subtype trans_status  \
0  Sport-Top Steel Bottle 12oz KK          I                              
1    Hand Sanitizer Wipes 10ct CW          I                              
2    Hand Sanitizer Wipes 10ct CW          I                              
3           Blackberries 6oz pkg.          I                              
4                     Credit Card          T            CC                

   department  ...  batchHeaderID  local  organic  display  receipt  card_no  \
0         9.0  ...    

In [21]:
#Show number of columns, if columns match expected columns, and total null values
def check_dataframe(df, df_index):
    # Number of columns
    num_columns = df.shape[1]
    
    # Check if column names match expected_columns
    columns_match = set(df.columns) == set(expected_columns)
    
    # Count of null values in the dataset
    null_count = df.isnull().sum().sum()
    
    # Print the results
    print(f"DataFrame {df_index + 1}:")
    print(f"  Number of columns: {num_columns}")
    print(f"  Columns match expected columns: {columns_match}")
    print(f"  Total null values: {null_count}")
    print("-" * 40)

# Iterate through each DataFrame and check its properties
for i, df in enumerate(dataframes):
    check_dataframe(df, i)

DataFrame 1:
  Number of columns: 50
  Columns match expected columns: True
  Total null values: 633803
----------------------------------------


In [22]:
def clean_dataframe(df, expected_columns):
    try:
        # Remove "" from column headers
        df.columns = df.columns.str.replace('"', '', regex=False)

        # Replace any "\N" values in the headers with None (which will be NULL in SQL)
        df.columns = [col if col != '\\N' else None for col in df.columns]
        
        # Replace problematic values with None (equivalent to NULL)
        df.replace(['\\N', 'NULL', 'null', 'NaN', 'nan'], None, inplace=True)

        # Remove "" from the data
        df = df.replace('"', '', regex=True)
        df = df.replace(';', ',', regex=True)
        
        # Check if column headers match expected_columns
        if list(df.columns) != expected_columns:
            print(f"Columns do not match, replacing with expected columns")
            df.columns = expected_columns

            # Move current column headers (which are actual data) into the first row
            df.columns = range(df.shape[1])  # Temporarily rename columns with numeric indices
            df = pd.concat([pd.DataFrame([df.columns], columns=df.columns), df], ignore_index=True)

        # Replace empty cells and any problematic values (e.g., \N) with None (NULL)
        df.replace(r'^\s*$', None, regex=True, inplace=True)
        df.replace('\\N', None, inplace=True)

        return df
    except Exception as e:
        print(f"Error cleaning dataframe: {e}")
        return None

# Clean each dataframe in the list
cleaned_dataframes = []
for df in dataframes:
    cleaned_df = clean_dataframe(df, expected_columns)
    if cleaned_df is not None:
        cleaned_dataframes.append(cleaned_df)
    else:
        print(f"Error cleaning dataframe")


  df.replace(r'^\s*$', None, regex=True, inplace=True)


In [23]:
#for dataframes with only 1 column, resplit
for i, df in enumerate(dataframes):  # Use enumerate to get both index and DataFrame
    if df.shape[1] == 1:  # Check if the dataframe has only one column
        print(f"Dataframe at index {i} has only one column, attempting to split...")

        #Remove headers if there are any
        if df.iloc[0, 0].startswith('datetime'):
            df = df.iloc[1:]

        # Step 1: Split the single column (data) into multiple columns based on commas
        df_split = df.iloc[:, 0].str.split(',', expand=True)

        # Step 2: Check for rows that have more than 50 columns
        for idx, row in df_split.iterrows():
            if len(row) > 50:
                print(f"Row {idx} in DataFrame {i} has {len(row)} columns:\n{row}")

        # Step 3: Ensure the split column count matches the expected columns
        if len(expected_columns) != df_split.shape[1]:
            # Raise an error if there is a mismatch
            raise ValueError(
                f"Column mismatch at DataFrame index {i}: Expected {len(expected_columns)} columns, "
                f"but found {df_split.shape[1]} after splitting."
            )

        # Step 4: Assign the expected_columns as the new header
        df_split.columns = expected_columns

        # Replace the original DataFrame in the list with the new split DataFrame
        dataframes[i] = df_split
    else:
        # If the DataFrame has more than one column, leave it as is
        print(f"Dataframe at index {i} has more than one column, no action needed.")

Dataframe at index 0 has more than one column, no action needed.


In [24]:
#change datatypes

for df in dataframes:
    for col, dtype in dtypes.items():
        col_name = expected_columns[col]
        try:
            df[col_name] = df[col_name].astype(dtype)
        except ValueError as e:
            print(f"Error converting column '{col_name}' to {dtype}: {e}")
            df[col_name] = df[col_name].astype(str)
            for idx, value in df[col_name].items():
                if pd.isna(value):
                    print(f"Conversion failed at row {idx}, column '{col_name}'")


## Export to CSV

In [25]:
export_directory = "data/full_clean/"

os.makedirs(export_directory, exist_ok=True)

# Iterate through each cleaned DataFrame and save it using its original filename
for i, df in enumerate(dataframes):
    # Use the corresponding original filename from `csv_files`
    filename = csv_files[i]  # Fetch the original filename
    file_path = os.path.join(export_directory, filename)
    
    # Export the cleaned DataFrame to CSV
    df.to_csv(file_path, index=False)

## Export to GBQ

Setup GBQ

In [26]:
gpq_path = "C:/Documents/Business Analytics Grad/Applied Data Analytics/wedge/"
gbq_project_id = "rhamre"
gbq_dataset_id = "wedge"

private_key = "C:/Documents/Business Analytics Grad/Applied Data Analytics/wedge/data/rhamre-10ae9aad8c6b.json"

gbq_credentials = service_account.Credentials.from_service_account_file(private_key)
client = bigquery.Client(credentials=gbq_credentials, project=gbq_project_id)

In [27]:
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
job_config.schema_update_options = [bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION]

Schema

In [28]:
job_config.schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="NULLABLE"),
    bigquery.SchemaField("register_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("emp_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("upc", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("description", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_type", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_subtype", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("trans_status", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("department", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("quantity", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("Scale", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("cost", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("unitPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("total", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("regPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("altPrice", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tax", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("taxexempt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("foodstamp", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("wicable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discountable", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("discounttype", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("voided", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("percentDiscount", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("ItemQtty", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volDiscType", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("volume", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("VolSpecial", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("mixMatch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("matched", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("memType", "STRING", mode="NULLABLE"),  # Changed from BOOLEAN to STRING
    bigquery.SchemaField("staff", "STRING", mode="NULLABLE"),    # Changed from BOOLEAN to STRING
    bigquery.SchemaField("numflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("itemstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("tenderstatus", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("charflag", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("varflag", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("batchHeaderID", "STRING", mode="NULLABLE"),  # Changed from BOOLEAN to STRING
    bigquery.SchemaField("local", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("organic", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("display", "STRING", mode="NULLABLE"),  # Changed from BOOLEAN to STRING
    bigquery.SchemaField("receipt", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("card_no", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("store", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("branch", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("match_id", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("trans_id", "FLOAT", mode="NULLABLE")
]

job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1

Iterate over files and upload

In [29]:
data_directory = "data/full_clean/"

In [30]:
def table_exists(client, table_ref):
    try:
        client.get_table(table_ref)
        return True
    except NotFound:
        return False

In [31]:
clean_files = os.listdir(data_directory)

In [32]:
for clean_file_name in clean_files:
    tab, other = clean_file_name.split(".csv")      #change if file names are different, for example "_clean" or ".csv"
    table_full_name = ".".join([gbq_project_id, gbq_dataset_id, tab])

    if not table_exists(client, table_full_name):
        table_ref = client.create_table(table = table_full_name)
    else:
        table_ref = client.get_table(table_full_name)

    with open(data_directory + clean_file_name, mode = "rb") as source_file:
        job = client.load_table_from_file(
            source_file,
            table_ref,
            location = "US",
            job_config = job_config
            )


In [108]:
job.result()

LoadJob<project=rhamre, location=US, id=217c9336-bea4-416c-9735-118e56c19c63>

In [109]:
#look at dataframes
'''
for i, df in enumerate(dataframes):
    print(f"DataFrame {i + 1}:")
    print(f"  Shape: {df.shape}")
    print(f"  Columns: {df.columns.tolist()}")
    print("-" * 40)
'''

'\nfor i, df in enumerate(dataframes):\n    print(f"DataFrame {i + 1}:")\n    print(f"  Shape: {df.shape}")\n    print(f"  Columns: {df.columns.tolist()}")\n    print("-" * 40)\n'

In [110]:
#delete all tables under wegde
'''
dataset_ref = bigquery.DatasetReference(gbq_project_id, gbq_dataset_id)

# List all tables in the dataset
tables = client.list_tables(dataset_ref)

# Loop through each table and delete it
for table in tables:
    table_id = f"{table.project}.{table.dataset_id}.{table.table_id}"
    print(f"Deleting table: {table_id}")
    client.delete_table(table_id)  # Make an API request to delete the table
    print(f"Deleted table {table_id}")


'''

'\ndataset_ref = bigquery.DatasetReference(gbq_project_id, gbq_dataset_id)\n\n# List all tables in the dataset\ntables = client.list_tables(dataset_ref)\n\n# Loop through each table and delete it\nfor table in tables:\n    table_id = f"{table.project}.{table.dataset_id}.{table.table_id}"\n    print(f"Deleting table: {table_id}")\n    client.delete_table(table_id)  # Make an API request to delete the table\n    print(f"Deleted table {table_id}")\n\n\n'