In [23]:
# Imports 
import pandas as pd
from pyspark.sql import SparkSession # type: ignore

# Initialize Spark
spark = SparkSession.builder.appName('Dataframe').getOrCreate()
# spark

Purpose:
1. Dropping Columns
2. Dropping Rows
3. Various Parameter In Dropping functionalities
4. Handling Missing values by Mean, MEdian And Mode
5. Updating a dataframe

In [30]:
# Read dataset into pySpark
df_pyspark = spark.read.csv('test2.csv',header=True,inferSchema=True)
df_pyspark.printSchema()
df_pyspark.show()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|NULL|      NULL| 40000|
|     NULL|  34|        10| 38000|
|     NULL|  36|      NULL|  NULL|
+---------+----+----------+------+



In [31]:
# drop the name column

df_pyspark.drop('Name').show()

+----+----------+------+
| age|Experience|Salary|
+----+----------+------+
|  31|        10| 30000|
|  30|         8| 25000|
|  29|         4| 20000|
|  24|         3| 20000|
|  21|         1| 15000|
|  23|         2| 18000|
|NULL|      NULL| 40000|
|  34|        10| 38000|
|  36|      NULL|  NULL|
+----+----------+------+



In [32]:
# Drop NA columns
df_pyspark.na.drop().show()
df_pyspark.na.drop(how="any").show() # any row containing 1 at least one null value will be dropped
df_pyspark.na.drop(how="any",thresh=3).show() # any row containing 3 or more non-null values, won't drop it
df_pyspark.na.drop(how="any",subset=['Age']).show() # Drops any row in DF where Age column contains Null or NaN (only considers age column)

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     NULL| 

In [33]:
from pyspark.sql.functions import col # type: ignore

# Convert non-nullable columns to nullable strings first or it won't work 
df_pyspark = df_pyspark.withColumn("Experience", col("Experience").cast("string"))
df_pyspark = df_pyspark.withColumn("age", col("age").cast("string"))


# Fill missing values
df_pyspark.na.fill('Missing Values',['Name','Experience','age']).show() #Replaces with string 'missing values' for columns [name, experience, age]

df_pyspark.show() #original data frame is not affected by the previous method

+--------------+--------------+--------------+------+
|          Name|           age|    Experience|Salary|
+--------------+--------------+--------------+------+
|         Krish|            31|            10| 30000|
|     Sudhanshu|            30|             8| 25000|
|         Sunny|            29|             4| 20000|
|          Paul|            24|             3| 20000|
|        Harsha|            21|             1| 15000|
|       Shubham|            23|             2| 18000|
|        Mahesh|Missing Values|Missing Values| 40000|
|Missing Values|            34|            10| 38000|
|Missing Values|            36|Missing Values|  NULL|
+--------------+--------------+--------------+------+

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23| 

TODO: What's goig on here? I thought we removed those rows. Explain here:

The na.drop() method returns a new DataFrame with rows containing missing values removed, it doesn't modify the original dataFrame in place.

you'd have to do this...

df_pyspark = df_pyspark.na.drop() to re-assign the variable 'df_pyspark' to the new array that was returned after dropping the columns.

In [34]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [38]:
# Convert columns to numeric data type
df_pyspark = df_pyspark.withColumn("age", col("age").cast("double"))
df_pyspark = df_pyspark.withColumn("Experience", col("Experience").cast("double"))
df_pyspark = df_pyspark.withColumn("Salary", col("Salary").cast("double"))

df_pyspark.printSchema()


root
 |-- Name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- Experience: double (nullable = true)
 |-- Salary: double (nullable = true)



In [39]:
from pyspark.ml.feature import Imputer

# Return an imputer that will imput the mean for age, experience and salary for Null or NaN values
imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'], 
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
    ).setStrategy("median")

In [40]:
# Add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()


+---------+----+----------+-------+-----------+------------------+--------------+
|     Name| age|Experience| Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+-------+-----------+------------------+--------------+
|    Krish|31.0|      10.0|30000.0|       31.0|              10.0|       30000.0|
|Sudhanshu|30.0|       8.0|25000.0|       30.0|               8.0|       25000.0|
|    Sunny|29.0|       4.0|20000.0|       29.0|               4.0|       20000.0|
|     Paul|24.0|       3.0|20000.0|       24.0|               3.0|       20000.0|
|   Harsha|21.0|       1.0|15000.0|       21.0|               1.0|       15000.0|
|  Shubham|23.0|       2.0|18000.0|       23.0|               2.0|       18000.0|
|   Mahesh|NULL|      NULL|40000.0|       29.0|               4.0|       40000.0|
|     NULL|34.0|      10.0|38000.0|       34.0|              10.0|       38000.0|
|     NULL|36.0|      NULL|   NULL|       36.0|               4.0|       20000.0|
+---------+----+

In [41]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

data = [('James','Smith','M',3000), ('Anna','Rose','F',4100),
  ('Robert','Williams','M',6200)
]
columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

24/05/02 03:48:00 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|  3000|
|     Anna|    Rose|     F|  4100|
|   Robert|Williams|     M|  6200|
+---------+--------+------+------+



                                                                                

Below PySpark code updates the salary column value of DataFrame by multiplying salary by 3 times.

Note that withColumn() is used to update or add a new column to the DataFrame, when you pass the existing column name to the first argument to withColumn() operation it updates it , if the value is new then it creates a new column.

In [42]:
df2=df.withColumn("salary", df.salary*3)
df2.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|  9000|
|     Anna|    Rose|     F| 12300|
|   Robert|Williams|     M| 18600|
+---------+--------+------+------+



Update Column Based on Condition

Update a column value based on a condition by using When Otherwise. The example below updates the gender column with a value Male for M, Female for F and keeps the same value for others.

In [43]:
from pyspark.sql.functions import when

df3 = df.withColumn("gender", when(df.gender == "M","Male") \
      .when(df.gender == "F","Female") \
      .otherwise(df.gender))
df3.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|  Male|  3000|
|     Anna|    Rose|Female|  4100|
|   Robert|Williams|  Male|  6200|
+---------+--------+------+------+



Update DataFrame Column Data Type

You can also update a Data Type of a column using withColumn(). In addition you have to use the cast() function of the PySpark Column class. The code below updates salary column to String type.

PySpark SQL Update

In [44]:
df.createOrReplaceTempView("PER")
df5=spark.sql("select firstname,gender,salary*3 as salary from PER")
df5.show()

+---------+------+------+
|firstname|gender|salary|
+---------+------+------+
|    James|     M|  9000|
|     Anna|     F| 12300|
|   Robert|     M| 18600|
+---------+------+------+



1. TODO: Use spark.sql to order the original dataframe by salary descending
2. TODO: Use spark.sql to determine the average salary for each gender, order results by average salary descending

In [46]:
# first register as a temp SQL table
df.createOrReplaceTempView("people")

#run SQL query to achieve ordering
ordered_df = spark.sql("SELECT * FROM people ORDER BY salary DESC")
ordered_df.show()

# execute SQL query to calc average salary by gender and order descending
average_salary_by_gender = spark.sql("""
SELECT gender, AVG(salary) AS average_salary
FROM people
GROUP BY gender
ORDER BY average_salary DESC
""")

average_salary_by_gender.show()


+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|   Robert|Williams|     M|  6200|
|     Anna|    Rose|     F|  4100|
|    James|   Smith|     M|  3000|
+---------+--------+------+------+

+------+--------------+
|gender|average_salary|
+------+--------------+
|     M|        4600.0|
|     F|        4100.0|
+------+--------------+

