In [0]:
from pyspark.sql.functions import col

# Sample DataFrame 1: Customers
customers = spark.createDataFrame([
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie")
], ["customer_id", "name"])

# Sample DataFrame 2: Orders
orders = spark.createDataFrame([
    (1, "Laptop"),
    (2, "Tablet"),
    (1, "Phone"),
    (4, "Monitor")
], ["customer_id", "product"])

# Perform an inner join on 'customer_id'
joined_df = customers.join(orders, on="customer_id", how="left")    # join types: inner, left, right, outer

# Show the result
joined_df.show()


In [0]:
from pyspark.sql.functions import col

# Sample data with duplicates
data = [
    (1, "John Doe", "john.doe@example.com"),
    (2, "Jane Smith", "jane.smith@example.com"),
    (3, "John Doe", "john.doe@example.com"),
    (4, "Alice Johnson", "alice.johnson@example.com"),
    (5, "Jane Smith", "jane.smith@example.com")
]

# Define schema
schema = ["CustomerID", "Name", "Email"]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Show the original DataFrame
print("Original DataFrame:")
df.show()

# Detect duplicates based on 'Name' and 'Email'
duplicates = df.groupBy("Name", "Email").count().filter(col("count") > 1)

# Show the duplicates
print("Duplicate Records:")
duplicates.show()



In [0]:
from pyspark.sql import SparkSession


# Sample data for DataFrame 1
data1 = [
    (1, "John Doe", "john.doe@example.com"),
    (2, "Jane Smith", "jane.smith@example.com"),
    (3, "Alice Johnson", "alice.johnson@example.com")
]

# Sample data for DataFrame 2
data2 = [
    (4, "John Doe", "john.doe@example.com"),
    (5, "Jane Smith", "jane.smith@example.com"),
    (6, "Bob Brown", "bob.brown@example.com")
]

# Define schema
schema = ["CustomerID", "Name", "Email"]

# Create DataFrames
df1 = spark.createDataFrame(data1, schema)
df2 = spark.createDataFrame(data2, schema)

# Perform inner join to find customers in both DataFrames
joined_df = df1.join(df2, on=["Name", "Email"], how="inner")

# Show the result
joined_df.show()



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

# Sample data for DataFrame 1
data1 = [
    (1, "John Doe", "john.doe@example.com"),
    (2, "Jane Smith", "jane.smith@example.com"),
    (3, "Alice Johnson", "alice.johnson@example.com")
]

# Sample data for DataFrame 2
data2 = [
    (4, "Jon Doe", "john.doe@example.com"),
    (5, "Jane Smyth", "jane.smith@example.com"),
    (6, "Bob Brown", "bob.brown@example.com")
]

# Define schema
schema = ["CustomerID", "Name", "Email"]

# Create DataFrames
df1 = spark.createDataFrame(data1, schema)
df2 = spark.createDataFrame(data2, schema)

# Rename columns to avoid ambiguity
df1 = df1.withColumnRenamed("CustomerID", "CustomerID1") \
         .withColumnRenamed("Name", "Name1") \
         .withColumnRenamed("Email", "Email1")

df2 = df2.withColumnRenamed("CustomerID", "CustomerID2") \
         .withColumnRenamed("Name", "Name2") \
         .withColumnRenamed("Email", "Email2")

# Perform fuzzy join using Levenshtein distance on 'Name'
fuzzy_joined_df = df1.crossJoin(df2).filter(expr("levenshtein(Name1, Name2) <= 2"))  # less than 2 edits required to match

# Show the result
fuzzy_joined_df.show()



### Jaro Winkler demonstration

Measures similarity between two strings based on:
- The number of matching characters.
- The number of transpositions (characters that match but are out of order).
- Produces a score between 0 (no similarity) and 1 (exact match).


In [0]:
%pip install textdistance

In [0]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
import textdistance

# Sample data for DataFrame 1
data1 = [
    (1, "John Doe", "john.doe@example.com"),
    (2, "Jane Smith", "jane.smith@example.com"),
    (3, "Alice Johnson", "alice.johnson@example.com")
]

# Sample data for DataFrame 2
data2 = [
    (4, "Jon Doe", "john.doe@example.com"),
    (5, "Jane Smyth", "jane.smith@example.com"),
    (6, "Bob Brown", "bob.brown@example.com")
]

# Define schema
schema = ["CustomerID", "Name", "Email"]

# Create DataFrames
df1 = spark.createDataFrame(data1, schema)
df2 = spark.createDataFrame(data2, schema)

# Rename columns to avoid ambiguity
df1 = df1.withColumnRenamed("CustomerID", "CustomerID1") \
         .withColumnRenamed("Name", "Name1") \
         .withColumnRenamed("Email", "Email1")

df2 = df2.withColumnRenamed("CustomerID", "CustomerID2") \
         .withColumnRenamed("Name", "Name2") \
         .withColumnRenamed("Email", "Email2")

# Define a UDF for Jaro-Winkler similarity
def jaro_winkler_similarity(s1, s2):
    if s1 is None or s2 is None:
        return 0.0
    return float(textdistance.jaro_winkler.normalized_similarity(s1, s2))

jaro_winkler_udf = udf(jaro_winkler_similarity, DoubleType())

# Perform cross join and apply Jaro-Winkler similarity UDF
fuzzy_joined_df = df1.crossJoin(df2) \
    .withColumn("similarity", jaro_winkler_udf(col("Name1"), col("Name2"))) \
    .filter(col("similarity") >= 0.85)

# Show the result
fuzzy_joined_df.show()

