In [0]:
# ################################################## Module Information ###################################################
#   Module Name         :   Fuzzy Matching of two datasets
#   Description         :   This module is designed to perform fuzzy matching between two data tables based on specified
#                           columns. The process includes:
#                               a. Loading data from the input tables.
#                               b. Takng distinct values from the targetted columns.
#                               c. Data normalization (conversion to lowercase, trimming, removal of special characters).
#                               d. Data reference optimization for parallel processing efficiency.
#                               e. Fuzzy matching using Jaro-Winkler similarity for first comparison.
#                               f. Selection of top-ranked matches based on given priority parameter.
#                               g. Removal of block words and further data cleaning to improve accuracy.
#                               h. Fuzzy mathcing using Jaccard Similarity to enhance matching results.
#                               i. Filtering result based on similarity thresholds to refine results.
#                               j. Compilation of final output including match scores and insights.
#   
#   Inputs              :   Two data tables for comparison, Fuzzy target columns, 
#                           Priority table for coverage(default: data-1), Threshold values(defaul present),  
#                           Block words(optional)
#   Outputs             :   A DataFrame containing matched product names
#   Requirements        :   Python environment with necessary libraries.
#   Last Modified       :   25 Sept, 2024
#   Author              :   Rehan Mansoori
#   Last modified By    :   Rehan Mansoori
#   Change Log          :   Added Parallel Processing Enhancement
# ######################################################################################################################

In [0]:
# Install necessary Python packages for fuzzy matching and text processing
!pip install python-Levenshtein
!pip install fuzzywuzzy scikit-learn
!pip install openpyxl

# Import necessary libraries and functions
from pyspark.sql.functions import *
from pyspark.sql.functions import col, regexp_replace, udf, lower, split, explode, count, collect_list
from collections import Counter
import concurrent.futures
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window
from pyspark.sql.functions import explode, split, lower, col, count, regexp_replace
from fuzzywuzzy import fuzz
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import jaccard_score
import pandas as pd
import openpyxl

In [0]:
try:
    # Creating widgets for inputs
    dbutils.widgets.text("User_table1_query", "", "1. Data 1 (Path/Query)")
    dbutils.widgets.text("User_table2_query", "", "2. Data 2 (Path/Query)")
    dbutils.widgets.text("Col_t1","", "3. Match Column for Data 1")
    dbutils.widgets.text("Col_t2","", "4. Match Column for Data 2")

    #Optional Value widget inputs
    dbutils.widgets.text("Rank_col", "1", "Rank selection (Data: 1 or 2)")
    dbutils.widgets.text("block_words", "", "Comma-separated block words (optional)")
    dbutils.widgets.text("num_partitions", "28", "Number of partitions for Parallel processing")
    dbutils.widgets.text("jaccard_similarity_threshold", "0.25", "Jaccard similarity threshold")
    dbutils.widgets.text("jaro_winkler_similarity_threshold", "0.85", "Jaro-Winkler similarity threshold")
except Exception as e:
    print(f"Error creating widgets: {str(e)}")

In [0]:
## Function to load data while handling case of CSV, txt, Parquet, Excel and SQL
def load_data(query_or_path):
    try:
        if query_or_path.lower().endswith((".csv", ".txt")):
            return spark.read.options(header=True, quote='"', multiLine=True, escape='"').csv(query_or_path)
        elif query_or_path.lower().endswith(".parquet"):
            return spark.read.parquet(query_or_path)
        elif query_or_path.lower().endswith((".xlsx", ".xls")):
            return spark.read.format("com.crealytics.spark.excel").option("useHeader", "true").option("treatEmptyValuesAsNulls", "true").option("inferSchema", "true").load(query_or_path)
        else:
            return spark.sql(query_or_path)
    except Exception as e:
        print(f"Error loading data: {str(e)}")
        return None

# Retrieving widget values
try:
    user_table1_query = dbutils.widgets.get("User_table1_query")
    user_table2_query = dbutils.widgets.get("User_table2_query")

    user_table1 = load_data(user_table1_query)
    user_table2 = load_data(user_table2_query)
    col_t1 = dbutils.widgets.get("Col_t1")
    col_t2 = dbutils.widgets.get("Col_t2")

    Rank_col = dbutils.widgets.get("Rank_col")
    if Rank_col == "1":
        rank_col = col_t1
    else:
        rank_col = col_t2

    # Handling optional block words input
    block_words_input = dbutils.widgets.get("block_words")
    if block_words_input:
        block_words = block_words_input.split(",")
    else:
        block_words = []  # Default to empty list if not provided

    # Retrieving number of partitions, Jaccard and Jaro-Winkler similarity thresholds
    num_partitions = int(dbutils.widgets.get("num_partitions"))
    jaccard_similarity_threshold = float(dbutils.widgets.get("jaccard_similarity_threshold"))
    jaro_winkler_similarity_threshold = float(dbutils.widgets.get("jaro_winkler_similarity_threshold"))
except Exception as e:
    print(f"Error retrieving widget values or processing data: {str(e)}")

In [0]:
try:
    # Select distinct values from the specified column in tables
    table1 = user_table1.select(col_t1).distinct()
    table2 = user_table2.select(col_t2).distinct()

    # Clean the columns in table1 by converting to lowercase and removing non-alphanumeric characters
    table1 = table1.withColumn(f"{col_t1}_prsd", lower(trim(regexp_replace(col_t1, "[^a-zA-Z0-9\s]", ""))))

    # Clean the columns in table2 by converting to lowercase and removing non-alphanumeric characters
    table2 = table2.withColumn(f"{col_t2}_prsd", lower(trim(regexp_replace(col_t2, "[^a-zA-Z0-9\s]", ""))))

    col_t1_prsd = f"{col_t1}_prsd"
    col_t2_prsd = f"{col_t2}_prsd"

    # Indicate preprocessing completion
    print('Preprocessing done')

    # Display counts of unique products in both DataFrames
    table1_count = table1.count()
    table2_count = table2.count()
    print("table1 count : "+ str(table1_count))
    print("table2 count : "+ str(table2_count))

    # Automatically identify the table with greater count and exchange values if necessary
    if table2_count > table1_count:
        print("Exchanging data table references for Parallel Processing enhancement")
        table1, table2 = table2, table1
        col_t1, col_t2 = col_t2, col_t1
        col_t1_prsd, col_t2_prsd = col_t2_prsd, col_t1_prsd
        if rank_col == col_t1:
            rank_col = col_t2
        else:
            rank_col = col_t1
except Exception as e:
    print(f"Error in preprocessing: {str(e)}")

In [0]:
# Define utility functions for calculating maximum and minimum values
def max_value(a, b):
    """Return the maximum of two values."""
    return a if a > b else b

def min_value(a, b):
    """Return the minimum of two values."""
    return a if a < b else b

# Define the Jaro-Winkler similarity function
def jaro_winkler_similarity(s1, s2):
    """Calculate the Jaro-Winkler similarity between two strings."""
    try:
        # Preprocess strings by stripping and converting to lowercase
        s1 = s1.strip().lower()
        s2 = s2.strip().lower()

        # Initialize variables for matching characters
        match_count = 0
        s1_matches = [False] * len(s1)
        s2_matches = [False] * len(s2)

        # Calculate matching characters between s1 and s2
        for i in range(len(s1)):
            for j in range(len(s2)):
                if not s2_matches[j] and s1[i] == s2[j]:
                    s1_matches[i] = s2_matches[j] = True
                    match_count += 1
                    break

        # Calculate transpositions
        trans_count = 0
        for i in range(len(s1)):
            if i < len(s2) and s1_matches[i] and s2_matches[i] and s1[i] != s2[i]:
                trans_count += 1

        # Calculate Jaro similarity
        if match_count == 0:
            jaro_similarity = 0.0
        else:
            jaro_similarity = (match_count / len(s1) + match_count / len(s2) + (match_count - trans_count) / match_count) / 3.0

        # Calculate prefix length for Jaro-Winkler adjustment
        prefix_len = 0
        for i in range(min_value(4, min_value(len(s1), len(s2)))):
            if s1[i] == s2[i]:
                prefix_len += 1
            else:
                break

        # Calculate Jaro-Winkler similarity
        jaro_winkler_similarity = jaro_similarity + 0.1 * prefix_len * (1 - jaro_similarity)

        return jaro_winkler_similarity
    except Exception as e:
        print(f"Error calculating Jaro-Winkler similarity: {e}")
        return None

# Function to process a partition of table1 and calculate similarity scores
def process_partition(partition, table2_data):
    """Process a partition of table1 and calculate Jaro-Winkler similarity scores."""
    results = []
    try:
        for row1 in partition:
            for row2 in table2_data:
                similarity_score = jaro_winkler_similarity(row1[col_t1_prsd], row2[col_t2_prsd])
                results.append((row1[col_t1], row2[col_t2], row1[col_t1_prsd], row2[col_t2_prsd], similarity_score))
    except Exception as e:
        print(f"Error processing partition: {e}")
    return results

# Split table1 into partitions for parallel processing
num_partitions = 28  # Number of partitions (threads) you want
table1_partitions = table1.randomSplit([1.0 / num_partitions] * num_partitions)
table2_data = table2.collect()  # Collect table2 cleaned data to use in all threads

# Process each partition in parallel using ThreadPoolExecutor
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=num_partitions) as executor:
    future_to_partition = {executor.submit(process_partition, partition.collect(), table2_data): partition for partition in table1_partitions}
    
    for future in concurrent.futures.as_completed(future_to_partition):
        try:
            results.extend(future.result())
        except Exception as e:
            print(f"Error retrieving results from future: {e}")

# Create a DataFrame from the results list
result_df = spark.createDataFrame(results, [f'{col_t1}', f'{col_t2}', f'{col_t1_prsd}', f'{col_t2_prsd}', 'jaro_winkler_similarity'])

In [0]:
try:
    # Define a window specification to partition data by 'Product' and order by 'jaro_winkler_similarity' in descending order
    window_spec = Window.partitionBy(rank_col).orderBy(col("jaro_winkler_similarity").desc())

    # Apply the window specification to rank the rows within each 'Product' group
    ranked_df = result_df.withColumn(f"ranked_{rank_col}", row_number().over(window_spec))

    # Filter to select only the top ranked record for each 'Product' group
    rank1_rcds = ranked_df.filter(col(f"ranked_{rank_col}") == 1)

    # Count the number of records in the 'Product' column of the rank1_rcds DataFrame
    print("Count of df after Ranking: "+ str(rank1_rcds.count()))
    rank1_rcds.select(f'{rank_col}').count()
except Exception as e:
    print(f"Error in ranking: {str(e)}")

In [0]:
try:
    # Step 3: Remove block words from the 'Product' column

    # Create a regular expression pattern to match block words
    block_words_expr = r'\b(' + '|'.join(block_words) + r')\b'

    # Remove block words from the 'Product' column and create a new column 'Cleaned_Product'
    rank1_rcds_cleaned = rank1_rcds.withColumn(f"{col_t1_prsd}_cln", regexp_replace(lower(col(col_t1_prsd)), block_words_expr, ""))
    rank1_rcds_cleaned = rank1_rcds_cleaned.withColumn(f"{col_t2_prsd}_cln", regexp_replace(lower(col(col_t2_prsd)), block_words_expr, ""))

    print("Count of df after Cleaning: "+ str(rank1_rcds_cleaned.count()))

    cln_col_t1 = f"{col_t1_prsd}_cln"
    cln_col_t2 = f"{col_t2_prsd}_cln"
except Exception as e:
    print(f"Error removing block words: {e}")
    rank1_rcds_cleaned = None
    cln_col_t1 = None
    cln_col_t2 = None

In [0]:
# Function to calculate Jaccard Similarity (Token-Based) with division by zero handling
def jaccard_similarity(s1, s2):
    try:
        s1_tokens = set(s1.split())
        s2_tokens = set(s2.split())
        intersection = s1_tokens.intersection(s2_tokens)
        union = s1_tokens.union(s2_tokens)
        if len(union) == 0:
            return 0  # or return 1, depending on the intended behavior for comparing two empty strings
        return len(intersection) / len(union)
    except ZeroDivisionError:
        return 0

# Sequential function to apply second algorithm on results from the first algorithm
def apply_second_algorithm(rank1_rcds_cleaned):
    rank1_rcds_cleaned = rank1_rcds_cleaned.toPandas()  # Convert Spark DataFrame to Pandas for easier manipulation

    rank1_rcds_cleaned['jaccard_similarity'] = rank1_rcds_cleaned.apply(lambda row: jaccard_similarity(row[cln_col_t1], row[cln_col_t2]), axis=1)

    return rank1_rcds_cleaned

# Apply the second algorithm
final_rank1_rcds_cleaned = apply_second_algorithm(rank1_rcds_cleaned)

# Convert back to Spark DataFrame if needed
final_result_spark_df = spark.createDataFrame(final_rank1_rcds_cleaned)

final_result_spark_df = final_result_spark_df.select(f"{col_t1}", f"{cln_col_t1}", f"{col_t2}", f"{cln_col_t2}", "jaro_winkler_similarity", "jaccard_similarity")


# Cast the 'jaccard_similarity' column to float type for consistency in numerical operations
final_result_unfiltered_score = final_result_spark_df.withColumn("jaccard_similarity", col("jaccard_similarity").cast("float")) \
                                                .withColumn("jaro_winkler_similarity", col("jaro_winkler_similarity").cast("float"))

In [0]:
try:
    # Filter rows where jaccard_similarity is greater than 0
    final_result_semifiltered_score = final_result_unfiltered_score.filter(col("jaccard_similarity") > 0)

    # Further filter the DataFrame to include rows where jaccard_similarity is at least 0.25 or jaro_winkler_similarity is at least 0.85
    final_result_filtered_score = final_result_semifiltered_score.filter((col("jaccard_similarity") >= jaccard_similarity_threshold) | (col("jaro_winkler_similarity") >= jaro_winkler_similarity_threshold))
except Exception as e:
    print("An error occurred:", str(e))

In [0]:
try:
    # Select specific columns from the dataframe for further processing
    # "jaro_winkler_similarity" and "jaccard_similarity" are metrics for comparing string similarity
    final_result_filtered_score = final_result_filtered_score.select(f"{col_t1}", f"{col_t2}", f"{cln_col_t1}", f"{cln_col_t2}", f"jaro_winkler_similarity", f"jaccard_similarity")
except Exception as e:
    print("An error occurred:", str(e))

In [0]:
try:
    final_match = final_result_filtered_score.select(f"{col_t1}", f"{col_t2}")
except Exception as e:
    print("An error occurred:", str(e))

In [0]:
final_result_unfiltered_score.display()

In [0]:
final_result_filtered_score.display()

In [0]:
final_match.display()