In [None]:
pip install pyspark



In [69]:
from pyspark.sql import SparkSession

In [70]:
from pyspark.sql.functions import col, date_format, lower, when, substring, split, hour

In [71]:
spark = SparkSession.builder.getOrCreate()

In [72]:
spark

In [73]:
df = spark.read.format('csv') \
    .option('header', 'true') \
    .option('sep', ',') \
    .option('escape', '"') \
    .option('quote', '"') \
    .load('/content/sample_data/data.csv')  # sep, escape and quote to consider data with commas in it as a whole and not split data at comma

df.show(truncate=False, n=200)


+-----------------------+--------+--------------------------+-------------+------------+-------------------------------------------+----------------+----------+----------+--------+--------+
|order_date             |order_id|product                   |product_id   |category    |purchase_address                           |quantity_ordered|price_each|cost_price|turnover|margin  |
+-----------------------+--------+--------------------------+-------------+------------+-------------------------------------------+----------------+----------+----------+--------+--------+
|2023-01-22T21:25:00.000|141234  |iPhone                    |5638008983335|Vêtements   |"944 Walnut St, Boston, MA 02215"          |1               |700       |231       |700     |469     |
|2023-01-28T14:15:00.000|141235  |Lightning Charging Cable  |5563319511488|Alimentation|"185 Maple St, Portland, OR 97035"         |1               |14.95     |7.475     |14.95   |7.475   |
|2023-01-17T13:33:00.000|141236  |Wired Headphones

In [74]:
df = df.filter(~(date_format(col("order_date"), "H").cast("int").between(0, 5)))

In [75]:
df.show(truncate=False)

+-----------------------+--------+--------------------------+-------------+------------+-----------------------------------------+----------------+----------+----------+--------+--------+
|order_date             |order_id|product                   |product_id   |category    |purchase_address                         |quantity_ordered|price_each|cost_price|turnover|margin  |
+-----------------------+--------+--------------------------+-------------+------------+-----------------------------------------+----------------+----------+----------+--------+--------+
|2023-01-22T21:25:00.000|141234  |iPhone                    |5638008983335|Vêtements   |"944 Walnut St, Boston, MA 02215"        |1               |700       |231       |700     |469     |
|2023-01-28T14:15:00.000|141235  |Lightning Charging Cable  |5563319511488|Alimentation|"185 Maple St, Portland, OR 97035"       |1               |14.95     |7.475     |14.95   |7.475   |
|2023-01-17T13:33:00.000|141236  |Wired Headphones          

In [76]:
df = df.withColumn(
    "time_of_day",
    when((hour(col("order_date")) >= 5) & (hour(col("order_date")) < 12), "morning")
    .when((hour(col("order_date")) >= 12) & (hour(col("order_date")) < 18), "afternoon")
    .when((hour(col("order_date")) >= 18) & (hour(col("order_date")) < 24), "evening")
)

# Reorder columns to place 'time_of_day' at index 1
columns = df.columns
desired_index = 1
# Remove 'time_of_day' from columns list to avoid duplication
columns.remove('time_of_day')
# Insert 'time_of_day' at the desired index
new_columns = columns[:desired_index] + ['time_of_day'] + columns[desired_index:]

# Reorder the DataFrame columns
orders_data = df.select(new_columns)

# Show the updated DataFrame
df.show(truncate=False)

+-----------------------+--------+--------------------------+-------------+------------+-----------------------------------------+----------------+----------+----------+--------+--------+-----------+
|order_date             |order_id|product                   |product_id   |category    |purchase_address                         |quantity_ordered|price_each|cost_price|turnover|margin  |time_of_day|
+-----------------------+--------+--------------------------+-------------+------------+-----------------------------------------+----------------+----------+----------+--------+--------+-----------+
|2023-01-22T21:25:00.000|141234  |iPhone                    |5638008983335|Vêtements   |"944 Walnut St, Boston, MA 02215"        |1               |700       |231       |700     |469     |evening    |
|2023-01-28T14:15:00.000|141235  |Lightning Charging Cable  |5563319511488|Alimentation|"185 Maple St, Portland, OR 97035"       |1               |14.95     |7.475     |14.95   |7.475   |afternoon  |


In [77]:
df = df.filter(~lower(col("product")).contains("tv"))
df.show(truncate=False)

+-----------------------+--------+--------------------------+-------------+------------+-----------------------------------------+----------------+----------+----------+--------+--------+-----------+
|order_date             |order_id|product                   |product_id   |category    |purchase_address                         |quantity_ordered|price_each|cost_price|turnover|margin  |time_of_day|
+-----------------------+--------+--------------------------+-------------+------------+-----------------------------------------+----------------+----------+----------+--------+--------+-----------+
|2023-01-22T21:25:00.000|141234  |iPhone                    |5638008983335|Vêtements   |"944 Walnut St, Boston, MA 02215"        |1               |700       |231       |700     |469     |evening    |
|2023-01-28T14:15:00.000|141235  |Lightning Charging Cable  |5563319511488|Alimentation|"185 Maple St, Portland, OR 97035"       |1               |14.95     |7.475     |14.95   |7.475   |afternoon  |


In [78]:
df = df.withColumn("product", lower(col("product")))
df = df.withColumn("category", lower(col("category")))
df = df.withColumn("purchase_state", split(col("purchase_address"), ",")[2])
df = df.withColumn("purchase_state", split(col("purchase_state"), " ")[1])
df.show(truncate=False)

+-----------------------+--------+--------------------------+-------------+------------+-----------------------------------------+----------------+----------+----------+--------+--------+-----------+--------------+
|order_date             |order_id|product                   |product_id   |category    |purchase_address                         |quantity_ordered|price_each|cost_price|turnover|margin  |time_of_day|purchase_state|
+-----------------------+--------+--------------------------+-------------+------------+-----------------------------------------+----------------+----------+----------+--------+--------+-----------+--------------+
|2023-01-22T21:25:00.000|141234  |iphone                    |5638008983335|vêtements   |"944 Walnut St, Boston, MA 02215"        |1               |700       |231       |700     |469     |evening    |MA            |
|2023-01-28T14:15:00.000|141235  |lightning charging cable  |5563319511488|alimentation|"185 Maple St, Portland, OR 97035"       |1         

In [79]:
output_path = "path_to_save_cleaned_data/cleaned_data.csv"
df.write.csv(output_path, header=True)