In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, when, to_date, lower, regexp_extract, to_timestamp, udf
from pyspark.sql.types import StringType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Data Cleaning for Voltmart") \
    .getOrCreate()

# Load the data from CSV into a Spark DataFrame
df = spark.read.csv('/content/data.csv', header=True, inferSchema=True)

print("Initial Data:")
df.show(5)

# DATA Cleaning operations
# 1. Remove orders placed between 12am and 5am and convert timestamp to date
df = df.withColumn("order_date", to_timestamp(col("order_date")))

# Apply the hour filter to exclude times between 0 and 5 AM
df = df.filter(~hour(col("order_date")).between(0, 5))

# 2. Adding new column time_of_day
def get_time_of_day(hour):
    if 5 <= hour < 12:
        return 'morning'
    elif 12 <= hour < 18:
        return 'afternoon'
    elif 18 <= hour < 24:
        return 'evening'
    else:
        return 'night'

time_of_day_udf = udf(get_time_of_day, StringType())
df = df.withColumn('time_of_day', time_of_day_udf(hour(col('order_date'))))

# 3. Remove rows containing "TV" in the product column
df = df.filter(~df.product.contains("TV"))

# 4. Ensure all product categories are in lowercase
df = df.withColumn("category", lower(col("category")))

# 5. Adding new column purchase_state
def extract_state_and_zip(address):
    parts = address.split(',')
    if len(parts) >= 3:
        return parts[-1].strip()
    else:
        return None

extract_state_and_zip_udf = udf(extract_state_and_zip, StringType())
df = df.withColumn('purchase_state', extract_state_and_zip_udf(col('purchase_address')))

# Step 6: Save the cleaned DataFrame as a Parquet file
df.write.parquet('/content/drive/FYP/cleaneddata1.parquet')

# Stop the Spark session
spark.stop()



Initial Data:
+-------------------+--------+--------------------+----------+------------+----------------+----------------+------------+----------+--------+-------+
|         order_date|order_id|             product|product_id|    category|purchase_address|quantity_ordered|  price_each|cost_price|turnover| margin|
+-------------------+--------+--------------------+----------+------------+----------------+----------------+------------+----------+--------+-------+
|2023-01-22 21:25:00|  141234|              iPhone|5.63801E12|   Vêtements|"""944 Walnut St|          Boston| MA 02215"""|         1|   700.0|  231.0|
|2023-01-28 14:15:00|  141235|Lightning Chargin...|5.56332E12|Alimentation| """185 Maple St|        Portland| OR 97035"""|         1|   14.95|  7.475|
|2023-01-17 13:33:00|  141236|    Wired Headphones|2.11397E12|   Vêtements| """538 Adams St|   San Francisco| CA 94016"""|         2|   11.99|  5.995|
|2023-01-05 20:33:00|  141237|    27in FHD Monitor|3.06916E12|      Sports|  """

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=e6c893d65ac9ddac2dfef6d4c23d706720e77796e282713e025ac3cb324b361a
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2
