In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean, col, when, count, isnan, lit

In [2]:
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()
merged_df = spark.read.csv('Merged_Dataset.csv', header=True, inferSchema=True)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/07 05:28:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
missing_values = merged_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in merged_df.columns])
print("Missing values before imputation:")
missing_values.show()

for column in merged_df.columns:
    mean_value = merged_df.agg(mean(column)).first()[0]
    merged_df = merged_df.withColumn(column, when(col(column).isNull(), lit(mean_value)).otherwise(col(column)))

missing_values_after = merged_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in merged_df.columns])
print("Missing values after imputation:")
missing_values_after.show()

Missing values before imputation:


                                                                                

+--------+--------+----+-----------+------------+----------------+----------+--------------+-----------------------+----------+-----------+-----------+-----------+-----------+----+----+---+---+---+---+------+
|iso_code|location|date|total_cases|total_deaths|stringency_index|population|gdp_per_capita|human_development_index|Unnamed: 9|Unnamed: 10|Unnamed: 11|Unnamed: 12|Unnamed: 13|CODE| HDI| TC| TD|STI|POP|GDPCAP|
+--------+--------+----+-----------+------------+----------------+----------+--------------+-----------------------+----------+-----------+-----------+-----------+-----------+----+----+---+---+---+---+------+
|       0|       0|   0|       3094|       11190|            7126|         0|          5712|                   6202|      3594|      12298|      10042|          0|       5712|   0|6202|  0|  0|  0|  0|     0|
+--------+--------+----+-----------+------------+----------------+----------+--------------+-----------------------+----------+-----------+-----------+-----------+-

                                                                                

Missing values after imputation:




+--------+--------+----+-----------+------------+----------------+----------+--------------+-----------------------+----------+-----------+-----------+-----------+-----------+----+---+---+---+---+---+------+
|iso_code|location|date|total_cases|total_deaths|stringency_index|population|gdp_per_capita|human_development_index|Unnamed: 9|Unnamed: 10|Unnamed: 11|Unnamed: 12|Unnamed: 13|CODE|HDI| TC| TD|STI|POP|GDPCAP|
+--------+--------+----+-----------+------------+----------------+----------+--------------+-----------------------+----------+-----------+-----------+-----------+-----------+----+---+---+---+---+---+------+
|       0|       0|   0|          0|           0|               0|         0|             0|                      0|         0|          0|          0|          0|          0|   0|  0|  0|  0|  0|  0|     0|
+--------+--------+----+-----------+------------+----------------+----------+--------------+-----------------------+----------+-----------+-----------+-----------+-----

                                                                                

In [4]:
for column in [col_name for col_name, dtype in merged_df.dtypes if dtype in ["double", "int"]]:
    quantiles = merged_df.approxQuantile(column, [0.25, 0.75], 0.01)
    Q1 = quantiles[0]
    Q3 = quantiles[1]
    IQR = Q3 - Q1
    median = merged_df.approxQuantile(column, [0.5], 0.01)[0]
    
    outliers = merged_df.filter((col(column) < (Q1 - 1.5 * IQR)) | (col(column) > (Q3 + 1.5 * IQR)))
    print(f"Column {column} has {outliers.count()} outliers before processing.")
    
    merged_df = merged_df.withColumn(column, when((col(column) < (Q1 - 1.5 * IQR)) | (col(column) > (Q3 + 1.5 * IQR)), median).otherwise(col(column)))
    
    outliers_after = merged_df.filter((col(column) < (Q1 - 1.5 * IQR)) | (col(column) > (Q3 + 1.5 * IQR)))
    print(f"Column {column} has {outliers_after.count()} outliers after processing.")

Column total_cases has 9264 outliers before processing.
Column total_cases has 0 outliers after processing.
Column total_deaths has 2830 outliers before processing.
Column total_deaths has 0 outliers after processing.
Column stringency_index has 0 outliers before processing.
Column stringency_index has 0 outliers after processing.
Column population has 5597 outliers before processing.
Column population has 0 outliers after processing.
Column gdp_per_capita has 2281 outliers before processing.
Column gdp_per_capita has 0 outliers after processing.
Column human_development_index has 651 outliers before processing.
Column human_development_index has 0 outliers after processing.
Column Unnamed: 9 has 0 outliers before processing.
Column Unnamed: 9 has 0 outliers after processing.
Column Unnamed: 10 has 2288 outliers before processing.
Column Unnamed: 10 has 0 outliers after processing.
Column Unnamed: 11 has 4149 outliers before processing.
Column Unnamed: 11 has 0 outliers after processin

In [5]:
from pyspark.sql import functions as F
column_freq = [col for col, count in merged_df.groupBy(merged_df.columns).count().alias('count').groupby(merged_df.columns).agg(F.sum('count')).toPandas().set_index(merged_df.columns).T.to_dict('records')[0].items() if count > 1]
if column_freq:
    print(f"Columns that appeared more than once: {column_freq}")
    merged_df = merged_df.select(*[F.col(col).alias(col) for col in set(merged_df.columns)])

print("Columns in the processed dataframe:")
print(merged_df.columns)

                                                                                

Columns in the processed dataframe:
['iso_code', 'location', 'date', 'total_cases', 'total_deaths', 'stringency_index', 'population', 'gdp_per_capita', 'human_development_index', 'Unnamed: 9', 'Unnamed: 10', 'Unnamed: 11', 'Unnamed: 12', 'Unnamed: 13', 'CODE', 'HDI', 'TC', 'TD', 'STI', 'POP', 'GDPCAP']


In [6]:
import os
import shutil

output_folder = "TempOutputFolder"
output_file_path = os.path.join(output_folder, "part-00000-*.csv")

merged_df.coalesce(1).write.csv(output_folder, mode="overwrite", header=True)

for filename in os.listdir(output_folder):
    if filename.startswith("part-00000"):
        shutil.move(os.path.join(output_folder, filename), "Processed_Merged_Dataset.csv")

shutil.rmtree(output_folder)

                                                                                