### PySpark Intro

In [None]:
#!pip3 install pyspark

In [1]:
import pyspark

In [75]:
import pandas as pd
pd.read_csv('data.csv')

Unnamed: 0,Name,Age,Experience
0,Shri,29,2
1,Kris,22,3
2,Adarsh,22,4


In [76]:
from pyspark.sql import SparkSession

In [77]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [78]:
spark

In [79]:
df_pySpark = spark.read.csv('data.csv')

In [80]:
df_pySpark

DataFrame[_c0: string, _c1: string, _c2: string]

In [81]:
df_pySpark.show()

+------+---+----------+
|   _c0|_c1|       _c2|
+------+---+----------+
|  Name|Age|Experience|
|  Shri| 29|         2|
|  Kris| 22|         3|
|Adarsh| 22|         4|
+------+---+----------+



In [82]:
df_pySpark = spark.read.option('header', 'true').csv('data.csv')

In [83]:
df_pySpark.show()

+------+---+----------+
|  Name|Age|Experience|
+------+---+----------+
|  Shri| 29|         2|
|  Kris| 22|         3|
|Adarsh| 22|         4|
+------+---+----------+



In [84]:
type(df_pySpark)

pyspark.sql.dataframe.DataFrame

In [85]:
df_pySpark.head()

Row(Name='Shri', Age='29', Experience='2')

In [86]:
# Check Schema
df_pySpark.printSchema

<bound method DataFrame.printSchema of DataFrame[Name: string, Age: string, Experience: string]>

### Pyspark DataFrames Part I

In [87]:
# Here infer schema is used for the understanding of Data types becuase as you can observe above that 
# by default for all columns it was taking as String as datatype 
df_pySpark = spark.read.option('header', 'true').csv('data.csv', inferSchema = True)

In [88]:
df_pySpark = spark.read.csv('data.csv', header = True, inferSchema = True) # The other way of specifying the same

In [89]:
df_pySpark.printSchema # As you can see now the age and experience as the type int

<bound method DataFrame.printSchema of DataFrame[Name: string, Age: int, Experience: int]>

In [90]:
type(df_pySpark)

pyspark.sql.dataframe.DataFrame

In [91]:
df_pySpark.columns

['Name', 'Age', 'Experience']

In [92]:
#Selecting a particular column

df_pySpark.select('Name')

DataFrame[Name: string]

In [93]:
df_pySpark.select('Name').show()

+------+
|  Name|
+------+
|  Shri|
|  Kris|
|Adarsh|
+------+



In [94]:
# Selecting multiple columns

df_pySpark.select(['Name', 'Age'])

DataFrame[Name: string, Age: int]

In [95]:
df_pySpark.select(['Name','Age']).show()

+------+---+
|  Name|Age|
+------+---+
|  Shri| 29|
|  Kris| 22|
|Adarsh| 22|
+------+---+



In [96]:
df_pySpark.dtypes

[('Name', 'string'), ('Age', 'int'), ('Experience', 'int')]

In [97]:
df_pySpark.describe()

DataFrame[summary: string, Name: string, Age: string, Experience: string]

In [98]:
df_pySpark.describe().show()

+-------+------+------------------+----------+
|summary|  Name|               Age|Experience|
+-------+------+------------------+----------+
|  count|     3|                 3|         3|
|   mean|  NULL|24.333333333333332|       3.0|
| stddev|  NULL| 4.041451884327381|       1.0|
|    min|Adarsh|                22|         2|
|    max|  Shri|                29|         4|
+-------+------+------------------+----------+



In [99]:
df_pySpark = df_pySpark.withColumn('Experience After 2 Years', df_pySpark['Experience']+2)

In [100]:
df_pySpark.show()

+------+---+----------+------------------------+
|  Name|Age|Experience|Experience After 2 Years|
+------+---+----------+------------------------+
|  Shri| 29|         2|                       4|
|  Kris| 22|         3|                       5|
|Adarsh| 22|         4|                       6|
+------+---+----------+------------------------+



In [101]:
# post reassigning the variable 
df_pySpark.show()

+------+---+----------+------------------------+
|  Name|Age|Experience|Experience After 2 Years|
+------+---+----------+------------------------+
|  Shri| 29|         2|                       4|
|  Kris| 22|         3|                       5|
|Adarsh| 22|         4|                       6|
+------+---+----------+------------------------+



In [103]:
df_pySpark = df_pySpark.drop('Experience After 2 Years') # Deleting a column

In [104]:
df_pySpark.show()

+------+---+----------+
|  Name|Age|Experience|
+------+---+----------+
|  Shri| 29|         2|
|  Kris| 22|         3|
|Adarsh| 22|         4|
+------+---+----------+



In [105]:
df_pySpark.withColumnRenamed('Name','Username').show()

+--------+---+----------+
|Username|Age|Experience|
+--------+---+----------+
|    Shri| 29|         2|
|    Kris| 22|         3|
|  Adarsh| 22|         4|
+--------+---+----------+



### Handling Missing Values

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [4]:
df = spark.read.csv('data.csv', header = True, inferSchema = True)

In [5]:
df.show()

+------+----+----------+------+
|  Name| Age|Experience|Salary|
+------+----+----------+------+
|  Shri|  29|         2|  NULL|
|  Kris|  22|         3| 12000|
|Adarsh|  22|         4| 81000|
| Anand|NULL|        12| 10101|
|  NULL|  43|      NULL| 12243|
|  NULL|  21|         4|  8425|
+------+----+----------+------+



In [6]:
df.drop('Name')

DataFrame[Age: int, Experience: int, Salary: int]

In [7]:
df

DataFrame[Name: string, Age: int, Experience: int, Salary: int]

In [8]:
# Dropping NAN values

In [9]:
df.na.drop().show()

+------+---+----------+------+
|  Name|Age|Experience|Salary|
+------+---+----------+------+
|  Kris| 22|         3| 12000|
|Adarsh| 22|         4| 81000|
+------+---+----------+------+



In [10]:
df

DataFrame[Name: string, Age: int, Experience: int, Salary: int]

In [11]:
df.show()

+------+----+----------+------+
|  Name| Age|Experience|Salary|
+------+----+----------+------+
|  Shri|  29|         2|  NULL|
|  Kris|  22|         3| 12000|
|Adarsh|  22|         4| 81000|
| Anand|NULL|        12| 10101|
|  NULL|  43|      NULL| 12243|
|  NULL|  21|         4|  8425|
+------+----+----------+------+



In [12]:
df.na.drop(how = "any").show() # Here it deletes a row if even one value is null

+------+---+----------+------+
|  Name|Age|Experience|Salary|
+------+---+----------+------+
|  Kris| 22|         3| 12000|
|Adarsh| 22|         4| 81000|
+------+---+----------+------+



In [13]:
df.na.drop(how = "all").show() # Here it wont deletes a row even if a value is non null

+------+----+----------+------+
|  Name| Age|Experience|Salary|
+------+----+----------+------+
|  Shri|  29|         2|  NULL|
|  Kris|  22|         3| 12000|
|Adarsh|  22|         4| 81000|
| Anand|NULL|        12| 10101|
|  NULL|  43|      NULL| 12243|
|  NULL|  21|         4|  8425|
+------+----+----------+------+



In [16]:
## Threshold values 
# Means setting a value to be checked for non value like
# Atleast 'n' number of values should be non null

df.na.drop(how = "any", thresh = 3).show()

+------+----+----------+------+
|  Name| Age|Experience|Salary|
+------+----+----------+------+
|  Shri|  29|         2|  NULL|
|  Kris|  22|         3| 12000|
|Adarsh|  22|         4| 81000|
| Anand|NULL|        12| 10101|
|  NULL|  21|         4|  8425|
+------+----+----------+------+



In [19]:
## Subsetting
# dropping the rows which are null in the specified column of subset
df.na.drop(how = "any", subset = ['Age', 'Experience']).show()

+------+---+----------+------+
|  Name|Age|Experience|Salary|
+------+---+----------+------+
|  Shri| 29|         2|  NULL|
|  Kris| 22|         3| 12000|
|Adarsh| 22|         4| 81000|
|  NULL| 21|         4|  8425|
+------+---+----------+------+



In [20]:
### Filling the Missing Values
# you can fill the values of na by following method like it takes
# 2 paramets i.e.,(value, subset (optional - of type list))

df.na.fill("Missing values", [ 'Experience', 'Age' ]).show()

+------+----+----------+------+
|  Name| Age|Experience|Salary|
+------+----+----------+------+
|  Shri|  29|         2|  NULL|
|  Kris|  22|         3| 12000|
|Adarsh|  22|         4| 81000|
| Anand|NULL|        12| 10101|
|  NULL|  43|      NULL| 12243|
|  NULL|  21|         4|  8425|
+------+----+----------+------+



In [32]:
from pyspark.ml.feature import Imputer


imputer = Imputer(
    inputCols = ['Age', 'Experience', 'Salary'],
    outputCols = ["{}_imputed".format(c) for c in ['Age', 'Experience', 'Salary']]
    ).setStrategy("mode")


# Similarly you can try it for mode and median

In [33]:
# Add imputation cols to df

imputer.fit(df).transform(df).show()

+------+----+----------+------+-----------+------------------+--------------+
|  Name| Age|Experience|Salary|Age_imputed|Experience_imputed|Salary_imputed|
+------+----+----------+------+-----------+------------------+--------------+
|  Shri|  29|         2|  NULL|         29|                 2|          8425|
|  Kris|  22|         3| 12000|         22|                 3|         12000|
|Adarsh|  22|         4| 81000|         22|                 4|         81000|
| Anand|NULL|        12| 10101|         22|                12|         10101|
|  NULL|  43|      NULL| 12243|         43|                 4|         12243|
|  NULL|  21|         4|  8425|         21|                 4|          8425|
+------+----+----------+------+-----------+------------------+--------------+



### Filter Operations

In [34]:
df = spark.read.csv('data.csv', header = True, inferSchema = True)

In [35]:
df.show()

+------+---+----------+------+
|  Name|Age|Experience|Salary|
+------+---+----------+------+
|  Shri| 29|         2| 12411|
|  Kris| 22|         3| 12000|
|Adarsh| 22|         4| 81000|
| Anand| 44|        12| 10101|
| Karan| 43|         2| 12243|
| Kunal| 21|         4|  8425|
+------+---+----------+------+



In [37]:
### selecting/filtering the people who has less or equal to 10K salary

df.filter("Salary <= 10000").show()

+-----+---+----------+------+
| Name|Age|Experience|Salary|
+-----+---+----------+------+
|Kunal| 21|         4|  8425|
+-----+---+----------+------+



In [38]:
df.filter("Salary <= 10000").select(['Name','Age']).show()

+-----+---+
| Name|Age|
+-----+---+
|Kunal| 21|
+-----+---+



In [39]:
df.filter(df["Salary"] <= 10000).show()

+-----+---+----------+------+
| Name|Age|Experience|Salary|
+-----+---+----------+------+
|Kunal| 21|         4|  8425|
+-----+---+----------+------+



In [40]:
df.filter((df["Salary"] >= 10000) & 
        (df["Salary"] <= 30000) ).show()

+-----+---+----------+------+
| Name|Age|Experience|Salary|
+-----+---+----------+------+
| Shri| 29|         2| 12411|
| Kris| 22|         3| 12000|
|Anand| 44|        12| 10101|
|Karan| 43|         2| 12243|
+-----+---+----------+------+



In [42]:
df.filter(~(df["Salary"] <= 20000)).show()

+------+---+----------+------+
|  Name|Age|Experience|Salary|
+------+---+----------+------+
|Adarsh| 22|         4| 81000|
+------+---+----------+------+



### GroupBy and Aggregate Functions

In [43]:
df = spark.read.csv('data2.csv', header = True, inferSchema = True)

In [44]:
df

DataFrame[Name: string, Department: string, Salary: int]

In [45]:
df.show()

+---------+-----------+------+
|     Name| Department|Salary|
+---------+-----------+------+
|   Adarsh|       Java| 12000|
|   Adarsh|DataScience|250000|
|   Adarsh| SpringBoot| 50000|
|  Krishna|     Devops|125000|
|  Krishna|     Python| 60000|
|Shrinidhi|       Java|  8000|
|Shrinidhi|     Devops| 95000|
|     Maya|     Kotlin| 34000|
|    Sunil|DataScience| 12200|
|    Sunil|     Python| 23000|
+---------+-----------+------+



In [46]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[Name: string, Department: string, Salary: int]>

In [47]:
df.groupBy('Name').sum().show() # Grouped to see the total salary of each person

+---------+-----------+
|     Name|sum(Salary)|
+---------+-----------+
|  Krishna|     185000|
|    Sunil|      35200|
|   Adarsh|     312000|
|Shrinidhi|     103000|
|     Maya|      34000|
+---------+-----------+



In [48]:
df.groupBy('Department').mean().show()

+-----------+-----------+
| Department|avg(Salary)|
+-----------+-----------+
| SpringBoot|    50000.0|
|     Kotlin|    34000.0|
|     Devops|   110000.0|
|     Python|    41500.0|
|       Java|    10000.0|
|DataScience|   131100.0|
+-----------+-----------+



In [49]:
df.groupBy('Department').count().show()

+-----------+-----+
| Department|count|
+-----------+-----+
| SpringBoot|    1|
|     Kotlin|    1|
|     Devops|    2|
|     Python|    2|
|       Java|    2|
|DataScience|    2|
+-----------+-----+



In [52]:
df.groupBy('Name').max().show() # Each person's maximum salary according to names

+---------+-----------+
|     Name|max(Salary)|
+---------+-----------+
|  Krishna|     125000|
|    Sunil|      23000|
|   Adarsh|     250000|
|Shrinidhi|      95000|
|     Maya|      34000|
+---------+-----------+



In [53]:
df.agg({'Salary':'Sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|     669200|
+-----------+



### Introduction to Pyspark MLib

In [73]:
spark = SparkSession.builder.appName('Missing').getOrCreate()

In [74]:
spark

In [75]:
training = spark.read.csv('data.csv', header = True, inferSchema = True)
training.show()

+------+---+----------+-------+
|  Name|Age|Experience| Salary|
+------+---+----------+-------+
|  Shri| 29|         2|  12411|
|  Kris| 22|         3|  12000|
|Adarsh| 22|         4|  81000|
| Anand| 44|        12|  10101|
| Karan| 43|         2|  12243|
| Kunal| 21|         4|   8425|
|  Shri| 29|         2|   1211|
|  Kris| 22|         3|  12000|
|Adarsh| 22|         4| 231000|
| Anand| 44|        12|  10101|
| Karan| 43|         2|  12243|
| Kunal| 21|         4|   8425|
|  Shri| 29|         2|1241211|
|  Kris| 22|         3|  12000|
|Adarsh| 22|         4|   8100|
| Anand| 44|        12|  10101|
| Karan| 43|         2|1224123|
| Kunal| 21|         4|   8425|
|  Shri| 29|         2| 124114|
|  Kris| 22|         3|  12000|
+------+---+----------+-------+
only showing top 20 rows



In [76]:
training.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [77]:
training.columns

['Name', 'Age', 'Experience', 'Salary']

Here we will be making the Age and Expereince column as one independent feature by grouping them with the help
of VectorAssembler

In [78]:
from pyspark.ml.feature import VectorAssembler
featureAssembler = VectorAssembler(
                        inputCols = ["Age", "Experience"],
                        outputCol = "Independent Features" )

In [79]:
output = featureAssembler.transform(training)

In [80]:
output.show()

+------+---+----------+-------+--------------------+
|  Name|Age|Experience| Salary|Independent Features|
+------+---+----------+-------+--------------------+
|  Shri| 29|         2|  12411|          [29.0,2.0]|
|  Kris| 22|         3|  12000|          [22.0,3.0]|
|Adarsh| 22|         4|  81000|          [22.0,4.0]|
| Anand| 44|        12|  10101|         [44.0,12.0]|
| Karan| 43|         2|  12243|          [43.0,2.0]|
| Kunal| 21|         4|   8425|          [21.0,4.0]|
|  Shri| 29|         2|   1211|          [29.0,2.0]|
|  Kris| 22|         3|  12000|          [22.0,3.0]|
|Adarsh| 22|         4| 231000|          [22.0,4.0]|
| Anand| 44|        12|  10101|         [44.0,12.0]|
| Karan| 43|         2|  12243|          [43.0,2.0]|
| Kunal| 21|         4|   8425|          [21.0,4.0]|
|  Shri| 29|         2|1241211|          [29.0,2.0]|
|  Kris| 22|         3|  12000|          [22.0,3.0]|
|Adarsh| 22|         4|   8100|          [22.0,4.0]|
| Anand| 44|        12|  10101|         [44.0,

In [81]:
output.columns

['Name', 'Age', 'Experience', 'Salary', 'Independent Features']

In [82]:
finalised_data = output.select("Independent Features", "Salary")
finalised_data.show()

+--------------------+-------+
|Independent Features| Salary|
+--------------------+-------+
|          [29.0,2.0]|  12411|
|          [22.0,3.0]|  12000|
|          [22.0,4.0]|  81000|
|         [44.0,12.0]|  10101|
|          [43.0,2.0]|  12243|
|          [21.0,4.0]|   8425|
|          [29.0,2.0]|   1211|
|          [22.0,3.0]|  12000|
|          [22.0,4.0]| 231000|
|         [44.0,12.0]|  10101|
|          [43.0,2.0]|  12243|
|          [21.0,4.0]|   8425|
|          [29.0,2.0]|1241211|
|          [22.0,3.0]|  12000|
|          [22.0,4.0]|   8100|
|         [44.0,12.0]|  10101|
|          [43.0,2.0]|1224123|
|          [21.0,4.0]|   8425|
|          [29.0,2.0]| 124114|
|          [22.0,3.0]|  12000|
+--------------------+-------+
only showing top 20 rows



In [83]:
from pyspark.ml.regression import LinearRegression
train_set, test_set = finalised_data.randomSplit([0.75,0.25])

In [84]:
regressor = LinearRegression(featuresCol = "Independent Features", labelCol = 'Salary')
regressor = regressor.fit(train_set)
regressor

LinearRegressionModel: uid=LinearRegression_a3aaa5a62ef5, numFeatures=2

In [85]:
regressor.coefficients

DenseVector([-73310.8865, 74707.2725])

In [86]:
regressor.intercept

2595041.121001745

In [87]:
Pred_results = regressor.evaluate(test_set)

In [90]:
Pred_results.predictions.show()

+--------------------+-------+-------------------+
|Independent Features| Salary|         prediction|
+--------------------+-------+-------------------+
|          [21.0,4.0]|   8425|  1354341.593984179|
|          [21.0,4.0]|   8425|  1354341.593984179|
|          [21.0,4.0]|   8425|  1354341.593984179|
|          [22.0,3.0]|  12000| 1206323.4349775787|
|          [22.0,4.0]|   8100| 1281030.7074624202|
|          [22.0,4.0]| 231000| 1281030.7074624202|
|          [43.0,2.0]|  12243|-407912.45446419436|
|          [43.0,2.0]|1224123|-407912.45446419436|
|         [44.0,12.0]|  10101| 265849.38386246096|
|         [44.0,12.0]|  10101| 265849.38386246096|
+--------------------+-------+-------------------+



In [91]:
Pred_results.meanAbsoluteError, Pred_results.meanSquaredError

(1011872.230850827, 1255468477607.8054)