## Welcome to our Pyspark tutorial

### 1. Intalling pyspark and reading dataset

In [1]:
# After installing pyspark we will import it and start playing with
# Please install pyspark first if it is not yet done with the command "pip install pyspark"
import pyspark
import pandas as pd

pd.read_csv("name.csv")

Unnamed: 0,Name,Age,Profession,Income
0,Alex,25.0,Data Scientist,120000.0
1,Iannis,22.0,Pro Tennis,9345000.0
2,William,26.0,Entrepreneur,1400000.0
3,Donald,30.0,IT,85000.0
4,Cedric,31.0,Marketing Manager,90000.0
5,Mathew,27.0,Accountant,78000.0
6,Aboubakar,34.0,,
7,,,Designer,65000.0
8,Dan,29.0,Data Scientist,98000.0
9,Chris,28.0,IT,83000.0


In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [4]:
spark

In [5]:
df_pyspark = spark.read.csv('name.csv')

In [6]:
# Reading the dataset with the first row as header
df_pyspark = spark.read.option('header','true').csv('name.csv')

In [7]:
# df_pyspark.head(4)
# And now we can check the data types
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Profession: string (nullable = true)
 |-- Income: string (nullable = true)



### 2. Pyspark Dataframes

In [8]:
# Now let's read the dataset with the appropriate data type for each columns
df_pyspark = spark.read.csv('name.csv', header=True, inferSchema=True)
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Profession: string (nullable = true)
 |-- Income: integer (nullable = true)



In [9]:
# Checking the columns
df_pyspark.columns

['Name', 'Age', 'Profession', 'Income']

In [10]:
# How to select a specific column or a list of columns with the "select" operation
df_pyspark.select(['Name','Age']).show()

+---------+----+
|     Name| Age|
+---------+----+
|     Alex|  25|
|   Iannis|  22|
|  William|  26|
|   Donald|  30|
|   Cedric|  31|
|   Mathew|  27|
|Aboubakar|  34|
|     NULL|NULL|
|      Dan|  29|
|    Chris|  28|
|   Steven|  31|
|      Rob|  38|
+---------+----+



In [11]:
# Checking the data types and describe summary
df_pyspark.dtypes
df_pyspark.describe().show()

+-------+---------+------------------+----------+------------------+
|summary|     Name|               Age|Profession|            Income|
+-------+---------+------------------+----------+------------------+
|  count|       11|                11|        11|                11|
|   mean|     NULL|29.181818181818183|      NULL|1049909.0909090908|
| stddev|     NULL| 4.400413203738526|      NULL|2779160.0333390464|
|    min|Aboubakar|                22|Accountant|             65000|
|    max|  William|                38|Pro Tennis|           9345000|
+-------+---------+------------------+----------+------------------+



In [12]:
### Adding a new column in dataframe, could be also done by aggregating an existing col
from pyspark.sql.functions import when, lit
exp_list = [4, 5, 2, 6, 4, 3]
new_df = df_pyspark.withColumn("Experience",
                              when((df_pyspark.Name == "Alex") | (df_pyspark.Name == "Cedric"), lit(4)).
                               when((df_pyspark.Name == "Iannis"), lit(5)).
                               when((df_pyspark.Name == "William"), lit(2)).
                               when((df_pyspark.Name == "Donald") | (df_pyspark.Name == "Mathew"), lit(3)))
new_df.show()

+---------+----+-----------------+-------+----------+
|     Name| Age|       Profession| Income|Experience|
+---------+----+-----------------+-------+----------+
|     Alex|  25|   Data Scientist| 120000|         4|
|   Iannis|  22|       Pro Tennis|9345000|         5|
|  William|  26|     Entrepreneur|1400000|         2|
|   Donald|  30|               IT|  85000|         3|
|   Cedric|  31|Marketing Manager|  90000|         4|
|   Mathew|  27|       Accountant|  78000|         3|
|Aboubakar|  34|             NULL|   NULL|      NULL|
|     NULL|NULL|         Designer|  65000|      NULL|
|      Dan|  29|   Data Scientist|  98000|      NULL|
|    Chris|  28|               IT|  83000|      NULL|
|   Steven|  31|       Accountant|  88000|      NULL|
|      Rob|  38|     Entrepreneur|  97000|      NULL|
+---------+----+-----------------+-------+----------+



### 3. Handling Missing values

In [13]:
#Dropping columns
df_pyspark.drop('Name').show()

+----+-----------------+-------+
| Age|       Profession| Income|
+----+-----------------+-------+
|  25|   Data Scientist| 120000|
|  22|       Pro Tennis|9345000|
|  26|     Entrepreneur|1400000|
|  30|               IT|  85000|
|  31|Marketing Manager|  90000|
|  27|       Accountant|  78000|
|  34|             NULL|   NULL|
|NULL|         Designer|  65000|
|  29|   Data Scientist|  98000|
|  28|               IT|  83000|
|  31|       Accountant|  88000|
|  38|     Entrepreneur|  97000|
+----+-----------------+-------+



In [14]:
#Now let's focus on dropping a rows with null values with na
df_pyspark.na.drop().show()

+-------+---+-----------------+-------+
|   Name|Age|       Profession| Income|
+-------+---+-----------------+-------+
|   Alex| 25|   Data Scientist| 120000|
| Iannis| 22|       Pro Tennis|9345000|
|William| 26|     Entrepreneur|1400000|
| Donald| 30|               IT|  85000|
| Cedric| 31|Marketing Manager|  90000|
| Mathew| 27|       Accountant|  78000|
|    Dan| 29|   Data Scientist|  98000|
|  Chris| 28|               IT|  83000|
| Steven| 31|       Accountant|  88000|
|    Rob| 38|     Entrepreneur|  97000|
+-------+---+-----------------+-------+



In [15]:
## Dropping with how=any is the default and similar to the previous line of code, let's try any=all
df_pyspark.na.drop(how="all").show() #We notice that nothing happens

+---------+----+-----------------+-------+
|     Name| Age|       Profession| Income|
+---------+----+-----------------+-------+
|     Alex|  25|   Data Scientist| 120000|
|   Iannis|  22|       Pro Tennis|9345000|
|  William|  26|     Entrepreneur|1400000|
|   Donald|  30|               IT|  85000|
|   Cedric|  31|Marketing Manager|  90000|
|   Mathew|  27|       Accountant|  78000|
|Aboubakar|  34|             NULL|   NULL|
|     NULL|NULL|         Designer|  65000|
|      Dan|  29|   Data Scientist|  98000|
|    Chris|  28|               IT|  83000|
|   Steven|  31|       Accountant|  88000|
|      Rob|  38|     Entrepreneur|  97000|
+---------+----+-----------------+-------+



In [16]:
## Threshold is the min number of missing values to drop the row
df_pyspark.na.drop(how="any", thresh=2).show()

+---------+----+-----------------+-------+
|     Name| Age|       Profession| Income|
+---------+----+-----------------+-------+
|     Alex|  25|   Data Scientist| 120000|
|   Iannis|  22|       Pro Tennis|9345000|
|  William|  26|     Entrepreneur|1400000|
|   Donald|  30|               IT|  85000|
|   Cedric|  31|Marketing Manager|  90000|
|   Mathew|  27|       Accountant|  78000|
|Aboubakar|  34|             NULL|   NULL|
|     NULL|NULL|         Designer|  65000|
|      Dan|  29|   Data Scientist|  98000|
|    Chris|  28|               IT|  83000|
|   Steven|  31|       Accountant|  88000|
|      Rob|  38|     Entrepreneur|  97000|
+---------+----+-----------------+-------+



In [17]:
## Subset of dropping
df_pyspark.na.drop(how="any", subset=['Income']).show()

+-------+----+-----------------+-------+
|   Name| Age|       Profession| Income|
+-------+----+-----------------+-------+
|   Alex|  25|   Data Scientist| 120000|
| Iannis|  22|       Pro Tennis|9345000|
|William|  26|     Entrepreneur|1400000|
| Donald|  30|               IT|  85000|
| Cedric|  31|Marketing Manager|  90000|
| Mathew|  27|       Accountant|  78000|
|   NULL|NULL|         Designer|  65000|
|    Dan|  29|   Data Scientist|  98000|
|  Chris|  28|               IT|  83000|
| Steven|  31|       Accountant|  88000|
|    Rob|  38|     Entrepreneur|  97000|
+-------+----+-----------------+-------+



In [18]:
# Filling the missing values with mean of the column with Imputer, you can also set the strategy to median
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['Age', 'Income'],
                 outputCols=["{}_imputed".format(c) for c in ['Age', 'Income']]).setStrategy("median")

In [19]:
imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+----+-----------------+-------+-----------+--------------+
|     Name| Age|       Profession| Income|Age_imputed|Income_imputed|
+---------+----+-----------------+-------+-----------+--------------+
|     Alex|  25|   Data Scientist| 120000|         25|        120000|
|   Iannis|  22|       Pro Tennis|9345000|         22|       9345000|
|  William|  26|     Entrepreneur|1400000|         26|       1400000|
|   Donald|  30|               IT|  85000|         30|         85000|
|   Cedric|  31|Marketing Manager|  90000|         31|         90000|
|   Mathew|  27|       Accountant|  78000|         27|         78000|
|Aboubakar|  34|             NULL|   NULL|         34|         90000|
|     NULL|NULL|         Designer|  65000|         29|         65000|
|      Dan|  29|   Data Scientist|  98000|         29|         98000|
|    Chris|  28|               IT|  83000|         28|         83000|
|   Steven|  31|       Accountant|  88000|         31|         88000|
|      Rob|  38|    

### 4. Filter Operations

In [20]:
# Income <= 90000 and selecting specific columns 
df_pyspark.filter("Income<=90000").select(['Name', 'Age', 'Income']).show()

+------+----+------+
|  Name| Age|Income|
+------+----+------+
|Donald|  30| 85000|
|Cedric|  31| 90000|
|Mathew|  27| 78000|
|  NULL|NULL| 65000|
| Chris|  28| 83000|
|Steven|  31| 88000|
+------+----+------+



In [21]:
# Multiple conditions with operator (&, | )
df_pyspark.filter((df_pyspark['Income'] <=90000) & (df_pyspark['Age'] <= 30)).show()

+------+---+----------+------+
|  Name|Age|Profession|Income|
+------+---+----------+------+
|Donald| 30|        IT| 85000|
|Mathew| 27|Accountant| 78000|
| Chris| 28|        IT| 83000|
+------+---+----------+------+



In [22]:
#Not Operator
df_pyspark.filter(~(df_pyspark['Income'] <=90000)).show()

+-------+---+--------------+-------+
|   Name|Age|    Profession| Income|
+-------+---+--------------+-------+
|   Alex| 25|Data Scientist| 120000|
| Iannis| 22|    Pro Tennis|9345000|
|William| 26|  Entrepreneur|1400000|
|    Dan| 29|Data Scientist|  98000|
|    Rob| 38|  Entrepreneur|  97000|
+-------+---+--------------+-------+



### 5. GroupBy and Aggregate functions

In [23]:
## Groupby works with aggregate functions, let's try some!
df_pyspark.groupBy('Profession').sum().show() 
#Aggregated both Age and Income because we didn't specificy a column

+-----------------+--------+-----------+
|       Profession|sum(Age)|sum(Income)|
+-----------------+--------+-----------+
|Marketing Manager|      31|      90000|
|             NULL|      34|       NULL|
|         Designer|    NULL|      65000|
|   Data Scientist|      54|     218000|
|               IT|      58|     168000|
|     Entrepreneur|      64|    1497000|
|       Accountant|      58|     166000|
|       Pro Tennis|      22|    9345000|
+-----------------+--------+-----------+



In [24]:
# Let's find out the max, mean salary by Profession
df_pyspark.groupBy('Profession').max('Income').show()

+-----------------+-----------+
|       Profession|max(Income)|
+-----------------+-----------+
|Marketing Manager|      90000|
|             NULL|       NULL|
|         Designer|      65000|
|   Data Scientist|     120000|
|               IT|      85000|
|     Entrepreneur|    1400000|
|       Accountant|      88000|
|       Pro Tennis|    9345000|
+-----------------+-----------+



In [25]:
# Counting the number of 
df_pyspark.groupBy('Profession').count().show()

+-----------------+-----+
|       Profession|count|
+-----------------+-----+
|Marketing Manager|    1|
|             NULL|    1|
|         Designer|    1|
|   Data Scientist|    2|
|               IT|    2|
|     Entrepreneur|    2|
|       Accountant|    2|
|       Pro Tennis|    1|
+-----------------+-----+



### 6. Simple ML Regression with Pyspark

In [26]:
# Reading the new dataset into traning_df
training_df = spark.read.csv('name_and_exp.csv', header=True, inferSchema=True)
training_df.show()

+-------+---+----------+------+
|   Name|Age|Experience|Income|
+-------+---+----------+------+
|   Alex| 25|         5|120000|
| Iannis| 22|         4| 93500|
|William| 26|         7|890000|
| Donald| 30|        10| 85000|
| Cedric| 31|         8| 90000|
| Mathew| 27|         2| 78000|
|  Tyler|  6|         6| 95000|
+-------+---+----------+------+



In [27]:
training_df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Income: integer (nullable = true)



In [28]:
# Let import the needed module to combine Age and Experience into our independent feature
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols=["Age","Experience"], outputCol="Indep Features")

In [29]:
output = featureassembler.transform(training_df)

In [30]:
output.show()

+-------+---+----------+------+--------------+
|   Name|Age|Experience|Income|Indep Features|
+-------+---+----------+------+--------------+
|   Alex| 25|         5|120000|    [25.0,5.0]|
| Iannis| 22|         4| 93500|    [22.0,4.0]|
|William| 26|         7|890000|    [26.0,7.0]|
| Donald| 30|        10| 85000|   [30.0,10.0]|
| Cedric| 31|         8| 90000|    [31.0,8.0]|
| Mathew| 27|         2| 78000|    [27.0,2.0]|
|  Tyler|  6|         6| 95000|     [6.0,6.0]|
+-------+---+----------+------+--------------+



In [31]:
finalized_data = output.select("Indep Features", "Income")

In [32]:
finalized_data.show()

+--------------+------+
|Indep Features|Income|
+--------------+------+
|    [25.0,5.0]|120000|
|    [22.0,4.0]| 93500|
|    [26.0,7.0]|890000|
|   [30.0,10.0]| 85000|
|    [31.0,8.0]| 90000|
|    [27.0,2.0]| 78000|
|     [6.0,6.0]| 95000|
+--------------+------+



In [33]:
from pyspark.ml.regression import LinearRegression
train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
regressor = LinearRegression(featuresCol='Indep Features', labelCol='Income')
regressor = regressor.fit(train_data)

In [34]:
### Coefficients of our regression model
regressor.coefficients

DenseVector([-80274.5592, 115479.4291])

In [35]:
## Prediction
pred_res = regressor.evaluate(test_data)

In [36]:
pred_res.predictions.show()

+--------------+------+------------------+
|Indep Features|Income|        prediction|
+--------------+------+------------------+
|     [6.0,6.0]| 95000|1833027.2879932723|
|    [27.0,2.0]| 78000|-314656.1712846339|
+--------------+------+------------------+



End of the tutorial here, the following steps where completed in Databricks platform,
[Here is the link of the notebook on Databricks](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5640477560811446/712977470269093/3978417781751019/latest.html)


&copy; This notebook was inspired by the [freeCodeCamp.org Pyshark tutorial on YouTube](https://www.youtube.com/watch?v=_C8kWso4ne4). The original dataset was dropped, the one used was created by the owner of the notebook and some lines of code were intentionally changed for a better hands-on experience of the tool.