In [1]:
import re
from pyspark.sql.functions import col, regexp_replace, lower, trim, ceil, split, when, date_format, to_date, to_timestamp
from pyspark.sql.types import StringType, DoubleType, TimestampType, DateType
from pyspark import StorageLevel
import json
from pyspark.sql.functions import round as spark_round


def preprocess_data(data):

    # Check for missing values
    data = data.dropna()
    
    # Remove duplicates
    data = data.dropDuplicates(df.columns)


    # Define cache level
    cache_level = StorageLevel.MEMORY_AND_DISK

    # Iterate through each column in the data
    for col_name in data.columns:

        col_dtype = str(data.schema[col_name].dataType)
        print(col_name, ":", col_dtype)

        # Check if column is Date type
        if col_dtype == "DateType()":

            # Check if column contains date or timestamp data
            if "date" in col_name.lower():

                # Convert to standard date format
                data = data.withColumn(col_name, to_date(col(col_name), "yyyy-MM-dd"))

            elif "time" in col_name.lower() or "timestamp" in col_name.lower():

                # Convert to standard timestamp format
                data = data.withColumn(col_name, to_timestamp(col(col_name), "yyyy-MM-dd HH:mm:ss"))


        # Check if column is float type
        elif col_dtype == "FloatType()":

            # Ceil values to two decimal places
            data = data.withColumn(col_name, ceil(col(col_name) * 100) / 100)

        # Check if column is numeric type
        elif col_dtype == "DoubleType()":

            # Remove non-numeric characters from string
            data = data.withColumn(col_name, spark_round(col(col_name), 2))
            data = data.withColumn(col_name, regexp_replace(col(col_name), "[^0-9.]", ""))
            data = data.withColumn(col_name, when(col(col_name) == "", None).otherwise(col(col_name)))
            data = data.withColumn(col_name, col(col_name).cast(DoubleType()))


        # Check if column is string type
        elif col_dtype == "StringType()":

            # Convert to lowercase
            data = data.withColumn(col_name, lower(col(col_name)))

            # Remove special characters
            special_chars = r'[^a-zA-Z0-9\s]'
            
            data = data.withColumn(col_name, regexp_replace(col(col_name), special_chars, ' '))

            # Remove extra spaces
            data = data.withColumn(col_name, trim(col(col_name)))


    # Return processed data
    return data


**<h1>Implementing the module on a dataset</h1>**

<h3>Dataset-1</h3>

This dataset contains details about the global green house gas emmisions by various livestock perkg per edible weight of livestock per year

In [2]:
import pandas as pd
from pyspark.sql import SparkSession
#from data_processing import preprocess_data

# create SparkSession
spark = SparkSession.builder.appName("DataPreprocessing").getOrCreate()

# read CSV file into DataFrame
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:/Users/rohit/OneDrive/Desktop/BDT/ghg.csv")

# call preprocess_data function
processed_df = preprocess_data(df)
#Raw file
df.show()
# show processed data
processed_df.show()

Entity : StringType()
Year : IntegerType()
Greenhouse gas emissions (kg / kg edible weight) : DoubleType()
+--------------------+----+------------------------------------------------+
|              Entity|Year|Greenhouse gas emissions (kg / kg edible weight)|
+--------------------+----+------------------------------------------------+
|   Bivalves (farmed)|2021|                                      1.39912623|
|     Bivalves (wild)|2021|                                     11.40004939|
|       Carp (farmed)|2021|                                     6.946922702|
|    Catfish (farmed)|2021|                                     7.774491103|
|             Chicken|2021|                                           8.335|
| Cod, haddock (wild)|2021|                                     5.125038766|
|     Flounder (wild)|2021|                                     20.31331444|
|Herring, sardines...|2021|                                     3.877940448|
|    Jack fish (wild)|2021|                   

In [3]:
print("total number of records before preprocessing :",df.count())
print("total number of records after preprocessing :", processed_df.count())

total number of records before preprocessing : 25
total number of records after preprocessing : 24


**<h3>Dataset-2</h3>**

This is a movie dataset containing details of movies released in the years listed.

In [4]:
import pandas as pd
from pyspark.sql import SparkSession
#from data_processing import preprocess_data

# create SparkSession
spark = SparkSession.builder.appName("DataPreprocessing").getOrCreate()

# read CSV file into DataFrame
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:/Users/rohit/OneDrive/Desktop/BDT/netflix.csv")

# call preprocess_data function
processed_df = preprocess_data(df)

#Raw Data
#df1.show()

# show processed data
processed_df.show()
#df.count()


show_id : StringType()
type : StringType()
title : StringType()
director : StringType()
country : StringType()
date_added : StringType()
release_year : StringType()
rating : StringType()
duration : StringType()
listed_in : StringType()
+-------+-------+--------------------+--------------------+-------------+----------+------------+------+--------+--------------------+
|show_id|   type|               title|            director|      country|date_added|release_year|rating|duration|           listed_in|
+-------+-------+--------------------+--------------------+-------------+----------+------------+------+--------+--------------------+
|    s57|  movie|naruto shippuden ...|     masahiko murata|        japan| 9 15 2021|        2011| tv 14| 102 min|action   adventur...|
|  s5789|  movie|mariusz ka amaga ...|           not given|     pakistan| 9 19 2016|        2016| tv ma|  69 min|     stand up comedy|
|  s6774|tv show|food  delicious s...|           not given|     pakistan| 10 1 2017|     

In [5]:
print("total number of records before preprocessing :",df.count())
print("total number of records after preprocessing :", processed_df.count())

total number of records before preprocessing : 8791
total number of records after preprocessing : 8789


**<h3>Dataset-3</h3>**

This dataset contains details about vaccinations by country by date.

In [6]:
import pandas as pd
from pyspark.sql import SparkSession
#from data_processing import preprocess_data

# create SparkSession
spark = SparkSession.builder.appName("DataPreprocessing").getOrCreate()

# read CSV file into DataFrame
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:/Users/rohit/OneDrive/Desktop/BDT/country_vaccinations.csv")

# call preprocess_data function
processed_df = preprocess_data(df)

#Raw Data
#df.show()

# show processed data
processed_df.show()
#df.count()


country : StringType()
iso_code : StringType()
date : DateType()
total_vaccinations : DoubleType()
people_vaccinated : DoubleType()
people_fully_vaccinated : DoubleType()
daily_vaccinations_raw : DoubleType()
daily_vaccinations : DoubleType()
total_vaccinations_per_hundred : DoubleType()
people_vaccinated_per_hundred : DoubleType()
people_fully_vaccinated_per_hundred : DoubleType()
daily_vaccinations_per_million : DoubleType()
vaccines : StringType()
source_name : StringType()
source_website : StringType()
+----------+--------+----------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+--------------------+
|   country|iso_code|      date|total_vaccinations|people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hun

In [7]:
print("total number of records before preprocessing :",df.count())
print("total number of records after preprocessing :", processed_df.count())

total number of records before preprocessing : 86512
total number of records after preprocessing : 30847


**<h3>Dataset-4</h3>**

This is a subset of titanic dataset which contains details of the passengers on board the ship.<br>


In [8]:
import pandas as pd
from pyspark.sql import SparkSession
#from data_processing import preprocess_data

# create SparkSession
spark = SparkSession.builder.appName("DataPreprocessing").getOrCreate()

# read CSV file into DataFrame
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:/Users/rohit/OneDrive/Desktop/BDT/test.csv")

# call preprocess_data function
processed_df = preprocess_data(df)

#Raw Data
#df.show()

# show processed data
processed_df.show()
#df.count()


PassengerId : IntegerType()
Pclass : IntegerType()
Name : StringType()
Sex : StringType()
Age : DoubleType()
SibSp : IntegerType()
Parch : IntegerType()
Ticket : StringType()
Fare : DoubleType()
Cabin : StringType()
Embarked : StringType()
+-----------+------+--------------------+------+----+-----+-----+-----------+------+-------+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch|     Ticket|  Fare|  Cabin|Embarked|
+-----------+------+--------------------+------+----+-----+-----+-----------+------+-------+--------+
|       1144|     1|clark  mr  walter...|  male|27.0|    1|    0|      13508|136.78|    c89|       c|
|       1287|     1|smith  mrs  lucie...|female|18.0|    1|    0|      13695|  60.0|    c31|       s|
|       1069|     1|stengel  mr  char...|  male|54.0|    1|    0|      11778| 55.44|   c116|       c|
|        969|     1|cornell  mrs  rob...|female|55.0|    2|    0|      11770|  25.7|   c101|       s|
|       1185|     1|dodge  dr  washin...|  mal

In [9]:
print("total number of records before preprocessing :",df.count())
print("total number of records after preprocessing :", processed_df.count())

total number of records before preprocessing : 418
total number of records after preprocessing : 87


<h3>It should also be noted that this module removes records with null values  but sometimes we might need the details of these records too so in that case we can modify the code to fill the cells containing null values with either of mean or median or mode depending on our use case</h3>