In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace, trim, lower, upper, mean, count, lit, split, concat_ws
import pandas as pd
import numpy as np
import findspark
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

In [2]:
findspark.init()

In [3]:
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()

In [4]:
spark

In [18]:
data = [
    ("  John   Doe  ", "  25", "  New York  ", None, "Male", "2024-01-15", "yes", "USD 50,000"),
    ("Jane,,  Smith", "thirty", "Los Angeles  ", "50000 ", "Female", None, "no", "€ 60.000"),
    ("  ", "  40 ", "San   Francisco", "70000", None, "2022-08-10", "no", "70,000 INR"),
    ("  Michael!!  ", None, " Chicago ", "80000", "Male", "2020-05-20", None, "SGD 80000"),
    ("Sara   Connor", "50", None, "90000", "Female", None, "yes", "JPY 9000000"),
    ("   JOHN DOE  ", "  25", "New York", None, "Male", "2019-12-25", "yes", "$50000"),
    ("JoHn dOe", "25", "NEW YORK", None, "Male", "2018-07-04", None, "CAD 55,000"),
    ("", None, None, None, None, None, None, None),
]

columns = ["name", "age", "city", "salary", "gender", "date", "response", "currency_salary"]

df = spark.createDataFrame(data, columns)

In [19]:
df.printSchema()
df.show()

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- city: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- date: string (nullable = true)
 |-- response: string (nullable = true)
 |-- currency_salary: string (nullable = true)

+--------------+------+---------------+------+------+----------+--------+---------------+
|          name|   age|           city|salary|gender|      date|response|currency_salary|
+--------------+------+---------------+------+------+----------+--------+---------------+
|  John   Doe  |    25|     New York  |  null|  Male|2024-01-15|     yes|     USD 50,000|
| Jane,,  Smith|thirty|  Los Angeles  |50000 |Female|      null|      no|       € 60.000|
|              |   40 |San   Francisco| 70000|  null|2022-08-10|      no|     70,000 INR|
|   Michael!!  |  null|       Chicago | 80000|  Male|2020-05-20|    null|      SGD 80000|
| Sara   Connor|    50|           null| 90000|Female|     

In [20]:
df = df.withColumn("name", trim(regexp_replace(col("name"), " +", " ")))  
df = df.withColumn("city", trim(regexp_replace(col("city"), " +", " ")))  
df = df.withColumn("name", lower(col("name")))  
df = df.withColumn("city", upper(col("city")))  
df = df.withColumn("name", regexp_replace(col("name"), "[^a-zA-Z ]", ""))  
df = df.withColumn("name", when(col("name") == "", None).otherwise(col("name")))  
df = df.withColumn("city", when(col("city") == "", None).otherwise(col("city")))

In [21]:
df = df.fillna({"name": "Unknown", "city": "Unknown", "gender": "Other", "response": "Unknown"})

In [22]:
df = df.withColumn("age", when(col("age").rlike("^[0-9]+$"), col("age")).otherwise(None).cast("float"))
df = df.withColumn("salary", col("salary").cast("float"))

In [25]:
pandas_df = df.collect()

In [27]:
columns = ["name", "age", "city", "salary", "gender", "date", "response", "currency_salary"]
df_pandas = pd.DataFrame(pandas_df, columns=columns)

In [28]:
df_pandas.head()

Unnamed: 0,name,age,city,salary,gender,date,response,currency_salary
0,john doe,,NEW YORK,,Male,2024-01-15,yes,"USD 50,000"
1,jane smith,,LOS ANGELES,50000.0,Female,,no,€ 60.000
2,Unknown,,SAN FRANCISCO,70000.0,Other,2022-08-10,no,"70,000 INR"
3,michael,,CHICAGO,80000.0,Male,2020-05-20,Unknown,SGD 80000
4,sara connor,50.0,Unknown,90000.0,Female,,yes,JPY 9000000


In [29]:
df_pandas["age"].fillna(df_pandas["age"].median(), inplace=True)
df_pandas["salary"].fillna(df_pandas["salary"].mean(), inplace=True)

In [50]:
df = spark.createDataFrame(df_pandas) 

In [51]:
df.show()

+-----------+----+-------------+-------+------+----------+--------+---------------+
|       name| age|         city| salary|gender|      date|response|currency_salary|
+-----------+----+-------------+-------+------+----------+--------+---------------+
|   john doe|37.5|     NEW YORK|72500.0|  Male|2024-01-15|     yes|     USD 50,000|
| jane smith|37.5|  LOS ANGELES|50000.0|Female|      null|      no|       € 60.000|
|    Unknown|37.5|SAN FRANCISCO|70000.0| Other|2022-08-10|      no|     70,000 INR|
|    michael|37.5|      CHICAGO|80000.0|  Male|2020-05-20| Unknown|      SGD 80000|
|sara connor|50.0|      Unknown|90000.0|Female|      null|     yes|    JPY 9000000|
|   john doe|37.5|     NEW YORK|72500.0|  Male|2019-12-25|     yes|         $50000|
|   john doe|25.0|     NEW YORK|72500.0|  Male|2018-07-04| Unknown|     CAD 55,000|
|    Unknown|37.5|      Unknown|72500.0| Other|      null| Unknown|           null|
+-----------+----+-------------+-------+------+----------+--------+---------

In [52]:
df = df.fillna({"date": "2000-01-01"})
df = df.withColumn("date", col("date").cast("date"))

In [53]:
df = df.withColumn("currency_salary", regexp_replace(col("currency_salary"), "[^0-9]", ""))
df = df.withColumn("currency_salary", col("currency_salary").cast("float"))

In [54]:
indexer_gender = StringIndexer(inputCol="gender", outputCol="gender_index")
indexer_response = StringIndexer(inputCol="response", outputCol="response_index")

In [56]:
df = indexer_gender.fit(df).transform(df)
df = indexer_response.fit(df).transform(df)

In [57]:
df.show()

+-----------+----+-------------+-------+------+----------+--------+---------------+------------+--------------+
|       name| age|         city| salary|gender|      date|response|currency_salary|gender_index|response_index|
+-----------+----+-------------+-------+------+----------+--------+---------------+------------+--------------+
|   john doe|37.5|     NEW YORK|72500.0|  Male|2024-01-15|     yes|        50000.0|         0.0|           1.0|
| jane smith|37.5|  LOS ANGELES|50000.0|Female|2000-01-01|      no|        60000.0|         1.0|           2.0|
|    Unknown|37.5|SAN FRANCISCO|70000.0| Other|2022-08-10|      no|        70000.0|         2.0|           2.0|
|    michael|37.5|      CHICAGO|80000.0|  Male|2020-05-20| Unknown|        80000.0|         0.0|           0.0|
|sara connor|50.0|      Unknown|90000.0|Female|2000-01-01|     yes|      9000000.0|         1.0|           1.0|
|   john doe|37.5|     NEW YORK|72500.0|  Male|2019-12-25|     yes|        50000.0|         0.0|        

In [58]:
df = df.drop("gender", "response")

In [59]:
df.show()

+-----------+----+-------------+-------+----------+---------------+------------+--------------+
|       name| age|         city| salary|      date|currency_salary|gender_index|response_index|
+-----------+----+-------------+-------+----------+---------------+------------+--------------+
|   john doe|37.5|     NEW YORK|72500.0|2024-01-15|        50000.0|         0.0|           1.0|
| jane smith|37.5|  LOS ANGELES|50000.0|2000-01-01|        60000.0|         1.0|           2.0|
|    Unknown|37.5|SAN FRANCISCO|70000.0|2022-08-10|        70000.0|         2.0|           2.0|
|    michael|37.5|      CHICAGO|80000.0|2020-05-20|        80000.0|         0.0|           0.0|
|sara connor|50.0|      Unknown|90000.0|2000-01-01|      9000000.0|         1.0|           1.0|
|   john doe|37.5|     NEW YORK|72500.0|2019-12-25|        50000.0|         0.0|           1.0|
|   john doe|25.0|     NEW YORK|72500.0|2018-07-04|        55000.0|         0.0|           0.0|
|    Unknown|37.5|      Unknown|72500.0|

In [60]:
pandas_df = df.collect()
columns = ["name", "age", "city", "salary", "date", "currency_salary","gender_index","response_index"]
df_pandas = pd.DataFrame(pandas_df, columns=columns)

In [61]:
assembler = VectorAssembler(inputCols=["salary"], outputCol="salary_vector")
df = assembler.transform(df)

In [62]:
scaler = MinMaxScaler(inputCol="salary_vector", outputCol="salary_scaled")
df = scaler.fit(df).transform(df)

In [63]:
df = df.dropDuplicates()

In [67]:
df = df.dropna(how="any")

In [69]:
df.select(
    count(when(col("name").isNull(), True)).alias("null"),
    count(when(col("name").isNotNull(), True)).alias("not_null")
).show()

+----+--------+
|null|not_null|
+----+--------+
|   0|       7|
+----+--------+



In [70]:
df.groupBy(col("gender_index").isNull().alias("is_null")).count().show()

+-------+-----+
|is_null|count|
+-------+-----+
|  false|    7|
+-------+-----+



In [68]:
print("Data Setelah Dibersihkan:")
df.show(truncate=False)
df.printSchema()

Data Setelah Dibersihkan:
+-----------+----+-------------+-------+----------+---------------+------------+--------------+-------------+-------------+
|name       |age |city         |salary |date      |currency_salary|gender_index|response_index|salary_vector|salary_scaled|
+-----------+----+-------------+-------+----------+---------------+------------+--------------+-------------+-------------+
|john doe   |37.5|NEW YORK     |72500.0|2024-01-15|50000.0        |0.0         |1.0           |[72500.0]    |[0.5625]     |
|jane smith |37.5|LOS ANGELES  |50000.0|2000-01-01|60000.0        |1.0         |2.0           |[50000.0]    |[0.0]        |
|Unknown    |37.5|SAN FRANCISCO|70000.0|2022-08-10|70000.0        |2.0         |2.0           |[70000.0]    |[0.5]        |
|michael    |37.5|CHICAGO      |80000.0|2020-05-20|80000.0        |0.0         |0.0           |[80000.0]    |[0.75]       |
|sara connor|50.0|Unknown      |90000.0|2000-01-01|9000000.0      |1.0         |1.0           |[90000.0]  