In [9]:
import pandas as pd
import re
import apache_beam as beam

In [7]:
df = pd.read_csv('food_daily.csv')
df.head()

Unnamed: 0,Customer_id,date,time,order_id,items,amount,mode,restaurnt,Status,ratings,feedback
0,JXJY167254JK,11/10/2023,8.31.21,654S654,PiZza:Marga?ritA:WATERZOOI:Crispy Onion Rings,21,Wallet,Brussels Mussels,Delivered,2,Late delivery
1,JXJY167254JK,11/10/2023,9.31.21,2444454,Noodles:Pizza:BREAD,97,Card,Saint German,Delivered,3,Stale food
2,XVTR474839TP,11/10/2023,4.31.31,397T397,Fried Rice:salaD,46,Card,Brussels Mussels,Delivered,3,Complicated procedure
3,UFDF355524DM,11/10/2023,5.31.21,428K428,noo%dles:,71,Card,Gaspar's,Delivered,1,Food not good
4,FRBT691245BA,11/10/2023,6.31.21,437M437,Soup of the day:,29,Online,Sushi Masters,Delivered,1,Stale food


In [10]:
df.columns

Index(['Customer_id', 'date', 'time', 'order_id', 'items', 'amount', 'mode',
       'restaurnt', 'Status', 'ratings', 'feedback'],
      dtype='object')

In [11]:
df.tail()

Unnamed: 0,Customer_id,date,time,order_id,items,amount,mode,restaurnt,Status,ratings,feedback
886,OXJY167254JK,11/10/2023,8.31.21,564J564,salad:,63,Wallet,Demoloftas,Delivered,5,Cheap and best
887,SAMV824387MW,11/10/2023,8.31.21,149F149,Bread:,31,Cash,Saint German,Delivered,4,Worth
888,XVTR474839TP,11/10/2023,8.31.21,539I539,meatballs:,51,Online,Demoloftas,Delivered,1,High price
889,LJBO9511000BL,11/10/2023,8.31.21,215J215,Benedict:,39,Cash,Brussels Mussels,Delivered,1,Delivery boy didnt come at doorstep
890,SAMV824387MW,11/10/2023,8.31.21,363Z363,meatballs:,52,Cash,Telegrafas,Delivered,5,Worth


In [12]:
df.shape

(891, 11)

In [13]:
df.describe()

Unnamed: 0,amount,ratings
count,891.0,891.0
mean,60.91807,2.909091
std,31.96587,1.349838
min,12.0,1.0
25%,39.0,2.0
50%,59.0,3.0
75%,81.0,4.0
max,678.0,5.0


In [14]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 891 entries, 0 to 890
Data columns (total 11 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   Customer_id  891 non-null    object
 1   date         891 non-null    object
 2   time         891 non-null    object
 3   order_id     891 non-null    object
 4   items        891 non-null    object
 5   amount       891 non-null    int64 
 6   mode         891 non-null    object
 7   restaurnt    891 non-null    object
 8   Status       891 non-null    object
 9   ratings      891 non-null    int64 
 10  feedback     891 non-null    object
dtypes: int64(2), object(9)
memory usage: 76.7+ KB


In [15]:
def remove_last_column(item):
    if item.endswith(':'):
        return item[:-1]
    return item

def process_row(row):
    # Split the row into columns based on commas
    columns = row.split(',')
    if len(columns) > 4:
        columns[4] = remove_last_column(columns[4])
    return ','.join(columns)

def remove_special_characters(row):
    columns = row.split(',')
    ret = ''
    for col in columns:
        ret += re.sub(r'[^a-zA-Z0-9]', '', col) + ','
    return ret[:-1]

def print_row(row):
    print(row)


In [16]:
df['Status'].unique()

array(['Delivered', 'Not delivered', 'On Hold', 'Cancelled'], dtype=object)

In [18]:
input_file = 'food_daily.csv'
output_path = 'outputs/processed'

with beam.Pipeline() as p:
    cleaned_data = (
        p
        | 'Read Input file' >> beam.io.ReadFromText(input_file, skip_header_lines=1)
        | 'Process Items Column' >> beam.Map(process_row)
        | 'Convert to lowercase' >> beam.Map(lambda row: row.lower())
        | 'Remove Special Characters' >> beam.Map(remove_special_characters)
    )

    delivered_orders = (
        cleaned_data
        | 'Filter delivered data' >> beam.Filter(lambda row: row and row.split(',')[8].lower() == 'delivered')
        | 'WriteDelivered File' >> beam.io.WriteToText(output_path + '/delivered')
    )
    undelivered_orders = (
        cleaned_data
        | 'Filter undelivered data' >> beam.Filter(lambda row: row and row.split(',')[8].lower() != 'delivered')
        | 'WriteUndelivered File' >> beam.io.WriteToText(output_path + '/undelivered')
    )


