## Install PySpark

In [None]:
! pip3 install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=6bd80e2df67a29c669daab45ed1eb501ce4a7f36d432bceb1d510132c890bcd0
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


## 1. Start a PySpark Session

In [None]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
	.appName("DataCleaning") \
	.getOrCreate()


## 2. Generate a Sample Dataset

In [None]:
import random
import pandas as pd

# Function to generate random data with some missing values and duplicates
def generate_data(n):
    customer_ids = [f'C{str(i).zfill(5)}' for i in range(1, 101)]
    product_categories = ['Electronics', 'Books', 'Clothing', 'Groceries', 'Furniture']

    data = []
    for i in range(n):
        customer_id = random.choice(customer_ids) if i % 10 != 0 else None  # Introduce some missing values
        transaction_id = f'T{str(random.randint(10000, 99999))}'
        transaction_date = pd.Timestamp('2023-01-01') + pd.to_timedelta(random.randint(0, 180), unit='d')
        amount = round(random.uniform(5, 500), 2)
        product_category = random.choice(product_categories)
        data.append((customer_id, transaction_id, transaction_date, amount, product_category))

        # Introduce duplicates
        data.extend(data[:10])

    return data

In [None]:
# Generate 10,000 rows of data
data = generate_data(10_000)

# Convert to a Pandas DataFrame and then to PySpark DataFrame
columns = ['CustomerID', 'TransactionID', 'TransactionDate', 'Amount', 'ProductCategory']
df = pd.DataFrame(data, columns=columns)
spark_df = spark.createDataFrame(df)

spark_df.show(5)


+----------+-------------+-------------------+------+---------------+
|CustomerID|TransactionID|    TransactionDate|Amount|ProductCategory|
+----------+-------------+-------------------+------+---------------+
|      NULL|       T17203|2023-03-20 00:00:00|221.92|          Books|
|      NULL|       T17203|2023-03-20 00:00:00|221.92|          Books|
|    C00058|       T63296|2023-02-11 00:00:00|157.92|      Groceries|
|      NULL|       T17203|2023-03-20 00:00:00|221.92|          Books|
|      NULL|       T17203|2023-03-20 00:00:00|221.92|          Books|
+----------+-------------+-------------------+------+---------------+
only showing top 5 rows



In [None]:
spark_df.dtypes

[('CustomerID', 'string'),
 ('TransactionID', 'string'),
 ('TransactionDate', 'date'),
 ('Amount', 'double'),
 ('ProductCategory', 'string')]

## 3. Handle Missing Values

In [None]:
# Fill missing CustomerID with a default value
spark_df = spark_df.fillna({"CustomerID": "Unknown"})


## 4. Remove Duplicates

In [None]:
from pyspark.sql.functions import col, min, max

# Normalize the 'Amount' column
min_amount = spark_df.agg(min(col("Amount"))).collect()[0][0]
max_amount = spark_df.agg(max(col("Amount"))).collect()[0][0]

spark_df = spark_df.withColumn("Amount", (col("Amount") - min_amount) / (max_amount - min_amount))


## 5. Transform Columns

In [None]:
from pyspark.sql.functions import col, min, max

# Normalize the 'Amount' column
min_amount = spark_df.agg(min(col("Amount"))).collect()[0][0]
max_amount = spark_df.agg(max(col("Amount"))).collect()[0][0]

spark_df = spark_df.withColumn("Amount", (col("Amount") - min_amount) / (max_amount - min_amount))

## 6. Handle Outliers

In [None]:
from pyspark.sql.functions import col, expr

# Calculate Q1, Q3, and IQR
quantiles = spark_df.approxQuantile("Amount", [0.25, 0.75], 0.05)
Q1 = quantiles[0]
Q3 = quantiles[1]
IQR = Q3 - Q1

# Define the upper and lower bounds
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

# Filter out the outliers
spark_df = spark_df.filter((col("Amount") >= lower_bound) & (col("Amount") <= upper_bound))


## 7. Convert Data Types

In [None]:
from pyspark.sql.functions import to_date

# Convert 'TransactionDate' to date format
# (not quite needed for this dataset)
spark_df = spark_df.withColumn("TransactionDate", to_date(col("TransactionDate")))
