# Python Pipeline...


In [45]:
# Imports
import pandas as pd                                                                                 # Loads Pandas package
import os                                                                                           # For OS related paths 
import json                                                                                         # Handles jSON

#### 1. ABSOLUTE PATH FOR RAW DATA  (THIS IS NEEDED FOR LOCAL FILES ONLY)

Takes: raw_data_name that should be the name of one of the csv's files <br />
Returns: a variable named csv containing the absolute path to the raw_data subfolder folder plus the name of the file in raw_data_name

In [46]:
# 1. ABSOLUTE PATH FOR RAW DATA   

def absolute_path_for_raw_data(raw_data_file):
    abspath = os.path.abspath("../raw_data")
    csv = abspath + "\\" + raw_data_file
    return csv

In [47]:
# 1. [TEST RUN] ABSOLUTE PATH FOR RAW DATA

# Current Files are Chesterfield or Leeds (comment one out)
# raw_data_file = "chesterfield_25-08-2021_09-00-00.csv"
raw_data_file = "leeds_01-01-2020_09-00-00.csv"

csv = absolute_path_for_raw_data(raw_data_file)
print(csv) # Prints the newly formed absolute path with the raw_data_file name given.


c:\Users\jzzz\Python\final-project\brewed-awakening-final-project\raw_data\leeds_01-01-2020_09-00-00.csv


### 1. LOADS AND SANITISE CSV (Remove given Columns) 

Takes: csv(absolute path to filename), and a list of columns to be dropped on a variable called sanatise_these_columns <br />
Returns: the sanitised dataframe as df


In [48]:
# 2. SANITISE CSV (Remove given Columns) 

def sanitise_csv(csv,sanitise_these_columns):
    try:
        columns = ['date_time', 'location', 'full_name', 'order', 'transaction_total', 'payment_type', 'card_number']  # Headers for the orders csv files
        df = pd.read_csv(csv, header=None, names=columns)
        sanatisedf = df.drop(columns=sanitise_these_columns)
    except FileNotFoundError as fnfe:
        print(f'File not found: {fnfe}')

    return sanatisedf

In [49]:
# 2. [TEST RUN] SANITISE CSV 

columns = ['date_time', 'location', 'full_name', 'order', 'transaction_total', 'payment_type', 'card_number']  # Headers for the orders csv files

absolute_path_for_raw_data(raw_data_file)  # returns csv variable with the absolute path
 
sanitise_these_columns = ['full_name', 'card_number']

df = sanitise_csv(csv,sanitise_these_columns)
df

Unnamed: 0,date_time,location,order,transaction_total,payment_type
0,01/01/2020 09:00,Leeds,"Regular Chai latte - 2.30, Regular Speciality ...",3.60,CASH
1,01/01/2020 09:01,Leeds,"Large Chai latte - 2.60, Regular Filter coffee...",4.10,CARD
2,01/01/2020 09:03,Leeds,Large Speciality Tea - English breakfast - 1.6...,2.90,CARD
3,01/01/2020 09:04,Leeds,"Large Chai latte - 2.60, Large Iced americano ...",7.60,CARD
4,01/01/2020 09:06,Leeds,Regular Hot Chocolate - 1.40,1.40,CARD
...,...,...,...,...,...
377,01/01/2020 16:51,Leeds,"Large Filter coffee - 1.80, Regular Iced ameri...",5.55,CARD
378,01/01/2020 16:53,Leeds,"Regular Chai latte - 2.30, Regular Iced americ...",4.45,CARD
379,01/01/2020 16:55,Leeds,Large Speciality Tea - English breakfast - 1.6...,6.65,CARD
380,01/01/2020 16:56,Leeds,"Regular Filter coffee - 1.50, Large Filter cof...",3.30,CARD


### SORTING THE DATE FOR POSTGRE'S [YYYY-MM-DD H:MM:SS] FORMAT
Does what it says on the tin

In [50]:

# SORTING THE DATE FOR POSTGRE'S YYYY-MM-DD H:MM:SS FORMAT

def sort_time_to_postgre_format():
    df['date_time'] = pd.to_datetime(df['date_time'], dayfirst=True)
    
    return df



In [51]:
# [TEST RUN] SORTING THE DATE FOR POSTGRE'S YYYY-MM-DD H:MM:SS FORMAT

# Current Files are Chesterfield or Leeds (comment one out)
# raw_data_file = "chesterfield_25-08-2021_09-00-00.csv"
raw_data_file = "leeds_01-01-2020_09-00-00.csv"

absolute_path_for_raw_data(raw_data_file)                                                           
columns = ['date_time', 'location', 'full_name', 'order', 'transaction_total', 'payment_type', 'card_number']  # Headers for the DF
# df = pd.read_csv(csv, header=None, names=columns)                                                   # Creates the DF

sort_time_to_postgre_format()
df 

Unnamed: 0,date_time,location,order,transaction_total,payment_type
0,2020-01-01 09:00:00,Leeds,"Regular Chai latte - 2.30, Regular Speciality ...",3.60,CASH
1,2020-01-01 09:01:00,Leeds,"Large Chai latte - 2.60, Regular Filter coffee...",4.10,CARD
2,2020-01-01 09:03:00,Leeds,Large Speciality Tea - English breakfast - 1.6...,2.90,CARD
3,2020-01-01 09:04:00,Leeds,"Large Chai latte - 2.60, Large Iced americano ...",7.60,CARD
4,2020-01-01 09:06:00,Leeds,Regular Hot Chocolate - 1.40,1.40,CARD
...,...,...,...,...,...
377,2020-01-01 16:51:00,Leeds,"Large Filter coffee - 1.80, Regular Iced ameri...",5.55,CARD
378,2020-01-01 16:53:00,Leeds,"Regular Chai latte - 2.30, Regular Iced americ...",4.45,CARD
379,2020-01-01 16:55:00,Leeds,Large Speciality Tea - English breakfast - 1.6...,6.65,CARD
380,2020-01-01 16:56:00,Leeds,"Regular Filter coffee - 1.50, Large Filter cof...",3.30,CARD


### NORMALISE LOCATION
Takes the data frame (df) and a dicionary named location_dict <br />
Returns: DF with location normalised to a number, and updated location_dict

In [52]:
# Normalise Location 

# 
# If the location is not incluided in the dictionary it will add it to it with the next available index

location_dict = {'Chesterfield':1}  #Functions fails if location is empty, I can fix it if needed... Or we just can leave Chesterfield as 1


# Read Locations and replaces them with their number from a dict file (a JSON later on) and adds them if they're not in the Dict

for location in df['location'].unique():
        if location not in location_dict:
            
            new_location = df.iloc[0].iat[1]
            next_num = max(location_dict.values()) + 1
            location_dict[new_location] = next_num
        
        df['location'] = df['location'].replace(location_dict)


print(location_dict)

df

{'Chesterfield': 1, 'Leeds': 2}


Unnamed: 0,date_time,location,order,transaction_total,payment_type
0,2020-01-01 09:00:00,2,"Regular Chai latte - 2.30, Regular Speciality ...",3.60,CASH
1,2020-01-01 09:01:00,2,"Large Chai latte - 2.60, Regular Filter coffee...",4.10,CARD
2,2020-01-01 09:03:00,2,Large Speciality Tea - English breakfast - 1.6...,2.90,CARD
3,2020-01-01 09:04:00,2,"Large Chai latte - 2.60, Large Iced americano ...",7.60,CARD
4,2020-01-01 09:06:00,2,Regular Hot Chocolate - 1.40,1.40,CARD
...,...,...,...,...,...
377,2020-01-01 16:51:00,2,"Large Filter coffee - 1.80, Regular Iced ameri...",5.55,CARD
378,2020-01-01 16:53:00,2,"Regular Chai latte - 2.30, Regular Iced americ...",4.45,CARD
379,2020-01-01 16:55:00,2,Large Speciality Tea - English breakfast - 1.6...,6.65,CARD
380,2020-01-01 16:56:00,2,"Regular Filter coffee - 1.50, Large Filter cof...",3.30,CARD


### NORMALISE PAYMENT TYPE
Takes the data frame (df) and a dicionary named location_dict <br />
Returns: DF with location normalised to a number, and updated location_dict

In [53]:
# Normalise Payment_type 

# If the location is not incluided in the dictionary it will add it to it with the next available index

payment_type_dict = {'CASH':1,'CARD':2} #Functions fails if the payment_typr is empty, I can fix it if needed... Or we just can leave CASH as 1


# Read payment_types and replaces them with their number from a dict file (a JSON later on) and adds them if they're not in the Dict


for pays_with in df['payment_type'].unique():
        if pays_with not in payment_type_dict:                
            new_payment_type = df.iloc[0].iat[1]   #Arregla esto para payment type
            next_num = max(payment_type_dict.values()) + 1
            payment_type_dict[new_payment_type] = next_num
            
        
        df['payment_type'] = df['payment_type'].replace(payment_type_dict)


print(payment_type_dict)

df

{'CASH': 1, 'CARD': 2}


Unnamed: 0,date_time,location,order,transaction_total,payment_type
0,2020-01-01 09:00:00,2,"Regular Chai latte - 2.30, Regular Speciality ...",3.60,1
1,2020-01-01 09:01:00,2,"Large Chai latte - 2.60, Regular Filter coffee...",4.10,2
2,2020-01-01 09:03:00,2,Large Speciality Tea - English breakfast - 1.6...,2.90,2
3,2020-01-01 09:04:00,2,"Large Chai latte - 2.60, Large Iced americano ...",7.60,2
4,2020-01-01 09:06:00,2,Regular Hot Chocolate - 1.40,1.40,2
...,...,...,...,...,...
377,2020-01-01 16:51:00,2,"Large Filter coffee - 1.80, Regular Iced ameri...",5.55,2
378,2020-01-01 16:53:00,2,"Regular Chai latte - 2.30, Regular Iced americ...",4.45,2
379,2020-01-01 16:55:00,2,Large Speciality Tea - English breakfast - 1.6...,6.65,2
380,2020-01-01 16:56:00,2,"Regular Filter coffee - 1.50, Large Filter cof...",3.30,2


### PRODUCTS PER ORDER NORMALISATION STEP 1 OF 2
Creates a JSON file with the "ordered products per order", and replaces order with the order_id index <br />
PLEASE NOTE THAT ONCE NORMALISED, THE [ORDERS] COLUMN, BECOMES [ORDER_ID]


In [54]:
#! PLEASE NOTE THAT ONCE NORMALISED, THE [ORDERS] COLUMN, BECOMES [ORDER_ID]

order_list = []

# loop through orders column
for i, order in enumerate(df['order']):
    
    # split string by comma
    order_split = order.split(', ')
    
    # create empty dictionary to store products
    order_dict = {}
    
    # loop through split string
    for item in order_split:
        
        # split each item by dash
        item_split = item.split(' - ')
        
        # extract product name and price
        product = item_split[0].strip()     #The logic in this is ... Name is the index 0 and the last one will be the price, that way it will join flavors into name
        price = item_split[-1]
        
        # add product and price to dictionary
        order_dict[product] = price
    
    # add dictionary to list
    order_list.append(order_dict)
    
    # set order_id column to the index of the order plus 1
    df.at[i, 'order_id'] = i + 1

# cast order_id column to integer type
df['order_id'] = df['order_id'].astype(int)

# drop the order column
df = df.drop(columns=['order'])

# convert list to json string
json_string = json.dumps(order_list)

# create a new dictionary to store orders linked to index
order_index_dict = {}

# loop through order_list and add to order_index_dict
for i, order_dict in enumerate(order_list):
    order_index_dict[i+1] = order_dict

# convert order_index_dict to json string
order_index_json = json.dumps(order_index_dict)

# print json string
print(order_index_json)

# print updated DataFrame
print(df)

{"1": {"Regular Chai latte": "2.30", "Regular Speciality Tea": "1.30"}, "2": {"Large Chai latte": "2.60", "Regular Filter coffee": "1.50"}, "3": {"Large Speciality Tea": "1.60", "Regular Speciality Tea": "1.30"}, "4": {"Large Chai latte": "2.60", "Large Iced americano": "2.50"}, "5": {"Regular Hot Chocolate": "1.40"}, "6": {"Regular Chai latte": "2.30", "Regular Filter coffee": "1.50"}, "7": {"Regular Iced americano": "2.15", "Regular Hot Chocolate": "1.40", "Large Chai latte": "2.60", "Regular Chai latte": "2.30"}, "8": {"Regular Filter coffee": "1.50", "Regular Hot Chocolate": "1.40", "Regular Chai latte": "2.30", "Large Hot Chocolate": "1.70"}, "9": {"Large Iced americano": "2.50", "Regular Chai latte": "2.30"}, "10": {"Regular Hot Chocolate": "1.40", "Regular Speciality Tea": "1.30", "Regular Chai latte": "2.30"}, "11": {"Large Speciality Tea": "1.60", "Regular Filter coffee": "1.50", "Large Chai latte": "2.60", "Regular Iced americano": "2.15"}, "12": {"Regular Chai latte": "2.30"

### PRODUCTS PER ORDER NORMALISATION STEP 2 OF 2
Normalises the JSON file "ordered products per order" to replace Product Name for Id on each row, <br />
updates to add product to the products Dictionary if the product is not found 

In [55]:
# Now, we need to normalise the products in the JSON file created.... 
# we define a JSON dictionary for the products_dict (this will be updated from a file later on)

products_dict = {'Regular Flavoured iced latte':1}

def update_product_ids(products_dict, order_index_json):
    order_dict = json.loads(order_index_json)
    new_order_dict = {}
    for order_id, order_items in order_dict.items():
        new_order_items = {}
        for item_name, item_price in order_items.items():
            if item_name in products_dict:
                new_order_items[products_dict[item_name]] = item_price
            else:
                new_order_items[max(products_dict.values())+1] = item_price
                products_dict[item_name] = max(products_dict.values())+1
        new_order_dict[order_id] = new_order_items
    return json.dumps(new_order_dict), products_dict


new_order_index_json, products_dict = update_product_ids(products_dict, order_index_json)
print(new_order_index_json)
print(products_dict)


{"1": {"2": "2.30", "3": "1.30"}, "2": {"4": "2.60", "5": "1.50"}, "3": {"6": "1.60", "3": "1.30"}, "4": {"4": "2.60", "7": "2.50"}, "5": {"8": "1.40"}, "6": {"2": "2.30", "5": "1.50"}, "7": {"9": "2.15", "8": "1.40", "4": "2.60", "2": "2.30"}, "8": {"5": "1.50", "8": "1.40", "2": "2.30", "10": "1.70"}, "9": {"7": "2.50", "2": "2.30"}, "10": {"8": "1.40", "3": "1.30", "2": "2.30"}, "11": {"6": "1.60", "5": "1.50", "4": "2.60", "9": "2.15"}, "12": {"2": "2.30"}, "13": {"8": "1.40", "11": "1.80", "5": "1.50", "9": "2.15"}, "14": {"6": "1.60", "7": "2.50"}, "15": {"11": "1.80", "5": "1.50", "7": "2.50"}, "16": {"7": "2.50", "8": "1.40", "4": "2.60", "11": "1.80", "10": "1.70"}, "17": {"11": "1.80", "10": "1.70", "2": "2.30"}, "18": {"11": "1.80", "10": "1.70"}, "19": {"7": "2.50", "9": "2.15"}, "20": {"11": "1.80", "4": "2.60"}, "21": {"7": "2.50"}, "22": {"4": "2.60", "11": "1.80"}, "23": {"5": "1.50", "6": "1.60"}, "24": {"5": "1.50", "6": "1.60"}, "25": {"5": "1.50", "4": "2.60", "2": 

## And with that... the normalisation is completed

Next steps

Edit the Functions so they read the JSON's from the clean bucket <br />
Save each JSON with their updated values to files in the clean bucket after each run <br />
Add each JSON to their respective tables (avoiding duplicates in all tables but the products_orders) <br />


In [56]:

df  # Normalised DataFrame

Unnamed: 0,date_time,location,transaction_total,payment_type,order_id
0,2020-01-01 09:00:00,2,3.60,1,1
1,2020-01-01 09:01:00,2,4.10,2,2
2,2020-01-01 09:03:00,2,2.90,2,3
3,2020-01-01 09:04:00,2,7.60,2,4
4,2020-01-01 09:06:00,2,1.40,2,5
...,...,...,...,...,...
377,2020-01-01 16:51:00,2,5.55,2,378
378,2020-01-01 16:53:00,2,4.45,2,379
379,2020-01-01 16:55:00,2,6.65,2,380
380,2020-01-01 16:56:00,2,3.30,2,381


In [57]:
# Returned JSON's and Normalised DataFrame

print(payment_type_dict) #Payment Type Dict JSON
print(location_dict)     #Location Dict JSON

# print(order_index_json) <-- This one it's the JSON BEFORE normalisation, not needed.
print(new_order_index_json) #Products per Order JSON

print(products_dict) # Products JSON

{'CASH': 1, 'CARD': 2}
{'Chesterfield': 1, 'Leeds': 2}
{"1": {"2": "2.30", "3": "1.30"}, "2": {"4": "2.60", "5": "1.50"}, "3": {"6": "1.60", "3": "1.30"}, "4": {"4": "2.60", "7": "2.50"}, "5": {"8": "1.40"}, "6": {"2": "2.30", "5": "1.50"}, "7": {"9": "2.15", "8": "1.40", "4": "2.60", "2": "2.30"}, "8": {"5": "1.50", "8": "1.40", "2": "2.30", "10": "1.70"}, "9": {"7": "2.50", "2": "2.30"}, "10": {"8": "1.40", "3": "1.30", "2": "2.30"}, "11": {"6": "1.60", "5": "1.50", "4": "2.60", "9": "2.15"}, "12": {"2": "2.30"}, "13": {"8": "1.40", "11": "1.80", "5": "1.50", "9": "2.15"}, "14": {"6": "1.60", "7": "2.50"}, "15": {"11": "1.80", "5": "1.50", "7": "2.50"}, "16": {"7": "2.50", "8": "1.40", "4": "2.60", "11": "1.80", "10": "1.70"}, "17": {"11": "1.80", "10": "1.70", "2": "2.30"}, "18": {"11": "1.80", "10": "1.70"}, "19": {"7": "2.50", "9": "2.15"}, "20": {"11": "1.80", "4": "2.60"}, "21": {"7": "2.50"}, "22": {"4": "2.60", "11": "1.80"}, "23": {"5": "1.50", "6": "1.60"}, "24": {"5": "1.50