Task 1: Cleaning the Wedge transaction data files and uploading them to GBQ.

In [1]:
import os
import re
import datetime 
from zipfile import ZipFile
import pandas as pd
import numpy as np
import pandas_gbq
import janitor
import shutil
import glob
from google.cloud import bigquery
from google.oauth2 import service_account

This next section of cells is necessary to generate lists of delimiters and headers for use later on in the code in this notebook.

In [2]:
# Creates list of zip files in this directory.
zip_files = os.listdir("WedgeZipOfZips/")

In [3]:
# Checking for delimiters
import csv
import io

delimiters = dict() 
# r-read,w-write,a-append
# Start by reading in all the files again.
for this_zf in zip_files :
    with ZipFile("WedgeZipOfZips/" + this_zf,'r') as zf :
        zipped_files = zf.namelist()
        
        for file_name in zipped_files :
            input_file = zf.open(file_name,'r')
            input_file = io.TextIOWrapper(input_file,encoding="utf-8")
            
            dialect = csv.Sniffer().sniff(sample=input_file.readline(),
                                      delimiters=[",",";","\t"])
            
            delimiters[file_name] = dialect.delimiter
            
            print(" ".join(["It looks like",
                           file_name,
                           "has delimiter",
                           dialect.delimiter,
                           "."]))

            input_file.close() # Tidy up.

It looks like transArchive_201001_201003.csv has delimiter , .
It looks like transArchive_201004_201006.csv has delimiter , .
It looks like transArchive_201007_201009.csv has delimiter , .
It looks like transArchive_201010_201012.csv has delimiter , .
It looks like transArchive_201101_201103.csv has delimiter , .
It looks like transArchive_201104.csv has delimiter , .
It looks like transArchive_201105.csv has delimiter , .
It looks like transArchive_201106.csv has delimiter , .
It looks like transArchive_201107_201109.csv has delimiter , .
It looks like transArchive_201110_201112.csv has delimiter , .
It looks like transArchive_201201_201203.csv has delimiter , .
It looks like transArchive_201201_201203_inactive.csv has delimiter ; .
It looks like transArchive_201204_201206.csv has delimiter , .
It looks like transArchive_201204_201206_inactive.csv has delimiter ; .
It looks like transArchive_201207_201209.csv has delimiter , .
It looks like transArchive_201207_201209_inactive.csv has 

In [4]:
# Checking for headers
headers = dict()

for this_zf in zip_files :
    with ZipFile("WedgeZipOfZips/" + this_zf,'r') as zf :
        zipped_files = zf.namelist()

        for file_name in zipped_files :
            input_file = zf.open(file_name,'r')
            input_file = io.TextIOWrapper(input_file,encoding="utf-8")
            
            this_delimiter = delimiters[file_name]
            
            for line in input_file :
                #print(line.strip().split(this_delimiter))
                break
            #print(line)
            headers[file_name] = "datetime" in line
                        
            input_file.close() # Tidy up.

Next, the 'Wedge_Unzipped' directory is deleted if it exists. Then we need an extraction of the original zipped data into a new directory 'Wedge_Unzipped'

In [5]:
folderPath = 'Wedge_Unzipped';
    
# Check if folder exists or not.
if os.path.exists(folderPath):
      
    # Delete Folder.
    shutil.rmtree(folderPath)
  
    print("The folder has been deleted successfully!")
else:
    print("Cannot delete the folder as it doesn't exists")

The folder has been deleted successfully!


In [6]:
# Extracts all zips in 'WedgeZipOfZips' to 'Wedge_Unzipped'.
for zipf in zip_files :
    with ZipFile("WedgeZipOfZips/" + zipf,'r') as zf :  
        print(zf.namelist())
        
        print('Extracting all the files now...')
        zf.extractall('Wedge_Unzipped')
        print('Done!')
        #break

['transArchive_201001_201003.csv']
Extracting all the files now...
Done!
['transArchive_201004_201006.csv']
Extracting all the files now...
Done!
['transArchive_201007_201009.csv']
Extracting all the files now...
Done!
['transArchive_201010_201012.csv']
Extracting all the files now...
Done!
['transArchive_201101_201103.csv']
Extracting all the files now...
Done!
['transArchive_201104.csv']
Extracting all the files now...
Done!
['transArchive_201105.csv']
Extracting all the files now...
Done!
['transArchive_201106.csv']
Extracting all the files now...
Done!
['transArchive_201107_201109.csv']
Extracting all the files now...
Done!
['transArchive_201110_201112.csv']
Extracting all the files now...
Done!
['transArchive_201201_201203.csv']
Extracting all the files now...
Done!
['transArchive_201201_201203_inactive.csv']
Extracting all the files now...
Done!
['transArchive_201204_201206.csv']
Extracting all the files now...
Done!
['transArchive_201204_201206_inactive.csv']
Extracting all the 

Now, the files in 'Wedge_Unzipped' that do not have headers need to be moved to a new directory called 'O_Headers' after deleting this directory if it exists. The code then converts the headerless csv files to files with headers and moves them back to 'Wedge_Unzipped'. After, we clean up 'O_Headers' as it is no longer needed.

In [7]:
folderPath = 'O_Headers';
    
# Check if folder exists or not.
if os.path.exists(folderPath):
      
    # Delete Folder.
    shutil.rmtree(folderPath)
  
    print("The folder has been deleted successfully!")
else:
    print("Cannot delete the folder as it doesn't exists")

Cannot delete the folder as it doesn't exists


In [8]:
# This cell converts the headers dictionary built earlier on into a dataframe.
keys = []
values = []

for value in headers.values():
    values.append(value)
for key in headers.keys():
    keys.append(key)

# Column name list. 
col_names =  ['file', 'headers']
  
# Create an empty dataframe.
# Add columns.
headers_df  = pd.DataFrame(columns = col_names)
headers_df.file = keys
headers_df.headers = values

# Show the dataframe.
# headers_df

In [9]:
# Converts boolean to float.
headers_df['headers'] = headers_df['headers'].astype(float)
# headers_df['headers']

In [10]:
# Creates list of files with no headers.
headless = headers_df[(headers_df.headers == 0)]
headless = headless.file
file_names_headless = []

for file_name in headless :
    file_names_headless.append(file_name)

In [11]:
# Creates 'O_Headers' directory.
path = "O_Headers"
# Check whether the specified path exists or not
isExist = os.path.exists(path)
if not isExist:

   # Create a new directory because it does not exist.
   os.makedirs(path)
   print("The new directory is created!")

The new directory is created!


In [12]:
# Moves files with no headers to 'O_Headers'.
source_folder = r"Wedge_Unzipped\\"
destination_folder = r"O_Headers\\"
files_to_move = file_names_headless

for file in files_to_move:
    # Construct full file path.
    source = source_folder + file
    destination = destination_folder + file
    # Move files.
    shutil.move(source, destination)
    print('Moved:', file)

Moved: transArchive_201511.csv
Moved: transArchive_201512.csv
Moved: transArchive_201601.csv
Moved: transArchive_201602.csv
Moved: transArchive_201603.csv
Moved: transArchive_201604.csv
Moved: transArchive_201605.csv
Moved: transArchive_201606.csv
Moved: transArchive_201607.csv
Moved: transArchive_201608.csv
Moved: transArchive_201609.csv
Moved: transArchive_201610.csv
Moved: transArchive_201611.csv
Moved: transArchive_201612.csv
Moved: transArchive_201701.csv


In [13]:
# Reads headless list to df and then back to csv with specified headers to 'Wedge_Unzipped'.
headless_list = os.listdir("O_Headers")

# Read csv file to a DataFrame.
for headless in headless_list :
    big_heads = pd.read_csv('O_headers\\'+headless, header = None)
    
    # Write DataFrame to csv file, but with header.
for headless in headless_list :
    path = 'Wedge_Unzipped\\'
    big_heads.to_csv(
        path + headless,
        header=["datetime","register_no","emp_no","trans_no","upc","description","trans_type","trans_subtype","trans_status","department","quantity","Scale","cost","unitPrice","total","regPrice","altPrice","tax","taxexempt","foodstamp","wicable","discount","memDiscount","discountable","discounttype","voided","percentDiscount","ItemQtty","volDiscType","volume","VolSpecial","mixMatch","matched","memType","staff","numflag","itemstatus","tenderstatus","charflag","varflag","batchHeaderID","local","organic","display","receipt","card_no","store","branch","match_id","trans_id"],
        index=False)

  big_heads = pd.read_csv('O_headers\\'+headless, header = None)
  big_heads = pd.read_csv('O_headers\\'+headless, header = None)
  big_heads = pd.read_csv('O_headers\\'+headless, header = None)
  big_heads = pd.read_csv('O_headers\\'+headless, header = None)
  big_heads = pd.read_csv('O_headers\\'+headless, header = None)
  big_heads = pd.read_csv('O_headers\\'+headless, header = None)
  big_heads = pd.read_csv('O_headers\\'+headless, header = None)
  big_heads = pd.read_csv('O_headers\\'+headless, header = None)


In [14]:
# Deletes now useless 'O_Headers' directory.
folderPath = 'O_Headers';
    
# Check if folder exists or not.
if os.path.exists(folderPath):
      
    # Deletes folder.
    shutil.rmtree(folderPath)
  
    print("The folder has been deleted successfully!")
else:
    print("Cannot delete the folder as it doesn't exists")

The folder has been deleted successfully!


Now we need to upload the data to GBQ after establishing a connection.

In [15]:
# Building the private key.
service_path = "C:\\Users\\rsmcd\\OneDrive\\Desktop\\MSBA Fall 2022\\" # Path to json file.
service_file = 'reese-msba-9558fdd20984.json' # Name of json file.
gbq_proj_id = 'reese-msba' # Name of project.

# Creates single variable that leads to json file.
private_key =service_path + service_file  

In [16]:
# Now we pass in our credentials so that Python has permission to access our project.
credentials = service_account.Credentials.from_service_account_file(service_path + service_file)

In [17]:
# And finally we establish our connection.
client = bigquery.Client(credentials = credentials, project=gbq_proj_id)

In [18]:
# Look at list of data sets in client.
for item in client.list_datasets() : 
    print(item.full_dataset_id)

reese-msba:dram_shop
reese-msba:wedge_transactions


This body of code cleans up the files and uploads to GBQ.

In [19]:
gbq_proj_id = 'reese-msba'
dataset_id = 'wedge_transactions'
unzipped_files = os.listdir("Wedge_Unzipped")

for uz_file in unzipped_files :
    
    big_wedge = pd.read_csv('Wedge_Unzipped\\'+uz_file,sep=delimiters[uz_file], encoding = "utf-8") # Reads in files to big_wedge using file specific delimiters.
    big_wedge = big_wedge.replace(r'\N', np.nan).replace(r' ', np.nan) # Replaces all \N and ' ' with gbq ready null values.
    for idx, column in enumerate(big_wedge) : 
        if big_wedge[column].dtypes == object : # Converts all object columns to strings and replaces 'nan' with gbq ready null values.
            big_wedge = big_wedge.astype({column :'str'}).replace('nan', np.nan)
        if big_wedge[column].dtypes == "int64" : # Converts all integer columns to floats.
            big_wedge = big_wedge.astype({column :'float'})
    big_wedge['datetime'] = pd.to_datetime(big_wedge['datetime']) # Converts datetime columns to timestamp.
    cols = ['wicable','taxexempt','percentDiscount','receipt','match_id','local','organic','itemstatus','tenderstatus'] # List of columns to convert to float.
    for idx, col in enumerate(cols) :
        big_wedge[col] = pd.to_numeric(big_wedge[col]) # Converts cols to float.
    cols2 = ['memType','staff','batchHeaderID','display'] # List of columns to convert to boolean.
    for idx, col in enumerate(cols2) :
        big_wedge[col] = big_wedge[col].map({1:True, 0:False}).astype('boolean') # Maps 1 and 0 to True and False, then converts to boolean while preserving null values.
#     break

# Uploads all to GBQ.
    table_name, _ = uz_file.split(".")
    table_id = ".".join([gbq_proj_id,dataset_id,table_name])
    pandas_gbq.to_gbq(big_wedge, table_id, project_id=gbq_proj_id,if_exists="replace")
#     break    

## Should wicable, taxexempt, & local be boolean?

100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
  big_wedge = pd.read_csv('Wedge_Unzipped\\'+uz_file,sep=delimiters[uz_file], encoding = "utf-8") # Reads in files to big_wedge using file specific delimiters.
100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]


100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
  big_wedge = pd.read_csv('Wedge_Unzipped\\'+uz_file,sep=delimiters[uz_file], encoding = "utf-8") # Reads in files to big_wedge using file specific delimiters.
100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
  big_wedge = pd.read_csv('Wedge_Unzipped\\'+uz_file,sep=delimiters[uz_file], encoding = "utf-8") # Reads in files to big_wedge using file specific delimiters.
100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
100%|███████████████████████████████████████████████████████████████████████████

100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
  big_wedge = pd.read_csv('Wedge_Unzipped\\'+uz_file,sep=delimiters[uz_file], encoding = "utf-8") # Reads in files to big_wedge using file specific delimiters.
100%|████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<?, ?it/s]
