In [7]:
from pyspark.sql import SparkSession

In [8]:
#import sys
#!{sys.executable} -m pip install pyspark

In [10]:
# Initialize a Spark session
spark = SparkSession.builder \
	.appName("DataCleaning") \
	.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/20 10:27:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [14]:
import random
import pandas as pd

# Function to generate random data with some missing values and duplicates
def generate_data(n):
    
    customer_ids = [f'C{str(i).zfill(5)}' for i in range(1, 101)]
    product_categories = ['Electronics', 'Books', 'Clothing', 'Groceries', 'Furniture']

    data = []
    for i in range(n):
        customer_id = random.choice(customer_ids) if i % 10 != 0 else None  # Introduce some missing values
        transaction_id = f'T{str(random.randint(10000, 99999))}'
        transaction_date = pd.Timestamp('2023-01-01') + pd.to_timedelta(random.randint(0, 180), unit='d')
        amount = round(random.uniform(5, 500), 2)
        product_category = random.choice(product_categories)
        data.append((customer_id, transaction_id, transaction_date, amount, product_category))

        # Introduce duplicates
        data.extend(data[:10])

    return data

In [15]:
# Generate 10,000 rows of data
data = generate_data(10_000)

columns = ['CustomerID', 'TransactionID', 'TransactionDate', 'Amount', 'ProductCategory']
df = pd.DataFrame(data, columns=columns)
spark_df = spark.createDataFrame(df)

spark_df.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+----------+-------------+-------------------+------+---------------+
|CustomerID|TransactionID|    TransactionDate|Amount|ProductCategory|
+----------+-------------+-------------------+------+---------------+
|      NULL|       T19714|2023-06-19 00:00:00|175.03|          Books|
|      NULL|       T19714|2023-06-19 00:00:00|175.03|          Books|
|    C00047|       T93197|2023-05-08 00:00:00| 62.47|          Books|
|      NULL|       T19714|2023-06-19 00:00:00|175.03|          Books|
|      NULL|       T19714|2023-06-19 00:00:00|175.03|          Books|
+----------+-------------+-------------------+------+---------------+
only showing top 5 rows


Exception ignored in: <_io.BufferedWriter name=5>
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/jupyterlab/4.4.5/libexec/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 200, in manager
BrokenPipeError: [Errno 32] Broken pipe
                                                                                

In [16]:
# Hanlde Missing values
# Drop rows with missing CustomerID
spark_df = spark_df.dropna(subset=["CustomerID"])

In [17]:
# Fill missing CustomerID with a default value
spark_df = spark_df.fillna({"CustomerID": "Unknown"})

In [18]:
# Drop duplicate rows based on 'TransactionID'
spark_df = spark_df.dropDuplicates(subset=["TransactionID"])

In [19]:
#normalize the ‘Amount’ column by scaling it between 0 and 1.
from pyspark.sql.functions import col, min, max

# Normalize the 'Amount' column
min_amount = spark_df.agg(min(col("Amount"))).collect()[0][0]
max_amount = spark_df.agg(max(col("Amount"))).collect()[0][0]

spark_df = spark_df.withColumn("Amount", (col("Amount") - min_amount) / (max_amount - min_amount))

In [20]:
# Handle Outliers

from pyspark.sql.functions import col, expr

# Calculate Q1, Q3, and IQR
quantiles = spark_df.approxQuantile("Amount", [0.25, 0.75], 0.05)
Q1 = quantiles[0]
Q3 = quantiles[1]
IQR = Q3 - Q1

# Define the upper and lower bounds
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

# Filter out the outliers
spark_df = spark_df.filter((col("Amount") >= lower_bound) & (col("Amount") <= upper_bound))

In [21]:
spark_df.dtypes

[('CustomerID', 'string'),
 ('TransactionID', 'string'),
 ('TransactionDate', 'timestamp'),
 ('Amount', 'double'),
 ('ProductCategory', 'string')]

In [23]:
from pyspark.sql.functions import to_date

# Convert 'TransactionDate' to date format
spark_df = spark_df.withColumn("TransactionDate", to_date(col("TransactionDate")))

In [24]:
spark_df.head(5)

[Row(CustomerID='C00041', TransactionID='T10009', TransactionDate=datetime.date(2023, 6, 17), Amount=0.6086306517435047, ProductCategory='Groceries'),
 Row(CustomerID='C00065', TransactionID='T10013', TransactionDate=datetime.date(2023, 1, 2), Amount=0.8360337791425916, ProductCategory='Groceries'),
 Row(CustomerID='C00081', TransactionID='T10025', TransactionDate=datetime.date(2023, 3, 8), Amount=0.9513717725968727, ProductCategory='Furniture'),
 Row(CustomerID='C00031', TransactionID='T10027', TransactionDate=datetime.date(2023, 6, 23), Amount=0.5048082750818215, ProductCategory='Electronics'),
 Row(CustomerID='C00020', TransactionID='T10029', TransactionDate=datetime.date(2023, 4, 12), Amount=0.8559941815830943, ProductCategory='Books')]

In [31]:
spark_df

DataFrame[CustomerID: string, TransactionID: string, TransactionDate: date, Amount: double, ProductCategory: string]

## Additional Resources
https://spark.apache.org/docs/latest/api/python/getting_started/index.html

## Why to Use PySpark
Pandas has limitations such as
1. Pandas loads entire datasets into memory so datasets larger than 2-3 GB memory RAM can cause memory errors or slowdowns.
2. Single threaded run on a single CPU
3. Slow on large data

PySpark
1. Perform scalable parallele data processing
2. Integration with big data tools like Hadoop or cloud storage