<a href="https://colab.research.google.com/github/MohamedMostafaSal/StructuredData/blob/main/Spark/MLLibGuideCheatSheet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Task 1 - SQL

### Build SparkSession:

In [4]:
!pwd
! wget https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar -xvzf spark-3.0.1-bin-hadoop3.2.tgz
!pip install findspark

/content
--2022-07-18 18:26:24--  https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 224062525 (214M) [application/x-gzip]
Saving to: ‘spark-3.0.1-bin-hadoop3.2.tgz’


2022-07-18 18:26:32 (27.6 MB/s) - ‘spark-3.0.1-bin-hadoop3.2.tgz’ saved [224062525/224062525]

spark-3.0.1-bin-hadoop3.2/
spark-3.0.1-bin-hadoop3.2/RELEASE
spark-3.0.1-bin-hadoop3.2/examples/
spark-3.0.1-bin-hadoop3.2/examples/src/
spark-3.0.1-bin-hadoop3.2/examples/src/main/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/a

In [5]:
import os
import findspark

os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"
findspark.init()

In [6]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn

In [7]:
spark = SparkSession.builder.appName('hw2').getOrCreate()

In [8]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

### Read the json file:

In [9]:
df = spark.read.json('/content/DataFrames_sample.json')

In [10]:
df.show(5)

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
|7.74|0.52|256GB SSD|  2|    MacBook| 8GB|       12"|11.04|  2.03|2016|
|8.94|0.68|128GB SSD|  3|MacBook Air| 8GB|     13.3"| 12.8|  2.96|2016|
| 8.0|20.3|  1TB SSD|  4|       iMac|64GB|       27"| 25.6|  20.8|2017|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Display the schema:


In [11]:
df.printSchema()

root
 |-- D: double (nullable = true)
 |-- H: double (nullable = true)
 |-- HDD: string (nullable = true)
 |-- Id: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- RAM: string (nullable = true)
 |-- ScreenSize: string (nullable = true)
 |-- W: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Year: long (nullable = true)



### Get all the data when "Model" equal "MacBook Pro":




In [12]:
df.select('*').where(df['Model']== "MacBook Pro").show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Create Table:

In [13]:
df.createOrReplaceTempView('dfTable')

### Display "RAM"column and count "RAM" column:

In [14]:
spark.sql('select RAM, count(RAM) from dfTable group by RAM').show()

+----+----------+
| RAM|count(RAM)|
+----+----------+
|64GB|         1|
|16GB|         1|
| 8GB|         2|
+----+----------+



### Get all columns when "Year" column equal "2015"  

In [15]:
spark.sql('select * from dfTable where Year = 2015').show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Get all when "Model" start with "M":

In [16]:
spark.sql("SELECT * from dfTable where Model  like 'M%' ").show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
|7.74|0.52|256GB SSD|  2|    MacBook| 8GB|       12"|11.04|  2.03|2016|
|8.94|0.68|128GB SSD|  3|MacBook Air| 8GB|     13.3"| 12.8|  2.96|2016|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Get all data when "Model" column equal "MacBook Pro"

In [17]:
spark.sql("SELECT * from dfTable where Model  like 'MacBook Pro' ").show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Get all data with Multiple Conditions when "RAM" column equal "8GB" and "Model" column is "Macbook".

In [18]:
spark.sql("SELECT * from dfTable where Model='MacBook' and RAM='8GB'").show()

+----+----+---------+---+-------+---+----------+-----+------+----+
|   D|   H|      HDD| Id|  Model|RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-------+---+----------+-----+------+----+
|7.74|0.52|256GB SSD|  2|MacBook|8GB|       12"|11.04|  2.03|2016|
+----+----+---------+---+-------+---+----------+-----+------+----+



### Get all data with Multiple Conditions when "D" greater than or equal "8" and "Model" column is "iMac".

In [19]:
spark.sql("SELECT * from dfTable where Model='iMac' and D>=8").show()

+---+----+-------+---+-----+----+----------+----+------+----+
|  D|   H|    HDD| Id|Model| RAM|ScreenSize|   W|Weight|Year|
+---+----+-------+---+-----+----+----------+----+------+----+
|8.0|20.3|1TB SSD|  4| iMac|64GB|       27"|25.6|  20.8|2017|
+---+----+-------+---+-----+----+----------+----+------+----+



## Task 2


### Read "test1" dataset:

In [20]:
dfT2 = spark.read.csv('test1.csv', inferSchema= True, header=True)

In [21]:
dfT2.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



### Display Salary of the people less than or equal to 20000

In [22]:
dfT2.select('Salary').where(dfT2['Salary']<=20000).show()

+------+
|Salary|
+------+
| 20000|
| 20000|
| 15000|
| 18000|
+------+



### Display Salary of the people less than or equal to 20000 and greater than or equal 15000

In [23]:
dfT2.createOrReplaceTempView('dfT2Table')
spark.sql("select Salary from dfT2Table where Salary between 15000 and 20000").show()


+------+
|Salary|
+------+
| 20000|
| 20000|
| 15000|
| 18000|
+------+



## Task 3 

### Read "test3" dataset:

In [24]:
dfT3 = spark.read.csv('test3.csv', inferSchema= True, header=True)

### Display dataset

In [25]:
dfT3.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



### Display schema

In [26]:
dfT3.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



### Group by "Name" column and using sum function on "Name" column

In [27]:
dfT3.groupBy(dfT3['Name']).agg(fn.sum(dfT3.salary).alias('Sum')).show()

+---------+-----+
|     Name|  Sum|
+---------+-----+
|Sudhanshu|35000|
|    Sunny|12000|
|    Krish|19000|
|   Mahesh| 7000|
+---------+-----+



### Group by "Name" column and using avg function on "Name" column

In [28]:
dfT3.groupBy(dfT3['Name']).agg(fn.avg(dfT3.salary).alias('avgSalary')).show()

+---------+------------------+
|     Name|         avgSalary|
+---------+------------------+
|Sudhanshu|11666.666666666666|
|    Sunny|            6000.0|
|    Krish| 6333.333333333333|
|   Mahesh|            3500.0|
+---------+------------------+



### Group by "Departments" column and using sum function on "Departments" column

In [29]:
dfT3.groupBy(dfT3['Departments']).agg(fn.sum(dfT3.salary).alias('Sum')).show()

+------------+-----+
| Departments|  Sum|
+------------+-----+
|         IOT|15000|
|    Big Data|15000|
|Data Science|43000|
+------------+-----+



### Group by "Departments" column and using mean function on "Departments" column:

In [30]:
dfT3.groupBy(dfT3['Departments']).agg(fn.avg(dfT3.salary).alias('avgSalary')).show()

+------------+---------+
| Departments|avgSalary|
+------------+---------+
|         IOT|   7500.0|
|    Big Data|   3750.0|
|Data Science|  10750.0|
+------------+---------+



Group by "Departments" column and using count function on "Departments" column:

In [31]:
dfT3.groupBy(dfT3['Departments']).agg(fn.count(dfT3.Departments).alias('avgSalary')).show()

+------------+---------+
| Departments|avgSalary|
+------------+---------+
|         IOT|        2|
|    Big Data|        4|
|Data Science|        4|
+------------+---------+



### Apply agg to using sum function get the total of salaries

In [32]:
dfT3.agg(fn.sum(dfT3.salary).alias('sum')).show()

+-----+
|  sum|
+-----+
|73000|
+-----+



## Task 4

You've been flown to their headquarters in Ulsan, South Korea, to assist them in accurately estimating the number of crew members a ship will need.


They're currently building new ships for certain customers, and they'd like you to create a model and utilize it to estimate how many crew members the ships will require.


Metadata:
1. Measurements of ship size 
2. capacity 
3. crew 
4. age for 158 cruise ships.

It is saved in a csv file for you called "ITI_data.csv". our task is to develop a regression model that will assist in predicting the number of crew members required for future ships. The client also indicated that they have found that particular cruise lines will differ in acceptable crew counts, thus this is most likely an important factor to consider when conducting your investigation.

In [33]:
import pyspark.ml.feature as ml
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression

In [34]:
dfT3 = spark.read.csv('ITI_data.csv', inferSchema=True, header = True)
dfT3.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [35]:
train_df, test_df = dfT3.randomSplit([.8,.2],seed=42)
print(f"There are {train_df.count()} rows in the training set, and {test_df.count()} in the test set")

There are 133 rows in the training set, and 25 in the test set


In [36]:
train_df.write.parquet("train.parquet",  mode = 'overwrite')
test_df.write.parquet("test.parquet",  mode = 'overwrite')

In [37]:
train_df = spark.read.parquet('train.parquet')
test_df = spark.read.parquet('test.parquet')

In [38]:
train_df.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [39]:
test_df.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [40]:
categorical_cols  = [field for (field,dataType) in dfT3.dtypes
              if ((dataType=='string'))]
numerical_cols =[field for (field,dataType) in dfT3.dtypes
              if ((dataType!='string')& (field not in ['crew']))]

### OneHotEncoder 


In [41]:
index_output_cols = [x + "_Index" for x in categorical_cols]
string_indexer = ml.StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols)

In [42]:
ohe_output_cols = [x + "_OHE" for x in categorical_cols]
one_encoder = ml.OneHotEncoder(inputCols=index_output_cols, outputCols=ohe_output_cols)

In [43]:
mixed_features = ohe_output_cols + numerical_cols
mixed_features

['Ship_name_OHE',
 'Cruise_line_OHE',
 'Age',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density']

###Use VectorAssembler to merge all columns into one column:

In [44]:
vector_assembler = ml.VectorAssembler(inputCols=mixed_features, outputCol='features')

### Create a Linear Regression Model 

In [45]:
lr = LinearRegression(labelCol='crew')

### Creating a Pipeline

In [46]:
pipeline = Pipeline(stages=[string_indexer, one_encoder, vector_assembler, lr])
model=pipeline.fit(train_df)
model

PipelineModel_65fc0e5a3366

In [47]:
model.transform(train_df).show()

+-------------+---------------+---+-----------------+----------+------+------+-----------------+-----+---------------+-----------------+----------------+---------------+--------------------+------------------+
|    Ship_name|    Cruise_line|Age|          Tonnage|passengers|length|cabins|passenger_density| crew|Ship_name_Index|Cruise_line_Index|   Ship_name_OHE|Cruise_line_OHE|            features|        prediction|
+-------------+---------------+---+-----------------+----------+------+------+-----------------+-----+---------------+-----------------+----------------+---------------+--------------------+------------------+
|    Adventure|Royal_Caribbean| 12|            138.0|     31.14|  10.2| 15.57|            44.32|11.85|           12.0|              1.0|(117,[12],[1.0])| (18,[1],[1.0])|(141,[12,118,135,...| 11.85577295274202|
|      Allegra|          Costa| 21|            28.43|      8.08|  6.16|   4.1|            35.19|  4.0|           13.0|              5.0|(117,[13],[1.0])| (18,[5

In [53]:
pred = model.transform(test_df)

In [54]:
pred.count()

133

In [56]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="crew",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(pred))

R Squared (R2) on test data = 72.34%


By Eng. Mostafa Nabieh 
If you have questions, please feel free to ask.

My Email : nabieh.mostafa@yahoo.com

My Whatsapp : +201015197566