In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import col

#Create SparkSession 
spark = SparkSession.builder \
     .master("local[1]") \
     .appName("SparkByExamples.com") \
     .getOrCreate()

In [2]:
import findspark
findspark.init()

In [3]:
import pyspark

In [4]:
df = spark.read.csv("Absenteeism_new_data.csv",inferSchema=True, header=True)

# inferSchema, Header
inferschema=True
header=True

In [5]:
df.show()

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| 22|                27|01/06/2018|                   179|              26| 30|                237.656|             19|        3|       0|   0|
| 10|                 7|04/06/2018|                   361|              52| 28|                237.656|             27|        1|       1|   4|
| 14|                23|06/06/2018|                   155|              12| 34|                237.656|             25|        1|       2|   0|
| 17|                25|08/06/2018|                   179|              22| 40|                237.656|             22|        2|       

In [6]:
df.columns

['ID',
 'Reason for Absence',
 'Date',
 'Transportation Expense',
 'Distance to Work',
 'Age',
 'Daily Work Load Average',
 'Body Mass Index',
 'Education',
 'Children',
 'Pets']

In [7]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Reason for Absence: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Transportation Expense: integer (nullable = true)
 |-- Distance to Work: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Daily Work Load Average: double (nullable = true)
 |-- Body Mass Index: integer (nullable = true)
 |-- Education: integer (nullable = true)
 |-- Children: integer (nullable = true)
 |-- Pets: integer (nullable = true)



In [8]:
df.toPandas()

Unnamed: 0,ID,Reason for Absence,Date,Transportation Expense,Distance to Work,Age,Daily Work Load Average,Body Mass Index,Education,Children,Pets
0,22,27,01/06/2018,179,26,30,237.656,19,3,0,0
1,10,7,04/06/2018,361,52,28,237.656,27,1,1,4
2,14,23,06/06/2018,155,12,34,237.656,25,1,2,0
3,17,25,08/06/2018,179,22,40,237.656,22,2,2,0
4,14,10,08/06/2018,155,12,34,237.656,25,1,2,0
5,28,11,11/06/2018,225,26,28,237.656,24,1,1,2
6,16,7,13/06/2018,118,15,46,275.089,25,1,2,0
7,22,27,13/06/2018,179,26,30,275.089,19,3,0,0
8,34,26,15/06/2018,118,10,37,275.089,28,1,0,0
9,34,10,20/06/2018,118,10,37,275.089,28,1,0,0


PySpark DataFrame show()
is used to display the contents of the DataFrame--------- in a Table Row and Column Format. 

By default, 

it shows only 20 Rows

column values are truncated at 20 characters

In [9]:
df.show()

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| 22|                27|01/06/2018|                   179|              26| 30|                237.656|             19|        3|       0|   0|
| 10|                 7|04/06/2018|                   361|              52| 28|                237.656|             27|        1|       1|   4|
| 14|                23|06/06/2018|                   155|              12| 34|                237.656|             25|        1|       2|   0|
| 17|                25|08/06/2018|                   179|              22| 40|                237.656|             22|        2|       

In [10]:
# Default - displays 20 rows and 
# 20 charactes from column value 
df.show()

#Display full column contents
df.show(truncate=False)

# Display 2 rows and full column contents
df.show(2,truncate=False) 

# Display 2 rows & column values 25 characters
df.show(2,truncate=25) 

# Display DataFrame rows & columns vertically
df.show(n=3,truncate=25,vertical=True)

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| 22|                27|01/06/2018|                   179|              26| 30|                237.656|             19|        3|       0|   0|
| 10|                 7|04/06/2018|                   361|              52| 28|                237.656|             27|        1|       1|   4|
| 14|                23|06/06/2018|                   155|              12| 34|                237.656|             25|        1|       2|   0|
| 17|                25|08/06/2018|                   179|              22| 40|                237.656|             22|        2|       

pyspark.sql.Column class 
provides several functions to work with DataFrame to 
1. manipulate the Column values, 
2. evaluate the boolean expression to filter row
3. retrieve a value or part of a value from a DataFrame column, and to work with list, map & struct columns.

In [11]:
df.select(df.Pets).show()
df.select(df["Pets"]).show()
#Accessing column name with dot (with backticks)
#df.select(df["`name.fname`"]).show()

#Using SQL col() function
from pyspark.sql.functions import col
df.select(col("Pets")).show()
#Accessing column name with dot (with backticks)
#df.select(col("`name.fname`")).show()

+----+
|Pets|
+----+
|   0|
|   4|
|   0|
|   0|
|   0|
|   2|
|   0|
|   0|
|   0|
|   0|
|   4|
|   0|
|   8|
|   0|
|   5|
|   0|
|   0|
|   0|
|   1|
|   0|
+----+
only showing top 20 rows

+----+
|Pets|
+----+
|   0|
|   4|
|   0|
|   0|
|   0|
|   2|
|   0|
|   0|
|   0|
|   0|
|   4|
|   0|
|   8|
|   0|
|   5|
|   0|
|   0|
|   0|
|   1|
|   0|
+----+
only showing top 20 rows

+----+
|Pets|
+----+
|   0|
|   4|
|   0|
|   0|
|   0|
|   2|
|   0|
|   0|
|   0|
|   0|
|   4|
|   0|
|   8|
|   0|
|   5|
|   0|
|   0|
|   0|
|   1|
|   0|
+----+
only showing top 20 rows



# alias() – Set’s name to Column


In [12]:
from pyspark.sql.functions import *
df.select(df.Pets.alias("pet")).show()

+---+
|pet|
+---+
|  0|
|  4|
|  0|
|  0|
|  0|
|  2|
|  0|
|  0|
|  0|
|  0|
|  4|
|  0|
|  8|
|  0|
|  5|
|  0|
|  0|
|  0|
|  1|
|  0|
+---+
only showing top 20 rows



asc() & desc() – Sort the DataFrame columns by Ascending or Descending order.

In [13]:
df.orderBy(df.Pets.asc()).show(5)
df.sort(df.Pets.asc()).show(5)

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| 16|                 7|13/06/2018|                   118|              15| 46|                275.089|             25|        1|       2|   0|
| 22|                27|22/06/2018|                   179|              26| 30|                275.089|             19|        3|       0|   0|
| 22|                27|13/06/2018|                   179|              26| 30|                275.089|             19|        3|       0|   0|
| 14|                23|06/06/2018|                   155|              12| 34|                237.656|             25|        1|       

4.3 cast() & astype() – Used to convert the data Type.

In [14]:
df.select(df.Pets.cast("float")).show(2)
df.select(df.Pets.astype("float")).show(2)

+----+
|Pets|
+----+
| 0.0|
| 4.0|
+----+
only showing top 2 rows

+----+
|Pets|
+----+
| 0.0|
| 4.0|
+----+
only showing top 2 rows



between() –

Returns a Boolean expression -----when a column values in between lower and upper bound.


In [15]:
df.select(df.Pets.between(2,4)).show(3)
df.filter(df.Pets.between(2,4)).show(2)

+-----------------------------+
|((Pets >= 2) AND (Pets <= 4))|
+-----------------------------+
|                        false|
|                         true|
|                        false|
+-----------------------------+
only showing top 3 rows

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| 10|                 7|04/06/2018|                   361|              52| 28|                237.656|             27|        1|       1|   4|
| 28|                11|11/06/2018|                   225|              26| 28|                237.656|             24|        1|       1|   2|
+---+------------------+-------

 contains() – 
 
 Checks if a DataFrame column value contains a a value specified in this function.


In [16]:
df.select(df.Pets.contains(4)).show(3)
df.filter(df.Pets.contains(4)).show(3)

+-----------------+
|contains(Pets, 4)|
+-----------------+
|            false|
|             true|
|            false|
+-----------------+
only showing top 3 rows

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| 10|                 7|04/06/2018|                   361|              52| 28|                237.656|             27|        1|       1|   4|
| 23|                22|21/06/2018|                   378|              49| 36|                275.089|             21|        1|       2|   4|
| 10|                22|18/07/2018|                   361|              52| 28|                264.604|            

# startswith() & endswith() –
Checks if the value of the DataFrame Column starts and ends with a String respectively.

In [17]:
df.select(df['Distance to Work'].startswith("5")).show(2)
df.filter(df['Distance to Work'].startswith("5")).show(2)

+-------------------------------+
|startswith(Distance to Work, 5)|
+-------------------------------+
|                          false|
|                           true|
+-------------------------------+
only showing top 2 rows

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| 10|                 7|04/06/2018|                   361|              52| 28|                237.656|             27|        1|       1|   4|
| 12|                19|22/06/2018|                   233|              51| 31|                275.089|             21|        2|       1|   8|
+---+------------------+----------+----------------

# isNull & isNotNull() – Checks if the DataFrame column has NULL or non NULL values.

In [18]:
df.filter(df.Pets.isNull()).show(4)
df.filter(df.Pets.isNotNull()).show(2)

+---+------------------+----+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
+---+------------------+----+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| 22|          

#  like() & rlike() – Similar to SQL LIKE expression

In [19]:
df.filter(df.Pets.like("%5")).show(2)

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
|  2|                 0|25/06/2018|                   235|              29| 48|                275.089|             33|        1|       1|   5|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+



# substr() – Returns a Column after getting sub string from the Column

df.select(df.fname.substr(1,2).alias("substr")).show()

df.withColumn("new column", df.name.substr(1,2)).show()

# when() & otherwise()

In [20]:
df.select(df.Pets,df.Education,
          when(df.Pets==0,"nopets") \
         .when(df.Pets<=3,"lessPets") \
         .when(df.Pets>3, "morePets") \
         .otherwise(df.Pets).alias("new_Pets") \
    ).show(5)

df.withColumn("new_pets",
          when(df.Pets==0,"nopets") \
         .when(df.Pets<=3,"lessPets") \
         .when(df.Pets>3, "morePets") \
         .otherwise(df.Pets)
    ).show(5)

+----+---------+--------+
|Pets|Education|new_Pets|
+----+---------+--------+
|   0|        3|  nopets|
|   4|        1|morePets|
|   0|        1|  nopets|
|   0|        2|  nopets|
|   0|        1|  nopets|
+----+---------+--------+
only showing top 5 rows

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+--------+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|new_pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+--------+
| 22|                27|01/06/2018|                   179|              26| 30|                237.656|             19|        3|       0|   0|  nopets|
| 10|                 7|04/06/2018|                   361|              52| 28|                237.656|             27|        1|

In [21]:
li=[105,155,25]
df.filter(df["Transportation Expense"].isin(li)).show(3, truncate=25)

df.select(df["Pets"],df["Transportation Expense"]).filter(df["Transportation Expense"].isin(li)).show(3, truncate=25)

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| 14|                23|06/06/2018|                   155|              12| 34|                237.656|             25|        1|       2|   0|
| 14|                10|08/06/2018|                   155|              12| 34|                237.656|             25|        1|       2|   0|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+

+----+----------------------+
|Pets|Transportation Expense|
+----+----------------------+
|   0|                   155|
|   0|         

# Select

# select- to select number of columns
df2.select("name").show(truncate=False)

df2.select("name.firstname","name.lastname").show(truncate=False)

df2.select("name.*").show(truncate=False)


# collect

In [22]:
dataCollect = df.collect()
dataCollect

print("*"*100)
for row in df.collect():
    print(row)
    
print("*"*100)
    
df.collect()[0][0]

****************************************************************************************************
Row(ID=22, Reason for Absence=27, Date='01/06/2018', Transportation Expense=179, Distance to Work=26, Age=30, Daily Work Load Average=237.656, Body Mass Index=19, Education=3, Children=0, Pets=0)
Row(ID=10, Reason for Absence=7, Date='04/06/2018', Transportation Expense=361, Distance to Work=52, Age=28, Daily Work Load Average=237.656, Body Mass Index=27, Education=1, Children=1, Pets=4)
Row(ID=14, Reason for Absence=23, Date='06/06/2018', Transportation Expense=155, Distance to Work=12, Age=34, Daily Work Load Average=237.656, Body Mass Index=25, Education=1, Children=2, Pets=0)
Row(ID=17, Reason for Absence=25, Date='08/06/2018', Transportation Expense=179, Distance to Work=22, Age=40, Daily Work Load Average=237.656, Body Mass Index=22, Education=2, Children=2, Pets=0)
Row(ID=14, Reason for Absence=10, Date='08/06/2018', Transportation Expense=155, Distance to Work=12, Age=34, Daily 

22

# withColumn()

PySpark withColumn() -transformation function of DataFrame 

1. which is used to change 

2. the value

3. convert the datatype of an existing column
4. create a new column, and many more. 

# Change DataType- withColumn()

In [23]:
df.withColumn("new_pet_name",df["Pets"].astype("float")).show(4)
df.withColumn("new_pet_name",col("Pets").astype("float")).show(4)
df.withColumn("new_pet_name",df["Pets"].astype("float")).select(["Pets","new_pet_name"]).show(4)

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+------------+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|new_pet_name|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+------------+
| 22|                27|01/06/2018|                   179|              26| 30|                237.656|             19|        3|       0|   0|         0.0|
| 10|                 7|04/06/2018|                   361|              52| 28|                237.656|             27|        1|       1|   4|         4.0|
| 14|                23|06/06/2018|                   155|              12| 34|                237.656|             25|        1|       2|   0|         0.0|
| 17|                25|08/06/2018|                   179|

# Update The Value of an Existing Column-withColumn()

In [24]:
df.withColumn("new_pet", df["Pets"]*10).select(["Pets","new_pet"]).show(10)

+----+-------+
|Pets|new_pet|
+----+-------+
|   0|      0|
|   4|     40|
|   0|      0|
|   0|      0|
|   0|      0|
|   2|     20|
|   0|      0|
|   0|      0|
|   0|      0|
|   0|      0|
+----+-------+
only showing top 10 rows



# New Column using withColumn()-withColumn
to add a new column which is not there 

In [25]:

df.withColumn("Country", lit("USA")).show(5)
df.withColumn("Country", lit("USA")) \
  .withColumn("anotherColumn",lit("anotherValue")) \
  .select("Country").show(5)

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+-------+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|Country|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+-------+
| 22|                27|01/06/2018|                   179|              26| 30|                237.656|             19|        3|       0|   0|    USA|
| 10|                 7|04/06/2018|                   361|              52| 28|                237.656|             27|        1|       1|   4|    USA|
| 14|                23|06/06/2018|                   155|              12| 34|                237.656|             25|        1|       2|   0|    USA|
| 17|                25|08/06/2018|                   179|              22| 40|         

# Rename Column Name, WithColumnRenamed()


In [26]:
df.withColumnRenamed("Pets","Pet").show(4,truncate=True)

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+---+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pet|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+---+
| 22|                27|01/06/2018|                   179|              26| 30|                237.656|             19|        3|       0|  0|
| 10|                 7|04/06/2018|                   361|              52| 28|                237.656|             27|        1|       1|  4|
| 14|                23|06/06/2018|                   155|              12| 34|                237.656|             25|        1|       2|  0|
| 17|                25|08/06/2018|                   179|              22| 40|                237.656|             22|        2|       2|  0|

# Drop Column From PySpark DataFrame

In [27]:
df.drop("Pets").show(3)

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+
| 22|                27|01/06/2018|                   179|              26| 30|                237.656|             19|        3|       0|
| 10|                 7|04/06/2018|                   361|              52| 28|                237.656|             27|        1|       1|
| 14|                23|06/06/2018|                   155|              12| 34|                237.656|             25|        1|       2|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+
only showing top 3 rows



# Where Filter Function | Multiple Conditions

In [28]:
df.filter(df.Pets == 5).show(5)
df.filter(~(df.Pets ==5)).show(5)


#Using SQL Expression
df.filter("Pets == 5").show(3)
#For not equal
df.filter("Pets != 5").show(3)
df.filter("Pets <> 5").show(3)

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
|  2|                 0|25/06/2018|                   235|              29| 48|                275.089|             33|        1|       1|   5|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Childr

In [29]:
 li=["Pets","Education"]
df.filter(df.Pets == 5).select(li).show(5)
df.filter(~(df.Pets ==5)).select(li).show(5)

print("*"*100)

#Using SQL Expression
df.filter("Pets == 5").select(li).show(3)
#For not equal
df.filter("Pets != 5").select(li).show(3)
df.filter("Pets <> 5").select(li).show(3)

print("*"*100)


+----+---------+
|Pets|Education|
+----+---------+
|   5|        1|
+----+---------+

+----+---------+
|Pets|Education|
+----+---------+
|   0|        3|
|   4|        1|
|   0|        1|
|   0|        2|
|   0|        1|
+----+---------+
only showing top 5 rows

****************************************************************************************************
+----+---------+
|Pets|Education|
+----+---------+
|   5|        1|
+----+---------+

+----+---------+
|Pets|Education|
+----+---------+
|   0|        3|
|   4|        1|
|   0|        1|
+----+---------+
only showing top 3 rows

+----+---------+
|Pets|Education|
+----+---------+
|   0|        3|
|   4|        1|
|   0|        1|
+----+---------+
only showing top 3 rows

****************************************************************************************************


# filter with multiple conditions

In [30]:
 li=["Pets","Education"]
df.filter((df.Pets == 5)|(df.Pets == 1)).select(li).show(5)

print("*"*100)

df.filter((df.Pets == 5) & (df.Education == 2)).select(li).show(5)

+----+---------+
|Pets|Education|
+----+---------+
|   5|        1|
|   1|        1|
|   1|        1|
|   1|        1|
|   1|        1|
+----+---------+
only showing top 5 rows

****************************************************************************************************
+----+---------+
|Pets|Education|
+----+---------+
+----+---------+



# List based Filteration

In [31]:
li=[1,2,3]

df.filter(df.Pets.isin(li)).show(3)

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| 28|                11|11/06/2018|                   225|              26| 28|                237.656|             24|        1|       1|   2|
| 15|                28|28/06/2018|                   291|              31| 40|                275.089|             25|        1|       1|   1|
|  9|                 6|16/07/2018|                   228|              14| 58|                264.604|             22|        1|       2|   1|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+-------

# Other filtering
## like, alike
## startswith,endswith,contains 

# withColumnRenamed

In [32]:
#to change single columnname

df.withColumnRenamed("pet","Pets").show(2)

#to change multiple columns

#newColumns = ["newCol1","newCol2","newCol3","newCol4"]
#df.toDF(*newColumns).printSchema()

#to change 2 or more columns

df.withColumn("Educate",col("Education")) \
  .withColumn("mname",col("Pets")).show(2)


+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| 22|                27|01/06/2018|                   179|              26| 30|                237.656|             19|        3|       0|   0|
| 10|                 7|04/06/2018|                   361|              52| 28|                237.656|             27|        1|       1|   4|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
only showing top 2 rows

+---+------------------+----------+----------------------+----------------+---+-----------------------+--------

# handling duplicates


In [33]:
#Distinct
distinctDF = df.distinct()
print("Distinct count: "+str(distinctDF.count()))
distinctDF.show(truncate=False)


#Drop duplicates
df2 = df.dropDuplicates()
print("Distinct count: "+str(df2.count()))
df2.show(truncate=False)



#Drop duplicates on selected columns
dropDisDF = df.dropDuplicates(["Pets","Education"])
print("Distinct count of department salary : "+str(dropDisDF.count()))
dropDisDF.show(truncate=False)

Distinct count: 40
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
|ID |Reason for Absence|Date      |Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
|28 |22                |18/07/2018|225                   |26              |28 |264.604                |24             |1        |1       |2   |
|10 |22                |18/07/2018|361                   |52              |28 |264.604                |27             |1        |1       |4   |
|22 |27                |01/06/2018|179                   |26              |30 |237.656                |19             |3        |0       |0   |
|10 |7                 |04/06/2018|361                   |52              |28 |237.656                |27            

#  orderBy() and sort() 

In [34]:
df.sort("Pets","Education").show(5)
df.sort(col("Pets"),col("Education")).show(truncate=False)

df.orderBy("Pets","Education").show(truncate=False)
df.orderBy(col("Pets"),col("Education")).show(truncate=False)

df.sort(df.Pets.asc(),df.Education.asc()).show(truncate=False)
df.sort(col("Pets").asc(),col("Education").asc()).show(truncate=False)
df.orderBy(col("Pets").asc(),col("Education").asc()).show(truncate=False)

df.sort(col("Pets").asc(),col("Education").desc()).show(truncate=False)
df.orderBy(col("Pets").asc(),col("Education").desc()).show(truncate=False)

df.createOrReplaceTempView("EMP")
spark.sql("select Pets,Education from EMP ORDER BY Pets asc").show(truncate=False)

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| 16|                 7|13/06/2018|                   118|              15| 46|                275.089|             25|        1|       2|   0|
| 34|                26|15/06/2018|                   118|              10| 37|                275.089|             28|        1|       0|   0|
| 36|                19|21/06/2018|                   118|              13| 50|                275.089|             31|        1|       1|   0|
| 34|                10|20/06/2018|                   118|              10| 37|                275.089|             28|        1|       

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
|ID |Reason for Absence|Date      |Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
|14 |23                |06/06/2018|155                   |12              |34 |237.656                |25             |1        |2       |0   |
|36 |19                |21/06/2018|118                   |13              |50 |275.089                |31             |1        |1       |0   |
|16 |7                 |13/06/2018|118                   |15              |46 |275.089                |25             |1        |2       |0   |
|14 |10                |08/06/2018|155                   |12              |34 |237.656                |25             |1        |2      

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
|ID |Reason for Absence|Date      |Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
|22 |27                |22/06/2018|179                   |26              |30 |275.089                |19             |3        |0       |0   |
|22 |27                |01/06/2018|179                   |26              |30 |237.656                |19             |3        |0       |0   |
|22 |13                |28/06/2018|179                   |26              |30 |275.089                |19             |3        |0       |0   |
|22 |27                |13/06/2018|179                   |26              |30 |275.089                |19             |3        |0      

In [35]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Reason for Absence: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Transportation Expense: integer (nullable = true)
 |-- Distance to Work: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Daily Work Load Average: double (nullable = true)
 |-- Body Mass Index: integer (nullable = true)
 |-- Education: integer (nullable = true)
 |-- Children: integer (nullable = true)
 |-- Pets: integer (nullable = true)



# GroupBy

In [36]:

df.groupBy("Pets").sum("Transportation Expense").show(5)

df.groupBy("Pets").count().show(5)


df.groupBy("Pets","Education") \
    .sum("Transportation Expense") \
   .show(5)

df.groupBy("Pets") \
    .agg(sum("Transportation Expense").alias("sum_salary"), \
         avg("Transportation Expense").alias("avg_salary"), \
         sum("Transportation Expense").alias("sum_bonus"), \
         max("Age").alias("max_bonus") \
     ) \
    .show(5)
    
df.groupBy("Pets") \
    .agg(sum("Transportation Expense").alias("sum_salary"), \
      avg("Transportation Expense").alias("avg_salary"), \
      sum("Transportation Expense").alias("sum_bonus"), \
      max("Transportation Expense").alias("max_bonus")) \
    .where(col("sum_bonus") >= 5000) \
    .show(5)

+----+---------------------------+
|Pets|sum(Transportation Expense)|
+----+---------------------------+
|   1|                       1450|
|   5|                        235|
|   4|                       1100|
|   8|                       1050|
|   2|                        870|
+----+---------------------------+
only showing top 5 rows

+----+-----+
|Pets|count|
+----+-----+
|   1|    6|
|   5|    1|
|   4|    3|
|   8|    5|
|   2|    4|
+----+-----+
only showing top 5 rows

+----+---------+---------------------------+
|Pets|Education|sum(Transportation Expense)|
+----+---------+---------------------------+
|   1|        1|                       1215|
|   1|        3|                        235|
|   8|        1|                        118|
|   2|        1|                        870|
|   8|        2|                        932|
+----+---------+---------------------------+
only showing top 5 rows

+----+----------+------------------+---------+---------+
|Pets|sum_salary|        avg_sa

# [Join,union,union](https://sparkbyexamples.com/pyspark/pyspark-join-explained-with-examples/)

# userDefined 
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z))

df.select(col("Seqno"), \
    convertUDF(col("Name")).alias("Name") ) \
.show(truncate=False)

def upperCase(str):
    return str.upper()

upperCaseUDF = udf(lambda z:upperCase(z),StringType())    

df.withColumn("Cureated Name", upperCaseUDF(col("Name"))) \
.show(truncate=False)

""" Using UDF on SQL """
spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)
     
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " + \
          "where Name is not null and convertUDF(Name) like '%John%'") \
     .show(truncate=False)  
     
""" null check """

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders"),
    ('4',None)]

df2 = spark.createDataFrame(data=data,schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")
    
spark.udf.register("_nullsafeUDF", lambda str: convertCase(str) if not str is None else "" , StringType())

spark.sql("select _nullsafeUDF(Name) from NAME_TABLE2") \
     .show(truncate=False)

spark.sql("select Seqno, _nullsafeUDF(Name) as Name from NAME_TABLE2 " + \
          " where Name is not null and _nullsafeUDF(Name) like '%John%'") \
     .show(truncate=False)  

# PySpark map() Transformation

PySpark map 
RDD transformation that is used to apply the transformation function ----(lambda) on every element of RDD/DataFrame and returns a new RDD.

RDD map() transformation - apply any complex operations like 
adding a column
updating a column
transforming the data e.t.c, 
the output of map transformations would always have the same number of records as input.

#Referring Column Names
rdd2=df.rdd.map(lambda x: 

    (x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2)
    
    ) 
    
    

#Referring Column Names
rdd2=df.rdd.map(lambda x:

    (x.firstname+","+x.lastname,x.gender,x.salary*2)
    
    ) 
    
    

def func1(x):

    firstName=x.firstname
    
    lastName=x.lastname
    
    name=firstName+","+lastName
    
    gender=x.gender.lower()
    
    salary=x.salary*2
    
    return (name,gender,salary)

rdd2=df.rdd.map(lambda x: func1(x))

# PySpark flatMap() Transformation


#Flatmap    
rdd2=df.rdd.flatMap(lambda x: x.split(" "))

for element in rdd2.collect():

    print(element)

# PySpark fillna() & fill() – Replace NULL/None Values


 DataFrame.fillna()
 
 or
 
 DataFrameNaFunctions.fill()                
 
 --is used to replace NULL/None values on all or selected multiple DataFrame columns
 
 with either zero(0), empty string, space, or any constant literal values.
 
 fillna(value, subset=None)
 
fill(value, subset=None)

In [41]:
#Replace 0 for null for all integer columns
df.na.fill(value=0).show(3)

#Replace 0 for null on only population column 
df.na.fill(value=0,subset=["Pets"]).show(3)

#replace Null with empty string 
df.na.fill("").show(2)
# two columns at a time
df.na.fill("unknown",["Pets"]) \
    .na.fill("",["Education"]).show(2)

# or
df.na.fill({"Pets": "unknown", "Education": ""}) \
    .show(2)

+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| ID|Reason for Absence|      Date|Transportation Expense|Distance to Work|Age|Daily Work Load Average|Body Mass Index|Education|Children|Pets|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+--------+----+
| 22|                27|01/06/2018|                   179|              26| 30|                237.656|             19|        3|       0|   0|
| 10|                 7|04/06/2018|                   361|              52| 28|                237.656|             27|        1|       1|   4|
| 14|                23|06/06/2018|                   155|              12| 34|                237.656|             25|        1|       2|   0|
+---+------------------+----------+----------------------+----------------+---+-----------------------+---------------+---------+-------

# PySpark Pivot and Unpivot DataFrame

PySpark pivot() function is used to rotate/transpose the data from ---one column into ----multiple Dataframe columns 

and back using unpivot(). 

Pivot() -- aggregation where one of the grouping columns values is transposed into individual columns with distinct data.

In [42]:
df.groupBy("Pets").pivot("Education").sum("Transportation Expense").show(3)

+----+----+----+----+
|Pets|   1|   2|   3|
+----+----+----+----+
|   1|1215|null| 235|
|   5| 235|null|null|
|   4|1100|null|null|
+----+----+----+----+
only showing top 3 rows



# [Unpivote](https://sparkbyexamples.com/pyspark/pyspark-pivot-and-unpivot-dataframe/)

unpivot reverse it

unpivotExpr = "stack(3, 'Canada', Canada, 'China', China, 'Mexico', Mexico) as (Country,Total)"

unPivotDF = pivotDF.select("Product", expr(unpivotExpr)) \

    .where("Total is not null")
    
unPivotDF.show(truncate=False) 

# PySpark partitionBy() – Write to Disk Example

# PySpark ArrayType Column With Examples- expload