# Data Cleaning with PySpark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, regexp_replace, regexp_extract, monotonically_increasing_id


spark = SparkSession.builder.appName("Data_cleaning").getOrCreate()

## 1. Data Reading and Loading

In [None]:
df = spark.read.csv("population.csv", header=True, sep=";")

In [None]:
df.take(3)

In [None]:
df.show(3)

In [None]:
df.collect()

## 2. Data Description

In [None]:
df.printSchema()

In [None]:
df.columns

In [None]:
df.schema.names

In [None]:
df.describe()

In [None]:
df.describe().show()

In [None]:
df.summary().show()

In [None]:
df.count()

In [None]:
len(df.columns)

## 3. Handling Missing Data

In [None]:
df.filter(col("with_migration_background").isNotNull()).count()

In [None]:
df.filter(col("with_migration_background").isNull()).count()

In [None]:
df.columns

In [None]:
for i in df.columns:
    print(f"{i} : {df.filter(col(i).isNull()).count()}")

In [None]:
df = df.fillna("deleting")
df.show(3)

In [None]:
df.count()

## 4. Data Filtering and Cleaning

In [None]:
df.filter(col("total") == "deleting").show()

In [None]:
df.drop(col("total") == "deleting")

In [None]:
df.filter(col("total") == "deleting").count()

In [None]:
df.filter(col("with_migration_background") == "deleting").count()

In [None]:
df.count()

In [None]:
df = df.where(df.total != "deleting")

In [None]:
df.filter(col("total") == "deleting").show()

In [None]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
df = df.withColumn("index", monotonically_increasing_id())
df.show(3)

In [None]:
column_list = df.columns[:20]

In [None]:
df = df.select("index", *column_list)
df.show(3)

In [None]:
df.filter((df.index==1) | (df.index==2)).show()

In [None]:
df.show(3)

## 5. Data Manipulation

In [None]:
df_postcode = df.filter(col("index") <= 4).withColumn("postcode", regexp_replace(col("postcode"), '[^a-zA-Z.]+', ""))
df_postcode.show()

In [None]:
df_postcode.count()

In [None]:
df_postcode_last = df.filter(col("index") > 4).withColumn("postcode", regexp_replace(col("postcode"), '[^0-9.]+', ""))
df_postcode_last.show(3)

In [None]:
df_postcode_last.count()

In [None]:
df = df_postcode.union(df_postcode_last)

In [None]:
df.show(10)

In [None]:
for col_name in df.columns:
    if col_name == "index":
        df = df.drop(col_name)
    elif col_name != "postcode":
        df = df.withColumn(col_name, regexp_replace(col(col_name), '[^0-9]+', ''))

df.show(3)

## 6. Data Type Conversions

In [None]:
df = df.withColumn("year", col("year").cast("int"))

In [None]:
for col_name in df.columns:
    if col_name != "year" and col_name != "postcode":
        df = df.withColumn(col_name, col(col_name).cast("int"))

df.printSchema()

In [None]:
df.summary().show()

In [None]:
df.show()

## 7. Data Merging

In [None]:
df_nl = spark.read.csv("Nederland.csv", header=True)
df_nl.show(3)

In [None]:
df_nl = df_nl.drop(col("Huisnummer"))
df_nl.show(3)

In [None]:
df_nl = df_nl.withColumn("PC6", regexp_extract(df_nl["PC6"], r'^(.{4})', 1))
df_nl.show(3)

In [None]:
df_nl = df_nl.dropDuplicates(['PC6'])
df_nl.show()

In [None]:
df_nl.printSchema()

In [None]:
df = df.join(df_nl, df.postcode == df_nl.PC6, "inner").drop("PC6")

In [None]:
df.show(3)