In [0]:
from pyspark.sql import functions as f
from pyspark.sql import Window as w
from typing import Dict, List
import logging
import re
import time
import unicodedata
import pandas as pd
import numpy as np
import nltk
from nltk.corpus import stopwords
import itertools
import collections
from sklearn.utils import resample
from scipy.spatial.distance import cosine, jaccard
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix  
from sklearn.metrics import accuracy_score
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import recall_score

How many true duplicates 2422

In [0]:
df_master = spark.table('data_user_hien.master_data_lsh').select('shop','title','modelID')


Which columns we select to generate the input text -> select title

In [0]:
def generate_text(df, merge_columns):
    return df[merge_columns].apply(lambda row: ' '.join(row.values.astype(str)).lower(), axis=1)

df_input         = df_master.orderBy(f.rand()).toPandas()
df_input['text'] = generate_text(df_input,  ["title"])


In [0]:
df_input

Unnamed: 0,shop,title,modelID,text
0,bestbuy.com,"""Toshiba Refurbished 49"""" Class 4918"""" Diag. L...",TOS-50L5200UBx,"""toshiba refurbished 49"""" class 4918"""" diag. l..."
1,amazon.com,Sharp LC60LE835U Quattron 60-inch 1080p 240 Hz...,LC60LE835U,sharp lc60le835u quattron 60-inch 1080p 240 hz...
2,newegg.com,"""Toshiba 19"""" Class (18.5"""" Diag.) 720p 60Hz L...",19SL410U,"""toshiba 19"""" class (18.5"""" diag.) 720p 60hz l..."
3,newegg.com,"""Newegg.com - Refurbished: Sanyo 55"""" 1080p 12...",DP55441,"""newegg.com - refurbished: sanyo 55"""" 1080p 12..."
4,newegg.com,"""Newegg.com - Refurbished: LG 37"""" (36.6"""" Dia...",37LT770H,"""newegg.com - refurbished: lg 37"""" (36.6"""" dia..."
...,...,...,...,...
1619,newegg.com,"""Newegg.com - LG 60"""" Class (59.8"""" diagonal) ...",60PH6700,"""newegg.com - lg 60"""" class (59.8"""" diagonal) ..."
1620,bestbuy.com,Samsung Refurbished 40in Black LCD Monitor wTV...,400CXn,samsung refurbished 40in black lcd monitor wtv...
1621,newegg.com,"""Newegg.com - LG INFINIA 50"""" 1080p 600Hz Plas...",50PZ750,"""newegg.com - lg infinia 50"""" 1080p 600hz plas..."
1622,newegg.com,"""Panasonic Viera 42"""" Class (41.6"""" Diag.) 108...",TC-P42S30,"""panasonic viera 42"""" class (41.6"""" diag.) 108..."


Support Cleaning Text Function

In [0]:
def clean_text(sentence):
    SUBSTITUTE_REGEX = re.compile(r' +')
    KEEP_REGEX = re.compile(r'[a-zA-Z0-9\s]')
    MAX_CHARACTERS_ALLOWED_IN_THE_TITLE = 1000

    text = unicodedata.normalize('NFD', sentence)
    text = text.encode('ascii', 'ignore').decode('utf-8').lower().replace('-',
                                                                          ' ')  # Remove accents and convert to lower case
    text = text.lower()
    text = re.sub(r'[^\w\s]', '', text)
    text = remove_stopwords(text)
    text = ''.join(KEEP_REGEX.findall(text))  # Extract only alphanumeric characters
    text = SUBSTITUTE_REGEX.sub(' ', text).strip()  # Replace multiple spaces with single space
    text = text[: MAX_CHARACTERS_ALLOWED_IN_THE_TITLE].strip()
    text = text.lower()

    return text

    
def remove_stopwords(text):
    text = text.replace('neweggcom',"")
    text = text.replace('best buy',"")
    text = text.replace('thenerdsnet',"")
    stop_words = set(stopwords.words('english'))
    for stop_word in stop_words:
      if stop_word in text.lower():
         text.replace(stop_word, "")
    return text

def correctDuplicates(id_1,id_2):
  if id_1 == id_2:
    isDup = 1
  else:
    isDup = 0
  return isDup

In [0]:
df_input['text'] = df_input.text.apply(lambda x: clean_text(x))


In [0]:
df_input

Unnamed: 0,shop,title,modelID,text
0,bestbuy.com,"""Toshiba Refurbished 49"""" Class 4918"""" Diag. L...",TOS-50L5200UBx,toshiba refurbished 49 class 4918 diag led 108...
1,amazon.com,Sharp LC60LE835U Quattron 60-inch 1080p 240 Hz...,LC60LE835U,sharp lc60le835u quattron 60 inch 1080p 240 hz...
2,newegg.com,"""Toshiba 19"""" Class (18.5"""" Diag.) 720p 60Hz L...",19SL410U,toshiba 19 class 185 diag 720p 60hz led lcd hd...
3,newegg.com,"""Newegg.com - Refurbished: Sanyo 55"""" 1080p 12...",DP55441,refurbished sanyo 55 1080p 120hz led lcd hdtv ...
4,newegg.com,"""Newegg.com - Refurbished: LG 37"""" (36.6"""" Dia...",37LT770H,refurbished lg 37 366 diagonal commercial hosp...
...,...,...,...,...
1619,newegg.com,"""Newegg.com - LG 60"""" Class (59.8"""" diagonal) ...",60PH6700,lg 60 class 598 diagonal 1080p 600hz plasma hd...
1620,bestbuy.com,Samsung Refurbished 40in Black LCD Monitor wTV...,400CXn,samsung refurbished 40in black lcd monitor wtv...
1621,newegg.com,"""Newegg.com - LG INFINIA 50"""" 1080p 600Hz Plas...",50PZ750,lg infinia 50 1080p 600hz plasma hdtv 50pz750
1622,newegg.com,"""Panasonic Viera 42"""" Class (41.6"""" Diag.) 108...",TC-P42S30,panasonic viera 42 class 416 diag 1080p 600hz ...


Collect Shingles List

In [0]:
config ={
w         'shingle': True,
        'strip': True,
        'k': 5,
    }

In [0]:
def preprocess(text,params):
    shingle = params.get("shingle", False)
    skip_cleaning =  params.get("skip_cleaning", False) 
    text = clean_text(text)
    if not shingle:
        tokens = text.split()
    else:
        k = params['k']
        tokens = to_shingle(text, k, params['strip'])
    return tokens

def to_shingle(text: str, k: int = 3, strip=True):
  if strip:
      text = re.compile(r' +').sub('', text)
  shingles = []
  limit = len(text)-k+1
  for i in range(limit):
      shingles += [text[i:i+k]]
  return shingles

def shingling(data:List,params):
    shingles = []
    for text in data:
        tokens = preprocess(text,params)
        shingles.append(tokens)
    return shingles

shingles = shingling(df_input['text'],config)

Construct model words

In [0]:
def extract_model_words(shingle_sets):
  full_set = {item for set_ in shingle_sets for item in set_}
  model_words = {}
  for i, shingle in enumerate(list(full_set)):
      model_words[shingle] = i
  return model_words

In [0]:
model_words = extract_model_words(shingles)
len(model_words)

after generating model words, use model words and shingles list to build Binary Vectors

In [0]:
def one_hot_encoder(shingles: set, model_words: dict):
    vec = np.zeros(len(model_words))
    for shingle in shingles:
        index = model_words[shingle]
        vec[index] = 1
    return vec

In [0]:
binary_vectors = []
for shingle in shingles:
    binary_vectors.append(one_hot_encoder(shingle, model_words))

binary_vectors = np.stack(binary_vectors)
binary_vectors.shape

Binary Vectors to Signature Matrix

In [0]:
def minhash_permutation_rand(model_words: dict,perms):
    length = len(model_words.keys())
    row_hash = np.zeros((perms, length))
    for i in range(perms):
        permutation = np.random.permutation(len(model_words)) + 1
        row_hash[i,:] = permutation.copy()
    return row_hash.astype(int)

row_hash = minhash_permutation_rand(model_words, perms= 6000)


In [0]:
def get_signature(minhash, vector):
    # get index of 1 value in vector
    index = np.nonzero(vector)[0].tolist()
    shingles = minhash[:, index]
    # find minimum value in each hash vector
    signature = np.min(shingles, axis=1)
    return signature

In [0]:
signatures = []
for binary_vector in binary_vectors:
     signatures.append(get_signature(row_hash, binary_vector))

# merge signatures into single array
signature_matrix = np.transpose(np.stack(signatures))
signature_matrix.shape

In [0]:
def candidate_pairs(signature_matrix, b, r):
    #n: number of perms (hash functions)
    n,d = signature_matrix.shape
    assert(n==b*r)
    hashbuckets = collections.defaultdict(set)
    bands = np.array_split(signature_matrix, b, axis=0)
    
    for i,band in enumerate(bands):
        for j in range(d):
            band_id = tuple(list(band[:,j])+[str(i)])
            hashbuckets[band_id].add(j)
            
    candidate_pairs = set()
    for bucket in hashbuckets.values():
        if len(bucket) > 1:
            for pair in itertools.combinations(bucket, 2):
                candidate_pairs.add(pair)
    return candidate_pairs
                
def lsh_pairs(signature_matrix,candidate_pairs,t):
    lsh_pairs = set()
    for (i, j) in candidate_pairs:
      set_i = set(signature_matrix[:,i].flatten())
      set_j = set(signature_matrix[:,j].flatten())
      if jaccard(set_i,set_j) > t:
           lsh_pairs.add((i, j))
    return lsh_pairs

Fraction of comparisons

In [0]:
count_possible_comparisons = candidate_pairs(signature_matrix,1000,6)
len(count_possible_comparisons)

In [0]:
count_comparisons_made     = lsh_pairs(signature_matrix,count_possible_comparisons,0.4)
len(count_comparisons_made)

In [0]:
fraction_comparisons = len(count_comparisons_made)/len(count_possible_comparisons)
print(fraction_comparisons)

output of LSH

In [0]:
def create_output_dataframe(df_input,count_comparisons_made):
    df_input['row_index'] = np.arange(len(df_input))
    df_a = pd.DataFrame()
    df_b = pd.DataFrame()
    for a,b in count_comparisons_made:
        df_temp_a = df_input[df_input['row_index'] == a]
        df_temp_b = df_input[df_input['row_index'] == b]
        df_a = df_a.append(df_temp_a)
        df_b = df_b.append(df_temp_b)

    df_a = df_a.add_prefix('left_')
    df_b = df_b.add_prefix('right_')

    df_a.reset_index(drop=True, inplace=True)
    df_b.reset_index(drop=True, inplace=True)
    df_output = pd.concat([df_a, df_b], axis=1)
    df_output["isDup"] = df_output.apply(lambda x: correctDuplicates(x["left_modelID"], x["right_modelID"]), axis = 1)
    
    return df_output

In [0]:
df_output = create_output_dataframe(df_input,count_comparisons_made)

In [0]:
df_output.groupby(['isDup']).count()

Unnamed: 0_level_0,left_shop,left_title,left_modelID,left_text,left_row_index,right_shop,right_title,right_modelID,right_text,right_row_index
isDup,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
0,23604,23604,23604,23604,23604,23604,23604,23604,23604,23604
1,112,112,112,112,112,112,112,112,112,112


In [0]:
, def minhash_jaccard_similarity(text1: str, text2: str):
    list1 = text1.split()
    list2 = text2.split()
    intersection = len(list(set(list1).intersection(list2)))
    union = (len(list1) + len(list2)) - intersection    
    return float(intersection) / union                                                                                      

In [0]:
def xgboost(df_output):
  df_output = df_output[['left_text','right_text','isDup']]
  df_output['len_left_text'] = df_output.left_text.apply(lambda x: len(str(x)))
  df_output['len_right_text'] = df_output.right_text.apply(lambda x: len(str(x)))
  df_output['diff_len'] = df_output.len_left_text - df_output.len_right_text
  df_output['len_char_left_text'] = df_output.left_text.apply(lambda x: len(''.join(set(str(x).replace(' ', '')))))
  df_output['len_char_right_text'] = df_output.right_text.apply(lambda x: len(''.join(set(str(x).replace(' ', '')))))
  df_output['len_word_left_text'] = df_output.left_text.apply(lambda x: len(str(x).split()))
  df_output['len_word_right_text'] = df_output.right_text.apply(lambda x: len(str(x).split()))
  df_output['common_words'] = df_output.apply(lambda x: len(set(str(x['left_text']).lower().split()).intersection(set(str(x['right_text']).lower().split()))), axis=1)
  df_output = df_output.reset_index()
  df_output = df_output[~df_output.isin([np.inf, -np.inf])]
  df_output['minhash_jaccard'] = df_output.apply(lambda x: minhash_jaccard_similarity(x["left_text"],x["right_text"]),axis = 1)
  df_output.drop(["left_text", "right_text"], axis=1, inplace=True)
  X = df_output.loc[:, df_output.columns != 'isDup']
  y = df_output.loc[:, df_output.columns == 'isDup']
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=1, stratify=y)
  model = xgb.XGBClassifier(max_depth=50, n_estimators=80, learning_rate=0.1, colsample_bytree=.7, gamma=0, reg_alpha=4, objective='binary:logistic', eta=0.3, silent=1, subsample=0.8).fit(X_train, y_train.values.ravel()) 
  model.fit(X_train,y_train)
  prediction = model.predict(X_test)
  cm=confusion_matrix(y_test, prediction)
  duplicates = cm.sum(axis=1) - np.diag(cm)
  duplicates_found = np.diag(cm)
  return duplicates, duplicates_found

In [0]:
a,b,c = bootstrap(df_input)

In [0]:
print(a)