In [1]:
# libraries
import os
from datetime import datetime
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from PIL import Image
import urllib
from random import choices
from itertools import chain
# Levenshtein Distance in Python
import textdistance
import re
import unicodedata
from rank_bm25 import BM25Okapi, BM25L
# https://github.com/seatgeek/thefuzz
from thefuzz import fuzz, process

# Matplotlib configuration
font = { 'family': 'DejaVu Sans', 'weight': 'bold', 'size': 16 }
plt.rc('font', **font)

# Pandas config
pd.options.mode.chained_assignment = None  # default='warn'

In [2]:
# set random seed
np.random.seed(seed=42)

### Offers training

In [3]:
offers_training_df = pd.read_parquet('offers_training.parquet')
offers_test_df = pd.read_parquet('offers_test.parquet')

## Brand analysis

### Brand text processing
- Lower case text
- accented vowels removal `è, é, ... -> e`

### Class for brand management

In [4]:
brands_training = offers_training_df['brand'].unique()
brands_test = offers_test_df['brand'].unique()

In [5]:
def similarity(str_1, str_2):
    return textdistance.levenshtein.normalized_similarity(str_1, str_2)

In [6]:
class Brand:
    def __init__(self, name, parent=None, child=list()):
        self.name = name.lower().title()
        self.parent = parent
        self.child = child
    
    def simplify(self, text):
        try:
            text = unicode(text, 'utf-8')
        except NameError:
            pass
        text = unicodedata.normalize('NFD', text).encode('ascii', 'ignore').decode("utf-8")
        return str(text)
    
    def processed_text(self):
        #lower
        processed = self.name.lower()
        #remove accents
        processed = self.simplify(processed)
        #remove special characters
        processed = ''.join(c if c.isalnum() or c in [' ', '-', '.'] else ' ' for c in processed)
        #remove unnecessary double spaces
        processed = re.sub(' +', ' ', processed)
        return processed

In [7]:
class BrandCollection:
    def __init__(self, brand_list):
        self.brands = {}
        self.brand_family = {}
        brand_list.sort()
        for el in brand_list:
            self.process_brand(Brand(el))
    
    def process_brand(self, brand):
        if brand.processed_text() not in self.brands:
            parent_likelyhood = 0
            likely_parent = None
            for k in self.brands.keys():
                comp = self.brands[k]
                l_comp = self.listify(comp, brand)
                l_brand = self.listify(brand, comp)
                calc_likelyhood = self.parent_likelyhood(l_comp, l_brand)
                if calc_likelyhood > parent_likelyhood:
                    parent_likelyhood = calc_likelyhood
                    brand.parent = comp
                    likely_parent = self.brands[comp.processed_text()]
            self.brands[brand.processed_text()] = brand
            self.brand_family[brand] = []
            if likely_parent is not None:
                self.brand_family[likely_parent].append(brand)
    
    def listify(self, brand, to_compare):
        l_brand = brand.processed_text().split()
        l_comp = to_compare.processed_text().split()
        max_len = len(l_brand) if len(l_brand) > len(l_comp) else len(l_comp)
        if len(l_brand) == max_len:
            return l_brand
        for i in range(max_len - len(l_brand)):
            l_brand += ['']
        return l_brand
    
    def parent_likelyhood(self, l_comp, l_brand):
        likelyhood = 0
        #check longest matching n-gram, does not check for combinations
        for i in range(len(l_comp)):
            if l_comp[i] == l_brand[i]:
                likelyhood += 1
            else:
                return likelyhood
        return likelyhood
    
    def similarity(self, str_1, str_2):
        return textdistance.levenshtein.normalized_similarity(str_1, str_2)
    
    def get_match(self, brand_query):
        brand_to_search = Brand(brand_query)
        if brand_to_search.processed_text() in self.brands:
            brands = self.get_brand_family(brand_to_search.processed_text())
            return brands, 1
        else:
            relevance = 0
            most_relevant = '-'
            for key in self.brands.keys():
                sim = self.similarity(brand_to_search.processed_text(), self.brands[key].processed_text())
                if sim > relevance:
                    relevance = sim
                    most_relevant = self.brands[key].processed_text()
            brands = self.get_brand_family(most_relevant)
            return brands, relevance
            
    def get_brand_family(self, brand_name):
        fam_list = [brand_name]
        family = []
        while len(fam_list) != 0:
            current = fam_list[0]
            fam_list += [b.processed_text() for b in self.brand_family[self.brands[current]]]
            fam_list.remove(current)
            if current not in family:
                family += [current]
        return family             

In [8]:
bc = BrandCollection(list(brands_test))

In [9]:
for el in list(list(brands_test)):
    bcr, r = bc.get_match(el)
    #print(el, bcr, r)
bc.get_match('michael michael kors')

(['michael michael kors', 'michael kors', 'michael kors plus'], 1)

### Product name matching VSM - BM25

In [10]:
class BM25Matcher:
    def __init__(self, titles_list):
        self.corpus = self.process_list(titles_list)
        self.matcher = BM25L(self.corpus) #list form
        
    def process_list(self, titles_list):
        return [self.processed_text(text).split() for text in titles_list]
    
    def processed_text(self, text):
        #lower
        processed = text.lower()
        #remove accents
        processed = self.simplify(processed)
        #remove special characters
        processed = ''.join(c if c.isalnum() or c == ' ' else ' ' for c in processed)
        #remove unnecessary double spaces
        processed = re.sub(' +', ' ', processed)
        return processed
    
    def simplify(self, text):
        try:
            text = unicode(text, 'utf-8')
        except NameError:
            pass
        text = unicodedata.normalize('NFD', text).encode('ascii', 'ignore').decode("utf-8")
        return str(text)
    
    def get_n_results(self, query, n=1000):
        tokenized_query = self.processed_text(query).split()
        results = self.matcher.get_top_n(tokenized_query, self.corpus, n)
        return [' '.join(el) for el in results]
    
    def get_relevant_results(self, query):
        scores = self.matcher.get_scores(query)
        top_n = np.argsort(scores)[::-1][:]
        print(len([el for el in top_n]))
        return [{' '.join(doc), rel} for doc, rel in zip(self.corpus, scores) if rel > 0]

In [23]:
class BM25Z(BM25L):
    def __init__(self, corpus):
        super().__init__(self.process_list(corpus))
        self.corpus = self.process_list(corpus)
    def process_list(self, titles_list):
        return [self.processed_text(text).split() for text in titles_list]
    def processed_text(self, text):
        #lower
        processed = text.lower()
        #remove accents
        processed = self.simplify(processed)
        #remove special characters
        processed = ''.join(c if c.isalnum() or c == ' ' else ' ' for c in processed)
        #remove unnecessary double spaces
        processed = re.sub(' +', ' ', processed)
        return processed
    def simplify(self, text):
        try:
            text = unicode(text, 'utf-8')
        except NameError:
            pass
        text = unicodedata.normalize('NFD', text).encode('ascii', 'ignore').decode("utf-8")
        return str(text)
    def get_corpus_str(self):
        return [''.join(el) for el in self.corpus]
    def get_corpus_scores(self, query):
        return [{'text': x, 'score' :y} for x, y in zip(self.get_corpus_str(), self.get_scores(query))]
    def get_relevant_results(self, query):
        query = self.processed_text(query).split()
        print(query)
        res = [el for el in self.get_corpus_scores(query) if el['score'] > 0]
        res = sorted(res, key=lambda x: x['score'], reverse=True)
        return res

In [24]:
zalando_prod_training = offers_training_df.loc[offers_training_df['shop'] == 'zalando']
aboutyou_prod_training = offers_training_df.loc[offers_training_df['shop'] == 'aboutyou']
fields = ['shop','offer_id', 'brand', 'title']
#aboutyou_prod_training[fields]
brand_collection = BrandCollection(list(zalando_prod_training['brand'].unique()))
#brand_collection.get_match('pieces')
prod_to_match = aboutyou_prod_training.loc[0]

#get subset of possible brands to investigate
match_brands, rel = brand_collection.get_match(prod_to_match['brand'])
aboutyou_prod_brand_match = aboutyou_prod_training\
                            .loc[aboutyou_prod_training['brand']\
                            .apply(lambda x: Brand(x).processed_text())\
                            .isin(match_brands)]
#aboutyou_prod_brand_match
bm25matcher = BM25Z(list(aboutyou_prod_brand_match['title']))
#aboutyou_prod_title_match = 
bm25matcher.get_relevant_results(prod_to_match['title'])
bm25matcher.get_corpus_scores(prod_to_match['title'].lower())



['kleid']


[{'text': 'kleid', 'score': 0.0},
 {'text': 'kleid', 'score': 0.0},
 {'text': 'top', 'score': 0.0},
 {'text': 'kleidadua', 'score': 0.0},
 {'text': 'weste', 'score': 0.0},
 {'text': 'hakelschal', 'score': 0.0},
 {'text': 'hosepip', 'score': 0.0},
 {'text': 'jeansdelly', 'score': 0.0},
 {'text': 'blusenkleid', 'score': 0.0},
 {'text': 'blumenprintkleid', 'score': 0.0},
 {'text': 'hosetoppy', 'score': 0.0},
 {'text': 'tshirtrina', 'score': 0.0},
 {'text': 'midikleid', 'score': 0.0},
 {'text': 'schalerikka', 'score': 0.0},
 {'text': 'bluse', 'score': 0.0},
 {'text': 'hosebastiana', 'score': 0.0},
 {'text': 'minirock', 'score': 0.0},
 {'text': 'printleggings', 'score': 0.0},
 {'text': 'blusenikki', 'score': 0.0},
 {'text': 'pulloverkavala', 'score': 0.0},
 {'text': 'rockkeyla', 'score': 0.0},
 {'text': 'bluse', 'score': 0.0},
 {'text': 'rockgiana', 'score': 0.0},
 {'text': 'shirt', 'score': 0.0},
 {'text': 'pullovercrista', 'score': 0.0},
 {'text': 'hosepam', 'score': 0.0},
 {'text': 'klei