<a href="https://colab.research.google.com/github/duhajarrar/SparkApp/blob/main/Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Spark

In [31]:
!pip install pyspark



# Import libraries

In [32]:
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
import functools
import pyspark
from pyspark import SparkContext

# Read The Dataset

In [33]:
#TODO: Read about spark local Vs cluster mode 
#TODO: Read about spark session Vs spark context 
spark = SparkSession.builder.master("local[1]").appName("SparkApp").getOrCreate()

#TODO: Read about RDD Vs DF Vs Dataset
dfCar=spark.read.option("header",True).csv("/content/drive/MyDrive/Spark-Harri/cars.csv")
dfCar.printSchema()
print(type(spark),type(dfCar))
dfCar.show(5)
print(dfCar.columns)

root
 |-- Car Brand: string (nullable = true)
 |-- Country of Origin: string (nullable = true)

<class 'pyspark.sql.session.SparkSession'> <class 'pyspark.sql.dataframe.DataFrame'>
+------------+-----------------+
|   Car Brand|Country of Origin|
+------------+-----------------+
|      Abarth|            Italy|
|  Alfa Romeo|            Italy|
|Aston Martin|          England|
|        Audi|          Germany|
|     Bentley|          England|
+------------+-----------------+
only showing top 5 rows

['Car Brand', 'Country of Origin']


# Task1: Extract a file which contains the car model and the country of origin of this car.

In [34]:
#TODO: Read about shuffling 
#TODO: Read about Transformation Vs Action
rows = dfCar.count()
dfCar.repartition(rows).write.csv('/content/drive/MyDrive/Spark-Harri/Cars')

# Task2: Extract one file per country

In [35]:
dfCar.write.partitionBy('Country Of Origin').mode("overwrite").csv('/content/drive/MyDrive/Spark-Harri/Country Of Origin')

In [36]:
# from pyspark import SparkContext
# sc = SparkContext("local", "First App")
# sc.parallelize(dfCar)

In [37]:
# rddCar=dfCar.rdd
# print(rddObj.collect())

In [38]:
# def toCSVLine(data):
#   return ','.join(str(d) for d in data)

# lines = rddCar.map(toCSVLine)
# lines.saveAsTextFile('/content/drive/MyDrive/Spark-Harri/Part2')

# Task3: Use caching properly to optimize the performance

In [39]:
#TODO: Use Cache Vs Persist 
dfCar=dfCar.cache()

# Task4: Expect to read a file with updated records, you should be able to merge these updates with the original dataset.

## Read 2015_State_Top10Report_wTotalThefts file

In [40]:
from pyspark.sql.types import IntegerType
dfReport=spark.read.option("header",True).csv("/content/drive/MyDrive/Spark-Harri/2015_State_Top10Report_wTotalThefts.csv")
dfReport=dfReport.withColumn("Thefts",dfReport.Thefts.cast('long'))
dfReport.printSchema()
dfReport.show()

root
 |-- State: string (nullable = true)
 |-- Rank: string (nullable = true)
 |-- Make/Model: string (nullable = true)
 |-- Model Year: string (nullable = true)
 |-- Thefts: long (nullable = true)

+-------+----+--------------------+----------+------+
|  State|Rank|          Make/Model|Model Year|Thefts|
+-------+----+--------------------+----------+------+
|Alabama|   1|Chevrolet Pickup ...|      2005|   499|
|Alabama|   2|Ford Pickup (Full...|      2006|   357|
|Alabama|   3|        Toyota Camry|      2014|   205|
|Alabama|   4|       Nissan Altima|      2014|   191|
|Alabama|   4|    Chevrolet Impala|      2004|   191|
|Alabama|   5|        Honda Accord|      1998|   180|
|Alabama|   6|GMC Pickup (Full ...|      1999|   152|
|Alabama|   7|Dodge Pickup (Ful...|      1998|   138|
|Alabama|   8|        Ford Mustang|      2002|   122|
|Alabama|   9|       Ford Explorer|      2002|   119|
| Alaska|   1|Chevrolet Pickup ...|      2003|   147|
| Alaska|   2|Ford Pickup (Full...|      2004

Rename some columns to make it easy to use them.

In [41]:
dfReport=dfReport.withColumnRenamed('Make/Model','MakeModel').withColumnRenamed('Model Year','ModelYear')

## Read Updated - Sheet1 file

In [42]:
dfUpdate=spark.read.option("header",True).csv("/content/drive/MyDrive/Spark-Harri/Updated - Sheet1.csv")
dfUpdate=dfUpdate.dropna()
dfUpdate=dfUpdate.withColumn("Thefts",dfUpdate.Thefts.cast('long'))
dfUpdate.printSchema()
dfUpdate.show()

root
 |-- State: string (nullable = true)
 |-- Rank: string (nullable = true)
 |-- Make/Model: string (nullable = true)
 |-- Model Year: string (nullable = true)
 |-- Thefts: long (nullable = true)

+------------+----+--------------------+----------+------+
|       State|Rank|          Make/Model|Model Year|Thefts|
+------------+----+--------------------+----------+------+
|    Arkansas|   6|       Nissan Altima|      2015|  3000|
|       Idaho|   8|Jeep Cherokee/Gra...|      1997|    19|
|   Minnesota|   1|         Honda Civic|      1998|    50|
|   Minnesota|   2|        Honda Accord|      1997|    20|
|    Virginia|   7|      Toyota Corolla|      2013|   900|
|    Virginia|   8|       Ford Explorer|      2002|   543|
|North Dakota|   9|    Pontiac Grand Am|      2000|  2100|
|    New York|   5|           Seat Leon|      2019|    11|
|       Maine|   2|             VW Golf|      2021|     6|
+------------+----+--------------------+----------+------+



Rename some columns to make it easy to use them.

In [43]:
dfUpdate=dfUpdate.withColumnRenamed('Make/Model','MakeModel').withColumnRenamed('Model Year','ModelYear')
print(dfUpdate.columns)

['State', 'Rank', 'MakeModel', 'ModelYear', 'Thefts']


## Update the Report dataset using the updated dataset 

In [44]:
#TODO: Explanation Inner and Outer
#TODO: Update should be as => New record should be inserted and 
#                             Update should be updated and 
#                             Old record should be kept
dfUpdatedRank=dfReport.alias('a').join(dfUpdate.alias('b'), ['State','MakeModel','ModelYear','Thefts'],how='inner').select('State','MakeModel','ModelYear','Thefts',f.coalesce('b.Rank', 'a.Rank').alias('Rank'))
dfUpdatedRank.show(5)

+-----+--------------------+---------+------+----+
|State|           MakeModel|ModelYear|Thefts|Rank|
+-----+--------------------+---------+------+----+
|Idaho|Jeep Cherokee/Gra...|     1997|    19|   8|
+-----+--------------------+---------+------+----+



In [45]:
# dfUpdatedThefts=dfReport.alias('a').join(dfUpdate.alias('b'), ['State','MakeModel','ModelYear','Rank'], how='outer').select('State','MakeModel','ModelYear','Rank',f.coalesce('b.Thefts', 'a.Thefts').alias('Thefts'))
# dfUpdatedThefts.show(5)

# Create Cars table 

In [46]:
dfUpdatedRank=dfUpdatedRank.withColumn("Thefts",dfUpdatedRank.Thefts.cast('long'))

In [47]:
dfUpdatedRank.createOrReplaceTempView("Cars")

# Task5:List the most 5 thefted models in U.S

In [48]:
#TODO: Use Sum of the thefts insted of max thefts 
spark.sql("select MakeModel,MAX(Thefts) from Cars GROUP BY MakeModel ORDER BY MAX(Thefts) desc").show(5)

+--------------------+-----------+
|           MakeModel|max(Thefts)|
+--------------------+-----------+
|Jeep Cherokee/Gra...|         19|
+--------------------+-----------+



# Task6:List the most 5 states based on the number of thefted cars.

In [49]:
#TODO: Use Sum of the thefts insted of max thefts 
spark.sql("select State,MAX(Thefts) from Cars GROUP BY State ORDER BY MAX(Thefts) desc").show(5)

+-----+-----------+
|State|max(Thefts)|
+-----+-----------+
|Idaho|         19|
+-----+-----------+



# Task7:Based on the models, what is the most country from where Americans buy their cars

## Extract Model name 

We need to extract model name then join it with it's country (using cars.csv file)

In [50]:
split_col = pyspark.sql.functions.split(dfUpdatedRank['MakeModel'], ' ')
dfUpdatedRank = dfUpdatedRank.withColumn('MakeModel', split_col.getItem(0))
dfUpdatedRank.show(5)

+-----+---------+---------+------+----+
|State|MakeModel|ModelYear|Thefts|Rank|
+-----+---------+---------+------+----+
|Idaho|     Jeep|     1997|    19|   8|
+-----+---------+---------+------+----+



In [51]:
numOfModelsBefore=dfUpdatedRank.select('MakeModel').distinct().count()

In [52]:
#dfUpdatedRank.select('MakeModel').distinct().show()

Rename Car Brand column 

In [53]:
dfCar=dfCar.withColumnRenamed('Car Brand','MakeModel').withColumnRenamed('Country of Origin','CountryOfOrigin')
dfCar.show(5)

+------------+---------------+
|   MakeModel|CountryOfOrigin|
+------------+---------------+
|      Abarth|          Italy|
|  Alfa Romeo|          Italy|
|Aston Martin|        England|
|        Audi|        Germany|
|     Bentley|        England|
+------------+---------------+
only showing top 5 rows



## Join cars dataset with report dataset

In [54]:
dfUpdatedRank=dfUpdatedRank.join(dfCar, ['MakeModel'], 'inner')
dfUpdatedRank.show(5)

+---------+-----+---------+------+----+---------------+
|MakeModel|State|ModelYear|Thefts|Rank|CountryOfOrigin|
+---------+-----+---------+------+----+---------------+
|     Jeep|Idaho|     1997|    19|   8|        America|
+---------+-----+---------+------+----+---------------+



In [55]:
numOfModelsAfter=dfUpdatedRank.select('MakeModel').distinct().count()

In [56]:
#dfUpdatedRank.select('MakeModel').distinct().show()

In [57]:
#dfCar.select('MakeModel').distinct().show()

**Important**

In [58]:
print("Number of models in cars.csv file = ",dfCar.select('MakeModel').distinct().count())

Number of models in cars.csv file =  58


In [59]:
print(" Number Of Models Before join  = ",numOfModelsBefore," Number Of Models After join  = ",numOfModelsAfter)

 Number Of Models Before join  =  1  Number Of Models After join  =  1


**Note:** VW, GMC, Seat, Pontiac, Acura weren't in cars.csv so the models number matched in report csv file and cars csv file is just 10 not 15.

## Calculate the most country repeted in cars report based on the model

In [60]:
dfUpdatedRank.createOrReplaceTempView("Cars")
spark.sql("select CountryOfOrigin,count(*) from Cars GROUP BY CountryOfOrigin ORDER BY count(*) desc").show(1)

+---------------+--------+
|CountryOfOrigin|count(1)|
+---------------+--------+
|        America|       1|
+---------------+--------+

