Sales Analysis with PySpark

In [1]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, substring, isnan, when, count

# Create a Spark session to initialize PySpark
spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()


In [2]:

import os

# List all data files in the directory
files = [file for file in os.listdir('./Sales_Data/')]

# Read the first file to initialize the aggregated DataFrame
all_months_df = spark.read.csv(f"./Sales_Data/{files[0]}", header=True, inferSchema=True)

# Loop through the remaining files, read their content, and concatenate to the main DataFrame
for file in files[1:]:
    df = spark.read.csv(f"./Sales_Data/{file}", header=True, inferSchema=True)
    all_months_df = all_months_df.union(df)

# Display the aggregated data
all_months_df.show()


+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95|04/19/19 08:46|917 1st St, Dalla...|
|    NULL|                NULL|            NULL|      NULL|          NULL|                NULL|
|  176559|Bose SoundSport H...|               1|     99.99|04/07/19 22:30|682 Chestnut St, ...|
|  176560|        Google Phone|               1|     600.0|04/12/19 14:38|669 Spruce St, Lo...|
|  176560|    Wired Headphones|               1|     11.99|04/12/19 14:38|669 Spruce St, Lo...|
|  176561|    Wired Headphones|               1|     11.99|04/30/19 09:27|333 8th St, Los A...|
|  176562|USB-C Charging Cable|               1|     11.95|04/29/19 13:03|381 Wilson St, Sa...|
|  176563|Bose SoundSport H...|         

In [3]:

# Filter out rows with any NaN values
all_data = all_months_df.dropna()

# Filter out rows where the 'Order Date' column starts with the string 'Or' (likely headers)
all_data = all_data.filter(~col("Order Date").startswith("Or"))

# Display the cleaned data
all_data.show()


+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  176558|USB-C Charging Cable|               2|     11.95|04/19/19 08:46|917 1st St, Dalla...|
|  176559|Bose SoundSport H...|               1|     99.99|04/07/19 22:30|682 Chestnut St, ...|
|  176560|        Google Phone|               1|     600.0|04/12/19 14:38|669 Spruce St, Lo...|
|  176560|    Wired Headphones|               1|     11.99|04/12/19 14:38|669 Spruce St, Lo...|
|  176561|    Wired Headphones|               1|     11.99|04/30/19 09:27|333 8th St, Los A...|
|  176562|USB-C Charging Cable|               1|     11.95|04/29/19 13:03|381 Wilson St, Sa...|
|  176563|Bose SoundSport H...|               1|     99.99|04/02/19 07:46|668 Center St, Se...|
|  176564|USB-C Charging Cable|         

In [4]:

# Extract the month part from the 'Order Date' column and create a new 'Month' column
all_data = all_data.withColumn("Month", substring(col("Order Date"), 1, 2).cast("int"))

# Convert 'Quantity Ordered' column to integer type
all_data = all_data.withColumn("Quantity Ordered", col("Quantity Ordered").cast("int"))

# Convert 'Price Each' column to double type
all_data = all_data.withColumn("Price Each", col("Price Each").cast("double"))

# Create a new 'Sales' column by multiplying 'Quantity Ordered' and 'Price Each' columns
all_data = all_data.withColumn("Sales", col("Quantity Ordered") * col("Price Each"))

# Display the augmented data
all_data.show()


+--------+--------------------+----------------+----------+--------------+--------------------+-----+------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|Month| Sales|
+--------+--------------------+----------------+----------+--------------+--------------------+-----+------+
|  176558|USB-C Charging Cable|               2|     11.95|04/19/19 08:46|917 1st St, Dalla...|    4|  23.9|
|  176559|Bose SoundSport H...|               1|     99.99|04/07/19 22:30|682 Chestnut St, ...|    4| 99.99|
|  176560|        Google Phone|               1|     600.0|04/12/19 14:38|669 Spruce St, Lo...|    4| 600.0|
|  176560|    Wired Headphones|               1|     11.99|04/12/19 14:38|669 Spruce St, Lo...|    4| 11.99|
|  176561|    Wired Headphones|               1|     11.99|04/30/19 09:27|333 8th St, Los A...|    4| 11.99|
|  176562|USB-C Charging Cable|               1|     11.95|04/29/19 13:03|381 Wilson St, Sa...|    4| 11.95|
|  176563|Bose Soun

In [None]:

# Save the processed and augmented data to a CSV file, overwriting if the file already exists
all_data.write.csv('all_data_pyspark.csv', header=True, mode='overwrite')
