In [7]:
#vv2289  - Vamshi

import csv
import pandas as pd
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from csv import reader

cf = SparkConf()
cf.set("spark.submit.deployMode", "client")
sc = SparkContext.getOrCreate(cf)
    
spark = SparkSession.builder.appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

csv_path = "/shared/dataWildlife/filtered_copy.csv"

df =spark.read.option("escape", "\"").option('quote',"\"").option("multiline", True).csv(csv_path, inferSchema=True, header=True)

df_filtered = df.filter(df["animal_name"] != "")
df_filtered = df_filtered.filter(df_filtered["seller"] != "")

df_filtered.show()





                                                                                

+--------------------+--------------------+--------------------+--------------------+--------------------+------+--------+--------------------+--------------------+------------+---------------+-----------------+--------------------+------------+
|               title|         description|              seller|            location|             country| price|currency|                  id|                 url|      domain|predicted_label|            match|         animal_name|product_type|
+--------------------+--------------------+--------------------+--------------------+--------------------+------+--------+--------------------+--------------------+------------+---------------+-----------------+--------------------+------------+
|10 OSTRICH FEATHE...|10 OSTRICH Feathe...|       thefeatherguy|Flushing, New Yor...|       United States| 70.99|     USD|4585214d-b4d7-4c0...|http://picclick.c...|picclick.com|            1.0|          Ostrich|             Ostrich|     feather|
|1 PCS REAL Ostr

In [10]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType

def simple_fuzzy_match(str1, str2):
    str1, str2 = str1.lower(), str2.lower()
    matches = sum((1 for a, b in zip(str1, str2) if a == b))
    max_length = max(len(str1), len(str2))
    return int((matches / max_length) * 100) if max_length != 0 else 0

fuzzy_match_udf = udf(simple_fuzzy_match, IntegerType())

def find_duplicates(df, limiter=5000):
    df1 = df.alias('df1')
    df2 = df.alias('df2')
    
    condition = (df1.animal_name == df2.animal_name) & \
                (df1.product_type == df2.product_type) & \
                (df1.seller != df2.seller)
    joined_df = df1.join(df2, condition)
    
    result_df = joined_df.withColumn("similarity", fuzzy_match_udf(col("df1.title"), col("df2.title")))
    filtered_df = result_df.filter(col("similarity") >= 90)  

    final_df = filtered_df.select(
        col("df1.seller").alias("seller"),
        col("df2.seller").alias("duplicate_seller"),
        col("df1.title").alias("title"),
        col("df2.title").alias("duplicate_title"),
        col("df1.location").alias("location"),
        col("df2.location").alias("duplicate_location"),
        col("df1.country").alias("country"),
        col("df2.country").alias("duplicate_country"),
        col("df1.price").alias("price"),
        col("df2.price").alias("duplicate_price"),
        col("df1.currency").alias("currency"),
        col("df2.currency").alias("duplicate_currency"),
        col("df1.url").alias("url"),
        col("df2.url").alias("duplicate_url"),
        "similarity"
    )
    
    limited_df = final_df.limit(limiter)
    return limited_df

duplicates_df = find_duplicates(df_filtered, limiter=5000)
duplicates_df.show()

[Stage 10:>                                                         (0 + 1) / 1]

+----------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+---------------+--------+------------------+--------------------+--------------------+----------+
|          seller|duplicate_seller|               title|     duplicate_title|            location|  duplicate_location|             country|   duplicate_country|price|duplicate_price|currency|duplicate_currency|                 url|       duplicate_url|similarity|
+----------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+---------------+--------+------------------+--------------------+--------------------+----------+
|      benel_7507|      jewelry888|1 PCS REAL Ostric...|3 PCS REAL Ostric...|        shenzhen, CN| YiwuïŒZhejiang, CN|People’s Republic...|    Italian Republic| 53.0|          86.99|     USD|             


                                                                                

In [11]:
import csv
from csv import reader
duplicates_df.write.csv('duplicate_seller_details.csv', header=True, mode="overwrite")


                                                                                

In [12]:
from pyspark.sql.functions import col

seller_pairs_count = duplicates_df.groupBy("seller", "duplicate_seller").count()

sorted_seller_pairs_count = seller_pairs_count.orderBy(col("count").desc())

sorted_seller_pairs_count.show()


[Stage 17:>                                                         (0 + 1) / 1]

+--------------------+--------------------+-----+
|              seller|    duplicate_seller|count|
+--------------------+--------------------+-----+
|bigbangbigbangbig...|              yumeny|  287|
|            vewof-26|              xbx-my|  189|
|              xbx-my|            vewof-26|  176|
|           dreamroom|            vewof-26|  167|
|              yumeny|bigbangbigbangbig...|  157|
|      selltotheworld|    allaboutlearning|  130|
|            vewof-26|           dreamroom|  119|
|    allaboutlearning|      selltotheworld|  112|
|       luoqipeng5816|            vewof-26|  111|
|       luoqipeng5816|           dreamroom|   96|
|           sunnyrain|            redpeach|   92|
| goldengoosejewelers|    jewelry4less_atl|   90|
|     la-na-time-shop|          shendgjgfu|   81|
|            gaofudev|    allaboutlearning|   73|
|    jewelry4less_atl| goldengoosejewelers|   72|
|            redpeach|           sunnyrain|   71|
|   nastiaahstrinkets|      aqhatrailrider|   70|



                                                                                

In [14]:
from pyspark.sql.functions import concat_ws, col, lit, split, max as spark_max
import pyspark.sql.functions as F

sorted_seller_pairs_count = sorted_seller_pairs_count.withColumn(
    "normalized_pair",
    concat_ws(",", 
              F.sort_array(F.array("seller", "duplicate_seller")))
)

max_pairs_df = sorted_seller_pairs_count.groupBy("normalized_pair").agg(
    spark_max("count").alias("max_count")
)

max_pairs_df = max_pairs_df.withColumn(
    "seller", split(col("normalized_pair"), ",").getItem(0)
).withColumn(
    "duplicate_seller", split(col("normalized_pair"), ",").getItem(1)
).select(
    "seller", "duplicate_seller", "max_count"
)

max_pairs_df = max_pairs_df.orderBy(col("max_count").desc())

max_pairs_df.show()

max_pairs_df.write.csv('seller_match_count.csv', header=True, mode="overwrite")


                                                                                

+--------------------+--------------------+---------+
|              seller|    duplicate_seller|max_count|
+--------------------+--------------------+---------+
|bigbangbigbangbig...|              yumeny|      287|
|            vewof-26|              xbx-my|      189|
|           dreamroom|            vewof-26|      167|
|    allaboutlearning|      selltotheworld|      130|
|       luoqipeng5816|            vewof-26|      111|
|           dreamroom|       luoqipeng5816|       96|
|            redpeach|           sunnyrain|       92|
| goldengoosejewelers|    jewelry4less_atl|       90|
|     la-na-time-shop|          shendgjgfu|       81|
|    allaboutlearning|            gaofudev|       73|
|      aqhatrailrider|   nastiaahstrinkets|       70|
|        blinkfan_169|          maycocomay|       61|
|bigbangbigbangbig...|       miniyellowcat|       55|
|     aquariumzonetoo|        yebenstore03|       54|
|         baymax_2015|bigbangbigbangbig...|       52|
|             lus8968|      

                                                                                