## **How to Remove Duplicate Records in PySpark â€“ Real-World Transactions Example**

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
from datetime import datetime

# Define schema
transaction_schema = StructType([
    StructField("order_id", IntegerType(), nullable=False),
    StructField("customer_id", StringType(), nullable=False),
    StructField("amount", DoubleType(), nullable=False),
    StructField("transaction_time", TimestampType(), nullable=False),
    StructField("payment_method", StringType(), nullable=False),
    StructField("status", StringType(), nullable=False)
])

transaction_data = [
    (1001, "CUST001", 299.99, datetime.strptime("2024-01-15 10:30:00", "%Y-%m-%d %H:%M:%S"), "credit_card", "processed"),
    (1001, "CUST001", 299.99, datetime.strptime("2024-01-15 10:30:00", "%Y-%m-%d %H:%M:%S"), "credit_card", "processed"),
    (1002, "CUST002", 150.50, datetime.strptime("2024-01-15 11:15:00", "%Y-%m-%d %H:%M:%S"), "paypal", "processed"),
    (1003, "CUST003", 89.99, datetime.strptime("2024-01-15 12:00:00", "%Y-%m-%d %H:%M:%S"), "credit_card", "pending"),
    (1003, "CUST003", 89.99, datetime.strptime("2024-01-15 12:00:00", "%Y-%m-%d %H:%M:%S"), "credit_card", "pending"),
    (1004, "CUST004", 199.99, datetime.strptime("2024-01-15 13:45:00", "%Y-%m-%d %H:%M:%S"), "credit_card", "failed"),
    (1004, "CUST004", 199.99, datetime.strptime("2024-01-15 13:50:00", "%Y-%m-%d %H:%M:%S"), "credit_card", "success")
]

transaction_df = spark.createDataFrame(transaction_data, schema=transaction_schema)

display(transaction_df)

## Method 1: Remove Exact Duplicates (Beginner Level)

In [0]:
transaction_df.dropDuplicates().show()

## Method 2: Remove Duplicates Using a Key Column

In [0]:
transaction_df.dropDuplicates(['order_id']).display()

**Based on Multiple Columns**

In [0]:
transaction_df.dropDuplicates(['order_id','customer_id']).show()

## Window Function (Most Common in Real Projects)

In [0]:
from pyspark.sql.window import Window

from pyspark.sql.functions import row_number, desc , col

window_spec = Window.partitionBy("order_id").orderBy(desc("transaction_time"))

transactions_with_rank = transaction_df.withColumn("row_num", row_number().over(window_spec))

display(transactions_with_rank)

latest_transactions = transactions_with_rank.filter(col("row_num") == 1).drop("row_num")

latest_transactions.display()

## Method 4: Business Logic Deduplication (Advanced Level)

In [0]:
# Create priority column
status_priority = {"success": 1, "processed": 2, "pending": 3, "failed": 4}

from pyspark.sql.functions import when
transactions_priority = transaction_df.withColumn(
    "status_priority",
    when(col("status") == "success", 1)
    .when(col("status") == "processed", 2)
    .when(col("status") == "pending", 3)
    .otherwise(4)
)
transactions_priority.display()

window_priority = Window.partitionBy("order_id").orderBy("status_priority", "transaction_time")

best_transactions = transactions_priority.withColumn("rank", row_number().over(window_priority)) \
                                        .filter(col("rank") == 1) \
                                        .drop("status_priority", "rank")
best_transactions.display()