<a href="https://colab.research.google.com/github/Vivek-afk81/pyspark-learning-notes/blob/main/pySpark1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
import os
import pyspark
import pandas as pd

In [8]:
# Change to a specific directory
os.chdir('/content/drive/My Drive/pyspark')

# Verify current directory
print(os.getcwd())

/content/drive/My Drive/pyspark


In [9]:
pd.read_excel("sample_pyspark_data.xlsx")

Unnamed: 0,id,name,age,department,salary
0,1,Alice,24,IT,50000
1,2,Bob,28,HR,45000
2,3,Charlie,32,Finance,60000
3,4,David,26,IT,52000
4,5,Eva,30,Marketing,48000


In [10]:
type(pd.read_csv("student_practice_data.csv"))

##Working with Spark

1. Create a sprark session

In [11]:
from pyspark.sql import SparkSession

In [12]:
spark=SparkSession.builder.appName('Practise').getOrCreate()

In [13]:
spark

In [14]:
df_pyspark=spark.read.csv("student_practice_data.csv")
df_pyspark.show()

+----------+-----+---+--------------+-----+---------+
|       _c0|  _c1|_c2|           _c3|  _c4|      _c5|
+----------+-----+---+--------------+-----+---------+
|student_id| name|age|        course|score|     city|
|       101|Aarav| 20|            AI|   85|    Delhi|
|       102| Diya| 21|  Data Science|   90|   Mumbai|
|       103|Karan| 19|Cyber Security|   78|     Pune|
|       104|Meera| 22|            AI|   88|Bangalore|
|       105|Rohit| 20|       Web Dev|   82|  Chennai|
+----------+-----+---+--------------+-----+---------+



### Reading the data

In [15]:
# to remove these headers(_c0,_c1,....)
df_pyspark=spark.read.option('header','true').csv('student_practice_data.csv')

In [16]:
type(df_pyspark)

In [17]:
df_pyspark.head(3)

[Row(student_id='101', name='Aarav', age='20', course='AI', score='85', city='Delhi'),
 Row(student_id='102', name='Diya', age='21', course='Data Science', score='90', city='Mumbai'),
 Row(student_id='103', name='Karan', age='19', course='Cyber Security', score='78', city='Pune')]

In [18]:
#basically this works as info
df_pyspark.printSchema()

root
 |-- student_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- course: string (nullable = true)
 |-- score: string (nullable = true)
 |-- city: string (nullable = true)



In [19]:
# df_pyspark=spark.read.option('header','true').csv('student_practice_data.csv',inferSchema=True)
#or we can use
df_pyspark=spark.read.csv('student_practice_data.csv',header=True,inferSchema=True)
df_pyspark.show()


+----------+-----+---+--------------+-----+---------+
|student_id| name|age|        course|score|     city|
+----------+-----+---+--------------+-----+---------+
|       101|Aarav| 20|            AI|   85|    Delhi|
|       102| Diya| 21|  Data Science|   90|   Mumbai|
|       103|Karan| 19|Cyber Security|   78|     Pune|
|       104|Meera| 22|            AI|   88|Bangalore|
|       105|Rohit| 20|       Web Dev|   82|  Chennai|
+----------+-----+---+--------------+-----+---------+



### Checking the Datatypes of the Column(Schema)

In [20]:
df_pyspark.printSchema()

root
 |-- student_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- course: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- city: string (nullable = true)



###Selecting Columns and Indexing

In [21]:
df_pyspark.columns

['student_id', 'name', 'age', 'course', 'score', 'city']

In [22]:
df_pyspark.show()

+----------+-----+---+--------------+-----+---------+
|student_id| name|age|        course|score|     city|
+----------+-----+---+--------------+-----+---------+
|       101|Aarav| 20|            AI|   85|    Delhi|
|       102| Diya| 21|  Data Science|   90|   Mumbai|
|       103|Karan| 19|Cyber Security|   78|     Pune|
|       104|Meera| 22|            AI|   88|Bangalore|
|       105|Rohit| 20|       Web Dev|   82|  Chennai|
+----------+-----+---+--------------+-----+---------+



In [23]:
df_pyspark.select('name').show()

#selecting multiple columns
df_pyspark.select('student_id','name','city').show()

+-----+
| name|
+-----+
|Aarav|
| Diya|
|Karan|
|Meera|
|Rohit|
+-----+

+----------+-----+---------+
|student_id| name|     city|
+----------+-----+---------+
|       101|Aarav|    Delhi|
|       102| Diya|   Mumbai|
|       103|Karan|     Pune|
|       104|Meera|Bangalore|
|       105|Rohit|  Chennai|
+----------+-----+---------+



In [24]:
#Notice that the .select is returning a dataframe object
type(df_pyspark.select('name'))

In [25]:
#Checking the datatypes
df_pyspark.dtypes

[('student_id', 'int'),
 ('name', 'string'),
 ('age', 'int'),
 ('course', 'string'),
 ('score', 'int'),
 ('city', 'string')]

In [26]:
df_pyspark.describe().show()

+-------+------------------+-----+-----------------+-------+----------------+---------+
|summary|        student_id| name|              age| course|           score|     city|
+-------+------------------+-----+-----------------+-------+----------------+---------+
|  count|                 5|    5|                5|      5|               5|        5|
|   mean|             103.0| NULL|             20.4|   NULL|            84.6|     NULL|
| stddev|1.5811388300841898| NULL|1.140175425099138|   NULL|4.77493455452533|     NULL|
|    min|               101|Aarav|               19|     AI|              78|Bangalore|
|    max|               105|Rohit|               22|Web Dev|              90|     Pune|
+-------+------------------+-----+-----------------+-------+----------------+---------+



### Adding And Dropping Columns

In [27]:
'''Returns a new DataFrame by adding a column or replacing the
existing column that has the same name.

The column expression must be an expression over this DataFrame; attempting to add
a column from some other DataFrame will raise an error.'''
from pyspark.sql.functions import rand, floor

df_pyspark = df_pyspark.withColumn(
    "Score2",
    df_pyspark["score"] + floor(rand() * 6)
)
df_pyspark.show()

+----------+-----+---+--------------+-----+---------+------+
|student_id| name|age|        course|score|     city|Score2|
+----------+-----+---+--------------+-----+---------+------+
|       101|Aarav| 20|            AI|   85|    Delhi|    85|
|       102| Diya| 21|  Data Science|   90|   Mumbai|    93|
|       103|Karan| 19|Cyber Security|   78|     Pune|    78|
|       104|Meera| 22|            AI|   88|Bangalore|    88|
|       105|Rohit| 20|       Web Dev|   82|  Chennai|    82|
+----------+-----+---+--------------+-----+---------+------+



In [28]:
# Dropping a columns
df_pyspark=df_pyspark.drop('Score2')
df_pyspark.show()

+----------+-----+---+--------------+-----+---------+
|student_id| name|age|        course|score|     city|
+----------+-----+---+--------------+-----+---------+
|       101|Aarav| 20|            AI|   85|    Delhi|
|       102| Diya| 21|  Data Science|   90|   Mumbai|
|       103|Karan| 19|Cyber Security|   78|     Pune|
|       104|Meera| 22|            AI|   88|Bangalore|
|       105|Rohit| 20|       Web Dev|   82|  Chennai|
+----------+-----+---+--------------+-----+---------+



In [29]:
# Rename a columns

df_pyspark.withColumnRenamed('name','Name').show()

+----------+-----+---+--------------+-----+---------+
|student_id| Name|age|        course|score|     city|
+----------+-----+---+--------------+-----+---------+
|       101|Aarav| 20|            AI|   85|    Delhi|
|       102| Diya| 21|  Data Science|   90|   Mumbai|
|       103|Karan| 19|Cyber Security|   78|     Pune|
|       104|Meera| 22|            AI|   88|Bangalore|
|       105|Rohit| 20|       Web Dev|   82|  Chennai|
+----------+-----+---+--------------+-----+---------+



###Handling missing values

1. Dropping Columns
2. Dropping Rows
3. Various Parameter in Dropping Functionalities

4. Handeling Missing values by Mean,Median and Mode

In [30]:
DF=spark.read.csv("missing_values_practice.csv",header=True,inferSchema=True)
DF.show()

+-----------+-----+----+----------+----------------+------+
|employee_id| name| age|department|experience_years|rating|
+-----------+-----+----+----------+----------------+------+
|        201| Amit|25.0|        IT|             2.0|   4.5|
|        202| Neha|NULL|        HR|             3.0|  NULL|
|        203| Ravi|30.0|      NULL|            NULL|   3.8|
|        204|Priya|28.0|   Finance|             5.0|  NULL|
|        205| NULL|35.0|        IT|             7.0|   4.2|
|        206|Suman|NULL|        HR|            NULL|   3.9|
+-----------+-----+----+----------+----------------+------+



In [31]:
#Let's add a column Salary also
from pyspark.sql.functions import when, col,rand


DF = DF.withColumn(
    "Salary",
    floor(rand() * 50000 + 30000)
)

#now introducing null values
DF = DF.withColumn(
    "Salary",
    when(rand() < 0.2, None)
    .otherwise(col("Salary"))
)

DF.show()

+-----------+-----+----+----------+----------------+------+------+
|employee_id| name| age|department|experience_years|rating|Salary|
+-----------+-----+----+----------+----------------+------+------+
|        201| Amit|25.0|        IT|             2.0|   4.5| 63221|
|        202| Neha|NULL|        HR|             3.0|  NULL| 50978|
|        203| Ravi|30.0|      NULL|            NULL|   3.8|  NULL|
|        204|Priya|28.0|   Finance|             5.0|  NULL| 79370|
|        205| NULL|35.0|        IT|             7.0|   4.2| 34429|
|        206|Suman|NULL|        HR|            NULL|   3.9|  NULL|
+-----------+-----+----+----------+----------------+------+------+



In [32]:
#This will remove all those rows which ahve null values present in it
DF.na.drop().show()

+-----------+----+----+----------+----------------+------+------+
|employee_id|name| age|department|experience_years|rating|Salary|
+-----------+----+----+----------+----------------+------+------+
|        201|Amit|25.0|        IT|             2.0|   4.5| 63221|
+-----------+----+----+----------+----------------+------+------+



In [33]:
# any=how
DF.na.drop(how='all').show() # it will drop those rows which have all null values in this case there are'nt any
DF.na.drop(how='any').show()

+-----------+-----+----+----------+----------------+------+------+
|employee_id| name| age|department|experience_years|rating|Salary|
+-----------+-----+----+----------+----------------+------+------+
|        201| Amit|25.0|        IT|             2.0|   4.5| 63221|
|        202| Neha|NULL|        HR|             3.0|  NULL| 50978|
|        203| Ravi|30.0|      NULL|            NULL|   3.8|  NULL|
|        204|Priya|28.0|   Finance|             5.0|  NULL| 79370|
|        205| NULL|35.0|        IT|             7.0|   4.2| 34429|
|        206|Suman|NULL|        HR|            NULL|   3.9|  NULL|
+-----------+-----+----+----------+----------------+------+------+

+-----------+----+----+----------+----------------+------+------+
|employee_id|name| age|department|experience_years|rating|Salary|
+-----------+----+----+----------+----------------+------+------+
|        201|Amit|25.0|        IT|             2.0|   4.5| 63221|
+-----------+----+----+----------+----------------+------+------+

In [34]:
## Threshold
DF.na.drop(how="any",thresh=5).show() #it says atleast 5 non null values must be there, in this case only 2nd row has 4 non null values hence it got dropped

+-----------+-----+----+----------+----------------+------+------+
|employee_id| name| age|department|experience_years|rating|Salary|
+-----------+-----+----+----------+----------------+------+------+
|        201| Amit|25.0|        IT|             2.0|   4.5| 63221|
|        202| Neha|NULL|        HR|             3.0|  NULL| 50978|
|        204|Priya|28.0|   Finance|             5.0|  NULL| 79370|
|        205| NULL|35.0|        IT|             7.0|   4.2| 34429|
+-----------+-----+----+----------+----------------+------+------+



In [35]:
#Subset when you want to drop nan values with respect to a specific columns
DF.na.drop(how='any',subset=['experience_years']).show()

+-----------+-----+----+----------+----------------+------+------+
|employee_id| name| age|department|experience_years|rating|Salary|
+-----------+-----+----+----------+----------------+------+------+
|        201| Amit|25.0|        IT|             2.0|   4.5| 63221|
|        202| Neha|NULL|        HR|             3.0|  NULL| 50978|
|        204|Priya|28.0|   Finance|             5.0|  NULL| 79370|
|        205| NULL|35.0|        IT|             7.0|   4.2| 34429|
+-----------+-----+----+----------+----------------+------+------+



In [36]:
# Filling the Missing Values
DF.na.fill('Missing_values').show() #Spark interprets this as:“Fill STRING columns only with 'Missing_values'”
DF.na.fill('Missing_values',['experience_years','age']).show()#Spark will NOT insert a string into numeric columns

+-----------+--------------+----+--------------+----------------+------+------+
|employee_id|          name| age|    department|experience_years|rating|Salary|
+-----------+--------------+----+--------------+----------------+------+------+
|        201|          Amit|25.0|            IT|             2.0|   4.5| 63221|
|        202|          Neha|NULL|            HR|             3.0|  NULL| 50978|
|        203|          Ravi|30.0|Missing_values|            NULL|   3.8|  NULL|
|        204|         Priya|28.0|       Finance|             5.0|  NULL| 79370|
|        205|Missing_values|35.0|            IT|             7.0|   4.2| 34429|
|        206|         Suman|NULL|            HR|            NULL|   3.9|  NULL|
+-----------+--------------+----+--------------+----------------+------+------+

+-----------+-----+----+----------+----------------+------+------+
|employee_id| name| age|department|experience_years|rating|Salary|
+-----------+-----+----+----------+----------------+------+------

In [37]:
DF.show()

+-----------+-----+----+----------+----------------+------+------+
|employee_id| name| age|department|experience_years|rating|Salary|
+-----------+-----+----+----------+----------------+------+------+
|        201| Amit|25.0|        IT|             2.0|   4.5| 63221|
|        202| Neha|NULL|        HR|             3.0|  NULL| 50978|
|        203| Ravi|30.0|      NULL|            NULL|   3.8|  NULL|
|        204|Priya|28.0|   Finance|             5.0|  NULL| 79370|
|        205| NULL|35.0|        IT|             7.0|   4.2| 34429|
|        206|Suman|NULL|        HR|            NULL|   3.9|  NULL|
+-----------+-----+----+----------+----------------+------+------+



In [41]:
from pyspark.ml.feature import Imputer

imputer=Imputer(
    inputCols=['age','experience_years','Salary'],
    outputCols=["{}_imputed".format(c) for c in ['age','experience_years','Salary']]
).setStrategy("mean")

# changing he strategu to median
imputer_2=Imputer(
    inputCols=['age','experience_years','Salary'],
    outputCols=["{}_imputed".format(c) for c in ['age','experience_years','Salary']]
).setStrategy("median")

In [43]:
#Adding imputation cols to th DF

imputer.fit(DF).transform(DF).show()
imputer_2.fit(DF).transform(DF).show()

+-----------+-----+----+----------+----------------+------+------+-----------+------------------------+--------------+
|employee_id| name| age|department|experience_years|rating|Salary|age_imputed|experience_years_imputed|Salary_imputed|
+-----------+-----+----+----------+----------------+------+------+-----------+------------------------+--------------+
|        201| Amit|25.0|        IT|             2.0|   4.5| 63221|       25.0|                     2.0|         63221|
|        202| Neha|NULL|        HR|             3.0|  NULL| 50978|       29.5|                     3.0|         50978|
|        203| Ravi|30.0|      NULL|            NULL|   3.8|  NULL|       30.0|                    4.25|         56999|
|        204|Priya|28.0|   Finance|             5.0|  NULL| 79370|       28.0|                     5.0|         79370|
|        205| NULL|35.0|        IT|             7.0|   4.2| 34429|       35.0|                     7.0|         34429|
|        206|Suman|NULL|        HR|            N

## Pyspark DataFrames

In [44]:
# lets use the excel file from the beginning
#PySpark CANNOT natively read Excel (.xlsx)
#Spark works best with CSV / Parquet / JSON / ORC

df_pd = pd.read_excel("sample_pyspark_data.xlsx")

In [45]:
df_pd.to_csv("test1.csv", index=False)


In [46]:
df_spark = spark.read.csv(
    "test1.csv",
    header=True,
    inferSchema=True
)

df_spark.show()


+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  1|  Alice| 24|        IT| 50000|
|  2|    Bob| 28|        HR| 45000|
|  3|Charlie| 32|   Finance| 60000|
|  4|  David| 26|        IT| 52000|
|  5|    Eva| 30| Marketing| 48000|
+---+-------+---+----------+------+



###Filter operations

In [52]:
# Salary of people less than 50,000
df_spark.filter("salary<=50000").show()
#or
df_spark.filter(df_spark['salary']<49000).show()

+---+-----+---+----------+------+
| id| name|age|department|salary|
+---+-----+---+----------+------+
|  1|Alice| 24|        IT| 50000|
|  2|  Bob| 28|        HR| 45000|
|  5|  Eva| 30| Marketing| 48000|
+---+-----+---+----------+------+

+---+----+---+----------+------+
| id|name|age|department|salary|
+---+----+---+----------+------+
|  2| Bob| 28|        HR| 45000|
|  5| Eva| 30| Marketing| 48000|
+---+----+---+----------+------+



In [51]:
df_spark.filter("salary<=50000").select(['name','age']).show()

+-----+---+
| name|age|
+-----+---+
|Alice| 24|
|  Bob| 28|
|  Eva| 30|
+-----+---+



In [56]:
# Adding multiple condittions

df_spark.filter((df_spark['salary']<49000) | (df_spark['age']>35)).show()

+---+----+---+----------+------+
| id|name|age|department|salary|
+---+----+---+----------+------+
|  2| Bob| 28|        HR| 45000|
|  5| Eva| 30| Marketing| 48000|
+---+----+---+----------+------+



In [57]:
# Using the not condition
df_spark.filter(~(df_spark['salary']<49000)).show()

+---+-------+---+----------+------+
| id|   name|age|department|salary|
+---+-------+---+----------+------+
|  1|  Alice| 24|        IT| 50000|
|  3|Charlie| 32|   Finance| 60000|
|  4|  David| 26|        IT| 52000|
+---+-------+---+----------+------+



#### Pyspark GroupBy And Aggregate Functions

In [59]:
df_spark.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = true)



In [60]:
##GroupBy
# Group by and aggregate functions go hand in hand
df_spark.groupBy('name').sum()

DataFrame[name: string, sum(id): bigint, sum(age): bigint, sum(salary): bigint]

In [61]:
df_spark.groupBy('name').sum().show()

+-------+-------+--------+-----------+
|   name|sum(id)|sum(age)|sum(salary)|
+-------+-------+--------+-----------+
|    Eva|      5|      30|      48000|
|Charlie|      3|      32|      60000|
|    Bob|      2|      28|      45000|
|  Alice|      1|      24|      50000|
|  David|      4|      26|      52000|
+-------+-------+--------+-----------+



In [69]:
# group by departments to see which department gives max salary
df_spark.groupBy('department').sum('salary').show()
df_spark.groupBy('department').mean('salary').show()
df_spark.groupBy('department').count().show() # to check no of employees working in each department

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|        HR|      45000|
|   Finance|      60000|
| Marketing|      48000|
|        IT|     102000|
+----------+-----------+

+----------+-----------+
|department|avg(salary)|
+----------+-----------+
|        HR|    45000.0|
|   Finance|    60000.0|
| Marketing|    48000.0|
|        IT|    51000.0|
+----------+-----------+

+----------+-----+
|department|count|
+----------+-----+
|        HR|    1|
|   Finance|    1|
| Marketing|    1|
|        IT|    2|
+----------+-----+



In [70]:
from pyspark.sql.functions import desc

df_spark.groupBy("name") \
    .max("salary") \
    .orderBy(desc("max(salary)")) \
    .show()


+-------+-----------+
|   name|max(salary)|
+-------+-----------+
|Charlie|      60000|
|  David|      52000|
|  Alice|      50000|
|    Eva|      48000|
|    Bob|      45000|
+-------+-----------+

