# Wedge Task 1
___
In this first task, we will work on uploading the Wedge data from around 50 zip files to Google BigQuery (GBQ). To make this process easier, we will initially run our script on a smaller subset of the Wedge data. This smaller subset is structured the same way as the larger dataset, which allows us to test the entire process effectively. Once we have processed the smaller subset and successfully uploaded it to GBQ, we will then change the directory settings in our script to point to the full dataset. This way, we can ensure everything works correctly before handling the entire volume of data.

In [1]:
import os
import zipfile
import pandas as pd
from collections import Counter
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
from google.cloud import bigquery

## Unpack the Zip Files
___
The first thing we need to do is extract the csvs from the zip files located in our data directory. We will first work with our subset of Wedge zip files. After the full script has run successfully, we will return to this point to change which directory we are processing.

In [2]:
# Gets a file path, attaching the file name to the data directory
def get_file_path(data_directory, file):
    file_path = os.path.join(data_directory, file)
    return file_path

In [3]:
# Unpack the zips and save the csvs to an output directory
def unpack_and_save_csv(zip_file_path, output_directory):
    # Open the zip file at the specified path
    with zipfile.ZipFile(zip_file_path, 'r') as Z:
        # Get the name of the single CSV file inside the zip
        file = Z.namelist()[0]
        # Create the output directory if it doesn't exist
        if not os.path.exists(output_directory):
            os.makedirs(output_directory)
        # Extract the file to the output directory
        Z.extract(file, path=output_directory)
        extracted_file_path = get_file_path(output_directory, file)
        # Read the extracted CSV into a pandas DataFrame
        contents = pd.read_csv(extracted_file_path)
        # Return the file name and the DataFrame with contents
        return file, contents

In [4]:
data_directory = "data/WedgeZipOfZips/"
csv_directory = "data/WedgeCSVs/"

# Loop through all zip files in the data directory
for zip_file in os.listdir(data_directory):
    if zip_file.endswith('.zip'):
        # Get the full path of the zip file
        zip_file_path = get_file_path(data_directory, zip_file)
        # Unpack the zips and save the csvs to an output directory
        unpack_and_save_csv(zip_file_path, csv_directory)

  contents = pd.read_csv(extracted_file_path)
  contents = pd.read_csv(extracted_file_path)
  contents = pd.read_csv(extracted_file_path)


OSError: [Errno 28] No space left on device

# Header Validation
___
Now, let's check to make sure each of the files in our csv directory are the same.

In [20]:
# Gets the headers (column names) from a directory of csvs
# Will output a dataframe of the counts of different headers
def get_headers(csv_directory):
    headers = []
    # Loop over the csvs
    for csv_file in os.listdir(csv_directory):
        # Get the csv file path
        csv_file_path = get_file_path(csv_directory, csv_file)
        # Read in the first row of the csv, which is the header
        df = pd.read_csv(csv_file_path, nrows=0) 
        # Make the header a tuple and append it to our list
        headers.append(tuple(df.columns)) 
    # Create the dataframe of header counts
    headers_df = pd.DataFrame(Counter(headers).items(), columns=['Header', 'Count']).sort_values(by='Count', ascending=False)
    # Return the dataframe of header counts
    return headers_df

In [None]:
# Display the headers
headers_df = get_headers(csv_directory)
headers_df

In [None]:
print(headers_df.iloc[0,0])
print(headers_df.iloc[1,0])

It looks like on of the problems we have is differently delimited files. Some have columns separated by commas, others by semicolons.

In [25]:
for csv_file in os.listdir(csv_directory):
    # Get the csv file path
    csv_file_path = get_file_path(csv_directory, csv_file)
    # Determine the current delimiter by checking the first line of the file
    with open(csv_file_path, 'r') as f:
        first_line = f.readline().strip()
        current_delimiter = ',' if ',' in first_line else ';'  
    # Read the CSV file with the current delimiter
    df = pd.read_csv(csv_file_path, delimiter=current_delimiter)    
    # Save the file back with a semicolon (;) delimiter
    df.to_csv(csv_file_path, sep=';', index=False)

In [None]:
# Display the headers
headers_df = get_headers(csv_directory)
headers_df

Now, it looks like we've addresses the different delimiters, but we still have multiple data sets that don't have a header. To alleviate this, we will functionally add headers to csvs that do not have headers. We do this by comparing the header to the most common header. If the header is not the same as the most common header, we create a new data set with that header, append the data, and replace the file that didn't initially have the header.

In [27]:
# Find teh most common header
most_common_header = headers_df.iloc[0,0]
# Loop over our csv files
for csv_file in os.listdir(csv_directory):
    # Get the csv file path
    csv_file_path = get_file_path(csv_directory, csv_file)
    # Read the csv file to a pandas df
    df = pd.read_csv(csv_file_path)
    # Check if the first row matches the most common header
    if tuple(df.columns) != most_common_header:
        df = pd.read_csv(csv_file_path, header=None)
        df.columns = list(most_common_header)
        # Overwrite the original csv file with the new header
        df.to_csv(csv_file_path, index=False)

In [None]:
# Display the headers
headers_df = get_headers(csv_directory)
headers_df

Awesome, now all of the csv files have the same delimiter and have common headers.
## Double Quotation Handling
___
Now, when first investigating the files, there are some instances where lines are not read in correctly. This was mainly due to there being multiple double quotations as a value in an oject column. To avoid complication, we remove the quotations from the entire data set. When doing this, however, we end up with some formatting issues where double quotes are used to differentiate between lines. We remove trailing semicolons, which happens to fix all of our issues with the double quotes issue.

In [29]:
# Removes double quotes and fixes several data sets with formatting issues
def remove_double_quotes(csv_directory):
    # Loop over our file of csvs
    for csv_file in os.listdir(csv_directory):
        # Get the file path of the csv
        csv_file_path = os.path.join(csv_directory, csv_file)
        # Read the csv file as a raw text file to handle quotes more flexibly
        with open(csv_file_path, 'r', encoding='utf-8') as file:
            content = file.readlines()
        # Remove all double quotes and trim trailing semicolons
        cleaned_content = []
        for line in content:
            # Remove double quotes and trailing white space
            line = line.replace('"', '').rstrip()
            # Remove last semicolon
            if line.endswith(';'):
                line = line[:-1] 
            # Add the cleaned content to our list.
            cleaned_content.append(line + '\n')
        # Write the cleaned content back to the CSV file
        with open(csv_file_path, 'w', encoding='utf-8') as file:
            file.writelines(cleaned_content) 

In [30]:
# Remove double quotes
remove_double_quotes(csv_directory)

# Investigating Data Types
___
Now that we have all our data commonly delimited, with headers, and no other pertinent issues on a file by file basis, we will shift the focus to combining the csvs. To do this, we want to ensure that each dataset has the same data type.

In [32]:
# Gets the data types from our directory of csvs
# Will output a dataframe of the counts of different datatypes
def get_datatypes(csv_directory):
    datatypes = []
    # Loop over the csvs
    for csv_file in os.listdir(csv_directory):
        # Get the file path of the csvs
        csv_file_path = get_file_path(csv_directory, csv_file)
        # Read the file as a pandas df
        df = pd.read_csv(csv_file_path, delimiter=";", low_memory=False)  
        # Find the data types and add them to our list
        datatypes.append(tuple(df.dtypes))
    # Create the dataframe of data type counts
    datatypes_df = pd.DataFrame(Counter(datatypes).items(), columns=['Header', 'Count']).sort_values(by='Count', ascending=False)
    # Return the dataframe of data type counts
    return datatypes_df

In [None]:
# Display the data types
datatypes_df = get_datatypes(csv_directory)
datatypes_df

It seems that not every data set has the same data type. So, we need to identify where we are having differences and make the appropriate changes.


In [34]:
# Get the datatypes into a dataframe
def get_datatypes_df(csv_directory):
    datatypes_list = [] 
    # Loop over the csvs
    for csv_file in os.listdir(csv_directory):
        # Get the file path of the csvs
        csv_file_path = get_file_path(csv_directory, csv_file)
        # Read the data into a pandas df
        df = pd.DataFrame(pd.read_csv(csv_file_path, delimiter=";", low_memory=False))  
        # Get the data types
        datatypes_list.append(df.dtypes)
    # Create the data frame of data types
    datatypes_df = pd.DataFrame(datatypes_list)
    # Return the data frame of data types
    return datatypes_df

In [None]:
# Display the data types
datatypes_df = get_datatypes_df(csv_directory)
datatypes_df

Rather than roll through each data set, we will assume that the most common data type is what should be expected.

In [40]:
# Gets the columns and most common data type in a dictionary
def check_common_datatype(csv_directory):
    # Return the data types df
    datatypes_df = get_datatypes_df(csv_directory)
    common_dtype_dict = {}
    # Get the most common data type and assign it to the dictionary
    for column in datatypes_df.columns:
        common_dtype = datatypes_df[column].mode()[0]
        common_dtype_dict[column] = common_dtype
    # Return the data type dictionary
    return common_dtype_dict

In [None]:
# Display the dictionary
dtype_mapping = check_common_datatype(csv_directory)
dtype_mapping

In [42]:
# Finds the data types so that they are mapped to what is in the dictionary
def fix_datatypes(csv_directory, mapping=None):
    # Keep track of how many columns need their data type changed
    needing_changed = 0
    # Loop over the csvs
    for csv_file in os.listdir(csv_directory):
        # Get the file path of the csvx]s
        csv_file_path = get_file_path(csv_directory, csv_file)
        try:
            # Read in the data
            df = pd.read_csv(csv_file_path, delimiter=";", dtype=mapping)  
            print(csv_file)
            print('-------')
            # Print if the column doesn't have the right data type
            for column in df.columns:
                if df[column].dtype != dtype_mapping[column]:
                    print(f"{column}: {df[column].dtype} should be converted to {dtype_mapping[column]}.")
                    needing_changed += 1
        except Exception as e:
            print(f"Error reading {csv_file}: {e}")
    # Return the number of columns needing data type changed
    return f"{needing_changed} columns need their data types changed."

In [None]:
fix_datatypes(csv_directory)

In [45]:
# Actually change the data types of columns in the csvs
def change_datatypes(csv_directory):
    # Loop over the csvs
    for csv_file in os.listdir(csv_directory):
        # Get the file path of the csv
        csv_file_path = get_file_path(csv_directory, csv_file)
        # Read in the data
        df = pd.read_csv(csv_file_path, delimiter=";")  
        # Empty the display column. We have no use for this column.
        df['display'] = ''
        print(csv_file)
        print('-------')
        # Loop over the columns in the csv    
        for column in df.columns:
            # Check if we have the right data type
            if df[column].dtype == dtype_mapping[column]:
                continue
            # Perform the conversion based on target type
            try:
                # Object conversion
                if dtype_mapping[column] == 'object':
                    # Replace nulls with blanks
                    df[column] = df[column].replace({
                        '\\N': '',
                        '\\N.1': '',
                        '\\N.2': '',
                        '\\N.3': ''
                        })
                    # Object conversion
                    df[column] = df[column].astype('object')
                # Float conversion
                elif dtype_mapping[column] == 'float64':
                    # Ensure the column is treated as string first
                    if df[column].dtype != 'object':
                        df[column] = df[column].astype('str')
                    # Modifies any places where we have strings that look like 2.4900.1
                    df[column] = df[column].str.replace(r'\.(\d{2}|\d{4})\.\d', r'.\1', regex=True)
                    # Replace nulls with 0s
                    df[column] = df[column].replace({
                        '\\N': 0,
                        '\\N.1': 0,
                        '\\N.2': 0,
                        '\\N.3': 0,
                        '\\N.4': 0,
                        '\\N.5': 0,
                        '\\N.6': 0,
                        ' ': 0})
                    # Replacing "Unnamed" with 0.0 for the float conversion
                    if df[column].dtype == 'object':
                        df[column] = df[column].replace(r'.*Unnamed.*', '0.0', regex=True)
                    # Float conversion                  
                    df[column] = df[column].astype('float64')
                # Integer conversion
                elif dtype_mapping[column] == 'int64':
                    # Ensure the column is treated as string first
                    if df[column].dtype != 'object':
                        df[column] = df[column].astype('str')
                    # Replace the nulls with 0s
                    df[column] = df[column].replace({
                        '\\N': 0,
                        '\\N.1': 0,
                        '\\N.2': 0,
                        '\\N.3': 0,
                        '\\N.4': 0,
                        '\\N.5': 0,
                        '\\N.6': 0,
                        ' ': 0
                        })
                    # Convert to float first, then to integer
                    df[column] = pd.to_numeric(df[column], errors='coerce').fillna(0).astype('float64')
                    # Integer conversion
                    df[column] = df[column].astype('int64')
                else:
                    print(f"{column}: Unsupported target type {dtype_mapping[column]}.")
                # Trying to handle the display column
                if df['display'].dtype == 'float64':
                    df[column] = df[column].astype('object')
            except Exception as e:
                print(f"Error converting {column} from {df[column].dtype} to {dtype_mapping[column]}: {e}")
        # Save the file back with a semicolon (;) delimiter
        df.to_csv(csv_file_path, sep=';', index=False)
    return "Changes complete."

In [None]:
change_datatypes(csv_directory)

In [None]:
fix_datatypes(csv_directory, mapping=dtype_mapping)

In [None]:
datatypes_df = get_datatypes_df(csv_directory)

# Initialize flag to track if any column has differing values
all_same = True

# Check if all values in each column are the same
for column in datatypes_df.columns:
    if datatypes_df[column].nunique() != 1:
        print(f"Values in column '{column}' are not the same.")
        all_same = False

# If no column with differing values was found, print all values are the same
if all_same:
    print("All values in all columns are the same.")

## Concatenating the Data Sets

In [49]:
# Initialize an empty list to hold data frames
df_list = []

# Loop through all the files in the folder
for filename in os.listdir(csv_directory):
    if filename.endswith('.csv'):  # Only process csv files
        # Get the file path of the csv
        csv_file_path = get_file_path(csv_directory, csv_file)
        # Read in the data set as a pandas data frame
        df = pd.read_csv(csv_file_path, delimiter=";", low_memory=False, dtype=dtype_mapping) 
        # Append the dataframe to the list
        df_list.append(df)

# Concatenate all DataFrames into one
big_df = pd.concat(df_list, ignore_index=True)
big_df['display'] = big_df['display'].fillna('')
file_name = 'data/transactions.csv'
big_df.to_csv(file_name, sep=';', index=False)

In [50]:
# Check and notify about datatype mismatches
for column, expected_dtype in dtype_mapping.items():
    if column in big_df.columns and str(big_df[column].dtype) != expected_dtype:
        print(f"Column '{column}': Actual type = {big_df[column].dtype}, Expected type = {expected_dtype}")

## To the Cloud
___
Finally, we'd like to take our data set and upload it to Google Big Query.

In [None]:
# Initialize a BigQuery client
client = bigquery.Client()

# Define the dataset and table
dataset_id = 'wedge-to-the-cloud.wedge_to_the_dataset'
table_id = 'wedge-to-the-cloud.wedge_to_the_dataset.transactions'

# Load data into BigQuery
job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")

job = client.load_table_from_dataframe(
    big_df, table_id, job_config=job_config)  # Make an API request.

job.result()  # Wait for the job to complete.

After validating the data set has been uploaded to GBQ, we know we can run this script for both the subset and full set of zip files. We have set up a few places for error checking, but since we have built some error throwing around our functions, we have built something that can be used to reprocess the csvs if we need to reprocess them or add more data.