# PySpark Documentation

## Table of contents

* [Basic packages](#Basic-packages)
* [Create spark session](#Create-spark-session)
* [Read CSV file](#Read-CSV-file)
* [Checking data info](#Checking-data-info)
* [Checking data type of column](#Checking-data-type-of-column)
* [Get data by row or column](#Get-data-by-row-or-column)
* [Data description](#Data-description)
* [Add columns in data frame](#Add-columns-in-data-frame)
* [Drop the columns](#Drop-the-columns)
* [Rename the column](#Rename-the-column)
* [Drop na by row](#Drop-na-by-row)
* [Fill the missing values](#Fill-the-missing-values)
* [Imputer](#Imputer)
* [GroupBy and Aggregate Function](#GroupBy-and-Aggregate-Function)
* [Pyspark ML](#Pyspark-ML)

## Basic-packages
[back to top](#PySpark-Documentation)

In [1]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession

## Create spark session
[back to top](#PySpark-Documentation)

In [2]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [3]:
spark

## Read CSV file
[back to top](#PySpark-Documentation)

In [4]:
df_pyspark = spark.read \
                  .option('header', 'true') \
                  .csv('../Data/test.csv', inferSchema=True)  # read CSV and infer colums type
# df_pyspark = spark.read.csv('../Data/test.csv', inferSchema=True, header=True)


df_pyspark.show()

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    Krish|31.0|      10.0|30000.0|
|Sudhanshu|30.0|       8.0|25000.0|
|    Sunny|29.0|       4.0|20000.0|
|     Paul|24.0|       3.0|20000.0|
|   Harsha|21.0|       1.0|15000.0|
|  Shubham|23.0|       2.0|18000.0|
|  Maheesh|null|      null|40000.0|
|     null|34.0|      10.0|38000.0|
|     null|36.0|      null|   null|
+---------+----+----------+-------+



In [5]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

## Checking data info
[back to top](#PySpark-Documentation)

In [6]:
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- experience: double (nullable = true)
 |-- salary: double (nullable = true)



## Checking data type of column
[back to top](#PySpark-Documentation)

In [7]:
df_pyspark.dtypes

[('name', 'string'),
 ('age', 'double'),
 ('experience', 'double'),
 ('salary', 'double')]

## Get data by row or column
[back to top](#PySpark-Documentation)

In [8]:
df_pyspark.head(3)

[Row(name='Krish', age=31.0, experience=10.0, salary=30000.0),
 Row(name='Sudhanshu', age=30.0, experience=8.0, salary=25000.0),
 Row(name='Sunny', age=29.0, experience=4.0, salary=20000.0)]

In [9]:
df_pyspark.select(['name', 'experience']).show()

+---------+----------+
|     name|experience|
+---------+----------+
|    Krish|      10.0|
|Sudhanshu|       8.0|
|    Sunny|       4.0|
|     Paul|       3.0|
|   Harsha|       1.0|
|  Shubham|       2.0|
|  Maheesh|      null|
|     null|      10.0|
|     null|      null|
+---------+----------+



## Data description
[back to top](#PySpark-Documentation)

In [10]:
df_pyspark.describe().show()

+-------+------+------------------+------------------+-----------------+
|summary|  name|               age|        experience|           salary|
+-------+------+------------------+------------------+-----------------+
|  count|     7|                 8|                 7|                8|
|   mean|  null|              28.5| 5.428571428571429|          25750.0|
| stddev|  null|5.3718844791323335|3.8234863173611093|9361.776388210581|
|    min|Harsha|              21.0|               1.0|          15000.0|
|    max| Sunny|              36.0|              10.0|          40000.0|
+-------+------+------------------+------------------+-----------------+



## Add columns in data frame
[back to top](#PySpark-Documentation)

In [11]:
df_pyspark = df_pyspark.withColumn('experience after 2 year', df_pyspark['experience']+2)
df_pyspark.show()

+---------+----+----------+-------+-----------------------+
|     name| age|experience| salary|experience after 2 year|
+---------+----+----------+-------+-----------------------+
|    Krish|31.0|      10.0|30000.0|                   12.0|
|Sudhanshu|30.0|       8.0|25000.0|                   10.0|
|    Sunny|29.0|       4.0|20000.0|                    6.0|
|     Paul|24.0|       3.0|20000.0|                    5.0|
|   Harsha|21.0|       1.0|15000.0|                    3.0|
|  Shubham|23.0|       2.0|18000.0|                    4.0|
|  Maheesh|null|      null|40000.0|                   null|
|     null|34.0|      10.0|38000.0|                   12.0|
|     null|36.0|      null|   null|                   null|
+---------+----+----------+-------+-----------------------+



## Drop the columns
[back to top](#PySpark-Documentation)

In [12]:
df_pyspark = df_pyspark.drop('experience after 2 year')
df_pyspark.show()

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    Krish|31.0|      10.0|30000.0|
|Sudhanshu|30.0|       8.0|25000.0|
|    Sunny|29.0|       4.0|20000.0|
|     Paul|24.0|       3.0|20000.0|
|   Harsha|21.0|       1.0|15000.0|
|  Shubham|23.0|       2.0|18000.0|
|  Maheesh|null|      null|40000.0|
|     null|34.0|      10.0|38000.0|
|     null|36.0|      null|   null|
+---------+----+----------+-------+



## Rename the column
[back to top](#PySpark-Documentation)

In [13]:
df_pyspark.withColumnRenamed('name', 'new name').show()

+---------+----+----------+-------+
| new name| age|experience| salary|
+---------+----+----------+-------+
|    Krish|31.0|      10.0|30000.0|
|Sudhanshu|30.0|       8.0|25000.0|
|    Sunny|29.0|       4.0|20000.0|
|     Paul|24.0|       3.0|20000.0|
|   Harsha|21.0|       1.0|15000.0|
|  Shubham|23.0|       2.0|18000.0|
|  Maheesh|null|      null|40000.0|
|     null|34.0|      10.0|38000.0|
|     null|36.0|      null|   null|
+---------+----+----------+-------+



## Drop na by row
[back to top](#PySpark-Documentation)

In [14]:
df_pyspark.na.drop(how='all').show()

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    Krish|31.0|      10.0|30000.0|
|Sudhanshu|30.0|       8.0|25000.0|
|    Sunny|29.0|       4.0|20000.0|
|     Paul|24.0|       3.0|20000.0|
|   Harsha|21.0|       1.0|15000.0|
|  Shubham|23.0|       2.0|18000.0|
|  Maheesh|null|      null|40000.0|
|     null|34.0|      10.0|38000.0|
|     null|36.0|      null|   null|
+---------+----+----------+-------+



In [15]:
# thresh: how many value can't be null
df_pyspark.na.drop(how='any', thresh=2).show()

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    Krish|31.0|      10.0|30000.0|
|Sudhanshu|30.0|       8.0|25000.0|
|    Sunny|29.0|       4.0|20000.0|
|     Paul|24.0|       3.0|20000.0|
|   Harsha|21.0|       1.0|15000.0|
|  Shubham|23.0|       2.0|18000.0|
|  Maheesh|null|      null|40000.0|
|     null|34.0|      10.0|38000.0|
+---------+----+----------+-------+



In [16]:
df_pyspark.na.drop(how='any', thresh=1, subset=['age']).show()

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    Krish|31.0|      10.0|30000.0|
|Sudhanshu|30.0|       8.0|25000.0|
|    Sunny|29.0|       4.0|20000.0|
|     Paul|24.0|       3.0|20000.0|
|   Harsha|21.0|       1.0|15000.0|
|  Shubham|23.0|       2.0|18000.0|
|     null|34.0|      10.0|38000.0|
|     null|36.0|      null|   null|
+---------+----+----------+-------+



## Fill the missing values
[back to top](#PySpark-Documentation)

Spark will fill the missing value with the parameter you give, but only for columns that have the same type 

In [17]:
df_pyspark.na.fill('missing value').show()

+-------------+----+----------+-------+
|         name| age|experience| salary|
+-------------+----+----------+-------+
|        Krish|31.0|      10.0|30000.0|
|    Sudhanshu|30.0|       8.0|25000.0|
|        Sunny|29.0|       4.0|20000.0|
|         Paul|24.0|       3.0|20000.0|
|       Harsha|21.0|       1.0|15000.0|
|      Shubham|23.0|       2.0|18000.0|
|      Maheesh|null|      null|40000.0|
|missing value|34.0|      10.0|38000.0|
|missing value|36.0|      null|   null|
+-------------+----+----------+-------+



In [18]:
df_pyspark.na.fill(100).show()

+---------+-----+----------+-------+
|     name|  age|experience| salary|
+---------+-----+----------+-------+
|    Krish| 31.0|      10.0|30000.0|
|Sudhanshu| 30.0|       8.0|25000.0|
|    Sunny| 29.0|       4.0|20000.0|
|     Paul| 24.0|       3.0|20000.0|
|   Harsha| 21.0|       1.0|15000.0|
|  Shubham| 23.0|       2.0|18000.0|
|  Maheesh|100.0|     100.0|40000.0|
|     null| 34.0|      10.0|38000.0|
|     null| 36.0|     100.0|  100.0|
+---------+-----+----------+-------+



## Imputer
[back to top](#PySpark-Documentation)

In [19]:
from pyspark.ml.feature import Imputer

In [20]:
imputer = Imputer(
    inputCols = ['age', 'experience', 'salary'],
    outputCols = [f'{c}_imputed' for c in ['age', 'experience', 'salary']]
).setStrategy('mean')

In [21]:
# add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

+---------+----+----------+-------+-----------+------------------+--------------+
|     name| age|experience| salary|age_imputed|experience_imputed|salary_imputed|
+---------+----+----------+-------+-----------+------------------+--------------+
|    Krish|31.0|      10.0|30000.0|       31.0|              10.0|       30000.0|
|Sudhanshu|30.0|       8.0|25000.0|       30.0|               8.0|       25000.0|
|    Sunny|29.0|       4.0|20000.0|       29.0|               4.0|       20000.0|
|     Paul|24.0|       3.0|20000.0|       24.0|               3.0|       20000.0|
|   Harsha|21.0|       1.0|15000.0|       21.0|               1.0|       15000.0|
|  Shubham|23.0|       2.0|18000.0|       23.0|               2.0|       18000.0|
|  Maheesh|null|      null|40000.0|       28.5| 5.428571428571429|       40000.0|
|     null|34.0|      10.0|38000.0|       34.0|              10.0|       38000.0|
|     null|36.0|      null|   null|       36.0| 5.428571428571429|       25750.0|
+---------+----+

## Filter Operation
[back to top](#PySpark-Documentation)

In [22]:
df_pyspark.filter('salary<=20000').show()

+-------+----+----------+-------+
|   name| age|experience| salary|
+-------+----+----------+-------+
|  Sunny|29.0|       4.0|20000.0|
|   Paul|24.0|       3.0|20000.0|
| Harsha|21.0|       1.0|15000.0|
|Shubham|23.0|       2.0|18000.0|
+-------+----+----------+-------+



In [23]:
df_pyspark.filter(
    (df_pyspark['salary'] <= 19000) &
    (df_pyspark['salary'] >= 16000)
).show()

+-------+----+----------+-------+
|   name| age|experience| salary|
+-------+----+----------+-------+
|Shubham|23.0|       2.0|18000.0|
+-------+----+----------+-------+



In [24]:
df_pyspark.filter(
    ~(df_pyspark['salary'] <= 20000) 
).show()

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    Krish|31.0|      10.0|30000.0|
|Sudhanshu|30.0|       8.0|25000.0|
|  Maheesh|null|      null|40000.0|
|     null|34.0|      10.0|38000.0|
+---------+----+----------+-------+



## GroupBy and Aggregate Function
[back to top](#PySpark-Documentation)

In [25]:
df_pyspark = spark.read \
                  .option('header', 'true') \
                  .csv('../Data/test2.csv', inferSchema=True) 

In [26]:
df_pyspark.show()

+---------+------------+------+
|     name|  department|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|         IOT|  4000|
|    Krish|         IOT|  4000|
|   Mahesh|         IOT|  3000|
|Sudhanshu|         IOT| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|         IOT|  5000|
|    Sunny|         IOT| 10000|
|    Sunny|         IOT|  2000|
+---------+------------+------+



In [27]:
df_pyspark.printSchema()

root
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = true)



In [28]:
df_pyspark.groupBy('Name').sum().show()
df_pyspark.groupBy('Name').count().show()
df_pyspark.groupBy('Name').max().show()
df_pyspark.agg({'salary': 'sum'}).show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+

+---------+-----+
|     Name|count|
+---------+-----+
|Sudhanshu|    3|
|    Sunny|    2|
|    Krish|    3|
|   Mahesh|    2|
+---------+-----+

+---------+-----------+
|     Name|max(salary)|
+---------+-----------+
|Sudhanshu|      20000|
|    Sunny|      10000|
|    Krish|      10000|
|   Mahesh|       4000|
+---------+-----------+

+-----------+
|sum(salary)|
+-----------+
|      73000|
+-----------+



## Pyspark ML
[back to top](#PySpark-Documentation)

In [29]:
from pyspark.ml.feature import VectorAssembler

In [30]:
sdf = spark.read \
           .option('header', 'true') \
           .csv('../Data/test.csv', inferSchema=True) 

In [31]:
sdf.show()

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    Krish|31.0|      10.0|30000.0|
|Sudhanshu|30.0|       8.0|25000.0|
|    Sunny|29.0|       4.0|20000.0|
|     Paul|24.0|       3.0|20000.0|
|   Harsha|21.0|       1.0|15000.0|
|  Shubham|23.0|       2.0|18000.0|
|  Maheesh|null|      null|40000.0|
|     null|34.0|      10.0|38000.0|
|     null|36.0|      null|   null|
+---------+----+----------+-------+



In [32]:
sdf = sdf.na.drop(how='any')

In [33]:
sdf.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- experience: double (nullable = true)
 |-- salary: double (nullable = true)



In [34]:
featureassembler = VectorAssembler(inputCols=['age', 'experience'], outputCol='input feature')
sdf = featureassembler.transform(sdf)

In [35]:
sdf.show()

+---------+----+----------+-------+-------------+
|     name| age|experience| salary|input feature|
+---------+----+----------+-------+-------------+
|    Krish|31.0|      10.0|30000.0|  [31.0,10.0]|
|Sudhanshu|30.0|       8.0|25000.0|   [30.0,8.0]|
|    Sunny|29.0|       4.0|20000.0|   [29.0,4.0]|
|     Paul|24.0|       3.0|20000.0|   [24.0,3.0]|
|   Harsha|21.0|       1.0|15000.0|   [21.0,1.0]|
|  Shubham|23.0|       2.0|18000.0|   [23.0,2.0]|
+---------+----+----------+-------+-------------+



In [36]:
finalized_data = sdf.select('input feature', 'salary')
finalized_data.show()

+-------------+-------+
|input feature| salary|
+-------------+-------+
|  [31.0,10.0]|30000.0|
|   [30.0,8.0]|25000.0|
|   [29.0,4.0]|20000.0|
|   [24.0,3.0]|20000.0|
|   [21.0,1.0]|15000.0|
|   [23.0,2.0]|18000.0|
+-------------+-------+



In [37]:
from pyspark.ml.regression import LinearRegression

In [38]:
# train test split
train_data, test_data = finalized_data.randomSplit([0.5, 0.5])
regressor = LinearRegression(featuresCol='input feature', labelCol='salary')
regressor = regressor.fit(train_data)

In [39]:
regressor.coefficients

DenseVector([833.3333, 1250.0])

In [40]:
regressor.intercept

-3749.9999999996494

In [41]:
test_data.show()

+-------------+-------+
|input feature| salary|
+-------------+-------+
|   [23.0,2.0]|18000.0|
|   [29.0,4.0]|20000.0|
|   [30.0,8.0]|25000.0|
|  [31.0,10.0]|30000.0|
+-------------+-------+



In [42]:
pred_results = regressor.evaluate(test_data)

In [43]:
pred_results.predictions.show()



+-------------+-------+------------------+
|input feature| salary|        prediction|
+-------------+-------+------------------+
|   [23.0,2.0]|18000.0| 17916.66666666666|
|   [29.0,4.0]|20000.0|25416.666666666562|
|   [30.0,8.0]|25000.0| 31249.99999999987|
|  [31.0,10.0]|30000.0|34583.333333333176|
+-------------+-------+------------------+



In [44]:
pred_results.meanAbsoluteError, pred_results.meanSquaredError

(4083.3333333332366, 22354166.666665614)

## Handling categorical feature

In [45]:
from pyspark.ml.feature import StringIndexer

In [46]:
sdf = spark.read \
           .option('header', 'true') \
           .csv('../Data/test.csv', inferSchema=True) 

In [47]:
sdf.show()

+---------+----+----------+-------+
|     name| age|experience| salary|
+---------+----+----------+-------+
|    Krish|31.0|      10.0|30000.0|
|Sudhanshu|30.0|       8.0|25000.0|
|    Sunny|29.0|       4.0|20000.0|
|     Paul|24.0|       3.0|20000.0|
|   Harsha|21.0|       1.0|15000.0|
|  Shubham|23.0|       2.0|18000.0|
|  Maheesh|null|      null|40000.0|
|     null|34.0|      10.0|38000.0|
|     null|36.0|      null|   null|
+---------+----+----------+-------+



In [48]:
sdf = sdf.na.drop(how='any')

In [49]:
indexer = StringIndexer(inputCol='name', outputCol='name_indexed')

In [50]:
sdf = indexer.fit(sdf).transform(sdf)
sdf.show()

+---------+----+----------+-------+------------+
|     name| age|experience| salary|name_indexed|
+---------+----+----------+-------+------------+
|    Krish|31.0|      10.0|30000.0|         1.0|
|Sudhanshu|30.0|       8.0|25000.0|         4.0|
|    Sunny|29.0|       4.0|20000.0|         5.0|
|     Paul|24.0|       3.0|20000.0|         2.0|
|   Harsha|21.0|       1.0|15000.0|         0.0|
|  Shubham|23.0|       2.0|18000.0|         3.0|
+---------+----+----------+-------+------------+

