In [1]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("AutoSalesAnalysis") \
    .getOrCreate()


In [6]:
# Load the CSV file
data = spark.read.csv(r"C:\BigData\Auto Sales data.csv", header=True, inferSchema=True)

# Show the first few rows to verify the data is loaded correctly
data.show(5)


+-----------+---------------+---------+---------------+-------+----------+--------------------+-------+-----------+----+-----------+--------------------+----------------+--------------------+----------+----------+-------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES| ORDERDATE|DAYS_SINCE_LASTORDER| STATUS|PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|           PHONE|        ADDRESSLINE1|      CITY|POSTALCODE|COUNTRY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-------+----------+--------------------+-------+-----------+----+-----------+--------------------+----------------+--------------------+----------+----------+-------+---------------+----------------+--------+
|      10107|             30|     95.7|              2| 2871.0|24/02/2018|                 828|Shipped|Motorcycles|  95|   S10_1678|   Land of Toys Inc.|      2125557818|897 Long Airport ...|       NYC|     1

In [7]:
data.printSchema()


root
 |-- ORDERNUMBER: integer (nullable = true)
 |-- QUANTITYORDERED: integer (nullable = true)
 |-- PRICEEACH: double (nullable = true)
 |-- ORDERLINENUMBER: integer (nullable = true)
 |-- SALES: double (nullable = true)
 |-- ORDERDATE: string (nullable = true)
 |-- DAYS_SINCE_LASTORDER: integer (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- PRODUCTLINE: string (nullable = true)
 |-- MSRP: integer (nullable = true)
 |-- PRODUCTCODE: string (nullable = true)
 |-- CUSTOMERNAME: string (nullable = true)
 |-- PHONE: string (nullable = true)
 |-- ADDRESSLINE1: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- POSTALCODE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- CONTACTLASTNAME: string (nullable = true)
 |-- CONTACTFIRSTNAME: string (nullable = true)
 |-- DEALSIZE: string (nullable = true)



In [8]:
from pyspark.sql.functions import to_date

data = data.withColumn("ORDERDATE", to_date(data["ORDERDATE"], "dd/MM/yyyy"))
data.show(5)


+-----------+---------------+---------+---------------+-------+----------+--------------------+-------+-----------+----+-----------+--------------------+----------------+--------------------+----------+----------+-------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES| ORDERDATE|DAYS_SINCE_LASTORDER| STATUS|PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|           PHONE|        ADDRESSLINE1|      CITY|POSTALCODE|COUNTRY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-------+----------+--------------------+-------+-----------+----+-----------+--------------------+----------------+--------------------+----------+----------+-------+---------------+----------------+--------+
|      10107|             30|     95.7|              2| 2871.0|2018-02-24|                 828|Shipped|Motorcycles|  95|   S10_1678|   Land of Toys Inc.|      2125557818|897 Long Airport ...|       NYC|     1

In [9]:
data.select([data.columns[i] for i in range(len(data.columns))]).describe().show()


+-------+------------------+-----------------+------------------+------------------+-----------------+--------------------+---------+------------+------------------+-----------+-----------------+--------------------+--------------------+------------+------------------+---------+---------------+----------------+--------+
|summary|       ORDERNUMBER|  QUANTITYORDERED|         PRICEEACH|   ORDERLINENUMBER|            SALES|DAYS_SINCE_LASTORDER|   STATUS| PRODUCTLINE|              MSRP|PRODUCTCODE|     CUSTOMERNAME|               PHONE|        ADDRESSLINE1|        CITY|        POSTALCODE|  COUNTRY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-------+------------------+-----------------+------------------+------------------+-----------------+--------------------+---------+------------+------------------+-----------+-----------------+--------------------+--------------------+------------+------------------+---------+---------------+----------------+--------+
|  count|              2747|      

In [11]:
from pyspark.sql.functions import month, year, sum

# Extract month and year from ORDERDATE
data = data.withColumn("Month", month("ORDERDATE"))
data = data.withColumn("Year", year("ORDERDATE"))

# Calculate monthly sales totals
monthly_sales = data.groupBy("Year", "Month").agg(sum("SALES").alias("TotalSales"))
monthly_sales = monthly_sales.orderBy("Year", "Month")

# Display results
monthly_sales.show()

# Save for visualization
monthly_sales.write.csv(r"C:\BigData\monthly_sales.csv", header=True)


+----+-----+------------------+
|Year|Month|        TotalSales|
+----+-----+------------------+
|2018|    1|          129753.6|
|2018|    2|140836.19000000003|
|2018|    3|155809.31999999998|
|2018|    4|201609.55000000002|
|2018|    5|         192673.11|
|2018|    6|168082.55999999997|
|2018|    7|187731.87999999998|
|2018|    8|          197809.3|
|2018|    9|         263973.36|
|2018|   10| 448452.9500000002|
|2018|   11|1029837.6600000001|
|2018|   12|236444.58000000002|
|2019|    1|          292688.1|
|2019|    2| 311419.5299999999|
|2019|    3|205733.72999999992|
|2019|    4|206148.12000000008|
|2019|    5| 273438.3900000001|
|2019|    6|         286674.22|
|2019|    7| 327144.0899999998|
|2019|    8| 461501.2700000001|
+----+-----+------------------+
only showing top 20 rows



In [12]:
# Aggregate sales by PRODUCTLINE
product_line_sales = data.groupBy("PRODUCTLINE").agg(sum("SALES").alias("TotalSales"))

# Order by total sales in descending order
product_line_sales = product_line_sales.orderBy("TotalSales", ascending=False)

# Display results
product_line_sales.show()

# Save for visualization
product_line_sales.write.csv(r"C:\BigData/product_line_sales.csv", header=True)


+----------------+------------------+
|     PRODUCTLINE|        TotalSales|
+----------------+------------------+
|    Classic Cars|3842868.5399999963|
|    Vintage Cars|1806675.6799999995|
|Trucks and Buses|1111559.1899999997|
|     Motorcycles|1103512.1900000004|
|          Planes| 969323.4200000002|
|           Ships|         700039.22|
|          Trains|226243.46999999997|
+----------------+------------------+



In [13]:
# Aggregate sales by DEALSIZE
deal_size_sales = data.groupBy("DEALSIZE").agg(sum("SALES").alias("TotalSales"))

# Calculate percentage contribution
from pyspark.sql.functions import col
total_sales = deal_size_sales.select(sum("TotalSales")).collect()[0][0]
deal_size_sales = deal_size_sales.withColumn("Percentage", (col("TotalSales") / total_sales) * 100)

# Display results
deal_size_sales.show()

# Save for visualization
deal_size_sales.write.csv(r"C:\BigData/deal_size_sales.csv", header=True)


+--------+------------------+------------------+
|DEALSIZE|        TotalSales|        Percentage|
+--------+------------------+------------------+
|  Medium|5931231.4700000025| 60.76943379188876|
|   Small|2570033.8399999957|26.331715778206394|
|   Large| 1258956.400000001|12.898850429904845|
+--------+------------------+------------------+



In [16]:
# Group by CUSTOMERNAME and calculate metrics
from pyspark.sql.functions import count, sum, col, when
customer_segments = data.groupBy("CUSTOMERNAME").agg(
    sum("SALES").alias("TotalSales"),
    sum("QUANTITYORDERED").alias("TotalQuantity"),
    count("ORDERNUMBER").alias("OrderCount")
)

# Classify customers into segments based on TotalSales

customer_segments = customer_segments.withColumn(
    "Segment",
    when(col("TotalSales") > 10000, "High Value")
    .when(col("TotalSales") > 5000, "Medium Value")
    .otherwise("Low Value")
)

# Display results
customer_segments.show()

# Save for further use
customer_segments.write.csv(r"C:\BigData/customer_segments.csv", header=True)


+--------------------+------------------+-------------+----------+----------+
|        CUSTOMERNAME|        TotalSales|TotalQuantity|OrderCount|   Segment|
+--------------------+------------------+-------------+----------+----------+
| Suominen Souveniers|113961.14999999997|         1031|        30|High Value|
|  Amica Models & Co.| 94117.26000000002|          843|        26|High Value|
|Collectables For ...|          81577.98|          795|        24|High Value|
|         CAF Imports|          49642.05|          468|        13|High Value|
|   giftsbymail.co.uk| 78240.83999999998|          895|        26|High Value|
|       Rovelli Gifts|137955.72000000003|         1650|        48|High Value|
|     Lyon Souveniers| 78570.34000000001|          684|        20|High Value|
|   La Rochelle Gifts|          180124.9|         1832|        53|High Value|
| L'ordine Souveniers|142601.33000000002|         1280|        39|High Value|
|Signal Collectibl...| 50218.51000000001|          514|        1

In [18]:
# Aggregate sales by CITY and COUNTRY
geo_sales = data.groupBy("CITY", "COUNTRY").agg(sum("SALES").alias("TotalSales"))

# Order by sales in descending order
geo_sales = geo_sales.orderBy("TotalSales", ascending=False)

# Display results
geo_sales.show()

# Save for visualization
geo_sales.write.csv(r"C:\BigData/geo_sales.csv", header=True)


+-------------+---------+------------------+
|         CITY|  COUNTRY|        TotalSales|
+-------------+---------+------------------+
|       Madrid|    Spain|1082551.4400000002|
|   San Rafael|      USA|         654858.06|
|          NYC|      USA| 560787.7699999998|
|    Singapore|Singapore|288488.41000000003|
|        Paris|   France|         268944.68|
|  New Bedford|      USA|         207874.86|
|       Nantes|   France|         204304.86|
|    Melbourne|Australia|200995.40999999997|
|   Brickhaven|      USA|165255.20000000004|
|     San Jose|      USA|160010.26999999996|
|   Manchester|       UK|157807.80999999997|
|       Boston|      USA|154069.65999999997|
| North Sydney|Australia|153996.13000000003|
|    Chatswood|Australia|151570.98000000004|
| Philadelphia|      USA|151189.12999999998|
|     Salzburg|  Austria|         149798.63|
|    Kobenhavn|  Denmark|          145041.6|
|         Lyon|   France|142874.25000000003|
|Reggio Emilia|    Italy|142601.33000000002|
|    Cambr

In [20]:
# Aggregate sales and order counts by salesperson (CONTACTLASTNAME, CONTACTFIRSTNAME)
salesperson_performance = data.groupBy("CONTACTLASTNAME", "CONTACTFIRSTNAME").agg(
    sum("SALES").alias("TotalSales"),
    count("ORDERNUMBER").alias("DealsClosed")
)

# Order by total sales in descending order
salesperson_performance = salesperson_performance.orderBy("TotalSales", ascending=False)

# Display results
salesperson_performance.show()

# Save for further use
salesperson_performance.write.csv(r"C:\BigData/salesperson_performance.csv", header=True)


+---------------+----------------+------------------+-----------+
|CONTACTLASTNAME|CONTACTFIRSTNAME|        TotalSales|DealsClosed|
+---------------+----------------+------------------+-----------+
|         Freyre|           Diego| 912294.1100000002|        259|
|         Nelson|         Valarie|         654858.06|        180|
|       Ferguson|           Peter|200995.40999999997|         55|
|          Young|            Jeff|197736.93999999997|         48|
|        Labrune|          Janine|          180124.9|         53|
|      Natividad|            Eric|172989.68000000008|         43|
|             Yu|            Kwai|164069.44000000003|         49|
|          Frick|             Sue|160010.26999999996|         40|
|       Ashworth|        Victoria|157807.80999999997|         51|
|         O'Hara|            Anna|153996.13000000003|         46|
|         Huxley|          Adrian|151570.98000000004|         46|
|          Pipps|           Georg|         149798.63|         40|
|       Pe