<a href="https://colab.research.google.com/github/Muhozgu/etl-and-analytics-with-spark-and-powerbi/blob/main/n1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Apache Spark Environment Setup**

In [2]:
!sudo apt update

[33m0% [Working][0m            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.81)] [[0m                                                                               Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
[33m0% [2 InRelease 12.7 kB/128 kB 10%] [Connecting to security.ubuntu.com (185.125[0m                                                                               Get:3 https://cli.github.com/packages stable InRelease [3,917 B]
[33m0% [2 InRelease 24.3 kB/128 kB 19%] [Waiting for headers] [Waiting for headers][0m                                                                               Get:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
[33m0% [2 InRelease 41.7 kB/128 kB 33%] [Waiting for headers] [Waiting for headers][0m[33m0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpadcont[0

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

In [6]:
!wget -q https://dlcdn.apache.org/spark/spark-4.1.1/spark-4.1.1-bin-hadoop3.tgz

In [7]:
!tar -xzf spark-4.1.1-bin-hadoop3.tgz

In [12]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [13]:
import os
import sys
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-4.1.1-bin-hadoop3"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, regexp_replace, regexp_extract, monotonically_increasing_id

# **Data Reading and Loading**

In [17]:
df = spark.read.csv(
    "/content/dataset.csv",
    header=True,
    inferSchema=True)

In [18]:
df.show(20)

+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region| Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|5 Series|2016|         Asia|   Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|                High|
|      i8|2013|North America|   Red|   Hybrid|   Automatic|          1.6|    121671|    79219|        3428|                 Low|
|5 Series|2022|North America|  Blue|   Petrol|   Automatic|          4.5|     10991|   113265|        6994|                 Low|
|      X3|2024|  Middle East|  Blue|   Petrol|   Automatic|          1.7|     27255|    60971|        4047|                 Low|
|7 Series|2020|South America| Black|   Diesel|      Manual|          2.1|    122131|    49898|   

In [19]:
df.take(1)

[Row(Model='5 Series', Year=2016, Region='Asia', Color='Red', Fuel_Type='Petrol', Transmission='Manual', Engine_Size_L=3.5, Mileage_KM=151748, Price_USD=98740, Sales_Volume=8300, Sales_Classification='High')]

# **Data Description**

In [20]:
df.printSchema()

root
 |-- Model: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Region: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Fuel_Type: string (nullable = true)
 |-- Transmission: string (nullable = true)
 |-- Engine_Size_L: double (nullable = true)
 |-- Mileage_KM: integer (nullable = true)
 |-- Price_USD: integer (nullable = true)
 |-- Sales_Volume: integer (nullable = true)
 |-- Sales_Classification: string (nullable = true)



In [21]:
df.schema.names

['Model',
 'Year',
 'Region',
 'Color',
 'Fuel_Type',
 'Transmission',
 'Engine_Size_L',
 'Mileage_KM',
 'Price_USD',
 'Sales_Volume',
 'Sales_Classification']

In [22]:
df.describe().show()

+-------+--------+-----------------+-------------+-----+---------+------------+------------------+------------------+------------------+------------------+--------------------+
|summary|   Model|             Year|       Region|Color|Fuel_Type|Transmission|     Engine_Size_L|        Mileage_KM|         Price_USD|      Sales_Volume|Sales_Classification|
+-------+--------+-----------------+-------------+-----+---------+------------+------------------+------------------+------------------+------------------+--------------------+
|  count|   50000|            50000|        50000|50000|    50000|       50000|             50000|             50000|             50000|             50000|               50000|
|   mean|    NULL|        2017.0157|         NULL| NULL|     NULL|        NULL| 3.247179999999999|      100307.20314|        75034.6009|        5067.51468|                NULL|
| stddev|    NULL|4.324459218093149|         NULL| NULL|     NULL|        NULL|1.0090783975411621|57941.50934352461

In [23]:
df.summary().show()

+-------+--------+-----------------+-------------+-----+---------+------------+------------------+------------------+------------------+------------------+--------------------+
|summary|   Model|             Year|       Region|Color|Fuel_Type|Transmission|     Engine_Size_L|        Mileage_KM|         Price_USD|      Sales_Volume|Sales_Classification|
+-------+--------+-----------------+-------------+-----+---------+------------+------------------+------------------+------------------+------------------+--------------------+
|  count|   50000|            50000|        50000|50000|    50000|       50000|             50000|             50000|             50000|             50000|               50000|
|   mean|    NULL|        2017.0157|         NULL| NULL|     NULL|        NULL| 3.247179999999999|      100307.20314|        75034.6009|        5067.51468|                NULL|
| stddev|    NULL|4.324459218093149|         NULL| NULL|     NULL|        NULL|1.0090783975411621|57941.50934352461

In [24]:
df.count()

50000

In [25]:
len(df.columns)

11

In [26]:
df = df.fillna("deleting")
df.show(10)

+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region| Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|5 Series|2016|         Asia|   Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|                High|
|      i8|2013|North America|   Red|   Hybrid|   Automatic|          1.6|    121671|    79219|        3428|                 Low|
|5 Series|2022|North America|  Blue|   Petrol|   Automatic|          4.5|     10991|   113265|        6994|                 Low|
|      X3|2024|  Middle East|  Blue|   Petrol|   Automatic|          1.7|     27255|    60971|        4047|                 Low|
|7 Series|2020|South America| Black|   Diesel|      Manual|          2.1|    122131|    49898|   

# **Data Filtering and Cleaning**

In [27]:
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----+----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|Model|Year|Region|Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
+-----+----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+
|    0|   0|     0|    0|        0|           0|            0|         0|        0|           0|                   0|
+-----+----+------+-----+---------+------------+-------------+----------+---------+------------+--------------------+



# **Data Manipulation**

In [28]:
df = df.withColumn("index", monotonically_increasing_id())
df.show(3)

+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+-----+
|   Model|Year|       Region|Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|index|
+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+-----+
|5 Series|2016|         Asia|  Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|                High|    0|
|      i8|2013|North America|  Red|   Hybrid|   Automatic|          1.6|    121671|    79219|        3428|                 Low|    1|
|5 Series|2022|North America| Blue|   Petrol|   Automatic|          4.5|     10991|   113265|        6994|                 Low|    2|
+--------+----+-------------+-----+---------+------------+-------------+----------+---------+------------+--------------------+-----+
only showing top 3 rows


# **Data Type Conversions**

In [29]:
df = df.withColumn("year", col("year").cast("int"))

In [30]:
df.printSchema()

root
 |-- Model: string (nullable = false)
 |-- year: integer (nullable = true)
 |-- Region: string (nullable = false)
 |-- Color: string (nullable = false)
 |-- Fuel_Type: string (nullable = false)
 |-- Transmission: string (nullable = false)
 |-- Engine_Size_L: double (nullable = true)
 |-- Mileage_KM: integer (nullable = true)
 |-- Price_USD: integer (nullable = true)
 |-- Sales_Volume: integer (nullable = true)
 |-- Sales_Classification: string (nullable = false)
 |-- index: long (nullable = false)



In [31]:
df.summary().show()

+-------+--------+-----------------+-------------+-----+---------+------------+------------------+------------------+------------------+------------------+--------------------+-----------------+
|summary|   Model|             year|       Region|Color|Fuel_Type|Transmission|     Engine_Size_L|        Mileage_KM|         Price_USD|      Sales_Volume|Sales_Classification|            index|
+-------+--------+-----------------+-------------+-----+---------+------------+------------------+------------------+------------------+------------------+--------------------+-----------------+
|  count|   50000|            50000|        50000|50000|    50000|       50000|             50000|             50000|             50000|             50000|               50000|            50000|
|   mean|    NULL|        2017.0157|         NULL| NULL|     NULL|        NULL| 3.247179999999999|      100307.20314|        75034.6009|        5067.51468|                NULL|          24999.5|
| stddev|    NULL|4.32445

In [32]:
df.show(30)

+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+-----+
|   Model|year|       Region| Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|index|
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+-----+
|5 Series|2016|         Asia|   Red|   Petrol|      Manual|          3.5|    151748|    98740|        8300|                High|    0|
|      i8|2013|North America|   Red|   Hybrid|   Automatic|          1.6|    121671|    79219|        3428|                 Low|    1|
|5 Series|2022|North America|  Blue|   Petrol|   Automatic|          4.5|     10991|   113265|        6994|                 Low|    2|
|      X3|2024|  Middle East|  Blue|   Petrol|   Automatic|          1.7|     27255|    60971|        4047|                 Low|    3|
|7 Series|2020|South America| Black|   Diesel|      Man

# **Analytical Operations with SparkSQL**

In [33]:
sales = spark.read.csv("/content/dataset.csv")

sales.printSchema()
sales.show()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)

+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|     _c0| _c1|          _c2|   _c3|      _c4|         _c5|          _c6|       _c7|      _c8|         _c9|                _c10|
+--------+----+-------------+------+---------+------------+-------------+----------+---------+------------+--------------------+
|   Model|Year|       Region| Color|Fuel_Type|Transmission|Engine_Size_L|Mileage_KM|Price_USD|Sales_Volume|Sales_Classification|
|5 Series|2016|         Asia|   Red|   Petrol|      Manual|          3.5|    151748|    9874

In [34]:
sales.createOrReplaceGlobalTempView("car_sales")
df.write.mode("overwrite").saveAsTable("car_sales")

### **1. Which models generated the highest total revenue per year?**

###### ***YEAR 2010***

In [35]:
df2 = spark.sql("""SELECT Year, Model, SUM(Price_USD) AS Total_sales_per_year
                 FROM car_sales
                 WHERE Year = 2010
                 GROUP BY Model, Year
                 ORDER BY 3 DESC
                 LIMIT 5"""
                 )
df2.printSchema()
df2.show()

root
 |-- Year: integer (nullable = true)
 |-- Model: string (nullable = true)
 |-- Total_sales_per_year: long (nullable = true)

+----+--------+--------------------+
|Year|   Model|Total_sales_per_year|
+----+--------+--------------------+
|2010|      i3|            24370952|
|2010|3 Series|            23932107|
|2010|      i8|            23746810|
|2010|      X3|            23403586|
|2010|      X5|            22718514|
+----+--------+--------------------+



##### ***YEAR 2011***

In [36]:
df2 = spark.sql("""SELECT Year, Model, SUM(Price_USD) AS Total_sales_per_year
                 FROM car_sales
                 WHERE Year = 2011
                 GROUP BY Model, Year
                 ORDER BY 3 DESC
                 LIMIT 5"""
                 )
df2.printSchema()
df2.show()

root
 |-- Year: integer (nullable = true)
 |-- Model: string (nullable = true)
 |-- Total_sales_per_year: long (nullable = true)

+----+--------+--------------------+
|Year|   Model|Total_sales_per_year|
+----+--------+--------------------+
|2011|      M5|            24417272|
|2011|7 Series|            23774406|
|2011|      i8|            23563843|
|2011|      X1|            23538495|
|2011|5 Series|            23388935|
+----+--------+--------------------+



##### ***YEAR 2012***

In [37]:
df2 = spark.sql("""SELECT Year, Model, SUM(Price_USD) AS Total_sales_per_year
                 FROM car_sales
                 WHERE Year = 2012
                 GROUP BY Model, Year
                 ORDER BY 3 DESC
                 LIMIT 5"""
                 )
df2.printSchema()
df2.show()

root
 |-- Year: integer (nullable = true)
 |-- Model: string (nullable = true)
 |-- Total_sales_per_year: long (nullable = true)

+----+-----+--------------------+
|Year|Model|Total_sales_per_year|
+----+-----+--------------------+
|2012|   i3|            24770853|
|2012|   X1|            24591656|
|2012|   i8|            24503062|
|2012|   X3|            24309099|
|2012|   X5|            23639321|
+----+-----+--------------------+



##### ***YEAR 2013***

In [38]:
df2 = spark.sql("""SELECT Year, Model, SUM(Price_USD) AS Total_sales_per_year
                 FROM car_sales
                 WHERE Year = 2013
                 GROUP BY Model, Year
                 ORDER BY 3 DESC
                 LIMIT 5"""
                 )
df2.printSchema()
df2.show()

root
 |-- Year: integer (nullable = true)
 |-- Model: string (nullable = true)
 |-- Total_sales_per_year: long (nullable = true)

+----+--------+--------------------+
|Year|   Model|Total_sales_per_year|
+----+--------+--------------------+
|2013|      X1|            24568937|
|2013|7 Series|            23719096|
|2013|      i8|            23539135|
|2013|      X3|            22916323|
|2013|      M5|            22480694|
+----+--------+--------------------+



##### ***YEAR 2014***

In [39]:
df2 = spark.sql("""SELECT Year, Model, SUM(Price_USD) AS Total_sales_per_year
                 FROM car_sales
                 WHERE Year = 2014
                 GROUP BY Model, Year
                 ORDER BY 3 DESC
                 LIMIT 5"""
                 )
df2.printSchema()
df2.show()

root
 |-- Year: integer (nullable = true)
 |-- Model: string (nullable = true)
 |-- Total_sales_per_year: long (nullable = true)

+----+--------+--------------------+
|Year|   Model|Total_sales_per_year|
+----+--------+--------------------+
|2014|7 Series|            24903601|
|2014|      X1|            24677668|
|2014|3 Series|            24228860|
|2014|      X6|            23192116|
|2014|      X5|            23109632|
+----+--------+--------------------+



##### ***YEAR 2015***

In [40]:
df2 = spark.sql("""SELECT Year, Model, SUM(Price_USD) AS Total_sales_per_year
                 FROM car_sales
                 WHERE Year = 2015
                 GROUP BY Model, Year
                 ORDER BY 3 DESC
                 LIMIT 5"""
                 )
df2.printSchema()
df2.show()

root
 |-- Year: integer (nullable = true)
 |-- Model: string (nullable = true)
 |-- Total_sales_per_year: long (nullable = true)

+----+--------+--------------------+
|Year|   Model|Total_sales_per_year|
+----+--------+--------------------+
|2015|      i3|            25204559|
|2015|5 Series|            24541994|
|2015|7 Series|            24330409|
|2015|3 Series|            23923178|
|2015|      i8|            23817034|
+----+--------+--------------------+



#####***YEAR 2016***

In [41]:
df2 = spark.sql("""SELECT Year, Model, SUM(Price_USD) AS Total_sales_per_year
                 FROM car_sales
                 WHERE Year = 2016
                 GROUP BY Model, Year
                 ORDER BY 3 DESC
                 LIMIT 5"""
                 )
df2.printSchema()
df2.show()

root
 |-- Year: integer (nullable = true)
 |-- Model: string (nullable = true)
 |-- Total_sales_per_year: long (nullable = true)

+----+--------+--------------------+
|Year|   Model|Total_sales_per_year|
+----+--------+--------------------+
|2016|5 Series|            26443989|
|2016|      X3|            24786437|
|2016|3 Series|            24641923|
|2016|      i3|            24231775|
|2016|      X5|            23331907|
+----+--------+--------------------+



##### ***YEAR 2024***

In [42]:
df2 = spark.sql("""SELECT Year, Model, SUM(Price_USD) AS Total_sales_per_year
                 FROM car_sales
                 WHERE Year = 2024
                 GROUP BY Model, Year
                 ORDER BY 3 DESC
                 LIMIT 5"""
                 )
df2.printSchema()
df2.show()

root
 |-- Year: integer (nullable = true)
 |-- Model: string (nullable = true)
 |-- Total_sales_per_year: long (nullable = true)

+----+--------+--------------------+
|Year|   Model|Total_sales_per_year|
+----+--------+--------------------+
|2024|      X6|            26247081|
|2024|5 Series|            24773136|
|2024|7 Series|            24709447|
|2024|      X3|            24694167|
|2024|      i3|            24140197|
+----+--------+--------------------+



### **2. Total Sales per Year Over the past 14 Years.**

In [43]:
df2 = spark.sql("""SELECT Year, SUM(Price_USD) AS Total_sales_per_year
                 FROM car_sales
                 GROUP BY 1
                 ORDER BY 1
                 """
                 )
df2.printSchema()
df2.show()

root
 |-- Year: integer (nullable = true)
 |-- Total_sales_per_year: long (nullable = true)

+----+--------------------+
|Year|Total_sales_per_year|
+----+--------------------+
|2010|           249990479|
|2011|           246811360|
|2012|           249965198|
|2013|           247906431|
|2014|           249764393|
|2015|           248658398|
|2016|           254204015|
|2017|           247319655|
|2018|           247343066|
|2019|           255059493|
|2020|           240559823|
|2021|           254246364|
|2022|           260738271|
|2023|           242049501|
|2024|           257113598|
+----+--------------------+

