### Creating Spark environment

In [1]:
import pyspark

In [2]:
# first creating spark session
from pyspark.sql import SparkSession

In [3]:
# execution of following code will create a spark session name practice, 2nd code is faster to load
# spark = SparkSession.builder.appName('Practice').getOrCreate()
spark = SparkSession.builder.master("local").appName("Practice").getOrCreate()

In [4]:
spark

### Loading data from csv file to Dataframe

In [7]:
# creating pyspark dataframe
dfspark = spark.read.csv(r'C:\Users\SHEHA\Downloads\GitHubRepository\SampleDatasets\test1.csv')
dfspark

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string]

In [8]:
dfspark.show()

+---------+---+----------+------+
|      _c0|_c1|       _c2|   _c3|
+---------+---+----------+------+
|     Name|age|Experience|Salary|
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [9]:
type(dfspark)

pyspark.sql.dataframe.DataFrame

In [10]:
dfspark1 = spark.read.option('header','True').csv(r'C:\Users\SHEHA\Downloads\GitHubRepository\SampleDatasets\test1.csv')
dfspark1

DataFrame[Name: string, age: string, Experience: string, Salary: string]

In [11]:
print(type(dfspark1))
dfspark1.show()

<class 'pyspark.sql.dataframe.DataFrame'>
+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [13]:
dfspark1.head(3)

[Row(Name='Krish', age='31', Experience='10', Salary='30000'),
 Row(Name='Sudhanshu', age='30', Experience='8', Salary='25000'),
 Row(Name='Sunny', age='29', Experience='4', Salary='20000')]

In [14]:
# get info about column, here every column is showing string type although they are not all string type
dfspark1.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [15]:
# here, inferSchema=True helps to read data types of csv file correctly to dataframe
dfspark2 = spark.read.option('header','True').csv(r'C:\Users\SHEHA\Downloads\GitHubRepository\SampleDatasets\test1.csv',inferSchema=True)
dfspark2

DataFrame[Name: string, age: int, Experience: int, Salary: int]

In [16]:
dfspark2.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [17]:
# best way to load dataframe from csv file
dfspark3 = spark.read.csv(r'C:\Users\SHEHA\Downloads\GitHubRepository\SampleDatasets\test1.csv',header=True,inferSchema=True)
dfspark3

DataFrame[Name: string, age: int, Experience: int, Salary: int]

In [19]:
dfspark3.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [25]:
dfspark3.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [26]:
dfspark3.describe()

DataFrame[summary: string, Name: string, age: string, Experience: string, Salary: string]

In [27]:
# getting statistical info
dfspark3.describe().show()

+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  null| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



### Selecting, Deleting, Renaming and Indexing Column

In [20]:
dfspark3.columns

['Name', 'age', 'Experience', 'Salary']

In [21]:
# selecting column
dfspark3.select('Name').show()

+---------+
|     Name|
+---------+
|    Krish|
|Sudhanshu|
|    Sunny|
|     Paul|
|   Harsha|
|  Shubham|
+---------+



In [24]:
dfspark3.select('Name','age').show()

+---------+---+
|     Name|age|
+---------+---+
|    Krish| 31|
|Sudhanshu| 30|
|    Sunny| 29|
|     Paul| 24|
|   Harsha| 21|
|  Shubham| 23|
+---------+---+



In [30]:
# adding new column 'Experience after 2 years', inserting data based on existing Experience column data
# adding column temporaryly, not a inplace operation
dfspark3.withColumn('Experience after 2 years',dfspark3['Experience']+2).show() 

+---------+---+----------+------+------------------------+
|     Name|age|Experience|Salary|Experience after 2 years|
+---------+---+----------+------+------------------------+
|    Krish| 31|        10| 30000|                      12|
|Sudhanshu| 30|         8| 25000|                      10|
|    Sunny| 29|         4| 20000|                       6|
|     Paul| 24|         3| 20000|                       5|
|   Harsha| 21|         1| 15000|                       3|
|  Shubham| 23|         2| 18000|                       4|
+---------+---+----------+------+------------------------+



In [31]:
# adding column parmanently, it is a inplace operation
dfspark3 = dfspark3.withColumn('Experience after 2 years',dfspark3['Experience']+2)
dfspark3.show()

+---------+---+----------+------+------------------------+
|     Name|age|Experience|Salary|Experience after 2 years|
+---------+---+----------+------+------------------------+
|    Krish| 31|        10| 30000|                      12|
|Sudhanshu| 30|         8| 25000|                      10|
|    Sunny| 29|         4| 20000|                       6|
|     Paul| 24|         3| 20000|                       5|
|   Harsha| 21|         1| 15000|                       3|
|  Shubham| 23|         2| 18000|                       4|
+---------+---+----------+------+------------------------+



In [33]:
# deleting column, first one is temporary, second is parmanent
# dfspark3.drop('Experience after 2 years').show()
dfspark3 = dfspark3.drop('Experience after 2 years')
dfspark3.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [34]:
# renaming column, first argument takes old name, second takes new one
dfspark3 = dfspark3.withColumnRenamed('age','Age')
dfspark3.show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



### Handling Missing Values

In [53]:
# test2 csv file has some missing value
dfspark4 = spark.read.csv(r'C:\Users\SHEHA\Downloads\GitHubRepository\SampleDatasets\test2.csv',header=True,inferSchema=True)
dfspark4.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [90]:
dfspark4.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|null|      null|  null|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [55]:
# na.drop drops all row with null value, using temporary drop
dfspark4.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [44]:
# how="all" will delete that row which row's every value is null
# how="any" will drop all row with null value
#dfspark4.na.drop(how="all").show()
dfspark4.na.drop(how="any").show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [63]:
# thresh=2 means rows need to have atleast 2 non-null value otherwise row will be deleted 
dfspark4.na.drop(how="any",thresh=2).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
+---------+----+----------+------+



In [64]:
# subset=['Experience'] means it will delete row if Experience row contains a null value
dfspark4.na.drop(how="any",subset=['Experience']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+



### Filling Missing Value

In [79]:
# here all null values will be replaced with 'missing' string for string datatypes, column with integer datatypes won't change
# can also be done on specified column of string datatype
#dfspark4.na.fill('missing').show()
dfspark4.na.fill('missing',['Name']).show()


+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|  missing|  34|        10| 38000|
|  missing|null|      null|  null|
|  missing|  36|      null|  null|
+---------+----+----------+------+



In [84]:
# here all null values will be replaced with any value for integer datatypes, column with string datatypes won't change
# can also be done on specified column of integer datatype
#dfspark4.na.fill(0).show()
dfspark4.na.fill(0,['age','Salary']).show()


+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|   Mahesh|  0|      null| 40000|
|     null| 34|        10| 38000|
|     null|  0|      null|     0|
|     null| 36|      null|     0|
+---------+---+----------+------+



In [88]:
# importing imputer function
from pyspark.ml.feature import Imputer

impute = Imputer(inputCols=['age','Experience','Salary'],outputCols=["{}_imputed".format(c) for c in
                ['age', 'Experience', 'Salary']]).setStrategy("median")

In [89]:
# Add imputation cols to df
impute.fit(dfspark4).transform(dfspark4).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|null|      null| 40000|         29|                 4|         40000|
|     null|  34|        10| 38000|         34|                10|         38000|
|     null|null|      null|  null|         29|                 4|         20000|
|     null|  36|      null| 

In [91]:
impute1 = Imputer(inputCols=['age','Experience','Salary'],outputCols=["{}_imputed".format(c) for c in
                ['age', 'Experience', 'Salary']]).setStrategy("mean")
impute1.fit(dfspark4).transform(dfspark4).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|null|      null| 40000|         28|                 5|         40000|
|     null|  34|        10| 38000|         34|                10|         38000|
|     null|null|      null|  null|         28|                 5|         25750|
|     null|  36|      null| 

### Filter Operation

In [92]:
dfspark3.filter("Salary<=20000").show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [93]:
# select specified column to show while filtering
dfspark3.filter("Salary<=20000").select(['Name','age']).show()

+-------+---+
|   Name|age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha| 21|
|Shubham| 23|
+-------+---+



In [98]:
# first one is single condition, second one is multiple condition using & (AND) operation, | (OR) operation is used in last one 
#dfspark3.filter(dfspark3['Salary']<=20000).show()
#dfspark3.filter((dfspark3['Salary']<=20000) & (dfspark3['age']>=24)).show()
dfspark3.filter((dfspark3['Salary']<=20000) | (dfspark3['age']<=22)).show()

+-------+---+----------+------+
|   Name|Age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [99]:
# using ~ to inverse operation
dfspark3.filter(~(dfspark3['Salary']<=20000)).show()

+---------+---+----------+------+
|     Name|Age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
+---------+---+----------+------+



### GroupBy And Aggregate Functions

In [100]:
dfspark5 = spark.read.csv(r'C:\Users\SHEHA\Downloads\GitHubRepository\SampleDatasets\test3.csv',header=True,inferSchema=True)
dfspark5.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



In [101]:
dfspark5.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [102]:
# Groupby function works with aggregate functions
# Groupby using name column to find maximum salary
dfspark5.groupBy('Name').sum().show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [103]:
dfspark5.groupBy('Name').avg().show()

+---------+------------------+
|     Name|       avg(salary)|
+---------+------------------+
|Sudhanshu|11666.666666666666|
|    Sunny|            6000.0|
|    Krish| 6333.333333333333|
|   Mahesh|            3500.0|
+---------+------------------+



In [104]:
dfspark5.groupBy('Name').max().show()

+---------+-----------+
|     Name|max(salary)|
+---------+-----------+
|Sudhanshu|      20000|
|    Sunny|      10000|
|    Krish|      10000|
|   Mahesh|       4000|
+---------+-----------+



In [105]:
dfspark5.groupBy('Departments').count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



In [106]:
# getting total sum of all rows
dfspark5.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+

