## Task 1 - SQL

### Build SparkSession:

In [7]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession 
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import *

### Read the json file:

In [8]:
df = spark.read.json('DataFrames_sample.json')

                                                                                

### Display the schema:


In [9]:
df.printSchema()


root
 |-- D: double (nullable = true)
 |-- H: double (nullable = true)
 |-- HDD: string (nullable = true)
 |-- Id: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- RAM: string (nullable = true)
 |-- ScreenSize: string (nullable = true)
 |-- W: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Year: long (nullable = true)



In [10]:
df.show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
|7.74|0.52|256GB SSD|  2|    MacBook| 8GB|       12"|11.04|  2.03|2016|
|8.94|0.68|128GB SSD|  3|MacBook Air| 8GB|     13.3"| 12.8|  2.96|2016|
| 8.0|20.3|  1TB SSD|  4|       iMac|64GB|       27"| 25.6|  20.8|2017|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Get all the data when "Model" equal "MacBook Pro":




In [16]:
df.select('*').where('Model == "MacBook Pro"').show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Create TempView:

In [18]:
df.createOrReplaceTempView('TempView')

### Display "RAM"column and count "RAM" column:

In [23]:
spark.sql('select count(RAM) from TempView ').show()

+----------+
|count(RAM)|
+----------+
|         4|
+----------+



### Get all columns when "Year" column equal "2015"  

In [24]:
spark.sql('select * from TempView  where Year = 2015').show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Get all when "Model" start with "M":

In [26]:
spark.sql('select * from TempView  where Model LIKE "M%"').show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
|7.74|0.52|256GB SSD|  2|    MacBook| 8GB|       12"|11.04|  2.03|2016|
|8.94|0.68|128GB SSD|  3|MacBook Air| 8GB|     13.3"| 12.8|  2.96|2016|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Get all data when "Model" column equal "MacBook Pro"

In [27]:
spark.sql('select * from TempView  where Model = "MacBook Pro"').show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Get all data with Multiple Conditions when "RAM" column equal "8GB" and "Model" column is "Macbook".

In [35]:
spark.sql('select * from TempView  where Model = "MacBook" and RAM == "8GB"').show()

+----+----+---------+---+-------+---+----------+-----+------+----+
|   D|   H|      HDD| Id|  Model|RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-------+---+----------+-----+------+----+
|7.74|0.52|256GB SSD|  2|MacBook|8GB|       12"|11.04|  2.03|2016|
+----+----+---------+---+-------+---+----------+-----+------+----+



### Get all data with Multiple Conditions when "D" greater than or equal "8" and "Model" column is "iMac".

In [37]:
spark.sql('select * from TempView  where D  >=8 and Model == "iMac" ').show()

+---+----+-------+---+-----+----+----------+----+------+----+
|  D|   H|    HDD| Id|Model| RAM|ScreenSize|   W|Weight|Year|
+---+----+-------+---+-----+----+----------+----+------+----+
|8.0|20.3|1TB SSD|  4| iMac|64GB|       27"|25.6|  20.8|2017|
+---+----+-------+---+-----+----+----------+----+------+----+



## Task 2


### Read "test1" dataset:

In [41]:
df2 = spark.read.csv('test1.csv' , header=True)
df2.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



### Display Salary of the people less than or equal to 20000

In [48]:
df2.select("*").where('Salary <= 20000').show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



### Display Salary of the people less than or equal to 20000 and greater than or equal 15000

In [52]:
df2.where('(Salary <= 20000) and (Salary >= 15000)').show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



## Task 3 

### Read "test3" dataset:

In [55]:
df3 = spark.read.csv('test3.csv', header=True)

### Display dataset

In [56]:
df3.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



### Display schema

In [73]:
from pyspark.sql.types import IntegerType
df3 = df3.withColumn("salary", df3["salary"].cast(IntegerType()))

df3.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



### Group by "Name" column and using sum function on "Name" column

In [74]:
df3.select('Name' , 'salary').groupBy('Name').sum().show()

                                                                                

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



                                                                                

### Group by "Name" column and using avg function on "Name" column

In [77]:
df3.select('Name' , 'salary').groupBy('Name').avg().show()

                                                                                

+---------+------------------+
|     Name|       avg(salary)|
+---------+------------------+
|Sudhanshu|11666.666666666666|
|    Sunny|            6000.0|
|    Krish| 6333.333333333333|
|   Mahesh|            3500.0|
+---------+------------------+



                                                                                

### Group by "Departments" column and using sum function on "Departments" column

In [79]:
df3.groupBy('Departments').sum().show()

                                                                                

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



                                                                                

### Group by "Departments" column and using mean function on "Departments" column:

In [80]:
df3.groupBy('Departments').mean().show()

                                                                                

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



Group by "Departments" column and using count function on "Departments" column:

### Apply agg to using sum function get the total of salaries

In [94]:
df3.agg(sum(df3.salary)).show()

+-----------+
|sum(salary)|
+-----------+
|      73000|
+-----------+



In [None]:
df3.agg({'salary':'sum'}).show()

+-----------+
|sum(salary)|
+-----------+
|      73000|
+-----------+



In [95]:
df3.agg(sum('salary')).show()

+-----------+
|sum(salary)|
+-----------+
|      73000|
+-----------+



## Task 4

You've been flown to their headquarters in Ulsan, South Korea, to assist them in accurately estimating the number of crew members a ship will need.


They're currently building new ships for certain customers, and they'd like you to create a model and utilize it to estimate how many crew members the ships will require.


Metadata:
1. Measurements of ship size 
2. capacity 
3. crew 
4. age for 158 cruise ships.

It is saved in a csv file for you called "ITI_data.csv". our task is to develop a regression model that will assist in predicting the number of crew members required for future ships. The client also indicated that they have found that particular cruise lines will differ in acceptable crew counts, thus this is most likely an important factor to consider when conducting your investigation.

In [149]:
df5 = spark.read.csv("ITI_data.csv", header = True, inferSchema=True)
df5.show()

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
|    Destiny|   Carnival| 17|           101.353|     26.42|  8.92| 13.21|            38.36|10.0|
|    Ecstasy|   Carnival| 22|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Elation|   Carnival| 15|            70.367|     20.52|  8.55|  10.2|            34.29| 9.2|
|    Fantasy|   Carnival| 23| 

### OneHotEncoder 


In [150]:
from pyspark.ml.feature import StringIndexer , OneHotEncoder, VectorAssembler
df5 = df5.drop('Ship_name')

###Use VectorAssembler to merge all columns into one column:

In [151]:
trainDF , testDF = df5.randomSplit([0.8,0.2] , seed  =42)
trainDF.show()

+-----------+---+------------------+----------+------+------+-----------------+-----+
|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density| crew|
+-----------+---+------------------+----------+------+------+-----------------+-----+
|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64| 3.55|
|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64| 3.55|
|   Carnival|  8|             110.0|     29.74|  9.51| 14.87|            36.99| 11.6|
|   Carnival|  9|              88.5|     21.24|  9.63| 10.62|            41.67| 10.3|
|   Carnival|  9|             110.0|     29.74|  9.52| 14.87|            36.99| 11.6|
|   Carnival| 11|              86.0|     21.24|  9.63| 10.62|            40.49|  9.3|
|   Carnival| 12|              88.5|     21.24|  9.63| 10.56|            41.67|10.29|
|   Carnival| 12|              88.5|     21.24|  9.63| 11.62|            41.67|  9.3|
|   Carnival| 13|           101.509|     27.58|  8.93|

In [152]:
#In the following section:
strIndexer = StringIndexer(
    inputCol='Cruise_line',
    outputCol='Cruise_line_indexed',handleInvalid='skip') 


#Replace the StringIndexer with OneHotEncoder as follows:
ohtEncoder = OneHotEncoder(dropLast=False,inputCol='Cruise_line_indexed',
            outputCol='Cruise_line_encoded')

In [153]:
numColumns = [f for (f,dt) in trainDF.dtypes if ((dt=='double')&(f != 'crew'))]
numColumns

['Tonnage', 'passengers', 'length', 'cabins', 'passenger_density']

In [159]:
assemblerInputs = ['Cruise_line_encoded'] + numColumns
assemblerInputs

['Cruise_line_encoded',
 'Tonnage',
 'passengers',
 'length',
 'cabins',
 'passenger_density']

In [160]:
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features')

### Create a Linear Regression Model 

In [161]:
from pyspark.ml.regression import LinearRegression
lr =  LinearRegression(labelCol='crew' , featuresCol='features')

### Creating a Pipeline

In [162]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[strIndexer, ohtEncoder,  vecAssembler , lr ])

In [163]:
pipelineModel = pipeline.fit(trainDF)

21/10/28 17:22:42 WARN Instrumentation: [dc41cfbb] regParam is zero, which might cause numerical instability and overfitting.
21/10/28 17:22:42 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/10/28 17:22:42 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
21/10/28 17:22:42 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
21/10/28 17:22:42 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
21/10/28 17:22:42 WARN Instrumentation: [dc41cfbb] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.


In [165]:
predDF = pipelineModel.transform(testDF)

### Model Evaluation

In [171]:
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(predictionCol='prediction', labelCol='crew', metricName='rmse')

In [172]:
rmse = regressionEvaluator.evaluate(predDF)
rmse

1.581945799710334

In [173]:
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(predictionCol='prediction', labelCol='crew', metricName='r2')
r2 = regressionEvaluator.evaluate(predDF)
r2

0.8507705383879058

By Eng. Mostafa Nabieh 
If you have questions, please feel free to ask.

My Email : nabieh.mostafa@yahoo.com

My Whatsapp : +201015197566