
# PySpark Comprehensive EDA and Data Preprocessing Tutorial

This notebook provides a comprehensive guide to data preprocessing and filtering using PySpark. It also compares PySpark functions with equivalent Python code using Pandas.

**Steps covered:**
1. Initial data checks
2. Data cleaning and transformation
3. Filtering data and Running SQL
4. Comparison of PySpark with Python (Pandas)


In [21]:

# Importing Required Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnull, when, count, mean, stddev, min, max, skewness, kurtosis
from pyspark.sql.types import IntegerType, DoubleType
import pandas as pd
import numpy as np

In [6]:

# Initializing Spark Session
spark = SparkSession.builder.appName("PySpark Comprehensive EDA").getOrCreate()


24/08/14 05:14:13 WARN Utils: Your hostname, codespaces-78980c resolves to a loopback address: 127.0.0.1; using 10.0.0.173 instead (on interface eth0)
24/08/14 05:14:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/14 05:14:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


24/08/14 05:14:26 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [50]:

# Loading the Data (PySpark and Pandas)
file_path = "sample_data.csv"
df_spark = spark.read.csv(file_path, header=True, inferSchema=True)
df_pandas = pd.read_csv(file_path)
df_spark.show()
df_pandas


+--------+---+-----------+------+
|    name|age|       city|salary|
+--------+---+-----------+------+
|John Doe| 28|   New York| 70000|
|Jane Doe| 34|Los Angeles| 80000|
|   Alice| 23|    Chicago| 65000|
|     Bob| 45|   New York| 90000|
| Charlie| 35|    Chicago| 75000|
|   David| 38|Los Angeles| 85000|
|  Edward| 42|   New York| 95000|
|   Fiona| 30|    Chicago| 72000|
+--------+---+-----------+------+



Unnamed: 0,name,age,city,salary
0,John Doe,28,New York,70000
1,Jane Doe,34,Los Angeles,80000
2,Alice,23,Chicago,65000
3,Bob,45,New York,90000
4,Charlie,35,Chicago,75000
5,David,38,Los Angeles,85000
6,Edward,42,New York,95000
7,Fiona,30,Chicago,72000



# 1. Initial Data Checks

In [10]:


# Display Schema (PySpark)
df_spark.printSchema()

# Checking the schema (Pandas)
print(df_pandas.dtypes)



root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: integer (nullable = true)

name      object
age        int64
city      object
salary     int64
dtype: object


In [11]:
# Count Rows and Columns (PySpark)
row_count_spark = df_spark.count()
column_count_spark = len(df_spark.columns)
print(f"Number of rows (PySpark): {row_count_spark}, Number of columns (PySpark): {column_count_spark}")

# Count Rows and Columns (Pandas)
row_count_pandas = df_pandas.shape[0]
column_count_pandas = df_pandas.shape[1]
print(f"Number of rows (Pandas): {row_count_pandas}, Number of columns (Pandas): {column_count_pandas}")


Number of rows (PySpark): 8, Number of columns (PySpark): 4
Number of rows (Pandas): 8, Number of columns (Pandas): 4


In [49]:
# Summary Statistics (PySpark)
df_spark.describe().show()

# Summary Statistics (Pandas)
display(df_pandas.describe())


+-------+--------+------------------+--------+------------------+
|summary|    name|               age|    city|            salary|
+-------+--------+------------------+--------+------------------+
|  count|       8|                 8|       8|                 8|
|   mean|    NULL|            34.375|    NULL|           79000.0|
| stddev|    NULL|7.3082634247620195|    NULL|10392.304845413262|
|    min|   Alice|                23| Chicago|             65000|
|    max|John Doe|                45|New York|             95000|
+-------+--------+------------------+--------+------------------+



Unnamed: 0,age,salary
count,8.0,8.0
mean,34.375,79000.0
std,7.308263,10392.304845
min,23.0,65000.0
25%,29.5,71500.0
50%,34.5,77500.0
75%,39.0,86250.0
max,45.0,95000.0



# 2. Data Cleaning


In [17]:

# Dropping rows with excessive missing data (PySpark)
df_cleaned_spark = df_spark.dropna(thresh=int(0.5 * len(df_spark.columns)))

# Dropping rows with excessive missing data (Pandas)
df_cleaned_pandas = df_pandas.dropna(thresh=int(0.5 * len(df_pandas.columns)))

print("Pyspark Cleaned Data : ",df_cleaned_spark.show())
print("Pyspark Cleaned Data : ",display(df_cleaned_pandas))

+--------+---+-----------+------+
|    name|age|       city|salary|
+--------+---+-----------+------+
|John Doe| 28|   New York| 70000|
|Jane Doe| 34|Los Angeles| 80000|
|   Alice| 23|    Chicago| 65000|
|     Bob| 45|   New York| 90000|
| Charlie| 35|    Chicago| 75000|
|   David| 38|Los Angeles| 85000|
|  Edward| 42|   New York| 95000|
|   Fiona| 30|    Chicago| 72000|
+--------+---+-----------+------+

Pyspark Cleaned Data :  None


Unnamed: 0,name,age,city,salary
0,John Doe,28,New York,70000
1,Jane Doe,34,Los Angeles,80000
2,Alice,23,Chicago,65000
3,Bob,45,New York,90000
4,Charlie,35,Chicago,75000
5,David,38,Los Angeles,85000
6,Edward,42,New York,95000
7,Fiona,30,Chicago,72000


Pyspark Cleaned Data :  None


In [33]:
from pyspark.sql.functions import mean

# Imputing missing values in numeric columns (PySpark)
for col_name in df_cleaned_spark.columns:
    # Check if the column is numeric
    if dict(df_cleaned_spark.dtypes)[col_name] in ['int', 'double']:
        # Calculate the mean of the column
        mean_value = df_cleaned_spark.select(mean(col_name)).collect()[0][0]
        # Fill missing values with the calculated mean
        df_cleaned_spark = df_cleaned_spark.fillna({col_name: mean_value})
print("Pyspark: ",df_cleaned_spark.show())

# Imputing missing values in numeric columns (Pandas)

# Select only numeric columns
numeric_columns = df_cleaned_pandas.select_dtypes(include=[np.number]).columns

# Impute missing values in numeric columns with the mean
df_cleaned_pandas[numeric_columns] = df_cleaned_pandas[numeric_columns].fillna(df_cleaned_pandas[numeric_columns].mean())

# For non-numeric columns, fill missing values with the mode
non_numeric_columns = df_cleaned_pandas.select_dtypes(exclude=[np.number]).columns

# Fill non-numeric columns with the most frequent value (mode)
for column in non_numeric_columns:
    df_cleaned_pandas[column].fillna(df_cleaned_pandas[column].mode()[0], inplace=True)

print("Pandas: ",display(df_cleaned_pandas))


+--------+---+-----------+------+
|    name|age|       city|salary|
+--------+---+-----------+------+
|John Doe| 28|   New York| 70000|
|Jane Doe| 34|Los Angeles| 80000|
|   Alice| 23|    Chicago| 65000|
|     Bob| 45|   New York| 90000|
| Charlie| 35|    Chicago| 75000|
|   David| 38|Los Angeles| 85000|
|  Edward| 42|   New York| 95000|
|   Fiona| 30|    Chicago| 72000|
+--------+---+-----------+------+

Pyspark:  None


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df_cleaned_pandas[column].fillna(df_cleaned_pandas[column].mode()[0], inplace=True)


Unnamed: 0,name,age,city,salary
0,John Doe,28,New York,70000
1,Jane Doe,34,Los Angeles,80000
2,Alice,23,Chicago,65000
3,Bob,45,New York,90000
4,Charlie,35,Chicago,75000
5,David,38,Los Angeles,85000
6,Edward,42,New York,95000
7,Fiona,30,Chicago,72000


Pandas:  None


In [34]:
# Removing duplicates (PySpark)
df_no_duplicates_spark = df_cleaned_spark.dropDuplicates()

# Removing duplicates (Pandas)
df_no_duplicates_pandas = df_cleaned_pandas.drop_duplicates()

print("Pyspark Shape of the ")

In [35]:
# Renaming Columns (PySpark)
df_renamed_spark = df_no_duplicates_spark.withColumnRenamed("city", "location")
print(f"Number of rows before removing duplicates (PySpark): {df_cleaned_spark.count()}")
# Renaming Columns (Pandas)
df_renamed_pandas = df_no_duplicates_pandas.rename(columns={"city": "location"})
print(f"Number of rows after removing duplicates (PySpark): {df_no_duplicates_spark.count()}")

Number of rows before removing duplicates (PySpark): 8
Number of rows after removing duplicates (PySpark): 8



# 3. Data Transformation

In [38]:
# Converting columns to appropriate data types (PySpark)
df_transformed_spark = df_renamed_spark.withColumn("age", col("age").cast(IntegerType()))
print("Pyspark:",df_transformed_spark)
# Converting columns to appropriate data types (Pandas)
df_transformed_pandas = df_renamed_pandas.astype({"age": int})
print("Python:",df_transformed_pandas.dtypes)


Pyspark: DataFrame[name: string, age: int, location: string, salary: int]
Python: name        object
age          int64
location    object
salary       int64
dtype: object


In [41]:
# Creating new columns (PySpark)
df_transformed_spark = df_transformed_spark.withColumn("age_group", 
                                                       when(col("age") < 30, "Young")
                                                       .when(col("age") < 40, "Middle-aged")
                                                       .otherwise("Senior"))
display(df_transformed_spark.show())
# Creating new columns (Pandas)
df_transformed_pandas['age_group'] = pd.cut(df_transformed_pandas['age'], 
                                            bins=[0, 29, 39, 100], 
                                            labels=["Young", "Middle-aged", "Senior"])
display(df_transformed_pandas)

+--------+---+-----------+------+-----------+
|    name|age|   location|salary|  age_group|
+--------+---+-----------+------+-----------+
|   Fiona| 30|    Chicago| 72000|Middle-aged|
|   David| 38|Los Angeles| 85000|Middle-aged|
|Jane Doe| 34|Los Angeles| 80000|Middle-aged|
|John Doe| 28|   New York| 70000|      Young|
| Charlie| 35|    Chicago| 75000|Middle-aged|
|   Alice| 23|    Chicago| 65000|      Young|
|     Bob| 45|   New York| 90000|     Senior|
|  Edward| 42|   New York| 95000|     Senior|
+--------+---+-----------+------+-----------+



None

Unnamed: 0,name,age,location,salary,age_group
0,John Doe,28,New York,70000,Young
1,Jane Doe,34,Los Angeles,80000,Middle-aged
2,Alice,23,Chicago,65000,Young
3,Bob,45,New York,90000,Senior
4,Charlie,35,Chicago,75000,Middle-aged
5,David,38,Los Angeles,85000,Middle-aged
6,Edward,42,New York,95000,Senior
7,Fiona,30,Chicago,72000,Middle-aged


# 4. Filtering Data

In [45]:
# Exact Match Filtering (PySpark)
filtered_spark_exact = df_transformed_spark.filter(df_transformed_spark["name"] == "John Doe")
filtered_spark_exact.show()

# Exact Match Filtering (Pandas)
filtered_pandas_exact = df_transformed_pandas[df_transformed_pandas["name"] == "John Doe"]
display(filtered_pandas_exact)



+--------+---+--------+------+---------+
|    name|age|location|salary|age_group|
+--------+---+--------+------+---------+
|John Doe| 28|New York| 70000|    Young|
+--------+---+--------+------+---------+



Unnamed: 0,name,age,location,salary,age_group
0,John Doe,28,New York,70000,Young


In [44]:
# Partial Match Filtering (PySpark)
filtered_spark_partial = df_transformed_spark.filter(df_transformed_spark["name"].like("%John%"))
filtered_spark_partial.show()

# Partial Match Filtering (Pandas)
filtered_pandas_partial = df_transformed_pandas[df_transformed_pandas["name"].str.contains("John")]
display(filtered_pandas_partial)



+--------+---+--------+------+---------+
|    name|age|location|salary|age_group|
+--------+---+--------+------+---------+
|John Doe| 28|New York| 70000|    Young|
+--------+---+--------+------+---------+



Unnamed: 0,name,age,location,salary,age_group
0,John Doe,28,New York,70000,Young


In [46]:
# Filtering with Multiple Conditions (PySpark)
filtered_spark_multiple = df_transformed_spark.filter((df_transformed_spark["age"] > 30) & (df_transformed_spark["location"] == "New York"))
filtered_spark_multiple.show()

# Filtering with Multiple Conditions (Pandas)
filtered_pandas_multiple = df_transformed_pandas[(df_transformed_pandas["age"] > 30) & (df_transformed_pandas["location"] == "New York")]
display(filtered_pandas_multiple)


+------+---+--------+------+---------+
|  name|age|location|salary|age_group|
+------+---+--------+------+---------+
|   Bob| 45|New York| 90000|   Senior|
|Edward| 42|New York| 95000|   Senior|
+------+---+--------+------+---------+



Unnamed: 0,name,age,location,salary,age_group
3,Bob,45,New York,90000,Senior
6,Edward,42,New York,95000,Senior


# 5. Running SQL Statements in PySpark

In [47]:

# Creating a temporary view for SQL queries
df_transformed_spark.createOrReplaceTempView("people")

# Example SQL Query: Selecting specific columns
sql_df = spark.sql("SELECT name, age, location FROM people WHERE age > 30")
sql_df.show()

# Example SQL Query: Aggregation
sql_aggregation_df = spark.sql("SELECT location, COUNT(*) as count FROM people GROUP BY location")
sql_aggregation_df.show()


+--------+---+-----------+
|    name|age|   location|
+--------+---+-----------+
|   David| 38|Los Angeles|
|Jane Doe| 34|Los Angeles|
| Charlie| 35|    Chicago|
|     Bob| 45|   New York|
|  Edward| 42|   New York|
+--------+---+-----------+

+-----------+-----+
|   location|count|
+-----------+-----+
|Los Angeles|    2|
|    Chicago|    3|
|   New York|    3|
+-----------+-----+

