In [None]:
import pyspark

In [None]:
#Create pyspark session
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("filter operations").getOrCreate()
spark

In [None]:
#Read data
spark_df=spark.read.csv('test_file.csv',header=True,enforceSchema=True)
spark_df.show()

+----+----------+----+----------+------+
|Name|Department| Age|experience|salary|
+----+----------+----+----------+------+
|  AA|        D1|  30|         2|   100|
|  BB|        D1|  35|         1|   125|
|  CC|        D3|  25|         3|   100|
|  DD|        D2|  20|      NULL|    60|
|NULL|      NULL|NULL|      NULL|  NULL|
|  EE|      NULL|  29|         4|    95|
|  FF|        D2|  22|         4|    90|
|  GG|        D7|NULL|         1|    82|
|  HH|        D2|  38|         6|   115|
|  FF|        D3|  30|         6|   115|
+----+----------+----+----------+------+



In [None]:
#Cast Columns to the Correct Data Types:

from pyspark.sql.types import IntegerType, DoubleType

cols=["Age","experience","salary"]

for col in cols:
  spark_df = spark_df.withColumn(col, spark_df[col].cast(IntegerType()))
spark_df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- experience: integer (nullable = true)
 |-- salary: integer (nullable = true)



In [None]:
spark_df.show()

+----+----------+----+----------+------+
|Name|Department| Age|experience|salary|
+----+----------+----+----------+------+
|  AA|        D1|  30|         2|   100|
|  BB|        D1|  35|         1|   125|
|  CC|        D3|  25|         3|   100|
|  DD|        D2|  20|      NULL|    60|
|NULL|      NULL|NULL|      NULL|  NULL|
|  EE|      NULL|  29|         4|    95|
|  FF|        D2|  22|         4|    90|
|  GG|        D7|NULL|         1|    82|
|  HH|        D2|  38|         6|   115|
|  FF|        D3|  30|         6|   115|
+----+----------+----+----------+------+



In [None]:
#fillimg with stats using imputer
from pyspark.ml.feature import Imputer

imputer=Imputer(inputCols=['Age'],outputCols=['Age']).setStrategy('mean')     #or mode, median
df_imputed=imputer.fit(spark_df).transform(spark_df)
df_imputed.show()

+----+----------+---+----------+------+
|Name|Department|Age|experience|salary|
+----+----------+---+----------+------+
|  AA|        D1| 30|         2|   100|
|  BB|        D1| 35|         1|   125|
|  CC|        D3| 25|         3|   100|
|  DD|        D2| 20|      NULL|    60|
|NULL|      NULL| 28|      NULL|  NULL|
|  EE|      NULL| 29|         4|    95|
|  FF|        D2| 22|         4|    90|
|  GG|        D7| 28|         1|    82|
|  HH|        D2| 38|         6|   115|
|  FF|        D3| 30|         6|   115|
+----+----------+---+----------+------+



In [None]:
imputer=Imputer(inputCols=['salary'],outputCols=['salary']).setStrategy('mean')     #or mode, median
df_imputed=imputer.fit(df_imputed).transform(df_imputed)
df_imputed.show()

+----+----------+---+----------+------+
|Name|Department|Age|experience|salary|
+----+----------+---+----------+------+
|  AA|        D1| 30|         2|   100|
|  BB|        D1| 35|         1|   125|
|  CC|        D3| 25|         3|   100|
|  DD|        D2| 20|      NULL|    60|
|NULL|      NULL| 28|      NULL|    98|
|  EE|      NULL| 29|         4|    95|
|  FF|        D2| 22|         4|    90|
|  GG|        D7| 28|         1|    82|
|  HH|        D2| 38|         6|   115|
|  FF|        D3| 30|         6|   115|
+----+----------+---+----------+------+



In [None]:
imputer=Imputer(inputCols=['experience'],outputCols=['experience']).setStrategy('mode')     #or mode, median
df_imputed=imputer.fit(df_imputed).transform(df_imputed)
df_imputed.show()

+----+----------+---+----------+------+
|Name|Department|Age|experience|salary|
+----+----------+---+----------+------+
|  AA|        D1| 30|         2|   100|
|  BB|        D1| 35|         1|   125|
|  CC|        D3| 25|         3|   100|
|  DD|        D2| 20|         1|    60|
|NULL|      NULL| 28|         1|    98|
|  EE|      NULL| 29|         4|    95|
|  FF|        D2| 22|         4|    90|
|  GG|        D7| 28|         1|    82|
|  HH|        D2| 38|         6|   115|
|  FF|        D3| 30|         6|   115|
+----+----------+---+----------+------+



In [None]:
mode_value = df_imputed.groupBy("Department").count().orderBy("count", ascending=False).first()[0]
df_imputed = df_imputed.fillna({'Department': mode_value})
df_imputed.show()

+----+----------+---+----------+------+
|Name|Department|Age|experience|salary|
+----+----------+---+----------+------+
|  AA|        D1| 30|         2|   100|
|  BB|        D1| 35|         1|   125|
|  CC|        D3| 25|         3|   100|
|  DD|        D2| 20|         1|    60|
|NULL|        D2| 28|         1|    98|
|  EE|        D2| 29|         4|    95|
|  FF|        D2| 22|         4|    90|
|  GG|        D7| 28|         1|    82|
|  HH|        D2| 38|         6|   115|
|  FF|        D3| 30|         6|   115|
+----+----------+---+----------+------+



**Filtering based on operators**

In [None]:
#filter based on operations ==,<,.. |, &, ~
df_imputed[df_imputed['Age']>=30].show()
# df_imputed.filter(df_imputed['Age']>=30).show()
# df_imputed.filter(df_imputed.Age>=30).show()


+----+----------+---+----------+------+
|Name|Department|Age|experience|salary|
+----+----------+---+----------+------+
|  AA|        D1| 30|         2|   100|
|  BB|        D1| 35|         1|   125|
|  HH|        D2| 38|         6|   115|
|  FF|        D3| 30|         6|   115|
+----+----------+---+----------+------+



In [None]:
df_imputed[(df_imputed['Age']>=30) & (df_imputed['experience']==6)].show()


+----+----------+---+----------+------+
|Name|Department|Age|experience|salary|
+----+----------+---+----------+------+
|  HH|        D2| 38|         6|   115|
|  FF|        D3| 30|         6|   115|
+----+----------+---+----------+------+



**Groupby and aggregate**

In [None]:
df_imputed.show()

+----+----------+---+----------+------+
|Name|Department|Age|experience|salary|
+----+----------+---+----------+------+
|  AA|        D1| 25|         1|    70|
|  BB|        D1| 30|         3|   100|
|  CC|        D2| 35|         4|   110|
|  DD|        D3| 22|         1|    60|
|  EE|        D2| 29|         3|    75|
|  FF|        D1| 44|         3|    73|
|  ZZ|        D3| 23|         2|    46|
|  KK|        D4| 29|         2|    80|
+----+----------+---+----------+------+



In [None]:
df_imputed.show()

+----+----------+---+----------+------+
|Name|Department|Age|experience|salary|
+----+----------+---+----------+------+
|  AA|        D1| 30|         2|   100|
|  BB|        D1| 35|         1|   125|
|  CC|        D3| 25|         3|   100|
|  DD|        D2| 20|         1|    60|
|NULL|        D2| 28|         1|    98|
|  EE|        D2| 29|         4|    95|
|  FF|        D2| 22|         4|    90|
|  GG|        D7| 28|         1|    82|
|  HH|        D2| 38|         6|   115|
|  FF|        D3| 30|         6|   115|
+----+----------+---+----------+------+



In [None]:
#groupby
df_imputed.groupBy('Department').mean('Salary').show()   # alternatives sum,max,min,count

+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|        D7|       82.0|
|        D1|      112.5|
|        D3|      107.5|
|        D2|       91.6|
+----------+-----------+



In [None]:
df_imputed.groupBy('Department').min('Age').show()   # alternatives sum,max,min,count

+----------+--------+
|Department|min(Age)|
+----------+--------+
|        D7|      28|
|        D1|      30|
|        D3|      25|
|        D2|      20|
+----------+--------+



In [None]:
#Aggregate
df_imputed.groupBy('Department').agg({'Age':'mean',
                                      'salary':'sum',
                                      'experience':'max'}).show()

+----------+-----------+---------------+--------+
|Department|sum(salary)|max(experience)|avg(Age)|
+----------+-----------+---------------+--------+
|        D1|        243|              3|    33.0|
|        D3|        106|              2|    22.5|
|        D2|        185|              4|    32.0|
|        D4|         80|              2|    29.0|
+----------+-----------+---------------+--------+



In [None]:
# returns sum of salary for all

df_imputed.agg({'salary':'sum'}).show()

+-----------+
|sum(salary)|
+-----------+
|        614|
+-----------+

