# Wedge Task 1
___
In this first task, we will work on uploading the Wedge data from around 50 zip files to Google BigQuery (GBQ). To make this process easier, we will initially run our script on a smaller subset of the Wedge data. This smaller subset is structured the same way as the larger dataset, which allows us to test the entire process effectively. Once we have processed the smaller subset and successfully uploaded it to GBQ, we will then change the directory settings in our script to point to the full dataset. This way, we can ensure everything works correctly before handling the entire volume of data.

In [1]:
import os
import zipfile
import pandas as pd
import gc
from collections import Counter
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
from google.cloud import bigquery

## Unpack the Zip Files
___
The first thing we need to do is extract the csvs from the zip files located in our data directory. We will first work with our subset of Wedge zip files. After the full script has run successfully, we will return to this point to change which directory we are processing.

In [2]:
# Gets a file path, attaching the file name to the data directory
def get_file_path(data_directory, file):
    file_path = os.path.join(data_directory, file)
    return file_path

In [3]:
# Unpack the zips and save the csvs to an output directory
def unpack_and_save_csv(zip_file_path, output_directory):
    # Open the zip file at the specified path
    with zipfile.ZipFile(zip_file_path, 'r') as Z:
        # Get the name of the single CSV file inside the zip
        file = Z.namelist()[0]
        # Create the output directory if it doesn't exist
        if not os.path.exists(output_directory):
            os.makedirs(output_directory)
        # Extract the file to the output directory
        Z.extract(file, path=output_directory)
        extracted_file_path = get_file_path(output_directory, file)
        # Use chunks to read the CSV
        chunk_list = []  # To hold chunks of data
        for chunk in pd.read_csv(extracted_file_path, chunksize=10000):  # Adjust chunksize as needed
            chunk_list.append(chunk)
        # Concatenate all chunks into a single DataFrame if needed
        contents = pd.concat(chunk_list, ignore_index=True)
        # Memory management
        del chunk_list
        return file, contents

In [4]:
data_directory = "data/WedgeZipOfZips/"
csv_directory = "data/WedgeCSVs/"

# Loop through all zip files in the data directory
for zip_file in os.listdir(data_directory):
    if zip_file.endswith('.zip'):
        zip_file_path = get_file_path(data_directory, zip_file)
        file_name, contents = unpack_and_save_csv(zip_file_path, csv_directory)
        # Memory management
        del contents
        gc.collect()

# Header Validation
___
Now, let's check to make sure each of the files in our csv directory are the same.

In [5]:
# Gets the headers (column names) from a directory of csvs
# Will output a dataframe of the counts of different headers
def get_headers(csv_directory):
    headers = []
    # Loop over the csvs
    for csv_file in os.listdir(csv_directory):
        # Get the csv file path
        csv_file_path = get_file_path(csv_directory, csv_file)
        # Read in the first row of the csv, which is the header
        df = pd.read_csv(csv_file_path, nrows=0) 
        # Make the header a tuple and append it to our list
        headers.append(tuple(df.columns)) 
    # Create the dataframe of header counts
    headers_df = pd.DataFrame(Counter(headers).items(), columns=['Header', 'Count']).sort_values(by='Count', ascending=False)
    # Return the dataframe of header counts
    return headers_df

In [6]:
# Display the headers
headers_df = get_headers(csv_directory)
headers_df

Unnamed: 0,Header,Count
0,"(datetime, register_no, emp_no, trans_no, upc,...",26
1,"(datetime;""register_no"";""emp_no"";""trans_no"";""u...",12
2,"(2016-08-01 07:34:16, 51, 94, 7, 0000000000151...",1
3,"(2016-09-01 07:13:09, 51, 94, 6, 0, Change, T,...",1
4,"(2015-11-01 07:21:50, 51, 94, 4, TAX, Tax, A, ...",1
5,"(2015-12-01 07:03:06, 51, 94, 2, TAX, Tax, A, ...",1
6,"(2017-01-01 09:00:31, 51, 94, 12, 0, Change, T...",1
7,"(2016-04-01 07:34:35, 51, 94, 18, 0, Cash, T, ...",1
8,"(2016-10-01 07:04:40, 51, 94, 1, DISCOUNT, Dis...",1
9,"(2016-11-01 07:18:44, 51, 94, 11, 000000000101...",1


In [7]:
print(headers_df.iloc[0,0])
print(headers_df.iloc[1,0])

('datetime', 'register_no', 'emp_no', 'trans_no', 'upc', 'description', 'trans_type', 'trans_subtype', 'trans_status', 'department', 'quantity', 'Scale', 'cost', 'unitPrice', 'total', 'regPrice', 'altPrice', 'tax', 'taxexempt', 'foodstamp', 'wicable', 'discount', 'memDiscount', 'discountable', 'discounttype', 'voided', 'percentDiscount', 'ItemQtty', 'volDiscType', 'volume', 'VolSpecial', 'mixMatch', 'matched', 'memType', 'staff', 'numflag', 'itemstatus', 'tenderstatus', 'charflag', 'varflag', 'batchHeaderID', 'local', 'organic', 'display', 'receipt', 'card_no', 'store', 'branch', 'match_id', 'trans_id')
('datetime;"register_no";"emp_no";"trans_no";"upc";"description";"trans_type";"trans_subtype";"trans_status";"department";"quantity";"Scale";"cost";"unitPrice";"total";"regPrice";"altPrice";"tax";"taxexempt";"foodstamp";"wicable";"discount";"memDiscount";"discountable";"discounttype";"voided";"percentDiscount";"ItemQtty";"volDiscType";"volume";"VolSpecial";"mixMatch";"matched";"memType"

It looks like on of the problems we have is differently delimited files. Some have columns separated by commas, others by semicolons.

In [8]:
for csv_file in os.listdir(csv_directory):
    # Get the csv file path
    csv_file_path = get_file_path(csv_directory, csv_file)
    # Determine the current delimiter by checking the first line of the file
    with open(csv_file_path, 'r') as f:
        first_line = f.readline().strip()
        current_delimiter = ',' if ',' in first_line else ';'  
    # Read the CSV file with the current delimiter
    df = pd.read_csv(csv_file_path, delimiter=current_delimiter)    
    # Save the file back with a semicolon (;) delimiter
    df.to_csv(csv_file_path, sep=';', index=False)
    # Memory management
    del df
    gc.collect()

  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_delimiter)
  df = pd.read_csv(csv_file_path, delimiter=current_del

In [9]:
# Display the headers
headers_df = get_headers(csv_directory)
headers_df

Unnamed: 0,Header,Count
0,(datetime;register_no;emp_no;trans_no;upc;desc...,38
1,(2016-08-01 07:34:16;51;94;7;0000000000151;Ban...,1
2,(2016-09-01 07:13:09;51;94;6;0;Change;T;CA;Unn...,1
3,(2015-11-01 07:21:50;51;94;4;TAX;Tax;A;Unnamed...,1
4,(2015-12-01 07:03:06;51;94;2;TAX;Tax;A;Unnamed...,1
5,(2017-01-01 09:00:31;51;94;12;0;Change;T;CA;Un...,1
6,(2016-04-01 07:34:35;51;94;18;0;Cash;T;CA;Unna...,1
7,(2016-10-01 07:04:40;51;94;1;DISCOUNT;Discount...,1
8,(2016-11-01 07:18:44;51;94;11;0000000001014;Gr...,1
9,(2016-05-01 11:23:35;51;94;113;0000000004365;B...,1


Now, it looks like we've addresses the different delimiters, but we still have multiple data sets that don't have a header. To alleviate this, we will functionally add headers to csvs that do not have headers. We do this by comparing the header to the most common header. If the header is not the same as the most common header, we create a new data set with that header, append the data, and replace the file that didn't initially have the header.

In [10]:
# Find the most common header
most_common_header = headers_df.iloc[0,0]
most_common_header = tuple(most_common_header[0].split(';'))
# Loop over our csv files
for csv_file in os.listdir(csv_directory):
    # Get the csv file path
    csv_file_path = get_file_path(csv_directory, csv_file)
    # Read the csv file to a pandas df
    df = pd.read_csv(csv_file_path, delimiter=";")
    # Check if the first row matches the most common header
    if tuple(df.columns) != most_common_header:
        try:
            df = pd.read_csv(csv_file_path, delimiter=";", header=None)
            df.columns = most_common_header[0].split(";")
            # Overwrite the original csv file with the new header
            df.to_csv(csv_file_path, sep=';',index=False)
        except:
            df = pd.read_csv(csv_file_path, delimiter=";", header=None)
            df.columns = list(most_common_header)
            # Overwrite the original csv file with the new header
            df.to_csv(csv_file_path, sep=';',index=False)
    del df
    gc.collect()

  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";", header=None)
  df = pd.read_csv(csv_file_path, delimiter=";", header=None)
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";", header=None)
  df = pd.read_csv(csv_file_path, delimiter=";", header=None)
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_cs

In [11]:
# Display the headers
headers_df = get_headers(csv_directory)
headers_df

Unnamed: 0,Header,Count
0,(datetime;register_no;emp_no;trans_no;upc;desc...,53


Awesome, now all of the csv files have the same delimiter and have common headers.
## Double Quotation Handling
___
Now, when first investigating the files, there are some instances where lines are not read in correctly. This was mainly due to there being multiple double quotations as a value in an oject column. To avoid complication, we remove the quotations from the entire data set. When doing this, however, we end up with some formatting issues where double quotes are used to differentiate between lines. We remove trailing semicolons, which happens to fix all of our issues with the double quotes issue.

In [12]:
# Removes double quotes and fixes several data sets with formatting issues
def remove_double_quotes(csv_directory):
    # Loop over our file of csvs
    for csv_file in os.listdir(csv_directory):
        # Get the file path of the csv
        csv_file_path = os.path.join(csv_directory, csv_file)
        # Read the csv file as a raw text file to handle quotes more flexibly
        with open(csv_file_path, 'r', encoding='utf-8') as file:
            content = file.readlines()
        # Remove all double quotes and trim trailing semicolons
        cleaned_content = []
        for line in content:
            # Remove double quotes and trailing white space
            line = line.replace('"', '').rstrip()
            # Remove this weird case
            line = line.replace('259%?;?', '0')
            # Remove last semicolons
            if line.endswith(';'):
                line = line[:-1] 
            # Add the cleaned content to our list.
            cleaned_content.append(line + '\n')
        # Write the cleaned content back to the CSV file
        with open(csv_file_path, 'w', encoding='utf-8') as file:
            file.writelines(cleaned_content) 
        # Memory management
        del cleaned_content
        gc.collect()

In [13]:
# Remove double quotes
remove_double_quotes(csv_directory)

# Investigating Data Types
___
Now that we have all our data commonly delimited, with headers, and no other pertinent issues on a file by file basis, we will shift the focus to combining the csvs. To do this, we want to ensure that each dataset has the same data type.

In [14]:
# Gets the data types from our directory of csvs
# Will output a dataframe of the counts of different datatypes
def get_datatypes(csv_directory):
    datatypes = []
    # Loop over the csvs
    for csv_file in os.listdir(csv_directory):
        # Get the file path of the csvs
        csv_file_path = get_file_path(csv_directory, csv_file)
        # Read the file as a pandas df
        df = pd.read_csv(csv_file_path, delimiter=";")  
        # Find the data types and add them to our list
        datatypes.append(tuple(df.dtypes))
    # Create the dataframe of data type counts
    datatypes_df = pd.DataFrame(Counter(datatypes).items(), columns=['Header', 'Count']).sort_values(by='Count', ascending=False)
    # Return the dataframe of data type counts
    return datatypes_df

In [15]:
# Display the data types
datatypes_df = get_datatypes(csv_directory)
datatypes_df

  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(csv_file_path, delimiter=";")
  df = pd.read_csv(c

Unnamed: 0,Header,Count
0,"(object, int64, int64, int64, object, object, ...",9
1,"(object, int64, int64, int64, object, object, ...",8
5,"(object, int64, int64, int64, object, object, ...",4
8,"(object, int64, int64, int64, object, object, ...",4
3,"(object, int64, int64, int64, object, object, ...",3
25,"(object, int64, int64, int64, object, object, ...",3
6,"(object, int64, int64, int64, object, object, ...",2
2,"(object, int64, int64, int64, object, object, ...",2
7,"(object, int64, int64, int64, object, object, ...",1
4,"(object, int64, int64, int64, object, object, ...",1


It seems that not every data set has the same data type. So, we need to identify where we are having differences and make the appropriate changes.


In [16]:
# Get the datatypes into a dataframe
def get_datatypes_df(csv_directory):
    datatypes_list = [] 
    # Loop over the csvs
    for csv_file in os.listdir(csv_directory):
        # Get the file path of the csvs
        csv_file_path = get_file_path(csv_directory, csv_file)
        # Read the data into a pandas df
        df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))  
        # Get the data types
        datatypes_list.append(df.dtypes)
        del df
        gc.collect()
    # Create the data frame of data types
    datatypes_df = pd.DataFrame(datatypes_list)
    # Return the data frame of data types
    return datatypes_df

In [17]:
# Display the data types
datatypes_df = get_datatypes_df(csv_directory)
datatypes_df

  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimite

Unnamed: 0,datetime,register_no,emp_no,trans_no,upc,description,trans_type,trans_subtype,trans_status,department,...,batchHeaderID,local,organic,display,receipt,card_no,store,branch,match_id,trans_id
0,object,int64,int64,int64,object,object,object,object,object,int64,...,float64,float64,float64,object,float64,int64,int64,int64,float64,int64
1,object,int64,int64,int64,object,object,object,object,object,int64,...,float64,float64,float64,object,float64,int64,int64,int64,float64,int64
2,object,int64,int64,int64,object,object,object,object,object,int64,...,float64,float64,float64,object,float64,int64,int64,int64,float64,int64
3,object,int64,int64,int64,object,object,object,object,object,int64,...,float64,float64,float64,object,float64,int64,int64,int64,float64,int64
4,object,int64,int64,int64,object,object,object,object,object,int64,...,float64,float64,float64,object,float64,int64,int64,int64,float64,int64
5,object,int64,int64,int64,object,object,object,object,object,int64,...,float64,float64,float64,float64,float64,int64,int64,int64,float64,int64
6,object,int64,int64,int64,object,object,object,object,object,int64,...,float64,float64,float64,object,float64,float64,int64,int64,float64,float64
7,object,int64,int64,int64,object,object,object,object,object,int64,...,float64,float64,float64,float64,float64,float64,int64,int64,float64,float64
8,object,int64,int64,int64,object,object,object,object,object,int64,...,float64,float64,float64,object,float64,float64,int64,int64,float64,float64
9,object,int64,int64,int64,object,object,object,object,object,int64,...,float64,float64,float64,object,float64,int64,int64,int64,float64,int64


Rather than look at each column's data type and deciding what would be the best data type to cast, we will assume that the most common data type is what should be expected.

In [18]:
# Gets the columns and most common data type in a dictionary
def check_common_datatype(csv_directory):
    # Return the data types df
    datatypes_df = get_datatypes_df(csv_directory)
    common_dtype_dict = {}
    # Get the most common data type and assign it to the dictionary
    for column in datatypes_df.columns:
        common_dtype = datatypes_df[column].mode()[0]
        common_dtype_dict[column] = common_dtype
    # Return the data type dictionary
    return common_dtype_dict

In [19]:
# Display the dictionary
dtype_mapping = check_common_datatype(csv_directory)
dtype_mapping

  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";"))
  df = pd.DataFrame(pd.read_csv(csv_file_path, delimite

{'datetime': dtype('O'),
 'register_no': dtype('int64'),
 'emp_no': dtype('int64'),
 'trans_no': dtype('int64'),
 'upc': dtype('O'),
 'description': dtype('O'),
 'trans_type': dtype('O'),
 'trans_subtype': dtype('O'),
 'trans_status': dtype('O'),
 'department': dtype('int64'),
 'quantity': dtype('float64'),
 'Scale': dtype('int64'),
 'cost': dtype('float64'),
 'unitPrice': dtype('float64'),
 'total': dtype('float64'),
 'regPrice': dtype('float64'),
 'altPrice': dtype('float64'),
 'tax': dtype('int64'),
 'taxexempt': dtype('float64'),
 'foodstamp': dtype('int64'),
 'wicable': dtype('int64'),
 'discount': dtype('float64'),
 'memDiscount': dtype('float64'),
 'discountable': dtype('int64'),
 'discounttype': dtype('int64'),
 'voided': dtype('int64'),
 'percentDiscount': dtype('float64'),
 'ItemQtty': dtype('float64'),
 'volDiscType': dtype('int64'),
 'volume': dtype('float64'),
 'VolSpecial': dtype('float64'),
 'mixMatch': dtype('int64'),
 'matched': dtype('int64'),
 'memType': dtype('O'),


In [20]:
# Finds where the data types need to be changed and any special cases of cleaning
def fix_datatypes(csv_directory, mapping=None):
    # Keep track of how many columns need their data type changed
    needing_changed = 0
    # Loop over the csvs
    for csv_file in os.listdir(csv_directory):
        # Get the file path of the csvx]s
        csv_file_path = get_file_path(csv_directory, csv_file)
        try:
            # Read in the data
            df = pd.read_csv(csv_file_path, delimiter=";", dtype=dtype_mapping)  
            print(csv_file)
            print('-------')
            # Print if the column doesn't have the right data type
            for column in df.columns:
                if df[column].dtype != dtype_mapping[column]:
                    print(f"{column}: {df[column].dtype} should be converted to {dtype_mapping[column]}.")
                    needing_changed += 1
            # Memory handling
            del df
            gc.collect()
        except Exception as e:
            print(f"Error reading {csv_file}: {e}")
    # Return the number of columns needing data type changed
    return f"{needing_changed} columns need their data types changed."

In [21]:
fix_datatypes(csv_directory)

transArchive_201207_201209.csv
-------
transArchive_201404_201406.csv
-------
transArchive_201401_201403.csv
-------
transArchive_201201_201203.csv
-------
transArchive_201204_201206.csv
-------
transArchive_201407_201409.csv
-------
transArchive_201210_201212_inactive.csv
-------
transArchive_201404_201406_inactive.csv
-------
transArchive_201301_201303_inactive.csv
-------
transArchive_201210_201212.csv
-------


  chunks = self._reader.read_low_memory(nrows)


Error reading transArchive_201410_201412.csv: cannot safely convert passed user dtype of int64 for float64 dtyped data in column 20
Error reading transArchive_201608.csv: could not convert string to float: '1.1900.1'
Error reading transArchive_201609.csv: cannot safely convert passed user dtype of int64 for float64 dtyped data in column 9
Error reading transArchive_201410_201412_inactive.csv: cannot safely convert passed user dtype of int64 for float64 dtyped data in column 20
transArchive_201204_201206_inactive.csv
-------
transArchive_201310_201312.csv
-------
transArchive_201307_201309_inactive.csv
-------
transArchive_201304_201306.csv
-------
Error reading transArchive_201507_201509.csv: Integer column has NA values in column 20
transArchive_201301_201303.csv
-------
Error reading transArchive_201501_201503.csv: cannot safely convert passed user dtype of int64 for float64 dtyped data in column 9
transArchive_201307_201309.csv
-------
Error reading transArchive_201504_201506.csv: I

'0 columns need their data types changed.'

In [22]:
# Actually change the data types of columns in the csvs
def change_datatypes(csv_directory):
    # Loop over the csvs
    for csv_file in os.listdir(csv_directory):
        # Get the file path of the csv
        csv_file_path = get_file_path(csv_directory, csv_file)
        # Read in the data
        df = pd.read_csv(csv_file_path, delimiter=";")  
        # Empty the display column. We have no use for this column.
        df['display'] = ''
        print(csv_file)
        print('-------')
        # Loop over the columns in the csv    
        for column in df.columns:
            # Check if we have the right data type
            if df[column].dtype == dtype_mapping[column]:
                continue
            # Perform the conversion based on target type
            try:
                # Object conversion
                if dtype_mapping[column] == 'object':
                    # Replace nulls with blanks
                    df[column] = df[column].replace({
                        '\\N': '',
                        '\\N.1': '',
                        '\\N.2': '',
                        '\\N.3': ''
                        })
                    # Object conversion
                    df[column] = df[column].astype('object')
                # Float conversion
                elif dtype_mapping[column] == 'float64':
                    # Ensure the column is treated as string first
                    if df[column].dtype != 'object':
                        df[column] = df[column].astype('str')
                    # Modifies any places where we have strings that look like 2.4900.1
                    df[column] = df[column].str.replace(r'\.(\d{2}|\d{4})\.\d', r'.\1', regex=True)
                    # Replace nulls with 0s
                    df[column] = df[column].replace({
                        '\\N': 0,
                        '\\N.1': 0,
                        '\\N.2': 0,
                        '\\N.3': 0,
                        '\\N.4': 0,
                        '\\N.5': 0,
                        '\\N.6': 0,
                        ' ': 0})
                    # Replacing "Unnamed" with 0.0 for the float conversion
                    if df[column].dtype == 'object':
                        df[column] = df[column].replace(r'.*Unnamed.*', '0.0', regex=True)
                    # Float conversion                  
                    df[column] = df[column].astype('float64')
                # Integer conversion
                elif dtype_mapping[column] == 'int64':
                    # Ensure the column is treated as string first
                    if df[column].dtype != 'object':
                        df[column] = df[column].astype('str')
                    # Replace the nulls with 0s
                    df[column] = df[column].replace({
                        '\\N': 0,
                        '\\N.1': 0,
                        '\\N.2': 0,
                        '\\N.3': 0,
                        '\\N.4': 0,
                        '\\N.5': 0,
                        '\\N.6': 0,
                        ' ': 0
                        })
                    # Convert to float first, then to integer
                    df[column] = pd.to_numeric(df[column], errors='coerce').fillna(0).astype('float64')
                    # Integer conversion
                    df[column] = df[column].astype('int64')
                else:
                    print(f"{column}: Unsupported target type {dtype_mapping[column]}.")
                # Trying to handle the display column
                if df['display'].dtype == 'float64':
                    df[column] = df[column].astype('object')
            except Exception as e:
                print(f"Error converting {column} from {df[column].dtype} to {dtype_mapping[column]}: {e}")
        # Save the file back with a semicolon (;) delimiter
        df.to_csv(csv_file_path, sep=';', index=False)
        # Memory management
        del df
        gc.collect()
    return "Changes complete."

In [23]:
change_datatypes(csv_directory)

  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201207_201209.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201404_201406.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201401_201403.csv
-------
transArchive_201201_201203.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201204_201206.csv
-------
transArchive_201407_201409.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201210_201212_inactive.csv
-------
transArchive_201404_201406_inactive.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201301_201303_inactive.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201210_201212.csv
-------
transArchive_201410_201412.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201608.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201609.csv
-------
transArchive_201410_201412_inactive.csv
-------
transArchive_201204_201206_inactive.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201310_201312.csv
-------
transArchive_201307_201309_inactive.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201304_201306.csv
-------
transArchive_201507_201509.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201301_201303.csv
-------
transArchive_201501_201503.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201307_201309.csv
-------
transArchive_201504_201506.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201511.csv
-------
transArchive_201107_201109.csv
-------
transArchive_201510.csv
-------
transArchive_201201_201203_inactive.csv
-------
transArchive_201101_201103.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201512.csv
-------
transArchive_201407_201409_inactive.csv
-------
transArchive_201110_201112.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201701.csv
-------
transArchive_201106.csv
-------
transArchive_201104.csv
-------
transArchive_201310_201312_inactive.csv
-------
transArchive_201105.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201604.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201610.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201207_201209_inactive.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201611.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201605.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201607.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201010_201012.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201606.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201612.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201401_201403_inactive.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201304_201306_inactive.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201602.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201603.csv
-------


  df = pd.read_csv(csv_file_path, delimiter=";")


transArchive_201601.csv
-------
transArchive_201001_201003.csv
-------
transArchive_201004_201006.csv
-------
transArchive_201007_201009.csv
-------


'Changes complete.'

In [24]:
fix_datatypes(csv_directory, mapping=dtype_mapping)

transArchive_201207_201209.csv
-------
transArchive_201404_201406.csv
-------
transArchive_201401_201403.csv
-------
transArchive_201201_201203.csv
-------
transArchive_201204_201206.csv
-------
transArchive_201407_201409.csv
-------
transArchive_201210_201212_inactive.csv
-------
transArchive_201404_201406_inactive.csv
-------
transArchive_201301_201303_inactive.csv
-------
transArchive_201210_201212.csv
-------
transArchive_201410_201412.csv
-------
transArchive_201608.csv
-------
transArchive_201609.csv
-------
transArchive_201410_201412_inactive.csv
-------
transArchive_201204_201206_inactive.csv
-------
transArchive_201310_201312.csv
-------
transArchive_201307_201309_inactive.csv
-------
transArchive_201304_201306.csv
-------
transArchive_201507_201509.csv
-------
transArchive_201301_201303.csv
-------
transArchive_201501_201503.csv
-------
transArchive_201307_201309.csv
-------
transArchive_201504_201506.csv
-------
transArchive_201511.csv
-------
transArchive_201107_201109.csv


'0 columns need their data types changed.'

## Concatenating the Data Sets and To the Cloud

In [31]:
def large_csv(csv_directory, output_file, dtype_mapping):
    # Initialize a BigQuery client
    client = bigquery.Client()
    # Define the dataset and table
    dataset_id = 'wedge-to-the-cloud.wedge_to_the_dataset'
    table_id = 'wedge-to-the-cloud.wedge_to_the_dataset.transactions'
    # Flag to check if the output file needs to be initialized (for the first file)
    is_first_file = True
    # Loop through all the files in the folder
    for filename in os.listdir(csv_directory):
        if filename.endswith('.csv'):  # Only process CSV files
            # Get the file path of the CSV
            csv_file_path = get_file_path(csv_directory, filename)
            # Read the current file
            df = pd.read_csv(csv_file_path, delimiter=";", dtype=dtype_mapping)
            # If it's the first file, write the header and initialize the file
            if is_first_file:
                df.to_csv(output_file, sep=';', index=False, mode='w')
                job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
                is_first_file = False
            else:
                # For subsequent files, append without writing the header
                df.to_csv(output_file, sep=';', index=False, mode='a', header=False)
                job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND")
            
            job = client.load_table_from_dataframe(
                df, table_id, job_config=job_config)


In [32]:
# Output file path
file_name = 'data/transactions.csv'
# Call the function to append the CSV files
large_csv(csv_directory, file_name, dtype_mapping)



In [55]:
# Initialize an empty list to hold data frames
#df_list = []

# Loop through all the files in the folder
#for filename in os.listdir(csv_directory):
#    if filename.endswith('.csv'):  # Only process csv files
        # Get the file path of the csv
#        csv_file_path = get_file_path(csv_directory, csv_file)
        # Read in the data set as a pandas data frame
#        df = pd.read_csv(csv_file_path, delimiter=";", dtype=dtype_mapping) 
        # Append the dataframe to the list
#        df_list.append(df)

# Concatenate all DataFrames into one
#big_df = pd.concat(df_list, ignore_index=True)
#big_df['display'] = big_df['display'].fillna('')
#file_name = 'data/transactions.csv'
#big_df.to_csv(file_name, sep=';', index=False)

## To the Cloud (Only)
___
Finally, this last chunk of code will put a single transactions file into GBQ. Our dataset is too large to do this.

In [None]:
# Initialize a BigQuery client
#client = bigquery.Client()

# Define the dataset and table
#dataset_id = 'wedge-to-the-cloud.wedge_to_the_dataset'
#table_id = 'wedge-to-the-cloud.wedge_to_the_dataset.transactions'

# Load data into BigQuery
#job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")

#job = client.load_table_from_dataframe(
#    big_df, table_id, job_config=job_config)  # Make an API request.

#job.result()  # Wait for the job to complete.

After validating the data set has been uploaded to GBQ, we know we can run this script for both the subset and full set of zip files. We have set up a few places for error checking, but since we have built some error throwing around our functions, we have built something that can be used to reprocess the csvs if we need to reprocess them or add more data.