## Task 1 - SQL

### Build SparkSession:

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.appName('sql_HW').master('local[*]').getOrCreate()

### Read the json file:

In [3]:
my_schema = 'D DOUBLE, H DOUBLE, HDD STRING, Id INT, Model STRING, RAM STRING, ScreenSize STRING, W DOUBLE, Weight DOUBLE, Year String'
path_json = 'DataFrames_sample.json'


df = spark.read.json(path=path_json, schema=my_schema)

In [4]:
df.show(10, truncate=False)

+----+----+---------+---+-----------+----+----------+-----+------+----+
|D   |H   |HDD      |Id |Model      |RAM |ScreenSize|W    |Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|1  |MacBook Pro|16GB|15"       |13.75|4.02  |2015|
|7.74|0.52|256GB SSD|2  |MacBook    |8GB |12"       |11.04|2.03  |2016|
|8.94|0.68|128GB SSD|3  |MacBook Air|8GB |13.3"     |12.8 |2.96  |2016|
|8.0 |20.3|1TB SSD  |4  |iMac       |64GB|27"       |25.6 |20.8  |2017|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Display the schema:


In [5]:
df.printSchema()

root
 |-- D: double (nullable = true)
 |-- H: double (nullable = true)
 |-- HDD: string (nullable = true)
 |-- Id: integer (nullable = true)
 |-- Model: string (nullable = true)
 |-- RAM: string (nullable = true)
 |-- ScreenSize: string (nullable = true)
 |-- W: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- Year: string (nullable = true)



### Get all the data when "Model" equal "MacBook Pro":

In [6]:
df.select('Year').where(df['Model']=='MacBook Pro').show()

+----+
|Year|
+----+
|2015|
+----+



### Create TempView:

In [7]:
df.createOrReplaceTempView('json_view')

### Display "RAM"column and count "RAM" column:

In [8]:
spark.sql('SELECT RAM, count(RAM) AS Count_RAM from json_view GROUP BY RAM ORDER BY Count_RAM DESC').show()

+----+---------+
| RAM|Count_RAM|
+----+---------+
| 8GB|        2|
|64GB|        1|
|16GB|        1|
+----+---------+



### Get all columns when "Year" column equal "2015"  

In [9]:
spark.sql('SELECT * FROM json_view WHERE Year="2015"').show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Get all when "Model" start with "M":

In [10]:
from pyspark.sql.functions import substring

In [11]:
spark.sql("SELECT Model from json_view where SUBSTRING(Model, 0, 1)='M'").show()

+-----------+
|      Model|
+-----------+
|MacBook Pro|
|    MacBook|
|MacBook Air|
+-----------+



`OR`

In [12]:
df.select('Model').where(substring(df['Model'], 0, 1)=='M').show()

+-----------+
|      Model|
+-----------+
|MacBook Pro|
|    MacBook|
|MacBook Air|
+-----------+



### Get all data when "Model" column equal "MacBook Pro"

In [13]:
spark.sql('SELECT * FROM json_view WHERE Model="MacBook Pro"').show()

+----+----+---------+---+-----------+----+----------+-----+------+----+
|   D|   H|      HDD| Id|      Model| RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-----------+----+----------+-----+------+----+
|9.48|0.61|512GB SSD|  1|MacBook Pro|16GB|       15"|13.75|  4.02|2015|
+----+----+---------+---+-----------+----+----------+-----+------+----+



### Get all data with Multiple Conditions when "RAM" column equal "8GB" and "Model" column is "Macbook".

In [14]:
spark.sql('SELECT * FROM json_view WHERE RAM="8GB" AND Model="MacBook"').show()

+----+----+---------+---+-------+---+----------+-----+------+----+
|   D|   H|      HDD| Id|  Model|RAM|ScreenSize|    W|Weight|Year|
+----+----+---------+---+-------+---+----------+-----+------+----+
|7.74|0.52|256GB SSD|  2|MacBook|8GB|       12"|11.04|  2.03|2016|
+----+----+---------+---+-------+---+----------+-----+------+----+



### Get all data with Multiple Conditions when "D" greater than or equal "8" and "Model" column is "iMac".

In [15]:
spark.sql('SELECT * from json_view WHERE D>=8 AND Model="iMac"').show()

+---+----+-------+---+-----+----+----------+----+------+----+
|  D|   H|    HDD| Id|Model| RAM|ScreenSize|   W|Weight|Year|
+---+----+-------+---+-----+----+----------+----+------+----+
|8.0|20.3|1TB SSD|  4| iMac|64GB|       27"|25.6|  20.8|2017|
+---+----+-------+---+-----+----+----------+----+------+----+



## Task 2


### Read "test1" dataset:

In [16]:
df = spark.read.csv(path='test1.csv', header=True, inferSchema=True)

In [17]:
df.show(20)

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [18]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



### Display Salary of the people less than or equal to 20000

In [19]:
df.select('*').where(df['Salary']<=20000).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



### Display Salary of the people less than or equal to 20000 and greater than or equal 15000

In [20]:
df.select('*').where((df['Salary']<=20000) & (df['Salary']>=15000)).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



## Task 3 

### Read "test3" dataset:

In [21]:
df = spark.read.csv(path='test3.csv', header=True, inferSchema=True)

### Display dataset

In [22]:
df.show(20, truncate=False)

+---------+------------+------+
|Name     |Departments |salary|
+---------+------------+------+
|Krish    |Data Science|10000 |
|Krish    |IOT         |5000  |
|Mahesh   |Big Data    |4000  |
|Krish    |Big Data    |4000  |
|Mahesh   |Data Science|3000  |
|Sudhanshu|Data Science|20000 |
|Sudhanshu|IOT         |10000 |
|Sudhanshu|Big Data    |5000  |
|Sunny    |Data Science|10000 |
|Sunny    |Big Data    |2000  |
+---------+------------+------+



### Display schema

In [23]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



### Group by "Name" column and using sum function on "Name" column

In [24]:
df.select('*').groupBy('Name').count().orderBy('count', ascending=False).show()

+---------+-----+
|     Name|count|
+---------+-----+
|    Krish|    3|
|Sudhanshu|    3|
|    Sunny|    2|
|   Mahesh|    2|
+---------+-----+



### Group by "Name" column and using avg function on "Name" column

In [25]:
df.createOrReplaceTempView('test3_view')

In [26]:
spark.sql('SELECT Name, count(Name)/(select count(*) from test3_view) AS name_avg from test3_view GROUP BY Name ORDER BY name_avg DESC').show()

+---------+--------+
|     Name|name_avg|
+---------+--------+
|Sudhanshu|     0.3|
|    Krish|     0.3|
|    Sunny|     0.2|
|   Mahesh|     0.2|
+---------+--------+



### Group by "Departments" column and using sum function on "Departments" column

In [27]:
spark.sql('SELECT Departments, count(Departments) as count_dept from test3_view GROUP BY Departments ORDER BY count_dept DESC').show()

+------------+----------+
| Departments|count_dept|
+------------+----------+
|    Big Data|         4|
|Data Science|         4|
|         IOT|         2|
+------------+----------+



### Group by "Departments" column and using mean function on "Departments" column:

In [28]:
spark.sql('SELECT Departments, count(Departments)/(select count(*) from test3_view) as avg_dept from test3_view GROUP BY Departments ORDER BY avg_dept DESC').show()

+------------+--------+
| Departments|avg_dept|
+------------+--------+
|    Big Data|     0.4|
|Data Science|     0.4|
|         IOT|     0.2|
+------------+--------+



### Apply agg to using sum function get the total of salaries

In [29]:
spark.sql('SELECT SUM(Salary) as sum_salary from test3_view').show()

+----------+
|sum_salary|
+----------+
|     73000|
+----------+



`OR`

In [30]:
df.agg(sum('salary')).show()

+-----------+
|sum(salary)|
+-----------+
|      73000|
+-----------+



## Task 4

You've been flown to their headquarters in Ulsan, South Korea, to assist them in accurately estimating the number of crew members a ship will need.


They're currently building new ships for certain customers, and they'd like you to create a model and utilize it to estimate how many crew members the ships will require.


Metadata:
1. Measurements of ship size 
2. capacity 
3. crew 
4. age for 158 cruise ships.

It is saved in a csv file for you called "ITI_data.csv". our task is to develop a regression model that will assist in predicting the number of crew members required for future ships. The client also indicated that they have found that particular cruise lines will differ in acceptable crew counts, thus this is most likely an important factor to consider when conducting your investigation.

In [31]:
df = spark.read.csv('ITI_data.csv', header=True, inferSchema=True)
df.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [32]:
df.dtypes

[('Ship_name', 'string'),
 ('Cruise_line', 'string'),
 ('Age', 'int'),
 ('Tonnage', 'double'),
 ('passengers', 'double'),
 ('length', 'double'),
 ('cabins', 'double'),
 ('passenger_density', 'double'),
 ('crew', 'double')]

In [33]:
categ_cols = [f for (f, dt) in df.dtypes if (dt=='string') & (f!='Ship_name')]
num_cols = [f for (f, dt) in df.dtypes if ((dt=='double') | (dt=='int')) & (f!='crew')]

In [34]:
## repare cols for string_indexer and hot_encoder

indexed_cols = [c + '_index' for c in categ_cols]
encoded_cols = [c + '_OHE' for c in categ_cols]

### OneHotEncoder 


In [35]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [36]:
str_indexer = StringIndexer(inputCols=categ_cols, outputCols=indexed_cols, 
                            handleInvalid='skip')
hot_encoder = OneHotEncoder(inputCols=indexed_cols, outputCols=encoded_cols)

###Use VectorAssembler to merge all columns into one column:

In [37]:
total_cols = num_cols + encoded_cols

In [38]:
vect_assembler = VectorAssembler(inputCols=total_cols, outputCol='features')

### Create a Linear Regression Model 

In [39]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

In [40]:
lr = LinearRegression(featuresCol='features', labelCol='crew')

### Creating a Pipeline

In [41]:
pipeline = Pipeline(stages=[str_indexer, hot_encoder, vect_assembler, lr])

In [42]:
model = pipeline.fit(df)  ## for all dataset as it is very small

### Model Evaluation

In [43]:
preds_vals = model.transform(df)

In [44]:
preds_vals.select('prediction', 'crew').show()

+------------------+----+
|        prediction|crew|
+------------------+----+
|3.5500000000000007|3.55|
|3.5500000000000007|3.55|
| 6.700648095399965| 6.7|
|12.573736167893333|19.1|
|11.310990686634746|10.0|
| 9.109031795050672| 9.2|
| 9.058135555614097| 9.2|
| 9.126838852468705| 9.2|
| 9.087219121006427| 9.2|
| 11.80771764116933|11.5|
|12.549515113518257|11.6|
| 6.631811043386066| 6.6|
| 9.079948229658346| 9.2|
| 9.072677338310264| 9.2|
|10.019355646122154| 9.3|
|12.534973330822094|11.6|
|10.042130722991768|10.3|
| 9.058135555614097| 9.2|
| 10.78880348773949| 9.3|
| 9.094490012354509| 9.2|
+------------------+----+
only showing top 20 rows



In [45]:
evaluate_rmse = RegressionEvaluator(predictionCol='prediction', labelCol='crew', 
                                    metricName='rmse')
evaluate_r2 = RegressionEvaluator(predictionCol='prediction', labelCol='crew', 
                                    metricName='r2')

rmse = evaluate_rmse.evaluate(preds_vals)
r2 = evaluate_r2.evaluate(preds_vals)

print('rmse =>', rmse)
print('r2 =>', r2)

rmse => 0.8007646555929624
r2 => 0.947426569841828


By Eng. Mostafa Nabieh 
If you have questions, please feel free to ask.

My Email : nabieh.mostafa@yahoo.com

My Whatsapp : +201015197566