<a href="https://colab.research.google.com/github/RajuKGosala-45/microsoft-fabric-wwi-project/blob/main/BMWSales.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Create SparkSession and Read Data**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark=SparkSession.builder.appName('BMW dataSet').getOrCreate()
df=spark.read.csv('/content/BMW sales data (2010-2024) (1).csv', header=True, inferSchema=True)
df.show(5)

+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region|Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|5 Series|2016|         Asia|  Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|                High|
|      i8|2013|North America|  Red|   Hybrid|   Automatic|          1.6|    121671|    79219|        3428|                 Low|
|5 Series|2022|North America| Blue|   Petrol|   Automatic|          4.5|     10991|   113265|        6994|                 Low|
|      X3|2024|  Middle East| Blue|   Petrol|   Automatic|          1.7|     27255|    60971|        4047|                 Low|
|7 Series|2020|South America|Black|   Diesel|      Manual|          2.1|    122131|    49898|        308

## **Explore Dataset**

In [None]:
print("Schema Overview:")
df.printSchema()

print("\nSample Records:")
df.show(5)

print("\nSummary Statistics:")
df.describe().show()

Schema Overview:
root
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Fuel_Type: string (nullable = true)
 |-- Transmission: string (nullable = true)
 |-- Engine_Size_L: double (nullable = true)
 |-- Mileage_KM: integer (nullable = true)
 |-- Price_USD: integer (nullable = true)
 |-- Sales_Volume: integer (nullable = true)
 |-- Sales_Classification: string (nullable = true)


Sample Records:
+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region|Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|5 Series|2016|         Asia|  Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|         

## **2.Data Cleaning in PySpark(BMW Sales Dataset)**

In [None]:
#from pyspark.sql import SparkSession
#from pyspark.sql.functions import *
#from pyspark.sql.types import *

#spark=SparkSession.builder.appName('BWM Dataset').getOrCreate()
# The data is already loaded into the df DataFrame from the previous cell.
# df.spark.read.csv('/content/BMW sales data (2010-2024) (1).csv', header=True, inferSchema=True)

# Create for NULLs or Missing Values
print("Count of NULLs in each coulmn:")
df.select([column(c).isNull().alias(c) for c in df.columns]).show()

# Removing Missing values(if any)
df_clean=df.dropna()

# Remove Duplicate Records
df_clean=df_clean.dropDuplicates()

# Standardize Tecxt Columns (e.g., Region names to UPPERCASE)
df_clean=df.withColumn("Region",upper(col("Region")))

# Validate Cleaned data

print("Cleaned dataset preview:")
df_clean.show(5)

print("Rows Before:", df.count(), "| After Cleaning:", df_clean.count())

Count of NULLs in each coulmn:
+-----+-----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|Model| Year|Region|Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+-----+-----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|false|false| false|false|    false|       false|        false|     false|    false|       false|               false|
|false|false| false|false|    false|       false|        false|     false|    false|       false|               false|
|false|false| false|false|    false|       false|        false|     false|    false|       false|               false|
|false|false| false|false|    false|       false|        false|     false|    false|       false|               false|
|false|false| false|false|    false|       false|        false|     false|    false|       false|               false|
|false|false| fal

## **3.Filtering and Sorting BMW Sales Data Using PySpark**

In [None]:
#from pyspark.sql import SparkSession
#from pyspark.sql.functions import *
#from pyspark.sql.types import *

# spark=SparkSession.builder.appName('BMW Sales').getOrCreate()
# df=spark.read.csv('/content/BMW sales data (2010-2024) (1).csv', header=True, inferSchema=True)
#df.show()
high_Sales=df.filter(col("Sales_Volume") > 7000)
high_Sales.select("Model","Year","Region","Sales_Volume").show(10)

+--------+----+-------------+------------+
|   Model|Year|       Region|Sales_Volume|
+--------+----+-------------+------------+
|5 Series|2016|         Asia|        8300|
|      i8|2022|       Europe|        7949|
|      X3|2016|South America|        8944|
|      i8|2016|North America|        8252|
|7 Series|2020|North America|        8111|
|      X1|2017|         Asia|        7291|
|      M3|2014|North America|        7765|
|      M5|2017|North America|        9755|
|      X5|2012|       Africa|        7104|
|      X3|2015|North America|        8635|
+--------+----+-------------+------------+
only showing top 10 rows



Sort By Highest Revenue

In [None]:
top_revenue=df.orderBy(desc("Price_USD"))
print("Top 5 BMW cars by price:")
top_revenue.select("Model","Year","Region","Price_USD").show(5)

Top 5 BMW cars by price:
+--------+----+-----------+---------+
|   Model|Year|     Region|Price_USD|
+--------+----+-----------+---------+
|      i8|2010|Middle East|   119998|
|      i8|2024|     Africa|   119997|
|      X6|2019|       Asia|   119997|
|      X1|2016|     Africa|   119996|
|3 Series|2019|Middle East|   119994|
+--------+----+-----------+---------+
only showing top 5 rows



Filter By speciic Criteria

In [None]:
recent_petrol=df.filter((col("Fuel_Type") == "Petrol") & (col("Year") > 2018))
recent_petrol.select("Model","Year","Fuel_Type","Region","Sales_Volume").show(5)

+--------+----+---------+-------------+------------+
|   Model|Year|Fuel_Type|       Region|Sales_Volume|
+--------+----+---------+-------------+------------+
|5 Series|2022|   Petrol|North America|        6994|
|      X3|2024|   Petrol|  Middle East|        4047|
|      M5|2021|   Petrol|South America|        4561|
|      M3|2020|   Petrol|       Africa|        3929|
|      i3|2024|   Petrol|  Middle East|         173|
+--------+----+---------+-------------+------------+
only showing top 5 rows



## **4.Aggregation and Grouping - BMW Sales Insights**

### Total Sales By **Region**

In [None]:
#from pyspark.sql import SparkSession
#from pysaprk.sql.functions import *

#spark=SparkSession.builder.appName("BMW Sales").getOrCreate()
#df=spark.read('/content/BMW sales data (2010-2024) (1).csv', header=True, inferSchema=True)

region_sale=df.groupBy("Region").agg(
    sum("Sales_Volume").alias("Total_Sales"),
    avg("Price_USD").alias("Average_Price"),
    count("*").alias("Record_Count")
)
print("Total BMW Sales by Region:")
region_sale.show()

Total BMW Sales by Region:
+-------------+-----------+-----------------+------------+
|       Region|Total_Sales|    Average_Price|Record_Count|
+-------------+-----------+-----------------+------------+
|       Europe|   42555138|74988.35685145188|        8334|
|       Africa|   41565252|74885.77159820672|        8253|
|North America|   42402629|75070.05470905818|        8335|
|South America|   41551818|74973.59883650467|        8251|
|  Middle East|   42326620|74726.78848680282|        8373|
|         Asia|   42974277|75554.92500591437|        8454|
+-------------+-----------+-----------------+------------+



### Top Performing models by Total Revenue

In [None]:
model_revenue=df.groupBy("Model").agg(
    sum("Price_USD").alias("Total_Revenue")
).orderBy("Total_Revenue", ascending=False)

print("Top Performing Models by Total Revenue:")
model_revenue.show(5)

Top Performing Models by Total Revenue:
+--------+-------------+
|   Model|Total_Revenue|
+--------+-------------+
|7 Series|    352610538|
|3 Series|    347226845|
|      i8|    347137044|
|5 Series|    345721780|
|      i3|    345427638|
+--------+-------------+
only showing top 5 rows



### **Yearly Trend**

In [None]:
Yearly_sales=df.groupBy("Year").agg(sum("Sales_Volume").alias("Total_Sales"))
Yearly_sales.orderBy("Year").show()

+----+-----------+
|Year|Total_Sales|
+----+-----------+
|2010|   16933445|
|2011|   16758941|
|2012|   16751895|
|2013|   16866733|
|2014|   16958960|
|2015|   17010207|
|2016|   16957550|
|2017|   16620811|
|2018|   16412273|
|2019|   17191956|
|2020|   16310843|
|2021|   16884666|
|2022|   17920946|
|2023|   16268654|
|2024|   17527854|
+----+-----------+



# **5.Windows Functions - Year over Year(YoY) Growth analysis**

In [None]:
#from pyspark.sql import SparkSession
#from pyspark.sql.functions import *
from pyspark.sql.window import Window
#from pyspark.sql.types import *
#spark=SparkSession.builder.appName("BMW_Sales").getOrCreate()
#df.spark.read("/content/BMW sales data (2010-2024) (1).csv", header=True, inferSchema=True)

       # Define a Window Spec (Partition by Rergion, Order by Year)
windowSpec=Window.partitionBy("Region").orderBy("Year")
       # Use LAG() to get Previous Years Sales for Comparision
df_growth=df.withColumn("Prev_Year_Sales", lag("Sales_Volume").over(windowSpec))

       # Calculate Year-over-Year Growth %
df_growth = df_growth.withColumn(
    "YoY_Growth_Percent",
    round(((col("Sales_Volume") - col("Prev_Year_Sales")) / col("Prev_Year_Sales")) *100, 2)
)

     #Display Growth Trends
print("Year-over-Year growth by Region:")
df_growth.select("Region","Year","Sales_Volume","Prev_year_Sales","YoY_Growth_Percent").show(15)

Year-over-Year growth by Region:
+------+----+------------+---------------+------------------+
|Region|Year|Sales_Volume|Prev_year_Sales|YoY_Growth_Percent|
+------+----+------------+---------------+------------------+
|Africa|2010|        5935|           NULL|              NULL|
|Africa|2010|        7607|           5935|             28.17|
|Africa|2010|         968|           7607|            -87.27|
|Africa|2010|        5236|            968|            440.91|
|Africa|2010|        1252|           5236|            -76.09|
|Africa|2010|        8358|           1252|            567.57|
|Africa|2010|        2849|           8358|            -65.91|
|Africa|2010|        9841|           2849|            245.42|
|Africa|2010|        7521|           9841|            -23.57|
|Africa|2010|        7103|           7521|             -5.56|
|Africa|2010|        5754|           7103|            -18.99|
|Africa|2010|        5113|           5754|            -11.14|
|Africa|2010|         184|           

# **6. Ranking BMW Models with PySpark Window Functions**

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

spark=SparkSession.builder.appName("BMW_Sales").getOrCreate()
df=spark.read.csv("/content/BMW sales data (2010-2024) (1).csv", header=True, inferSchema=True)

windowSpec=Window.partitionBy("Year").orderBy("Sales_Volume")

df_ranked=df.withColumn("Rank",rank().over(windowSpec))\
            .withColumn("Dense_Rank",dense_rank().over(windowSpec))\
            .withColumn("Row_Number",row_number().over(windowSpec))

# Display Top 5 Models per Year
df_ranked.filter(df_ranked.Rank <=5).select("Year","Model","Sales_Volume","Rank").show(10)

+----+--------+------------+----+
|Year|   Model|Sales_Volume|Rank|
+----+--------+------------+----+
|2010|      X1|         100|   1|
|2010|      X5|         103|   2|
|2010|      i3|         105|   3|
|2010|      X3|         105|   3|
|2010|      X5|         110|   5|
|2011|3 Series|         105|   1|
|2011|      X6|         109|   2|
|2011|      X3|         111|   3|
|2011|5 Series|         116|   4|
|2011|      i8|         117|   5|
+----+--------+------------+----+
only showing top 10 rows

