In [1]:
import sys
import operator
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import re
import unicodedata
from operator import add
from itertools import combinations

In [2]:
spark = SparkSession \
            .builder \
            .appName("clustering") \
            .getOrCreate()
sc = spark.sparkContext

In [3]:
data = spark.read.csv('inspections.csv', header = True)

In [4]:
names = data.rdd.map(lambda x: x['facility_name']).distinct()\
                .map(lambda x: (x, x.strip()))\
                .map(lambda x: (x[0], x[1].lower()))\
                .map(lambda x: (x[0], re.sub(r'[^\w\s]','', x[1])))\
                .map(lambda x: (x[0], unicodedata.normalize('NFC',x[1])))\
                .map(lambda x: (x[0], x[1].split(' ')))\
                .map(lambda x: (x[0], list(filter(None, x[1])))).cache()

In [5]:
print(names.take(10))

[('POLLEN', ['pollen']), ('EMC RESTAURANT GROUP LLC', ['emc', 'restaurant', 'group', 'llc']), ('7 ELEVEN #27069', ['7', 'eleven', '27069']), ('MURAKAMI', ['murakami']), ('ORANGE TREE CAFE', ['orange', 'tree', 'cafe']), ("CAMY'S GRILL", ['camys', 'grill']), ('SABORES DEL PERU', ['sabores', 'del', 'peru']), ('LA INDIA BONITA CAFE', ['la', 'india', 'bonita', 'cafe']), ('BEVERLY FALAFEL', ['beverly', 'falafel']), ('HOT THAI RESTAURANT', ['hot', 'thai', 'restaurant'])]


In [5]:
all_names=names.collect()
name_dict ={}
for each in all_names:
    name_dict[each[0]]=each[1]

In [6]:
tokens = names.map(lambda x: x[1]).collect()

In [7]:
shingles = []
for each in tokens:
    shingles.extend(each)
shingles = sorted(list(set(shingles)))
rep_shingles = {}
i = 1
for each in shingles:
    rep_shingles[each] = i
    i = i+1

In [8]:
print(len(shingles))

9166


In [23]:
print(rep_shingles)

{'001': 1, '0026': 2, '0027': 3, '003': 4, '004554': 5, '007': 6, '0078': 7, '013': 8, '016': 9, '026176': 10, '033': 11, '0375': 12, '0409': 13, '041': 14, '0410': 15, '041500': 16, '051': 17, '05423': 18, '05425': 19, '05426': 20, '05430': 21, '05434': 22, '05435': 23, '05436': 24, '05456': 25, '05457': 26, '05458': 27, '05879': 28, '059': 29, '06023': 30, '06211': 31, '063': 32, '06516': 33, '06854': 34, '06931': 35, '06976': 36, '07016': 37, '07036': 38, '0717': 39, '07482': 40, '083': 41, '0876': 42, '09136': 43, '09144': 44, '09285': 45, '09563': 46, '09985': 47, '1': 48, '10': 49, '100': 50, '1000': 51, '1003': 52, '10070': 53, '101': 54, '10107': 55, '10129': 56, '102': 57, '1021': 58, '10291': 59, '10292': 60, '10295': 61, '10296': 62, '103': 63, '10309': 64, '1039': 65, '103rd': 66, '104': 67, '10433': 68, '10469': 69, '105': 70, '10509': 71, '10586': 72, '105meat': 73, '106': 74, '1061': 75, '10614': 76, '1064': 77, '10663': 78, '107': 79, '10724': 80, '10734': 81, '10791': 

In [9]:
trans_names = names.mapValues(lambda x: [rep_shingles[i] for i in x]).cache()

In [9]:
print(trans_names.take(20))

[('POLLEN', [6912]), ('EMC RESTAURANT GROUP LLC', [3518, 7224, 4241, 5439]), ('7 ELEVEN #27069', [1020, 3494, 481]), ('MURAKAMI', [6142]), ('ORANGE TREE CAFE', [6486, 8533, 2305]), ("CAMY'S GRILL", [2357, 4219]), ('SABORES DEL PERU', [7444, 3172, 6754]), ('LA INDIA BONITA CAFE', [5233, 4693, 2078, 2305]), ('BEVERLY FALAFEL', [1932, 3673]), ('HOT THAI RESTAURANT', [4574, 8350, 7224]), ('LA MONARCA BAKERY', [5233, 6062, 1694]), ('KALBIS KOREAN GRILL', [5019, 5185, 4219]), ('CALIFORNIA MEXICAN GRILL', [2327, 5902, 4219]), ('ETCHEA', [3608]), ('J & F ICE CREAM SHOP', [4773, 3650, 4651, 3004, 7729]), ('DONUT FACTORY', [3332, 3661]), ("JOHNNIE'S NEW YORK PIZZA", [4912, 6266, 9086, 6850]), ('ROSE GARDEN CAFE', [7376, 4004, 2305]), ('SHIN KANG SEO MYUN OAK', [7717, 5025, 7638, 6160, 6383]), ("MCDONALD'S #2084", [5819, 333])]


In [10]:
def min_hash(values):
    sign = []
    for i in range(1, 101):
        hash_list = []
        for v in values:
            h_value = (2*v + 11*i) % 100
            hash_list.append(h_value)
        sign.append(min(hash_list))
    return sign
def band(item):
    band = []
    for i in range(0, 2):
        band.append((i, (item[0], item[1][i])))
    return band



In [11]:
def cluster(values):
    Output = {} 
    for x, y in values: 
        if tuple(y) in Output: 
            Output[tuple(y)].append((x)) 
        else: 
            Output[tuple(y)] = [(x)]
    return Output

In [12]:
def clean(l):
    output = []
    for each in l:
        if len(each[1])>=2:
            output.extend(each(1))
    return output

In [13]:
signiture = trans_names.mapValues(lambda x: min_hash(x))\
                        .mapValues(lambda x: [x[i:i+50] for i in range(0, 100, 50)])\
                        .flatMap(lambda x: band(x))\
                        .groupByKey()\
                        .map(lambda x: (x[0],list(x[1])))\
                        .mapValues(lambda x: cluster(x))\
                        .map(lambda x: list(x[1].items())).cache()

In [14]:
candidates = signiture.collect()
cand = []
for each in candidates:
    cand.extend(each)
candidate = []
for each in cand:
    if len(each[1])>=2:
        candidate.append(each[1])
print(candidate)

[['POLLEN', 'SUN-E-SIDE', 'OMSB', 'CHEZ-ANTOINE', 'LAX-C', 'IHOP', 'APETIT', 'YUPDDUK', 'BALCONY', 'SASSAFRAS', 'XPRESS', 'RHYTHM ROOM', 'NEUEHOUSE', 'SHIBUMI', 'ORSA-WINSTON', 'ROAST'], ['MURAKAMI', 'MOOPONGRI', 'HARIKAWA', 'CRYING TIGER', "PIRATA'S BAKERY", 'CHEGO', 'AVENTINE', 'ACCENTS', 'PERCOLATE', "MATTEO'S", 'DOS AGUILAS BAKERY', 'KOTOYA', 'NEAT', 'SHU', 'D  SHU', "REILLY'S", 'TEAVANA', 'PARAO', 'SCHWARTZ BAKERY', 'SQUAREFRITZ', "O'WOKS", 'YAYO TACO', 'CYNTHIAS', 'KMART', 'D. BISTROS', 'VGTA', 'OLDFIELDS', 'ARCO', 'GULFSTREAM', 'PUNCHBOWL'], ['ORANGE TREE CAFE', 'SMART & FINAL #735'], ["CAMY'S GRILL", 'ASTRO LIQUOR', 'MI CIUDAD', "LUCIA'S DISCOUNT STORE", 'PACIFIC LIQUOR', 'BILLS LIQUOR STORE'], ['BEVERLY FALAFEL', 'FALAFEL ARAX', 'OSTERIA MAMMA', 'O T LIQUORS', 'MALI HOM', 'SUNSET MOBIL', 'MR SUBMARINE'], ['HOT THAI RESTAURANT', 'PIZZA EIGHT', 'RENAISSANCE RESTAURANT', 'LUPITAS RESTAURANT', 'SUNSHINE THAI RESTAURANT', 'HIT BARGAIN', 'THAI PATIO RESTAURANT', 'KUSINA FILIPINA', '

In [109]:
print(len(candidate))

3259


In [15]:
def similarity(values):
    result = []
    permutations = list(combinations(values, 2))
    for each in permutations:
        u1 = each[0]
        u2 = each[1]
        m1 = name_dict[u1]
        m2 = name_dict[u2]
        inter = set(m1).intersection(set(m2))
        union = set(m1).union(set(m1))
        simi = float(len(inter)/len(union))
        if simi>=0.9:
            result.append(each)
    return result

In [16]:
sc_cand = sc.parallelize(candidate).map(lambda x: similarity(x))

In [25]:
results = sc_cand.collect()
result = []
for each in results:
    if len(each)>0:
        result.append(each)
output = []
for i in result:
    clusters = []
    for each in i:
        clusters.append(each[0])
        clusters.append(each[1])
    clusters = list(set(clusters))
    output.append(clusters)
output2 = list(set([tuple(t) for t in output]))

In [26]:
output2

[('K.T. LIQUOR MARKET', 'KT LIQUOR MARKET'),
 ('23RD ST CAFE', '23RD ST. CAFE'),
 ("LUPITA'S MINI MARKET", 'LUPITAS MINI MARKET'),
 ("CARL'S JR #422", 'CARLS JR #422'),
 ("CARL'S JR #102", "CARL'S JR", "CARL'S JR."),
 ("FRANK'S LIQUOR", 'FRANKS LIQUOR & MARKET'),
 ('ANS LIQUOR', 'ANS LIQUOR MARKET'),
 ('REDS LIQUOR', "MCDONALD'S", "RED'S LIQUOR", "MCDONALD'S #5806"),
 ('NUMERO UNO MARKET #103', 'NUMERO UNO MARKET'),
 ("WAHOO'S FISH TACO", 'WAHOOS FISH TACO'),
 ('J & P MARKET', 'P & J MARKET'),
 ('THE DRAGON', 'THE TWIN DRAGON'),
 ('NATURES SUNSHINE', "NATURE'S SUNSHINE"),
 ("MARILYN'S DISCOUNT STORE",
  "JOE'S DISCOUNT STORE",
  'DISCOUNT STORE',
  'MEXICO DISCOUNT STORE'),
 ("IRENE'S LIQUOR", "AL'S LIQUOR", 'ALS LIQUOR', "IRENE'S LIQUOR MARKET"),
 ('DOLLAR TREE STORES, INC', 'DOLLAR TREE STORES , INC .'),
 ('TACOS LOS 4 CARNALES', 'TACOS LOS CARNALES'),
 ("GELSON'S MARKET", "GELSON'S MARKET #394"),
 ('DONUT KING', 'KING DONUT'),
 ('CARLS JR #321', "CARL'S JR #321"),
 ('EAST LA MARKET'

In [27]:
print(len(output2))

197


In [28]:
import csv
with open('LA_token_lsh.csv', "w") as f:
    w = csv.writer(f)
    w.writerows(output2)
    