# PySpark DataFrame subsetting and cleaning

- After data inspection, it is often necessary to clean the data which mainly involves subsetting, renaming the columns, removing duplicated rows etc., PySpark DataFrame API provides several operators to do this. In this exercise, your job is to subset `'name'`, `'sex'` and `'date of birth'` columns from `people_df` DataFrame, remove any duplicate rows from that dataset and count the number of rows before and after duplicates removal step.

- Remember, you already have `SparkSession` `spark` and `people_df` DataFrames available in your workspace.

## Instructions

- Select `'name'`, `'sex'` and `'date of birth'` columns from `people_df` and create `people_df_sub` DataFrame.
- Print the first 10 observations in the `people_df` DataFrame.
- Remove duplicate entries from `people_df_sub` DataFrame and create `people_df_sub_nodup` DataFrame.
- How many rows are there before and after duplicates are removed?

In [2]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell''

In [3]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [4]:
file_path = "file:///home/talentum/test-jupyter/P2/M3/SM2/2_OperatingonDataFramesinPySpark/Dataset/people.csv"

# Create an DataFrame from file_path
people_df = spark.read.csv(file_path, header=True, inferSchema=True)

# Select name, sex and date of birth columns
people_df_sub = people_df.select('name', 'sex', 'date of birth')

# Print the first 10 observations from people_df_sub
people_df_sub.show(10)

# Remove duplicate entries from people_df_sub
people_df_sub_nodup = people_df_sub.dropDuplicates()

# Count the number of rows
print("There were {} rows before removing duplicates, and {} rows after removing duplicates".format(people_df_sub.count(), people_df_sub_nodup.count()))

+----------------+------+-------------+
|            name|   sex|date of birth|
+----------------+------+-------------+
|  Penelope Lewis|female|   1990-08-31|
|   David Anthony|  male|   1971-10-14|
|       Ida Shipp|female|   1962-05-24|
|    Joanna Moore|female|   2017-03-10|
|  Lisandra Ortiz|female|   2020-08-05|
|   David Simmons|  male|   1999-12-30|
|   Edward Hudson|  male|   1983-05-09|
|    Albert Jones|  male|   1990-09-13|
|Leonard Cavender|  male|   1958-08-08|
|  Everett Vadala|  male|   2005-05-24|
+----------------+------+-------------+
only showing top 10 rows

There were 100000 rows before removing duplicates, and 99998 rows after removing duplicates


In [32]:
#to see duplicated records
#people_df_sub.groupBy(people_df_sub.columns).count().filter("count > 1").show()
people_df_sub.groupBy('name', 'sex','date of birth').count().filter("count > 1").show()
print(type(people_df_sub.groupBy('name', 'sex','date of birth')))
#this columns are of string type

+-------------+------+-------------+-----+
|         name|   sex|date of birth|count|
+-------------+------+-------------+-----+
|Kathryn Davis|female|  20175-02-28|    2|
| Robert Smith|  male|  20175-02-28|    2|
+-------------+------+-------------+-----+

<class 'pyspark.sql.group.GroupedData'>


In [26]:
people_df_sub.groupBy(people_df_sub.name,people_df_sub.sex,'date of birth').count().where('count > 1').show()
print(type(people_df_sub.name))#this columns are of object types

+-------------+------+-------------+-----+
|         name|   sex|date of birth|count|
+-------------+------+-------------+-----+
|Kathryn Davis|female|  20175-02-28|    2|
| Robert Smith|  male|  20175-02-28|    2|
+-------------+------+-------------+-----+

<class 'pyspark.sql.column.Column'>


In [30]:
#people_df_sub.groupBy(people_df_sub.name,people_df_sub.sex,people_df_sub.date of birth).count().where('count > 1').show()
#above line will give error for date of birth as it is a object
print(type(people_df_sub['date of birth'])) #this is a column object
people_df_sub.groupBy(people_df_sub.name,people_df_sub.sex,people_df_sub['date of birth']).count().where('count > 1').show()

<class 'pyspark.sql.column.Column'>
+-------------+------+-------------+-----+
|         name|   sex|date of birth|count|
+-------------+------+-------------+-----+
|Kathryn Davis|female|  20175-02-28|    2|
| Robert Smith|  male|  20175-02-28|    2|
+-------------+------+-------------+-----+



In [13]:
print(type(people_df_sub.groupBy('name','sex','date of birth').count()))

<class 'pyspark.sql.dataframe.DataFrame'>


In [14]:
people_df_sub.printSchema()

root
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- date of birth: string (nullable = true)



In [19]:
people_df_sub.groupBy('name','sex','date of birth').count().printSchema()

root
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- date of birth: string (nullable = true)
 |-- count: long (nullable = false)



In [33]:
people_df_sub \
.groupBy(people_df_sub.name,people_df_sub.sex,people_df_sub['date of birth']) \
.count().where('count > 1').show()

+-------------+------+-------------+-----+
|         name|   sex|date of birth|count|
+-------------+------+-------------+-----+
|Kathryn Davis|female|  20175-02-28|    2|
| Robert Smith|  male|  20175-02-28|    2|
+-------------+------+-------------+-----+



#### object oriented programming  and functional orirnted programming example

In [43]:
# people_df_sub \
# .groupBy(people_df_sub.name,people_df_sub.sex,people_df_sub['date of birth']) \
# .count().where(people_df_sub.count > 1).show()
# #TypeError: '>' not supported between instances of 'method' and 'int'

#--------------------------------
# people_df_sub \
# .groupBy(people_df_sub.name,people_df_sub.sex,people_df_sub['date of birth']) \
# .count().where(people_df_sub['count'] > 1).show()
# #AnalysisException: 'Cannot resolve column name "count" among (name, sex, date of birth);'
# #here where(people_df_sub['count'] > 1) is try look for count column which not present

#-------------------------------

#to insist where(people_df_sub['count'] > 1) that it is a column 
#use below

from pyspark.sql.functions import col

#using column objects
people_df_sub \
.groupBy(people_df_sub.name,people_df_sub.sex,people_df_sub['date of birth']) \
.count().where(col('count')>1).show() 

print(type(col('count')>1)) #its a column object

#in pandas series object even row is series object
#in spark data frame columns are column object
#in spark data frame rows are row object object
#spark data frame is collection of row and columns objects
#pandas df is immutable
#pandas df is not a distributed data structure
#spark df is distributed data structure

print(people_df_sub \
.groupBy(people_df_sub.name,people_df_sub.sex,people_df_sub['date of birth']) \
.count().where(col('count')>1).collect())

+-------------+------+-------------+-----+
|         name|   sex|date of birth|count|
+-------------+------+-------------+-----+
|Kathryn Davis|female|  20175-02-28|    2|
| Robert Smith|  male|  20175-02-28|    2|
+-------------+------+-------------+-----+

<class 'pyspark.sql.column.Column'>
[Row(name='Kathryn Davis', sex='female', date of birth='20175-02-28', count=2), Row(name='Robert Smith', sex='male', date of birth='20175-02-28', count=2)]
