<a href="https://colab.research.google.com/github/RajuKGosala-45/PySpark-Practice-Journey/blob/main/BMWSales.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **PySpark in Google Colab JAVA_HOME**

In [1]:
# PySpark Setup (for Colab)
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install -q pyspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["JAVA_HOME"], "bin")



## **Create SparkSession and Read Data**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark=SparkSession.builder.appName('BMW dataSet').getOrCreate()
df=spark.read.csv('/content/BMW sales data (2010-2024).csv', header=True, inferSchema=True)
df.show(20)

+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region| Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|5 Series|2016|         Asia|   Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|                High|
|      i8|2013|North America|   Red|   Hybrid|   Automatic|          1.6|    121671|    79219|        3428|                 Low|
|5 Series|2022|North America|  Blue|   Petrol|   Automatic|          4.5|     10991|   113265|        6994|                 Low|
|      X3|2024|  Middle East|  Blue|   Petrol|   Automatic|          1.7|     27255|    60971|        4047|                 Low|
|7 Series|2020|South America| Black|   Diesel|      Manual|          2.1|    122131|    49898|   

## **Explore Dataset**

In [None]:
print("Schema Overview:")
df.printSchema()

print("\nSample Records:")
df.show(5)

print("\nSummary Statistics:")
df.describe().show()

Schema Overview:
root
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Fuel_Type: string (nullable = true)
 |-- Transmission: string (nullable = true)
 |-- Engine_Size_L: double (nullable = true)
 |-- Mileage_KM: integer (nullable = true)
 |-- Price_USD: integer (nullable = true)
 |-- Sales_Volume: integer (nullable = true)
 |-- Sales_Classification: string (nullable = true)


Sample Records:
+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region|Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|5 Series|2016|         Asia|  Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|         

## **2.Data Cleaning in PySpark(BMW Sales Dataset)**

In [None]:
#from pyspark.sql import SparkSession
#from pyspark.sql.functions import *
#from pyspark.sql.types import *

#spark=SparkSession.builder.appName('BWM Dataset').getOrCreate()
# The data is already loaded into the df DataFrame from the previous cell.
# df.spark.read.csv('/content/BMW sales data (2010-2024) (1).csv', header=True, inferSchema=True)

# Create for NULLs or Missing Values
print("Count of NULLs in each coulmn:")
df.select([column(c).isNull().alias(c) for c in df.columns]).show()

# Removing Missing values(if any)
df_clean=df.dropna()

# Remove Duplicate Records
df_clean=df_clean.dropDuplicates()

# Standardize Tecxt Columns (e.g., Region names to UPPERCASE)
df_clean=df.withColumn("Region",upper(col("Region")))

# Validate Cleaned data

print("Cleaned dataset preview:")
df_clean.show(5)

print("Rows Before:", df.count(), "| After Cleaning:", df_clean.count())

Count of NULLs in each coulmn:
+-----+-----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|Model| Year|Region|Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+-----+-----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|false|false| false|false|    false|       false|        false|     false|    false|       false|               false|
|false|false| false|false|    false|       false|        false|     false|    false|       false|               false|
|false|false| false|false|    false|       false|        false|     false|    false|       false|               false|
|false|false| false|false|    false|       false|        false|     false|    false|       false|               false|
|false|false| false|false|    false|       false|        false|     false|    false|       false|               false|
|false|false| fal

## **3.Filtering and Sorting BMW Sales Data Using PySpark**

In [None]:
#from pyspark.sql import SparkSession
#from pyspark.sql.functions import *
#from pyspark.sql.types import *

# spark=SparkSession.builder.appName('BMW Sales').getOrCreate()
# df=spark.read.csv('/content/BMW sales data (2010-2024) (1).csv', header=True, inferSchema=True)
#df.show()
high_Sales=df.filter(col("Sales_Volume") > 7000)
high_Sales.select("Model","Year","Region","Sales_Volume").show(10)

+--------+----+-------------+------------+
|   Model|Year|       Region|Sales_Volume|
+--------+----+-------------+------------+
|5 Series|2016|         Asia|        8300|
|      i8|2022|       Europe|        7949|
|      X3|2016|South America|        8944|
|      i8|2016|North America|        8252|
|7 Series|2020|North America|        8111|
|      X1|2017|         Asia|        7291|
|      M3|2014|North America|        7765|
|      M5|2017|North America|        9755|
|      X5|2012|       Africa|        7104|
|      X3|2015|North America|        8635|
+--------+----+-------------+------------+
only showing top 10 rows



Sort By Highest Revenue

In [None]:
top_revenue=df.orderBy(desc("Price_USD"))
print("Top 5 BMW cars by price:")
top_revenue.select("Model","Year","Region","Price_USD").show(5)

Top 5 BMW cars by price:
+--------+----+-----------+---------+
|   Model|Year|     Region|Price_USD|
+--------+----+-----------+---------+
|      i8|2010|Middle East|   119998|
|      i8|2024|     Africa|   119997|
|      X6|2019|       Asia|   119997|
|      X1|2016|     Africa|   119996|
|3 Series|2019|Middle East|   119994|
+--------+----+-----------+---------+
only showing top 5 rows



Filter By speciic Criteria

In [None]:
recent_petrol=df.filter((col("Fuel_Type") == "Petrol") & (col("Year") > 2018))
recent_petrol.select("Model","Year","Fuel_Type","Region","Sales_Volume").show(5)

+--------+----+---------+-------------+------------+
|   Model|Year|Fuel_Type|       Region|Sales_Volume|
+--------+----+---------+-------------+------------+
|5 Series|2022|   Petrol|North America|        6994|
|      X3|2024|   Petrol|  Middle East|        4047|
|      M5|2021|   Petrol|South America|        4561|
|      M3|2020|   Petrol|       Africa|        3929|
|      i3|2024|   Petrol|  Middle East|         173|
+--------+----+---------+-------------+------------+
only showing top 5 rows



## **4.Aggregation and Grouping - BMW Sales Insights**

### Total Sales By **Region**

In [None]:
#from pyspark.sql import SparkSession
#from pysaprk.sql.functions import *

#spark=SparkSession.builder.appName("BMW Sales").getOrCreate()
#df=spark.read('/content/BMW sales data (2010-2024) (1).csv', header=True, inferSchema=True)

region_sale=df.groupBy("Region").agg(
    sum("Sales_Volume").alias("Total_Sales"),
    avg("Price_USD").alias("Average_Price"),
    count("*").alias("Record_Count")
)
print("Total BMW Sales by Region:")
region_sale.show()

Total BMW Sales by Region:
+-------------+-----------+-----------------+------------+
|       Region|Total_Sales|    Average_Price|Record_Count|
+-------------+-----------+-----------------+------------+
|       Europe|   42555138|74988.35685145188|        8334|
|       Africa|   41565252|74885.77159820672|        8253|
|North America|   42402629|75070.05470905818|        8335|
|South America|   41551818|74973.59883650467|        8251|
|  Middle East|   42326620|74726.78848680282|        8373|
|         Asia|   42974277|75554.92500591437|        8454|
+-------------+-----------+-----------------+------------+



### Top Performing models by Total Revenue

In [None]:
model_revenue=df.groupBy("Model").agg(
    sum("Price_USD").alias("Total_Revenue")
).orderBy("Total_Revenue", ascending=False)

print("Top Performing Models by Total Revenue:")
model_revenue.show(5)

Top Performing Models by Total Revenue:
+--------+-------------+
|   Model|Total_Revenue|
+--------+-------------+
|7 Series|    352610538|
|3 Series|    347226845|
|      i8|    347137044|
|5 Series|    345721780|
|      i3|    345427638|
+--------+-------------+
only showing top 5 rows



### **Yearly Trend**

In [None]:
Yearly_sales=df.groupBy("Year").agg(sum("Sales_Volume").alias("Total_Sales"))
Yearly_sales.orderBy("Year").show()

+----+-----------+
|Year|Total_Sales|
+----+-----------+
|2010|   16933445|
|2011|   16758941|
|2012|   16751895|
|2013|   16866733|
|2014|   16958960|
|2015|   17010207|
|2016|   16957550|
|2017|   16620811|
|2018|   16412273|
|2019|   17191956|
|2020|   16310843|
|2021|   16884666|
|2022|   17920946|
|2023|   16268654|
|2024|   17527854|
+----+-----------+



# **5.Windows Functions - Year over Year(YoY) Growth analysis**

In [None]:
#from pyspark.sql import SparkSession
#from pyspark.sql.functions import *
from pyspark.sql.window import Window
#from pyspark.sql.types import *
#spark=SparkSession.builder.appName("BMW_Sales").getOrCreate()
#df.spark.read("/content/BMW sales data (2010-2024) (1).csv", header=True, inferSchema=True)

       # Define a Window Spec (Partition by Rergion, Order by Year)
windowSpec=Window.partitionBy("Region").orderBy("Year")
       # Use LAG() to get Previous Years Sales for Comparision
df_growth=df.withColumn("Prev_Year_Sales", lag("Sales_Volume").over(windowSpec))

       # Calculate Year-over-Year Growth %
df_growth = df_growth.withColumn(
    "YoY_Growth_Percent",
    round(((col("Sales_Volume") - col("Prev_Year_Sales")) / col("Prev_Year_Sales")) *100, 2)
)

     #Display Growth Trends
print("Year-over-Year growth by Region:")
df_growth.select("Region","Year","Sales_Volume","Prev_year_Sales","YoY_Growth_Percent").show(15)

Year-over-Year growth by Region:
+------+----+------------+---------------+------------------+
|Region|Year|Sales_Volume|Prev_year_Sales|YoY_Growth_Percent|
+------+----+------------+---------------+------------------+
|Africa|2010|        5935|           NULL|              NULL|
|Africa|2010|        7607|           5935|             28.17|
|Africa|2010|         968|           7607|            -87.27|
|Africa|2010|        5236|            968|            440.91|
|Africa|2010|        1252|           5236|            -76.09|
|Africa|2010|        8358|           1252|            567.57|
|Africa|2010|        2849|           8358|            -65.91|
|Africa|2010|        9841|           2849|            245.42|
|Africa|2010|        7521|           9841|            -23.57|
|Africa|2010|        7103|           7521|             -5.56|
|Africa|2010|        5754|           7103|            -18.99|
|Africa|2010|        5113|           5754|            -11.14|
|Africa|2010|         184|           

# **6. Ranking BMW Models with PySpark Window Functions**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

spark=SparkSession.builder.appName("BMW_Sales").getOrCreate()
df=spark.read.csv("/content/BMW sales data (2010-2024) (1).csv", header=True, inferSchema=True)

windowSpec=Window.partitionBy("Year").orderBy("Sales_Volume")

df_ranked=df.withColumn("Rank",rank().over(windowSpec))\
            .withColumn("Dense_Rank",dense_rank().over(windowSpec))\
            .withColumn("Row_Number",row_number().over(windowSpec))

# Display Top 5 Models per Year
df_ranked.filter(df_ranked.Rank <=5).select("Year","Model","Sales_Volume","Rank").show(10)

+----+--------+------------+----+
|Year|   Model|Sales_Volume|Rank|
+----+--------+------------+----+
|2010|      X1|         100|   1|
|2010|      X5|         103|   2|
|2010|      i3|         105|   3|
|2010|      X3|         105|   3|
|2010|      X5|         110|   5|
|2011|3 Series|         105|   1|
|2011|      X6|         109|   2|
|2011|      X3|         111|   3|
|2011|5 Series|         116|   4|
|2011|      i8|         117|   5|
+----+--------+------------+----+
only showing top 10 rows



# **7. Cumulative Sales Trend (Running Totals) in PySpark**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *

spark=SparkSession.builder.appName("BMW_Sales").getOrCreate()
df=spark.read.csv('/content/BMW sales data (2010-2024) (1).csv', header=True, inferSchema=True)

windowSpec= Window.partitionBy("Region").orderBy("Year")\
                  .rowsBetween(Window.unboundedPreceding, 0)
# Calculate Cumulative Sales
df_cumulative=df.withColumn(
    "Cumulative_Sales",
    sum(col("Sales_Volume")).over(windowSpec)
)

# Display Cumulative Sales Trends by Region
df_cumulative.select("Region","Year","Sales_Volume","Cumulative_Sales").show(15)

+------+----+------------+----------------+
|Region|Year|Sales_Volume|Cumulative_Sales|
+------+----+------------+----------------+
|Africa|2010|        5935|            5935|
|Africa|2010|        7607|           13542|
|Africa|2010|         968|           14510|
|Africa|2010|        5236|           19746|
|Africa|2010|        1252|           20998|
|Africa|2010|        8358|           29356|
|Africa|2010|        2849|           32205|
|Africa|2010|        9841|           42046|
|Africa|2010|        7521|           49567|
|Africa|2010|        7103|           56670|
|Africa|2010|        5754|           62424|
|Africa|2010|        5113|           67537|
|Africa|2010|         184|           67721|
|Africa|2010|        1697|           69418|
|Africa|2010|        8248|           77666|
+------+----+------------+----------------+
only showing top 15 rows



# **8.Percentile and Ranking Distribution in PySpark**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *

spark=SparkSession.builder.appName("BMW_Sales").getOrCreate()
df=spark.read.csv("/content/BMW sales data (2010-2024) (1).csv", header=True, inferSchema=True)
# Define Window (Partition by Region or Year)
windowSpec=Window.partitionBy("Year").orderBy(desc("Sales_Volume"))

# Calculate Percentile & Cimilative Distribution
df_percentile=df.withColumn("Percent_Rank", percent_rank().over(windowSpec))\
                .withColumn("Come_Dist", cume_dist().over(windowSpec))
df_percentile.select("Model","Year","Region","Sales_volume","Percent_Rank","Come_Dist").show(10)


+--------+----+-------------+------------+--------------------+--------------------+
|   Model|Year|       Region|Sales_volume|        Percent_Rank|           Come_Dist|
+--------+----+-------------+------------+--------------------+--------------------+
|      i8|2010|  Middle East|        9998|                 0.0|3.003003003003003E-4|
|      M3|2010|South America|        9996|3.003905076599579...|9.009009009009009E-4|
|      X6|2010|       Africa|        9996|3.003905076599579...|9.009009009009009E-4|
|      i3|2010|       Africa|        9992|9.011715229798738E-4|0.001201201201201...|
|5 Series|2010|       Europe|        9990|0.001201562030639...|0.001501501501501...|
|5 Series|2010|South America|        9988|0.001501952538299...|0.002102102102102102|
|      M3|2010|  Middle East|        9988|0.001501952538299...|0.002102102102102102|
|      X1|2010|       Europe|        9986|0.002102733553619...|0.002702702702702703|
|      i8|2010|South America|        9986|0.002102733553619...|0.

# **9. Joins in PySpark**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import*

spark=SparkSession.builder.appName('BMW_Sales').getOrCreate()
df=spark.read.csv('/content/BMW sales data (2010-2024).csv', header=True, inferSchema=True)

# Create Reference Dataframe for BMW Models

data=[
    ("X1","Compact SUV"),
    ("X3","Mid SUV"),
    ("X5","Luxury SUV"),
    ("3 Series","Sedan"),
    ("5 Series","Sedan"),
    ("7 Series","Luxury Sedan"),
    ("Z4","Sports Car")
]

columns = ["Model","Category"]
model_df= spark.createDataFrame(data,columns)

# Displayt Initial Dataframe

print("BMW Sales Data:")
df.show(5)

print("BMW Model info Data:")
model_df.show()

BMW Sales Data:
+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region|Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|5 Series|2016|         Asia|  Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|                High|
|      i8|2013|North America|  Red|   Hybrid|   Automatic|          1.6|    121671|    79219|        3428|                 Low|
|5 Series|2022|North America| Blue|   Petrol|   Automatic|          4.5|     10991|   113265|        6994|                 Low|
|      X3|2024|  Middle East| Blue|   Petrol|   Automatic|          1.7|     27255|    60971|        4047|                 Low|
|7 Series|2020|South America|Black|   Diesel|      Manual|          2.1|    122131|    4

Perform Different Joins

In [None]:
# INNER JOIN - Only matching records

inner_join=df.join(model_df, on="Model", how="inner")
print("Inner Join Model to Sales Data:")
inner_join.select("Model","Region","Sales_Volume","Category").show(5)

Inner Join Model to Sales Data:
+-----+-------------+------------+-----------+
|Model|       Region|Sales_Volume|   Category|
+-----+-------------+------------+-----------+
|   X1|North America|        1764|Compact SUV|
|   X1|         Asia|        8973|Compact SUV|
|   X1|  Middle East|        8545|Compact SUV|
|   X1|South America|        8713|Compact SUV|
|   X1|       Europe|        1406|Compact SUV|
+-----+-------------+------------+-----------+
only showing top 5 rows



In [None]:
# LEFT JOIN - All from Sales, add Catogory Where match

Left_join=sales_df.join(model_df, on="Model",how="left")
print("Left Join Result:")
Left_join.select("Model","Region","Sales_Volume","Category").show(5)


Left Join Result:
+--------+-------------+------------+------------+
|   Model|       Region|Sales_Volume|    Category|
+--------+-------------+------------+------------+
|7 Series|South America|        3080|Luxury Sedan|
|      X3|  Middle East|        4047|     Mid SUV|
|5 Series|         Asia|        8300|       Sedan|
|5 Series|North America|        6994|       Sedan|
|5 Series|  Middle East|        1232|       Sedan|
+--------+-------------+------------+------------+
only showing top 5 rows



In [None]:
# RIGHT JOIN -All from Model Info, and Add Sales if match

Right_join=sales_df.join(model_df, on="Model", how="right")
print("Right Join Result")
Right_join.select("Model","Region","Sales_Volume","Category").show(5)

Right Join Result
+-----+-------------+------------+-----------+
|Model|       Region|Sales_Volume|   Category|
+-----+-------------+------------+-----------+
|   X1|North America|        1764|Compact SUV|
|   X1|         Asia|        8973|Compact SUV|
|   X1|  Middle East|        8545|Compact SUV|
|   X1|South America|        8713|Compact SUV|
|   X1|       Europe|        1406|Compact SUV|
+-----+-------------+------------+-----------+
only showing top 5 rows



In [None]:
# FULL JOIN - Combine all records

Full_join=sales_df.join(model_df, on="Model", how="outer")
print("Full Join Result")
Full_join.select("Model","Region","Sales_Volume","Category").show(5)

Full Join Result
+--------+-------------+------------+--------+
|   Model|       Region|Sales_Volume|Category|
+--------+-------------+------------+--------+
|3 Series|  Middle East|         500|   Sedan|
|3 Series|       Africa|        3293|   Sedan|
|3 Series|North America|         660|   Sedan|
|3 Series|       Europe|        4266|   Sedan|
|3 Series|South America|        1703|   Sedan|
+--------+-------------+------------+--------+
only showing top 5 rows



In [None]:
#Compare Row Counts (Quick Summary)

summary_df = spark.createDataFrame([
    ("INNER JOIN", inner_join.count()),
    ("LEFT JOIN", Left_join.count()),
    ("RIGHT JOIN", Right_join.count()),
    ("FULL OUTER JOIN", Full_join.count())
], ["Join_Type", "Row_Count"])

print("Join Type Comparison Summary:")
summary_df.show()

# Step 8: Save for Visualization (optional)
summary_df.coalesce(1).write.mode("overwrite").csv("output/Day9_Join_Summary", header=True)

# Stop the Spark Session
spark.stop()

Join Type Comparison Summary:
+---------------+---------+
|      Join_Type|Row_Count|
+---------------+---------+
|     INNER JOIN|    27407|
|      LEFT JOIN|    50000|
|     RIGHT JOIN|    27408|
|FULL OUTER JOIN|    50001|
+---------------+---------+



# **10. PySpark Aggregations — Group, Summarize, and Analyze Data**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark=SparkSession.builder.appName("BMW_Sales").getOrCreate()
df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)

Region_Summary=(
    df.groupBy("Region") # corrected from sales_df
    .agg(
        round(sum("Sales_Volume"), 2).alias("Total_Sales"), # Added comma
        round(avg("Price_USD"), 2).alias("Average_Price"), # Added comma
        count("Model").alias("Total_Transactions"), # Added comma
        max("Sales_Volume").alias("Max_Sales"), # Corrected column name from Sales_Amount to Sales_Volume
        min("Sales_Volume").alias("Min_Sales") # Corrected column name from Sales_Amount to Sales_Volume
    )
    .orderBy("Total_Sales", ascending=False)
)

print(" Regional Sales Summary :")
Region_Summary.show(truncate=False)

 Regional Sales Summary :
+-------------+-----------+-------------+------------------+---------+---------+
|Region       |Total_Sales|Average_Price|Total_Transactions|Max_Sales|Min_Sales|
+-------------+-----------+-------------+------------------+---------+---------+
|Asia         |42974277   |75554.93     |8454              |9998     |105      |
|Europe       |42555138   |74988.36     |8334              |9999     |100      |
|North America|42402629   |75070.05     |8335              |9999     |100      |
|Middle East  |42326620   |74726.79     |8373              |9998     |102      |
|Africa       |41565252   |74885.77     |8253              |9999     |101      |
|South America|41551818   |74973.6      |8251              |9999     |100      |
+-------------+-----------+-------------+------------------+---------+---------+



# **11. Multi-Level Aggregations – Region + Model Analysis**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark=SparkSession.builder.appName("BMW_Sales").getOrCreate()
df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)
sales_df=df
region_model_summary=(
    sales_df.groupBy("Region","Model")
    .agg(
        round(sum("Sales_Volume"),2).alias("Total_Sales"),
        round(avg("Sales_Volume"),2).alias("Average_Sales")
    )
    .orderBy("Region", "Total_Sales" ,ascending=[False,True])
)

print("Regional and Model-Wise Sales Summary:")
region_model_summary.show(15, truncate= False)


region_model_summary.coalesce(1).write.mode("overwrite").csv("output/Day11_Region_Model_Summary", header=True)

Regional and Model-Wise Sales Summary:
+-------------+--------+-----------+-------------+
|Region       |Model   |Total_Sales|Average_Sales|
+-------------+--------+-----------+-------------+
|South America|X5      |3609906    |5020.73      |
|South America|X1      |3630361    |4879.52      |
|South America|M5      |3632944    |5189.92      |
|South America|M3      |3690509    |5125.71      |
|South America|3 Series|3697895    |4771.48      |
|South America|X3      |3699923    |5040.77      |
|South America|i3      |3760040    |5060.62      |
|South America|5 Series|3885018    |4993.6       |
|South America|i8      |3920908    |4925.76      |
|South America|7 Series|4000510    |5128.86      |
|South America|X6      |4023804    |5280.58      |
|North America|M5      |3602546    |4921.51      |
|North America|5 Series|3691777    |4982.16      |
|North America|X6      |3702034    |5023.11      |
|North America|X5      |3744219    |4985.64      |
+-------------+--------+-----------+-------

# **12. Handling Missing Data & Data Cleaning in PySpark**

### **12.1 Detect Missing Values**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark=SparkSession.builder.appName("BMWSales").getOrCreate()
df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)
# Detect Missing Values

null_Summary=df.select([
    sum(col(c).isNull().cast("int")).alias(c) for c in df.columns
])
null_Summary.show()

+-----+----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|Model|Year|Region|Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+-----+----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|    0|   0|     0|    0|        0|           0|            0|         0|        0|           0|                   0|
+-----+----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+



### **12.2 Replacing Missing Values with Default Data**

In [None]:
# Fill Missing numeric and text fields:
df_filled=df.na.fill({
    "Sales_Volume":0,
    "Model":"Unknown",
    "Year":0
})
df_filled.describe().show()
df_filled.printSchema()

+-------+--------+-----------------+-------------+-----+---------+------------+------------------+------------------+------------------+------------------+--------------------+
|summary|   Model|             Year|       Region|Color|Fuel_Type|Transmission|     Engine_Size_L|        Mileage_KM|         Price_USD|      Sales_Volume|Sales_Classification|
+-------+--------+-----------------+-------------+-----+---------+------------+------------------+------------------+------------------+------------------+--------------------+
|  count|   50000|            50000|        50000|50000|    50000|       50000|             50000|             50000|             50000|             50000|               50000|
|   mean|    NULL|        2017.0157|         NULL| NULL|     NULL|        NULL| 3.247179999999999|      100307.20314|        75034.6009|        5067.51468|                NULL|
| stddev|    NULL|4.324459218093149|         NULL| NULL|     NULL|        NULL|1.0090783975411621|57941.50934352461

**12.3 Conditional Cleaning**

In [None]:
# Replacing nulls in one column based on another:
df_clean=df.withColumn("Region", when(col("Region").isNull(), "Not Specified").otherwise(col("Region"))
)
df_clean.describe().show()
df_clean.printSchema()

+-------+--------+-----------------+-------------+-----+---------+------------+------------------+------------------+------------------+------------------+--------------------+
|summary|   Model|             Year|       Region|Color|Fuel_Type|Transmission|     Engine_Size_L|        Mileage_KM|         Price_USD|      Sales_Volume|Sales_Classification|
+-------+--------+-----------------+-------------+-----+---------+------------+------------------+------------------+------------------+------------------+--------------------+
|  count|   50000|            50000|        50000|50000|    50000|       50000|             50000|             50000|             50000|             50000|               50000|
|   mean|    NULL|        2017.0157|         NULL| NULL|     NULL|        NULL| 3.247179999999999|      100307.20314|        75034.6009|        5067.51468|                NULL|
| stddev|    NULL|4.324459218093149|         NULL| NULL|     NULL|        NULL|1.0090783975411621|57941.50934352461

**12.4 Drop Rows/Columns with too many Nulls**

In [None]:
df_drop_rows=df.na.drop(thresh=3) # keep only rows >3 non-null vlues
df_drop_columns=df.dropna(how="all", subset=["Model","Region"])



**12.5 Repalce Inconsistent Values**

In [None]:
df_replaced=df.replace(["N/A", "-",""], None)

**Real-World Insight Example**

In [None]:
df_clean.groupBy("Region").agg({"Sales_Volume": "sum"}).orderBy("sum(Sales_Volume)", ascending=False).show()

+-------------+-----------------+
|       Region|sum(Sales_Volume)|
+-------------+-----------------+
|         Asia|         42974277|
|       Europe|         42555138|
|North America|         42402629|
|  Middle East|         42326620|
|       Africa|         41565252|
|South America|         41551818|
+-------------+-----------------+



# **13: Trend Analysis by Year & Region in PySpark**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark=SparkSession.builder.appName("BMW Sales").getOrCreate()
df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)

# Total Sales per Year
Yearly_sales=df.groupBy("Year").agg({"Sales_Volume": "sum"}).withColumnRenamed("sum(Sales_Volume)", "Total_Sales")
Yearly_sales.orderBy("Year").show()


+----+-----------+
|Year|Total_Sales|
+----+-----------+
|2010|   16933445|
|2011|   16758941|
|2012|   16751895|
|2013|   16866733|
|2014|   16958960|
|2015|   17010207|
|2016|   16957550|
|2017|   16620811|
|2018|   16412273|
|2019|   17191956|
|2020|   16310843|
|2021|   16884666|
|2022|   17920946|
|2023|   16268654|
|2024|   17527854|
+----+-----------+



In [None]:
# 2. Regional Sales by Year
region_trend= df.groupBy("Region","Year").agg({"Sales_Volume":"sum"}).withColumnRenamed("sum(Sales_Volume)", "Total_Sales")
region_trend.orderBy("Region","Year").show()

+------+----+-----------+
|Region|Year|Total_Sales|
+------+----+-----------+
|Africa|2010|    2855044|
|Africa|2011|    2760743|
|Africa|2012|    2670105|
|Africa|2013|    2848852|
|Africa|2014|    2743718|
|Africa|2015|    2801490|
|Africa|2016|    2842833|
|Africa|2017|    2512233|
|Africa|2018|    2771761|
|Africa|2019|    2750683|
|Africa|2020|    2750323|
|Africa|2021|    2867259|
|Africa|2022|    2902488|
|Africa|2023|    2682214|
|Africa|2024|    2805506|
|  Asia|2010|    2907671|
|  Asia|2011|    2790615|
|  Asia|2012|    2941414|
|  Asia|2013|    2887600|
|  Asia|2014|    3109421|
+------+----+-----------+
only showing top 20 rows



In [None]:
# 3. Rank Regions by Yearly Sales
from pyspark.sql.window import Window

windowSpec=Window.partitionBy("Year").orderBy(col("Total_Sales").desc())
region_rank= region_trend.withColumn("Rank", rank().over(windowSpec))
region_rank.show()

+-------------+----+-----------+----+
|       Region|Year|Total_Sales|Rank|
+-------------+----+-----------+----+
|         Asia|2010|    2907671|   1|
|South America|2010|    2896353|   2|
|North America|2010|    2876099|   3|
|       Africa|2010|    2855044|   4|
|       Europe|2010|    2775123|   5|
|  Middle East|2010|    2623155|   6|
|North America|2011|    2945731|   1|
|       Europe|2011|    2862580|   2|
|         Asia|2011|    2790615|   3|
|       Africa|2011|    2760743|   4|
|  Middle East|2011|    2727987|   5|
|South America|2011|    2671285|   6|
|  Middle East|2012|    3015084|   1|
|         Asia|2012|    2941414|   2|
|       Europe|2012|    2896866|   3|
|South America|2012|    2743446|   4|
|       Africa|2012|    2670105|   5|
|North America|2012|    2484980|   6|
|         Asia|2013|    2887600|   1|
|North America|2013|    2879917|   2|
+-------------+----+-----------+----+
only showing top 20 rows



In [None]:
# 4.Year-Over-Year (YoY) Growth

windowSpec= Window.partitionBy("Region").orderBy("Year")

trend_growth= region_trend.withColumn("Prev_Year_Sales", lag("Total_Sales").over(windowSpec))\
.withColumn("YoY_Growth_Percent", round(((col("Total_Sales") - col("Prev_Year_Sales")) / col("Prev_Year_Sales")) *100, 2))

trend_growth.orderBy("Region","Year").show()

+------+----+-----------+---------------+------------------+
|Region|Year|Total_Sales|Prev_Year_Sales|YoY_Growth_Percent|
+------+----+-----------+---------------+------------------+
|Africa|2010|    2855044|           NULL|              NULL|
|Africa|2011|    2760743|        2855044|              -3.3|
|Africa|2012|    2670105|        2760743|             -3.28|
|Africa|2013|    2848852|        2670105|              6.69|
|Africa|2014|    2743718|        2848852|             -3.69|
|Africa|2015|    2801490|        2743718|              2.11|
|Africa|2016|    2842833|        2801490|              1.48|
|Africa|2017|    2512233|        2842833|            -11.63|
|Africa|2018|    2771761|        2512233|             10.33|
|Africa|2019|    2750683|        2771761|             -0.76|
|Africa|2020|    2750323|        2750683|             -0.01|
|Africa|2021|    2867259|        2750323|              4.25|
|Africa|2022|    2902488|        2867259|              1.23|
|Africa|2023|    2682214

In [None]:
# 5. Identify Top-3 Growth Regions
top_growth = trend_growth.groupBy("Region") \
    .agg({"YoY_Growth_Percent": "avg"}) \
    .withColumnRenamed("avg(YoY_Growth_Percent)", "Avg_YoY_Growth") \
    .orderBy(col("Avg_YoY_Growth").desc())

top_growth.show(3)

+-----------+------------------+
|     Region|    Avg_YoY_Growth|
+-----------+------------------+
|Middle East|0.9657142857142856|
|     Europe|0.8471428571428572|
|       Asia|0.8035714285714283|
+-----------+------------------+
only showing top 3 rows



# **14. Creating Tables for PySpark Joins**

### df_region

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark=SparkSession.builder.appName("BMW_Sales").getOrCreate()
df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)
sales_df=df
# Create df_region Schema
region_schema= StructType([
    StructField("Region", StringType(),True),
    StructField("Manager",StringType(),True),
    StructField("Country",StringType(),True),
    StructField("Zone",StringType(),True
                )
])
# Sample Data
region_data=[
    ("North", "Michael", "Germany", "Europe"),
    ("South", "Sophia", "India", "Asia"),
    ("East", "Daniel", "Japan", "Asia"),
    ("West", "Emma", "USA", "North America"),
    ("Central", "Oliver", "UK", "Europe")
]

df_region=spark.createDataFrame(region_data, schema=region_schema)
df_region.show()

+-------+-------+-------+-------------+
| Region|Manager|Country|         Zone|
+-------+-------+-------+-------------+
|  North|Michael|Germany|       Europe|
|  South| Sophia|  India|         Asia|
|   East| Daniel|  Japan|         Asia|
|   West|   Emma|    USA|North America|
|Central| Oliver|     UK|       Europe|
+-------+-------+-------+-------------+



In [None]:
# Create df_model Schema

model_schema= StructType([
    StructField("Model", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("Launch_Year", IntegerType(), True),
    StructField("Fuel_Type", StringType(), True)
])

model_data=[
    ("BMW X1", "SUV", 2012, "Diesel"),
    ("BMW X3", "SUV", 2014, "Petrol"),
    ("BMW X5", "Luxury SUV", 2018, "Diesel"),
    ("BMW 3 Series", "Sedan", 2010, "Petrol"),
    ("BMW 5 Series", "Executive Sedan", 2016, "Diesel"),
    ("BMW i4", "Electric", 2021, "Electric"),
    ("BMW iX", "Electric SUV", 2022, "Electric")
]

df_model=spark.createDataFrame(model_data, schema=model_schema)
df_model.show()

+------------+---------------+-----------+---------+
|       Model|       Category|Launch_Year|Fuel_Type|
+------------+---------------+-----------+---------+
|      BMW X1|            SUV|       2012|   Diesel|
|      BMW X3|            SUV|       2014|   Petrol|
|      BMW X5|     Luxury SUV|       2018|   Diesel|
|BMW 3 Series|          Sedan|       2010|   Petrol|
|BMW 5 Series|Executive Sedan|       2016|   Diesel|
|      BMW i4|       Electric|       2021| Electric|
|      BMW iX|   Electric SUV|       2022| Electric|
+------------+---------------+-----------+---------+



# **15. JOINS Day — one of the most important skills for Data Analysts & PySpark users.**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark= SparkSession.builder.appName("BMW Sales Analysis").getOrCreate()
df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header= True, inferSchema=True)
sales_df=df

Sales_df=df
region_df=df_region
model_df=df_model

model_df = sales_df.select(
    "Model", "Engine_Size_L", "Fuel_Type", "Transmission"
).dropDuplicates(["Model"])

model_df.show()


+--------+-------------+---------+------------+
|   Model|Engine_Size_L|Fuel_Type|Transmission|
+--------+-------------+---------+------------+
|3 Series|          2.4| Electric|      Manual|
|5 Series|          3.5|   Petrol|      Manual|
|7 Series|          2.1|   Diesel|      Manual|
|      M3|          3.0|   Hybrid|      Manual|
|      M5|          1.6|   Diesel|   Automatic|
|      X1|          1.6| Electric|   Automatic|
|      X3|          1.7|   Petrol|   Automatic|
|      X5|          2.6| Electric|      Manual|
|      X6|          2.8|   Hybrid|   Automatic|
|      i3|          3.5|   Petrol|   Automatic|
|      i8|          1.6|   Hybrid|   Automatic|
+--------+-------------+---------+------------+



In [None]:
region_df = sales_df.select(
    "Region", "Sales_Classification"
).dropDuplicates(["Region"])


region_df.show()


+-------------+--------------------+
|       Region|Sales_Classification|
+-------------+--------------------+
|       Africa|                 Low|
|         Asia|                High|
|       Europe|                High|
|  Middle East|                 Low|
|North America|                 Low|
|South America|                 Low|
+-------------+--------------------+



In [None]:
final_join_df = sales_df \
    .join(model_df, on="Model", how="inner") \
    .join(region_df, on="Region", how="inner")

final_join_df.show(truncate=False)


+-------------+--------+----+------+---------+------------+-------------+----------+---------+------------+--------------------+-------------+---------+------------+--------------------+
|Region       |Model   |Year|Color |Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|Engine_Size_L|Fuel_Type|Transmission|Sales_Classification|
+-------------+--------+----+------+---------+------------+-------------+----------+---------+------------+--------------------+-------------+---------+------------+--------------------+
|Asia         |5 Series|2016|Red   |Petrol   |Manual      |3.5          |151748    |98740    |8300        |High                |3.5          |Petrol   |Manual      |High                |
|North America|i8      |2013|Red   |Hybrid   |Automatic   |1.6          |121671    |79219    |3428        |Low                 |1.6          |Hybrid   |Automatic   |Low                 |
|North America|5 Series|2022|Blue  |Petrol   |Automatic   |4.5   

# **16 — GroupBy & Aggregations in PySpark**

In [None]:
# Total Sales Volume By Model

Total_Sales_by_Model= final_join_df.groupBy("Model")\
                                   .agg(sum("Sales_Volume").alias("Total_Sales"))
Total_Sales_by_Model.orderBy("Total_Sales", ascending=False).show()

+--------+-----------+
|   Model|Total_Sales|
+--------+-----------+
|7 Series|   23786466|
|      i8|   23423891|
|      X1|   23406060|
|3 Series|   23281303|
|      i3|   23133849|
|5 Series|   23097519|
|      M5|   22779688|
|      X3|   22745529|
|      X5|   22709749|
|      X6|   22661986|
|      M3|   22349694|
+--------+-----------+



In [None]:
# Total Sales Volume By Region

Total_Sales_by_Region= final_join_df.groupBy("Region")\
                                    .agg(sum("Sales_Volume").alias("Total_Sales"))
Total_Sales_by_Region.orderBy("Total_Sales", ascending=False).show()

+-------------+-----------+
|       Region|Total_Sales|
+-------------+-----------+
|         Asia|   42974277|
|       Europe|   42555138|
|North America|   42402629|
|  Middle East|   42326620|
|       Africa|   41565252|
|South America|   41551818|
+-------------+-----------+



In [None]:
# Average price by Fuel Type
Average_Price_by_Fuel_type= sales_df.groupBy("Fuel_Type")\
                                        .agg(avg("Price_USD").alias("Average_Price"))
Average_Price_by_Fuel_type.orderBy("Average_Price", ascending=False).show()

+---------+-----------------+
|Fuel_Type|    Average_Price|
+---------+-----------------+
| Electric| 75276.3132066394|
|   Diesel|75079.80967136916|
|   Petrol|74990.41984063745|
|   Hybrid|74797.55174583202|
+---------+-----------------+



In [None]:
# Count of Models by Transmission Type
Model_Count_by_Transmission= sales_df.groupBy("Transmission")\
                                          .agg(count("Model").alias("Model_Count"))
Model_Count_by_Transmission.orderBy("Model_Count", ascending=False).show()

+------------+-----------+
|Transmission|Model_Count|
+------------+-----------+
|      Manual|      25154|
|   Automatic|      24846|
+------------+-----------+



In [None]:
# Maximum Mileage by Model
Max_Mileage_by_Model=sales_df.groupBy("Model")\
                            .agg(max("Mileage_KM").alias("Max_Mileage"))
Max_Mileage_by_Model.orderBy("Max_Mileage", ascending=False).show()

+--------+-----------+
|   Model|Max_Mileage|
+--------+-----------+
|      X3|     199996|
|      X5|     199995|
|7 Series|     199991|
|      i3|     199987|
|      M3|     199971|
|5 Series|     199956|
|      i8|     199955|
|      X1|     199954|
|      X6|     199935|
|3 Series|     199900|
|      M5|     199822|
+--------+-----------+



# **17. Window Functions (Real Business Case)**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

spark=SparkSession.builder.appName("BMW_Sales").getOrCreate()
df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)

windowSpec=Window.partitionBy("Model").orderBy("Year")

# Calculate Running Sales Trend per Month

Sales_trend = df.withColumn("Running_Sales", sum("Sales_Volume").over(windowSpec))

Sales_trend.select("Model","Year","Sales_Volume","Running_Sales").show(15)

+--------+----+------------+-------------+
|   Model|Year|Sales_Volume|Running_Sales|
+--------+----+------------+-------------+
|3 Series|2010|        8650|      1647769|
|3 Series|2010|        8248|      1647769|
|3 Series|2010|        6364|      1647769|
|3 Series|2010|        7885|      1647769|
|3 Series|2010|        6039|      1647769|
|3 Series|2010|        4167|      1647769|
|3 Series|2010|        9869|      1647769|
|3 Series|2010|        7751|      1647769|
|3 Series|2010|         280|      1647769|
|3 Series|2010|        9482|      1647769|
|3 Series|2010|        2746|      1647769|
|3 Series|2010|        2996|      1647769|
|3 Series|2010|        3734|      1647769|
|3 Series|2010|        7828|      1647769|
|3 Series|2010|         287|      1647769|
+--------+----+------------+-------------+
only showing top 15 rows



In [None]:
# Rank Models by Total Sales
Total_Sales= df.groupBy("Model")\
            .agg(sum("Sales_Volume").alias("Total_Sale"))

Window_Rank = Window.orderBy(Total_Sales["Total_Sale"].desc())

ranked_model= Total_Sales.withColumn("Rank", rank().over(Window_Rank))
ranked_model.show()

+--------+----------+----+
|   Model|Total_Sale|Rank|
+--------+----------+----+
|7 Series|  23786466|   1|
|      i8|  23423891|   2|
|      X1|  23406060|   3|
|3 Series|  23281303|   4|
|      i3|  23133849|   5|
|5 Series|  23097519|   6|
|      M5|  22779688|   7|
|      X3|  22745529|   8|
|      X5|  22709749|   9|
|      X6|  22661986|  10|
|      M3|  22349694|  11|
+--------+----------+----+



In [None]:
# Regional Market Share (% Contribution)

window_region = Window.partitionBy("Region")

market_share = final_join_df.withColumn(
    "Region_Total", sum("Sales_Volume").over(window_region)
).withColumn(
    "Market_Share_Percent",
    (final_join_df.Sales_Volume / sum("Sales_Volume").over(window_region)) * 100
)

market_share.select("Region", "Model", "Sales_Volume", "Market_Share_Percent").show()


+------+--------+------------+--------------------+
|Region|   Model|Sales_Volume|Market_Share_Percent|
+------+--------+------------+--------------------+
|Africa|5 Series|        4668| 0.01123053458210719|
|Africa|      X5|        7104|0.017091199158373923|
|Africa|      X1|        1113|0.002677717435708077|
|Africa|      M3|        3929|0.009452607192180622|
|Africa|      X5|        2913|0.007008257762998766|
|Africa|      X6|        7933|0.019085653564665024|
|Africa|      M3|        3153|0.007585663139970859|
|Africa|3 Series|        3293|0.007922482943204578|
|Africa|      i3|         455|0.001094664360509591|
|Africa|      X5|        2775|0.006676249671239813|
|Africa|      i3|        6301|0.015159297001254797|
|Africa|7 Series|        8472|0.020382409807114848|
|Africa|7 Series|        8663| 0.02084192825295514|
|Africa|      X3|        8293| 0.01995176163012316|
|Africa|3 Series|        4548|0.010941831893621143|
|Africa|      X6|        5935|0.014278753801372357|
|Africa|    

# **18-Aggregations and Grouping**

In [None]:
# Region- Level Summary
region_summary=final_join_df.groupBy("Region").agg(
    sum("Sales_Volume").alias("Total_Sales"),
    avg("Price_USD").alias("Average_Price"),
    count("Model").alias("Model_Count")
)
region_summary.show()

+-------------+-----------+-----------------+-----------+
|       Region|Total_Sales|    Average_Price|Model_Count|
+-------------+-----------+-----------------+-----------+
|       Europe|   42555138|74988.35685145188|       8334|
|       Africa|   41565252|74885.77159820672|       8253|
|North America|   42402629|75070.05470905818|       8335|
|South America|   41551818|74973.59883650467|       8251|
|  Middle East|   42326620|74726.78848680282|       8373|
|         Asia|   42974277|75554.92500591437|       8454|
+-------------+-----------+-----------------+-----------+



In [None]:
# Model-Level Summary
Model_Summary=final_join_df.groupBy("Model").agg(
    sum("Sales_Volume").alias("Total_Amount"),
    avg("Price_USD").alias("Average_Price"),
    min("Price_USD").alias("Minimum_Price"),
    max("Price_USD").alias("Maximum_Price"),
    count("Region").alias("Region_Count")
)
Model_Summary.show()


+--------+------------+-----------------+-------------+-------------+------------+
|   Model|Total_Amount|    Average_Price|Minimum_Price|Maximum_Price|Region_Count|
+--------+------------+-----------------+-------------+-------------+------------+
|      i3|    23133849|74800.26808142052|        30020|       119965|        4618|
|3 Series|    23281303| 75566.2339499456|        30008|       119994|        4595|
|      X6|    22661986|74434.60049129075|        30043|       119997|        4478|
|      X1|    23406060|75262.21903719913|        30039|       119996|        4570|
|7 Series|    23786466|75570.19674239177|        30021|       119978|        4666|
|      X3|    22745529|75016.61685568157|        30020|       119954|        4497|
|5 Series|    23097519|75287.84407665505|        30002|       119988|        4592|
|      M5|    22779688|74474.93099598035|        30005|       119946|        4478|
|      M3|    22349694|74841.58871515976|        30037|       119960|        4413|
|   

In [None]:
# 3. Yearly Sales Trend
Yearly_Trend= final_join_df.groupBy("Year").agg(
    sum("Sales_Volume").alias("Total_Sales"),
    avg("Price_USD").alias("Average_Price"),
    count("Model").alias("Model_Count")
).orderBy("Year")

Yearly_Trend.show()

+----+-----------+-----------------+-----------+
|Year|Total_Sales|    Average_Price|Model_Count|
+----+-----------+-----------------+-----------+
|2010|   16933445|75072.21591591592|       3330|
|2011|   16758941|75293.27638804149|       3278|
|2012|   16751895|75019.56722689075|       3332|
|2013|   16866733|74535.90829825617|       3326|
|2014|   16958960| 74556.5352238806|       3350|
|2015|   17010207| 74049.5527099464|       3358|
|2016|   16957550|75543.54086181278|       3365|
|2017|   16620811|75173.14741641337|       3290|
|2018|   16412273|  75455.480780964|       3278|
|2019|   17191956| 75194.4260023585|       3392|
|2020|   16310843|75057.66708268331|       3205|
|2021|   16884666|75399.27758007117|       3372|
|2022|   17920946|74967.87550316274|       3478|
|2023|   16268654|75194.00465983225|       3219|
|2024|   17527854| 75025.8529325941|       3427|
+----+-----------+-----------------+-----------+



In [None]:
# 4. Top- Performing Models by Region
window_region=Window.partitionBy("Region").orderBy(final_join_df["Sales_Volume"].desc())

Top_Models=final_join_df.withColumn("Rank", rank().over(window_region)).filter("Rank <= 3")

Top_Models.select("Region","Model","Sales_Volume","Rank").show()

+-------------+--------+------------+----+
|       Region|   Model|Sales_Volume|Rank|
+-------------+--------+------------+----+
|       Africa|      X3|        9999|   1|
|       Africa|      M3|        9998|   2|
|       Africa|      X1|        9996|   3|
|       Africa|      X6|        9996|   3|
|         Asia|      X5|        9998|   1|
|         Asia|      i8|        9997|   2|
|         Asia|      X5|        9994|   3|
|       Europe|7 Series|        9999|   1|
|       Europe|      M3|        9999|   1|
|       Europe|      i8|        9997|   3|
|  Middle East|      i8|        9998|   1|
|  Middle East|      X1|        9998|   1|
|  Middle East|      i8|        9996|   3|
|North America|3 Series|        9999|   1|
|North America|3 Series|        9992|   2|
|North America|3 Series|        9988|   3|
|South America|5 Series|        9999|   1|
|South America|      M5|        9998|   2|
|South America|      M3|        9996|   3|
|South America|      i3|        9996|   3|
+----------

In [None]:
from pyspark.sql.functions import*
from pyspark.sql.types import*
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("BMW_Sales").getOrCreate()
df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)

sales_df= df.withColumn("Model", trim(upper(col("Model"))))\
                  .withColumn("Region", trim(upper(col("Region"))))\
                  .withColumn("Color", trim(upper(col("Color"))))\
                  .withColumn("Fuel_Type", trim(upper(col("Fuel_Type"))))\
                  .withColumn("Transmission", trim(upper(col("Transmission"))))

+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region| Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|5 SERIES|2016|         ASIA|   Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|                High|
|      I8|2013|NORTH AMERICA|   Red|   Hybrid|   Automatic|          1.6|    121671|    79219|        3428|                 Low|
|5 SERIES|2022|NORTH AMERICA|  Blue|   Petrol|   Automatic|          4.5|     10991|   113265|        6994|                 Low|
|      X3|2024|  MIDDLE EAST|  Blue|   Petrol|   Automatic|          1.7|     27255|    60971|        4047|                 Low|
|7 SERIES|2020|SOUTH AMERICA| Black|   Diesel|      Manual|          2.1|    122131|    49898|   

# **19. Data Cleaning & Transformation in PySpark**

 **Step-1: Load Dataset**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import*
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark=SparkSession.builder.appName("BMW_Sales").getOrCreate()
df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)
sales_df=df
sales_df.show()

+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region| Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|5 Series|2016|         Asia|   Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|                High|
|      i8|2013|North America|   Red|   Hybrid|   Automatic|          1.6|    121671|    79219|        3428|                 Low|
|5 Series|2022|North America|  Blue|   Petrol|   Automatic|          4.5|     10991|   113265|        6994|                 Low|
|      X3|2024|  Middle East|  Blue|   Petrol|   Automatic|          1.7|     27255|    60971|        4047|                 Low|
|7 Series|2020|South America| Black|   Diesel|      Manual|          2.1|    122131|    49898|   

**Step-2 Handle Missing Values**

In [None]:
# Fill numeric columns with their mean value
numeric_cols = ["Sales_Volume", "Price_USD", "Mileage_KM", "Engine_Size_L"]

for column in numeric_cols:
    avg_val = sales_df.select(mean(col(column))).collect()[0][0]
    sales_df = sales_df.na.fill({column: avg_val})

# Fill text columns with "Unknown"
sales_df = sales_df.na.fill("Unknown").show()


+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region| Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|5 Series|2016|         Asia|   Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|                High|
|      i8|2013|North America|   Red|   Hybrid|   Automatic|          1.6|    121671|    79219|        3428|                 Low|
|5 Series|2022|North America|  Blue|   Petrol|   Automatic|          4.5|     10991|   113265|        6994|                 Low|
|      X3|2024|  Middle East|  Blue|   Petrol|   Automatic|          1.7|     27255|    60971|        4047|                 Low|
|7 Series|2020|South America| Black|   Diesel|      Manual|          2.1|    122131|    49898|   

**Step 3: Standardize String Columns**

In [None]:
from pyspark.sql.functions import trim, upper

sales_df = sales_df.withColumn("Model", trim(upper(col("Model")))) \
                   .withColumn("Region", trim(upper(col("Region"))))
sales_df.show()

+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region| Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|5 SERIES|2016|         ASIA|   Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|                High|
|      I8|2013|NORTH AMERICA|   Red|   Hybrid|   Automatic|          1.6|    121671|    79219|        3428|                 Low|
|5 SERIES|2022|NORTH AMERICA|  Blue|   Petrol|   Automatic|          4.5|     10991|   113265|        6994|                 Low|
|      X3|2024|  MIDDLE EAST|  Blue|   Petrol|   Automatic|          1.7|     27255|    60971|        4047|                 Low|
|7 SERIES|2020|SOUTH AMERICA| Black|   Diesel|      Manual|          2.1|    122131|    49898|   

**Step 4: Create New Derived Columns**

In [None]:
from pyspark.sql.functions import when, round

# Price category based on price range
sales_df = sales_df.withColumn(
    "Price_Category",
    when(col("Price_USD") < 40000, "Budget")
    .when((col("Price_USD") >= 40000) & (col("Price_USD") < 70000), "Mid-Range")
    .otherwise("Luxury")
)

# Mileage efficiency category
sales_df = sales_df.withColumn(
    "Mileage_Category",
    when(col("Mileage_KM") > 15, "High Efficiency").otherwise("Standard")
)
sales_df.show()

+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+--------------+----------------+
|   Model|Year|       Region| Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|Price_Category|Mileage_Category|
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+--------------+----------------+
|5 SERIES|2016|         ASIA|   Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|                High|        Luxury| High Efficiency|
|      I8|2013|NORTH AMERICA|   Red|   Hybrid|   Automatic|          1.6|    121671|    79219|        3428|                 Low|        Luxury| High Efficiency|
|5 SERIES|2022|NORTH AMERICA|  Blue|   Petrol|   Automatic|          4.5|     10991|   113265|        6994|                 Low|        Luxury| High Efficiency|
|      X3|2024|  MIDDLE EAST|  Blu

 **Step 5: Remove Duplicates**

In [None]:
before = sales_df.count()
sales_df = sales_df.dropDuplicates()
after = sales_df.count()

print(f" Removed {before - after} duplicate rows")
sales_df.show()


 Removed 0 duplicate rows
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+--------------+----------------+
|   Model|Year|       Region| Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|Price_Category|Mileage_Category|
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+--------------+----------------+
|5 SERIES|2019|SOUTH AMERICA|   Red|   Hybrid|      Manual|          4.2|    132625|    88464|        6353|                 Low|        Luxury| High Efficiency|
|      M5|2019|       EUROPE|   Red|   Hybrid|   Automatic|          2.1|       252|    66606|        1684|                 Low|     Mid-Range| High Efficiency|
|      X3|2019|       AFRICA| Black|   Hybrid|   Automatic|          2.5|    114886|    70265|        9879|                High|        Luxury| High Efficiency|
|      I

**Step 6: Final Clean Data Preview**

In [None]:
sales_df.show(10, truncate=False)
sales_df.printSchema()


+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+--------------+----------------+
|Model   |Year|Region       |Color |Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|Price_Category|Mileage_Category|
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+--------------+----------------+
|5 SERIES|2019|SOUTH AMERICA|Red   |Hybrid   |Manual      |4.2          |132625    |88464    |6353        |Low                 |Luxury        |High Efficiency |
|M5      |2019|EUROPE       |Red   |Hybrid   |Automatic   |2.1          |252       |66606    |1684        |Low                 |Mid-Range     |High Efficiency |
|X3      |2019|AFRICA       |Black |Hybrid   |Automatic   |2.5          |114886    |70265    |9879        |High                |Luxury        |High Efficiency |
|I8      |2024|MIDDLE EAST  |Black

# **20. Exploratory Data Analysis(EDA) in PySpark**

Step-1: Load the Clean Dataset

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark=SparkSession.builder.appName("BMW_Sales").getOrCreate()
sales_df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)
sales_df.show(5)

+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region|Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|5 Series|2016|         Asia|  Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|                High|
|      i8|2013|North America|  Red|   Hybrid|   Automatic|          1.6|    121671|    79219|        3428|                 Low|
|5 Series|2022|North America| Blue|   Petrol|   Automatic|          4.5|     10991|   113265|        6994|                 Low|
|      X3|2024|  Middle East| Blue|   Petrol|   Automatic|          1.7|     27255|    60971|        4047|                 Low|
|7 Series|2020|South America|Black|   Diesel|      Manual|          2.1|    122131|    49898|        308

Step-2 Basic Data Overview

In [None]:
print("Total Records:", sales_df.count())
print("Total_Columns:", len(sales_df.columns))
sales_df.describe().show()

Total Records: 50000
Total_Columns: 11
+-------+--------+-----------------+-------------+-----+---------+------------+------------------+------------------+------------------+------------------+--------------------+
|summary|   Model|             Year|       Region|Color|Fuel_Type|Transmission|     Engine_Size_L|        Mileage_KM|         Price_USD|      Sales_Volume|Sales_Classification|
+-------+--------+-----------------+-------------+-----+---------+------------+------------------+------------------+------------------+------------------+--------------------+
|  count|   50000|            50000|        50000|50000|    50000|       50000|             50000|             50000|             50000|             50000|               50000|
|   mean|    NULL|        2017.0157|         NULL| NULL|     NULL|        NULL| 3.247179999999999|      100307.20314|        75034.6009|        5067.51468|                NULL|
| stddev|    NULL|4.324459218093149|         NULL| NULL|     NULL|        NU

step-3: Unique Values and Counts

In [None]:
sales_df.select("Model").distinct().count(),
sales_df.select("Region").distinct().count()

6

Step -4: Most Popular BWM Model

In [None]:
Popular_Model=sales_df.groupBy("Model").agg(
    sum("Sales_Volume").alias("Total_Sales")
).orderBy(desc("Total_Sales"))

Popular_Model.show(10, truncate=False)

+--------+-----------+
|Model   |Total_Sales|
+--------+-----------+
|7 Series|23786466   |
|i8      |23423891   |
|X1      |23406060   |
|3 Series|23281303   |
|i3      |23133849   |
|5 Series|23097519   |
|M5      |22779688   |
|X3      |22745529   |
|X5      |22709749   |
|X6      |22661986   |
+--------+-----------+
only showing top 10 rows



Step-5: Region- Wise Sales Performance

In [None]:
Region_sales=sales_df.groupBy("Region").agg(
    sum("Sales_Volume").alias("Total_Sales")
).orderBy(desc("Total_Sales"))
Region_sales.show()

+-------------+-----------+
|       Region|Total_Sales|
+-------------+-----------+
|         Asia|   42974277|
|       Europe|   42555138|
|North America|   42402629|
|  Middle East|   42326620|
|       Africa|   41565252|
|South America|   41551818|
+-------------+-----------+



Step-6: Yearly Sales Trend

In [None]:
Yearly_Trend=sales_df.groupBy("Year").agg(
    sum("Sales_Volume").alias("Total_Sales")
).orderBy(desc("Total_Sales"))

Yearly_Trend.show()

+----+-----------+
|Year|Total_Sales|
+----+-----------+
|2022|   17920946|
|2024|   17527854|
|2019|   17191956|
|2015|   17010207|
|2014|   16958960|
|2016|   16957550|
|2010|   16933445|
|2021|   16884666|
|2013|   16866733|
|2011|   16758941|
|2012|   16751895|
|2017|   16620811|
|2018|   16412273|
|2020|   16310843|
|2023|   16268654|
+----+-----------+



Step- 7: Average Price & Millage by Model

In [None]:
from os import truncate
Price_Mileage_Stats =sales_df.groupBy("Model").agg(
    avg("Price_USD").alias("Average_Price"),
    avg("Mileage_KM").alias("Average_Mileage")
).orderBy(desc("Average_Price"))
Price_Mileage_Stats.show(10, truncate=False)

+--------+-----------------+------------------+
|Model   |Average_Price    |Average_Mileage   |
+--------+-----------------+------------------+
|7 Series|75570.19674239177|100792.16588084013|
|3 Series|75566.2339499456 |100159.89836779107|
|i8      |75366.27095093357|99447.38493269648 |
|5 Series|75287.84407665505|101356.63458188153|
|X1      |75262.21903719913|100375.68205689278|
|X3      |75016.61685568157|100716.86902379364|
|M3      |74841.58871515976|99729.7704509404  |
|i3      |74800.26808142052|98734.7661325249  |
|X5      |74708.11678181413|100259.7399153109 |
|M5      |74474.93099598035|102342.99575703438|
+--------+-----------------+------------------+
only showing top 10 rows



# **21. Correlation Analysis in PySpark**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark=SparkSession.builder.appName("BMW_Sales").getOrCreate()
sales_df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)

# Correlation between Price_USD and Engine_Size
corr_engine_price=sales_df.corr("Price_USD","Engine_Size_L")
print(f"Correlation between Price_USD and Engine_Size: {corr_engine_price}")

Correlation between Price_USD and Engine_Size: 0.00014617254604737103


In [None]:
# Correlation between Price_USD and Mileage
corr_price_mileage=sales_df.corr("Price_USD","Mileage_KM")
print(f"Correlation between Price_USD and Mileage: {corr_price_mileage}")

Correlation between Price_USD and Mileage: -0.004238194574623344


In [None]:
# Correlation between Engine_size and Mileage
corr_engine_mileage=sales_df.corr("Engine_Size_L","Mileage_KM")
print(f"Correlation between Engine_Size and Mileage: {corr_engine_mileage}")

Correlation between Engine_Size and Mileage: -0.004905652062614584


# **22. Data Standardization and Feature Scaling in PySpark**

In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install -q pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["JAVA_HOME"], "bin")


In [None]:
# Create SparkSession and Load dataset

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.feature import StandardScaler, MinMaxScaler, VectorAssembler


spark=SparkSession.builder.appName("BMW_Feature_Scaling").getOrCreate()
sales_df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)

# Select Numeric Features.
numeric_cols=["Price_USD","Mileage_KM","Engine_Size_L"]

# Asssemble features into a single vector
assembler=VectorAssembler(inputCols=numeric_cols, outputCol="Features")
assembler_df=assembler.transform(sales_df)

# Standard Scalling (Z-Score)
standard_Scaler=StandardScaler(inputCol="Features", outputCol="scaled_features", withStd=True, withMean=True)
standard_Scaler_df=standard_Scaler.fit(assembler_df).transform(assembler_df)

# MinMax Scalling
minmax_Scaler=MinMaxScaler(inputCol="Features", outputCol="minmax_scaled_features")
minmax_Scaler_df=minmax_Scaler.fit(assembler_df).transform(assembler_df)

# Show scaled results
print("✅ Standard Scaled Data:")
standard_Scaler_df.select("features", "scaled_features").show(5, truncate=False)

print("✅ MinMax Scaled Data:")
minmax_Scaler_df.select("features", "minmax_scaled_features").show(5, truncate=False)

✅ Standard Scaled Data:
+----------------------+------------------------------------------------------------+
|features              |scaled_features                                             |
+----------------------+------------------------------------------------------------+
|[98740.0,151748.0,3.5]|[0.9118075301089186,0.8878056067717602,0.25054544881353763] |
|[79219.0,121671.0,1.6]|[0.1609492669693736,0.368713157493662,-1.6323607799093853]  |
|[113265.0,10991.0,4.5]|[1.4704990045263098,-1.5414890663351741,1.2415487270887602] |
|[60971.0,27255.0,1.7] |[-0.5409441598924889,-1.260792201785552,-1.5332604520818631]|
|[49898.0,122131.0,2.1]|[-0.9668574608373269,0.3766521981781693,-1.1368591407717739]|
+----------------------+------------------------------------------------------------+
only showing top 5 rows

✅ MinMax Scaled Data:
+----------------------+-------------------------------------------------------------+
|features              |minmax_scaled_features                      

# **23. Handling Categorical Data with StringIndexer & OneHotEncoder in PySpark**

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import pipeline

spark=SparkSession.builder.appName("BMW Categorical Encoding").getOrCreate()
sales_df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)

# Define Categorical columns
categorical_cols= ["Fuel_Type","Transmission","Region"]

# 1. StringIndexer
indexers=[StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_cols]

# 2.OneHotEncoder
encoder=OneHotEncoder(
    inputCols=[col+"_index" for col in categorical_cols],
    outputCols=[col+"_Encoded" for col in categorical_cols]
)

# 3.Pipeline
pipeline=pipeline.Pipeline(stages=indexers+[encoder])
encoded_df=pipeline.fit(sales_df).transform(sales_df)

# Display Final Result
encoded_df.select("Fuel_Type","Fuel_Type_Encoded","Transmission","Transmission_Encoded","Region","Region_Encoded").show(5, truncate=False)

+---------+-----------------+------------+--------------------+-------------+--------------+
|Fuel_Type|Fuel_Type_Encoded|Transmission|Transmission_Encoded|Region       |Region_Encoded|
+---------+-----------------+------------+--------------------+-------------+--------------+
|Petrol   |(3,[1],[1.0])    |Manual      |(1,[0],[1.0])       |Asia         |(5,[0],[1.0]) |
|Hybrid   |(3,[0],[1.0])    |Automatic   |(1,[],[])           |North America|(5,[2],[1.0]) |
|Petrol   |(3,[1],[1.0])    |Automatic   |(1,[],[])           |North America|(5,[2],[1.0]) |
|Petrol   |(3,[1],[1.0])    |Automatic   |(1,[],[])           |Middle East  |(5,[1],[1.0]) |
|Diesel   |(3,[],[])        |Manual      |(1,[0],[1.0])       |South America|(5,[],[])     |
+---------+-----------------+------------+--------------------+-------------+--------------+
only showing top 5 rows



# **24.Data Binning (Bucketing) in PySpark**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark=SparkSession.builder.appName("BMW_Data_Binning").getOrCreate()
sales_df=spark.read.csv("/content/BMW sales data (2010-2024).csv",header=True, inferSchema=True)

# Create bins for Price_USD
sales_df=sales_df.withColumn("Price_USD", when(col("Price_USD") < 20000, "Low")
                    .when((col("Price_USD") >= 20000) & (col("Price_USD") < 50000), "Mid")
                    .when((col("Price_USD") <= 50000) & (col("Price_USD") >= 80000), "High")
                    .otherwise("Luxury")
)

# Create bin for Mileage_KM
sales_df = sales_df.withColumn("Mileage_KM", when(col("Mileage_KM") < 15000, "Low Mileage")
    .when((col("Mileage_KM") >= 15000) & (col("Mileage_KM") < 40000), "Moderate Mileage")
    .otherwise("High Mileage")
)
sales_df.select("Model", "Price_USD", "Mileage_KM").show(10, truncate=False)

+--------+---------+----------------+
|Model   |Price_USD|Mileage_KM      |
+--------+---------+----------------+
|5 Series|Luxury   |High Mileage    |
|i8      |Luxury   |High Mileage    |
|5 Series|Luxury   |Low Mileage     |
|X3      |Luxury   |Moderate Mileage|
|7 Series|Mid      |High Mileage    |
|5 Series|Mid      |High Mileage    |
|i8      |Luxury   |High Mileage    |
|M5      |Luxury   |High Mileage    |
|X3      |Luxury   |High Mileage    |
|i8      |Luxury   |Moderate Mileage|
+--------+---------+----------------+
only showing top 10 rows



# **25. Outlier Detection in PySpark**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark=SparkSession.builder.appName("BMW-Sales_Otlier_Detection").getOrCreate()
sales_df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)

# Select numeric column for outlier detection
col_name="Price_USD"

# Calculate Q1, Q3, and IQR
quantiles= sales_df.approxQuantile(col_name, [0.25, 0.75], 0.05)
Q1, Q3 = quantiles
IQR = Q3-Q1

# Define Bounds
lower_bound= Q1 - 1.5 * IQR
upper_bound= Q3 + 1.5 * IQR

print(f"Q1:{Q1}, Q3: {Q3}, IQR: {IQR}")
print(f"Lower Bound: {lower_bound}, Upper Bound: {upper_bound}")

#filter outliers
outliers_df = sales_df.filter((col(col_name) < lower_bound) | (col(col_name) > upper_bound))
non_outliers_df = sales_df.filter((col(col_name) >= lower_bound) & (col(col_name) <= upper_bound))

# Show outliers
print("Outliers in Price_USD:")
outliers_df.select("Model", "Price_USD").show(10, truncate=False)

Q1:53396.0, Q3: 94421.0, IQR: 41025.0
Lower Bound: -8141.5, Upper Bound: 155958.5
Outliers in Price_USD:
+-----+---------+
|Model|Price_USD|
+-----+---------+
+-----+---------+



# **26.Advanced Missing Data Handling in PySpark**

**Load Dataset and show**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import*
spark=SparkSession.builder.appName("BMW Sales Missing Data Handling").getOrCreate()
sales_df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)

# Check nulls in each colomn
sales_df.select([count(when(col(c).isNull(), c)).alias(c) for c in sales_df.columns]).show()

+-----+----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|Model|Year|Region|Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+-----+----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|    0|   0|     0|    0|        0|           0|            0|         0|        0|           0|                   0|
+-----+----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+



In [None]:
# Replace nulls with default values
sales_df=sales_df.fillna({'Fuel_Type': 'Unknown'})

# Replace numeric with Default values
avg_price=sales_df.select(mean(col("Price_USD"))).collect()[0][0]
sales_df=sales_df.fillna({'Price_USD': avg_price})

# Drop rows where 'Model' or 'Region' is NUll
sales_df = sales_df.dropna(subset=["Model", "Region"])

# Conditional Imputation
sales_df = sales_df.withColumn("Mileage_KM",
              when(col("Mileage_KM").isNull(), 0).otherwise(col("Mileage_KM")))

# 6️ Verify cleaned dataset
sales_df.select([count(when(col(c).isNull(), c)).alias(c) for c in sales_df.columns]).show()

sales_df.show(5)

+-----+----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|Model|Year|Region|Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+-----+----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|    0|   0|     0|    0|        0|           0|            0|         0|        0|           0|                   0|
+-----+----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+

+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region|Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|5 Series|2016|         A

# **27. Data Aggregations and Grouping in PySpark -2**

In [None]:
# Loading DataSet
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark=SparkSession.builder.appName("BMWSales_Aggregations").getOrCreate()
sales_df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True,inferSchema=True)

sales_df.show()

+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region| Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|5 Series|2016|         Asia|   Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|                High|
|      i8|2013|North America|   Red|   Hybrid|   Automatic|          1.6|    121671|    79219|        3428|                 Low|
|5 Series|2022|North America|  Blue|   Petrol|   Automatic|          4.5|     10991|   113265|        6994|                 Low|
|      X3|2024|  Middle East|  Blue|   Petrol|   Automatic|          1.7|     27255|    60971|        4047|                 Low|
|7 Series|2020|South America| Black|   Diesel|      Manual|          2.1|    122131|    49898|   

In [None]:
# 1.Total Sales Volume per Model

sales_df.groupby("Model").agg(
    sum("Sales_Volume").alias("Total_Sales")
).orderBy(desc("Total_Sales")).show()

+--------+-----------+
|   Model|Total_Sales|
+--------+-----------+
|7 Series|   23786466|
|      i8|   23423891|
|      X1|   23406060|
|3 Series|   23281303|
|      i3|   23133849|
|5 Series|   23097519|
|      M5|   22779688|
|      X3|   22745529|
|      X5|   22709749|
|      X6|   22661986|
|      M3|   22349694|
+--------+-----------+



In [None]:
# 2.Average Price per Fual type
sales_df.groupby("Fuel_Type").agg(
    round(avg("Price_USD"), 2).alias("Average_Price")

).orderBy(desc("Average_Price")).show()

+---------+-------------+
|Fuel_Type|Average_Price|
+---------+-------------+
| Electric|     75276.31|
|   Diesel|     75079.81|
|   Petrol|     74990.42|
|   Hybrid|     74797.55|
+---------+-------------+



In [None]:
# 3.Count of How many cars sold per region

sales_df.groupby("Region").agg(
    count("*").alias("Total_Records"))\
    .orderBy(col("Total_Records"))\
    .show()

+-------------+-------------+
|       Region|Total_Records|
+-------------+-------------+
|South America|         8251|
|       Africa|         8253|
|       Europe|         8334|
|North America|         8335|
|  Middle East|         8373|
|         Asia|         8454|
+-------------+-------------+



In [None]:
# 4.Min and Max Price per Year

sales_df.groupby("Year").agg(
    min("Price_USD").alias("Minimum_Price"),
    max("Price_USD").alias("Maximum_Price")
    ).orderBy("Year").show()

+----+-------------+-------------+
|Year|Minimum_Price|Maximum_Price|
+----+-------------+-------------+
|2010|        30075|       119998|
|2011|        30069|       119978|
|2012|        30055|       119973|
|2013|        30000|       119954|
|2014|        30061|       119988|
|2015|        30002|       119970|
|2016|        30021|       119996|
|2017|        30025|       119986|
|2018|        30005|       119954|
|2019|        30037|       119997|
|2020|        30010|       119960|
|2021|        30008|       119981|
|2022|        30001|       119960|
|2023|        30020|       119988|
|2024|        30054|       119997|
+----+-------------+-------------+



In [None]:
# 5. Multi-Column Grouping - Total Sales per Region & Fuel

sales_df.groupby("Region","Fuel_Type").agg(
    sum("Sales_Volume").alias("Total_Sales"), round(avg("Price_USD"), 2).alias("Avg_Price")
).orderBy(desc("Total_Sales")).show()

+-------------+---------+-----------+---------+
|       Region|Fuel_Type|Total_Sales|Avg_Price|
+-------------+---------+-----------+---------+
|         Asia|   Hybrid|   11422396| 75911.42|
|North America| Electric|   10861198| 75709.99|
|       Europe|   Hybrid|   10825662|  75105.7|
|  Middle East|   Petrol|   10812148| 74992.14|
|North America|   Hybrid|   10808682| 74510.79|
|       Europe|   Petrol|   10733279| 74569.23|
|  Middle East|   Hybrid|   10627320| 74442.66|
|         Asia| Electric|   10596850|  75257.2|
|       Europe| Electric|   10590064| 75158.83|
|       Africa|   Petrol|   10538463| 74650.21|
|South America|   Diesel|   10500121| 74613.35|
|         Asia|   Diesel|   10492033| 75851.33|
|  Middle East|   Diesel|   10491957| 74610.19|
|       Africa|   Hybrid|   10486034| 74072.18|
|North America|   Petrol|   10472845| 74970.21|
|         Asia|   Petrol|   10462998| 75173.94|
|       Europe|   Diesel|   10406133| 75113.03|
|  Middle East| Electric|   10395195| 74

In [None]:
# 6.Conditional Aggregation - Count electric car sales only
sales_df.groupby("Region")\
.agg(sum(when(col("Fuel_Type") == "Electric", col("Sales_Volume")).otherwise(0)).alias("Electric_Car_Sales"))\
.orderBy(col("Electric_Car_Sales").desc())\
.show()

+-------------+------------------+
|       Region|Electric_Car_Sales|
+-------------+------------------+
|North America|          10861198|
|         Asia|          10596850|
|       Europe|          10590064|
|  Middle East|          10395195|
|South America|          10385273|
|       Africa|          10329085|
+-------------+------------------+



In [None]:
# 7.Expression-based Aggregations - Profit Ratio Example
sales_df.withColumn("Profit", expr("Price_USD * 0.15")) \
    .groupBy("Model") \
    .agg(round(avg("Profit"), 2).alias("Avg_Profit")) \
    .orderBy(col("Avg_Profit").desc()) \
    .show()

+--------+----------+
|   Model|Avg_Profit|
+--------+----------+
|7 Series|  11335.53|
|3 Series|  11334.94|
|      i8|  11304.94|
|5 Series|  11293.18|
|      X1|  11289.33|
|      X3|  11252.49|
|      M3|  11226.24|
|      i3|  11220.04|
|      X5|  11206.22|
|      M5|  11171.24|
|      X6|  11165.19|
+--------+----------+



In [None]:
# 8.Aggregation with Aliases & Multiple Functions
sales_df.groupBy("Model") \
    .agg(
        count("*").alias("Records"),
        round(avg("Price_USD"), 2).alias("Avg_Price"),
        min("Mileage_KM").alias("Min_Mileage"),
        max("Mileage_KM").alias("Max_Mileage")
    ).orderBy(col("Avg_Price").desc()).show()

+--------+-------+---------+-----------+-----------+
|   Model|Records|Avg_Price|Min_Mileage|Max_Mileage|
+--------+-------+---------+-----------+-----------+
|7 Series|   4666|  75570.2|         23|     199991|
|3 Series|   4595| 75566.23|         36|     199900|
|      i8|   4606| 75366.27|         57|     199955|
|5 Series|   4592| 75287.84|         21|     199956|
|      X1|   4570| 75262.22|         43|     199954|
|      X3|   4497| 75016.62|         58|     199996|
|      M3|   4413| 74841.59|         69|     199971|
|      i3|   4618| 74800.27|          3|     199987|
|      X5|   4487| 74708.12|         29|     199995|
|      M5|   4478| 74474.93|         42|     199822|
|      X6|   4478|  74434.6|         55|     199935|
+--------+-------+---------+-----------+-----------+



# **28.Advanced Transformations & Complex Aggregations in PySpark**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import *
spark=SparkSession.builder.appName("BWMSales_Advanced_Transformations").getOrCreate()
sales_df=spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)

# 1.Create Derived Columns - Example : Revenue, Profit margin
sales_df = sales_df.withColumn("Revenue", col("Price_USD") * col("Sales_Volume"))\
                   .withColumn("Profit_Margin", round(col("Price_USD") * 0.18, 2))
#2.Multi-Level Aggregation — Average & Total Revenue by Model and Year
agg_df = sales_df.groupBy("Model", "Year") \
                 .agg(sum("Revenue").alias("Total_Revenue"),
                      round(avg("Price_USD"), 2).alias("Avg_Price"),
                      sum("Sales_Volume").alias("Total_Volume")) \
                 .orderBy(col("Total_Revenue").desc())

agg_df.show()

+--------+----+-------------+---------+------------+
|   Model|Year|Total_Revenue|Avg_Price|Total_Volume|
+--------+----+-------------+---------+------------+
|      X6|2024| 142416283875|  76078.5|     1836396|
|      X5|2022| 132767425394| 73899.47|     1796846|
|7 Series|2017| 132758608891| 75140.42|     1763339|
|5 Series|2016| 131081665600| 77096.18|     1677865|
|      i3|2012| 130936886917| 74164.23|     1752148|
|3 Series|2016| 129698953207| 75357.56|     1712098|
|      X6|2022| 129556338996| 76172.61|     1701699|
|5 Series|2024| 129229857243| 75527.85|     1711580|
|7 Series|2024| 128480227468| 75333.68|     1686209|
|      X1|2014| 127543477611| 77117.71|     1624913|
|      X1|2012| 127285905581| 75434.53|     1671189|
|7 Series|2022| 127227613709| 74480.64|     1739216|
|3 Series|2014| 126664028300| 75244.91|     1682545|
|      M3|2022| 126465321771| 74534.21|     1690554|
|      i3|2019| 126195179741| 75496.47|     1662586|
|      X3|2012| 126050677489| 76204.07|     16

In [None]:

# 3. Window Functions — Rank models by yearly revenue
windowSpec = Window.partitionBy("Year").orderBy(col("Total_Revenue").desc())

ranked_df = agg_df.withColumn("Revenue_Rank", rank().over(windowSpec)) \
                  .filter(col("Revenue_Rank") <= 3)

ranked_df.show()


+--------+----+-------------+---------+------------+------------+
|   Model|Year|Total_Revenue|Avg_Price|Total_Volume|Revenue_Rank|
+--------+----+-------------+---------+------------+------------+
|3 Series|2010| 123206927162| 75495.61|     1647769|           1|
|      i3|2010| 122138849923| 76159.23|     1619401|           2|
|      i8|2010| 120202406432| 75148.13|     1626565|           3|
|      X1|2011| 125185650431| 74963.36|     1694495|           1|
|7 Series|2011| 121581782347| 74527.92|     1638828|           2|
|      i8|2011| 120647382291| 77512.64|     1548054|           3|
|      i3|2012| 130936886917| 74164.23|     1752148|           1|
|      X1|2012| 127285905581| 75434.53|     1671189|           2|
|      X3|2012| 126050677489| 76204.07|     1643501|           3|
|      X1|2013| 124753447920| 73559.69|     1730904|           1|
|      i8|2013| 121922829104| 75932.69|     1613761|           2|
|      M5|2013| 117671842978| 72518.37|     1617984|           3|
|      X1|

In [None]:

# 4️.Conditional Aggregation — Fuel-type based profit analysis
sales_df.groupBy("Fuel_Type") \
        .agg(
            round(sum(when(col("Fuel_Type") == "Electric", col("Revenue")).otherwise(0)), 2).alias("Electric_Revenue"),
            round(sum(when(col("Fuel_Type") == "Petrol", col("Revenue")).otherwise(0)), 2).alias("Petrol_Revenue")
        ).show()

+---------+----------------+--------------+
|Fuel_Type|Electric_Revenue|Petrol_Revenue|
+---------+----------------+--------------+
|   Diesel|               0|             0|
|   Hybrid|               0|             0|
| Electric|   4750735161530|             0|
|   Petrol|               0| 4756136615908|
+---------+----------------+--------------+



In [None]:
# 5. Performance Insight — Avg Price vs Mileage Correlation-like view
sales_df.groupBy("Region") \
        .agg(
            round(avg("Price_USD"), 2).alias("Avg_Price"),
            round(avg("Mileage_KM"), 2).alias("Avg_Mileage")
        ).orderBy(col("Avg_Price").desc()) \
        .show()

+-------------+---------+-----------+
|       Region|Avg_Price|Avg_Mileage|
+-------------+---------+-----------+
|         Asia| 75554.93|    99971.9|
|North America| 75070.05|  100879.17|
|       Europe| 74988.36|  100648.99|
|South America|  74973.6|   100004.6|
|       Africa| 74885.77|  100426.94|
|  Middle East| 74726.79|   99916.36|
+-------------+---------+-----------+



# **Day 29 — Complex Joins & Data Enrichment in PySpark**

In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Initialize Spark session
spark = SparkSession.builder.appName("Day29_AdvancedJoins").getOrCreate()

# Load main dataset
sales_df = spark.read.csv("/content/BMW sales data (2010-2024).csv", header=True, inferSchema=True)

# Create supporting DataFrames (Region & Model reference tables)
region_df = spark.createDataFrame([
    ("North", "Cold"),
    ("South", "Hot"),
    ("East", "Humid"),
    ("West", "Dry")
], ["Region", "Climate_Type"])

model_df = spark.createDataFrame([
    ("X5", "SUV", "Luxury"),
    ("M3", "Sedan", "Sport"),
    ("i4", "Electric", "Premium"),
    ("X1", "SUV", "Compact"),
], ["Model", "Body_Type", "Segment"])

In [37]:
# Join Sales with Region info
joined_df=sales_df.join(region_df, on="Region", how="left")
joined_df.show()


+-------------+--------+----+------+---------+------------+-------------+----------+---------+------------+--------------------+------------+
|       Region|   Model|Year| Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|Climate_Type|
+-------------+--------+----+------+---------+------------+-------------+----------+---------+------------+--------------------+------------+
|       Europe|      i8|2022| White|   Diesel|      Manual|          1.8|    196741|    55064|        7949|                High|        NULL|
|       Europe|      i8|2019| White| Electric|      Manual|          3.0|     35700|    96257|        4411|                 Low|        NULL|
|       Africa|5 Series|2020| White| Electric|      Manual|          2.3|    163444|   119486|        4668|                 Low|        NULL|
|       Africa|      X5|2012|  Blue| Electric|      Manual|          3.8|    142243|    82677|        7104|                High|        NULL|
|North

In [38]:
# join model details
joined_df = joined_df.join(model_df, on="Model", how="left")
joined_df.show()

+--------+-------------+----+------+---------+------------+-------------+----------+---------+------------+--------------------+------------+---------+-------+
|   Model|       Region|Year| Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|Climate_Type|Body_Type|Segment|
+--------+-------------+----+------+---------+------------+-------------+----------+---------+------------+--------------------+------------+---------+-------+
|3 Series|  Middle East|2012| White| Electric|      Manual|          2.4|    101595|   117995|         500|                 Low|        NULL|     NULL|   NULL|
|      X1|         Asia|2017| Black| Electric|   Automatic|          1.6|    107918|    34078|        7291|                High|        NULL|      SUV|Compact|
|7 Series|North America|2020|Silver|   Diesel|   Automatic|          3.8|     27403|   100015|        8111|                High|        NULL|     NULL|   NULL|
|7 Series|South America|2020| Black|   D

In [39]:
# 3️. Derived metric — calculate revenue & classify sales
joined_df = joined_df.withColumn("Revenue", col("Sales_Volume") * col("Price_USD")) \
                     .withColumn("Sales_Category",
                                 when(col("Sales_Volume") > 800, "High Performer")
                                 .when(col("Sales_Volume").between(400, 800), "Mid Performer")
                                 .otherwise("Low Performer"))

In [40]:
# 4️.Aggregated View — Region, Climate, and Segment-based summary
summary_df = joined_df.groupBy("Region", "Climate_Type", "Segment") \
                      .agg(
                          round(avg("Price_USD"), 2).alias("Avg_Price"),
                          sum("Sales_Volume").alias("Total_Sales"),
                          round(sum("Revenue"), 2).alias("Total_Revenue")
                      ).orderBy(col("Total_Revenue").desc())

summary_df.show()

+-------------+------------+-------+---------+-----------+-------------+
|       Region|Climate_Type|Segment|Avg_Price|Total_Sales|Total_Revenue|
+-------------+------------+-------+---------+-----------+-------------+
|         Asia|        NULL|   NULL| 75666.88|   31097673|2355972817358|
|       Europe|        NULL|   NULL| 74994.72|   31367270|2352334777146|
|North America|        NULL|   NULL| 75159.91|   30878596|2324164478544|
|South America|        NULL|   NULL| 75018.57|   30621042|2295429925715|
|  Middle East|        NULL|   NULL| 74642.46|   30729784|2292861515660|
|       Africa|        NULL|   NULL| 74933.19|   30215866|2262335964281|
|         Asia|        NULL|Compact| 74646.86|    4192289| 310980489459|
|         Asia|        NULL|  Sport|  76571.9|    3935579| 304012053648|
|North America|        NULL|Compact|  76020.8|    3987029| 297675825616|
|  Middle East|        NULL| Luxury| 74099.91|    3946209| 295680139025|
|       Africa|        NULL| Luxury|  73532.0|    3

In [27]:
# 5️.Insight — Top Segments in Each Region
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

windowSpec = Window.partitionBy("Region").orderBy(col("Total_Revenue").desc())

top_segment_df = summary_df.withColumn("Rank", rank().over(windowSpec)).filter(col("Rank") <= 2)
top_segment_df.show()

+-------------+------------+-------+---------+-----------+-------------+----+
|       Region|Climate_Type|Segment|Avg_Price|Total_Sales|Total_Revenue|Rank|
+-------------+------------+-------+---------+-----------+-------------+----+
|       Africa|        NULL|   NULL| 74933.19|   30215866|2262335964281|   1|
|       Africa|        NULL| Luxury|  73532.0|    3972541| 294225469155|   2|
|         Asia|        NULL|   NULL| 75666.88|   31097673|2355972817358|   1|
|         Asia|        NULL|Compact| 74646.86|    4192289| 310980489459|   2|
|       Europe|        NULL|   NULL| 74994.72|   31367270|2352334777146|   1|
|       Europe|        NULL| Luxury| 76685.03|    3688138| 283787379840|   2|
|  Middle East|        NULL|   NULL| 74642.46|   30729784|2292861515660|   1|
|  Middle East|        NULL| Luxury| 74099.91|    3946209| 295680139025|   2|
|North America|        NULL|   NULL| 75159.91|   30878596|2324164478544|   1|
|North America|        NULL|Compact|  76020.8|    3987029| 29767