In [1]:
import pymongo
import pandas as pd
import io
from PIL import Image
import os
from SETTINGS import *
import matplotlib.pyplot as plt
import seaborn
%matplotlib inline
from itertools import combinations
import os
from externals.hungarian import HungarianError
from fuzzywuzzy import fuzz

In [2]:
connection = pymongo.MongoClient()
db = connection[name_source_database]
collection = db[name_image_file_lookup]

In [3]:
projections = {k:1 for k in ['_id', 'count', 'filename','hash', 'labelAnnotations']}

In [4]:
image_tag_pipeline = [{'$match':{'_id':{'$exists':1}}}, {'$project':projections}]
image_tag_cursor = collection.aggregate(pipeline=image_tag_pipeline, allowDiskUse=True)

df = pd.DataFrame(image_tag_cursor)

In [5]:
df = df[~df.labelAnnotations.isna()]

In [6]:
df.head()

Unnamed: 0,_id,count,filename,hash,labelAnnotations
0,http://pbs.twimg.com/media/BYPJ4QTCEAArCA7.jpg,1,00000.jpg,94985474d0f0b2f2,"[{'mid': '/m/01mqdt', 'description': 'Traffic ..."
1,http://pbs.twimg.com/media/C-ZZ2hsW0AAGJ4y.jpg,1,00001.jpg,f0ac7c3c3250b2b2,"[{'mid': '/m/05ws7', 'description': 'Police', ..."
2,http://pbs.twimg.com/media/C-Zb4ZqW0AARyIy.jpg,1,00002.jpg,156b665773ebfcde,"[{'mid': '/m/07bsy', 'description': 'Transport..."
4,http://pbs.twimg.com/media/C3HKX9JXgAAoyJe.jpg,1,01046.jpg,557525656565059b,"[{'mid': '/m/03scnj', 'description': 'Line', '..."
5,http://pbs.twimg.com/media/C3HKdBAXgAEVZ-g.jpg,1,01045.jpg,220e181a2a0e0f17,"[{'mid': '/m/07s6nbt', 'description': 'Text', ..."


In [7]:
df['tag_list'] = df.labelAnnotations.apply(lambda x: [entry['description'] for entry in x])

In [8]:

from externals.measures import AverageJaccard, RankingSetAgreement

metric = AverageJaccard()
matcher = RankingSetAgreement(metric)


rank_scores = pd.DataFrame([(a,b) for a,b in combinations(df.filename, r=2)], columns=['a','b'])

rank_scores = rank_scores.merge(df[['filename','tag_list','hash']], left_on='a', right_on='filename').rename(columns={'tag_list':'a_list','hash':'a_hash'}).drop(columns=['filename'])
rank_scores = rank_scores.merge(df[['filename','tag_list','hash']], left_on='b', right_on='filename').rename(columns={'tag_list':'b_list','hash':'b_hash'}).drop(columns=['filename'])



In [9]:
rank_scores.head()

Unnamed: 0,a,b,a_list,a_hash,b_list,b_hash
0,00000.jpg,00001.jpg,"[Traffic sign, Signage, Sign, Motor vehicle, S...",94985474d0f0b2f2,"[Police, Event, Alcohol, Vehicle, Crowd, Offic...",f0ac7c3c3250b2b2
1,00000.jpg,00002.jpg,"[Traffic sign, Signage, Sign, Motor vehicle, S...",94985474d0f0b2f2,"[Transport, Vehicle, Mode of transport, Crane,...",156b665773ebfcde
2,00001.jpg,00002.jpg,"[Police, Event, Alcohol, Vehicle, Crowd, Offic...",f0ac7c3c3250b2b2,"[Transport, Vehicle, Mode of transport, Crane,...",156b665773ebfcde
3,00000.jpg,01046.jpg,"[Traffic sign, Signage, Sign, Motor vehicle, S...",94985474d0f0b2f2,"[Line, Parallel, Machine, Scientific instrument]",557525656565059b
4,00001.jpg,01046.jpg,"[Police, Event, Alcohol, Vehicle, Crowd, Offic...",f0ac7c3c3250b2b2,"[Line, Parallel, Machine, Scientific instrument]",557525656565059b


In [10]:
collection = db['file_similarity']

In [19]:
existing_comparisons_pipeline = [{'$match':{'_id':{'$exists':1}}}, {'$project':{'_id':1}}]
existing_comparisons_cursor = collection.aggregate(pipeline=existing_comparisons_pipeline)
existing_comparisons = set([x['_id'] for x in existing_comparisons_cursor])

In [21]:

# PRIME THE DB
package = []
for i, row in rank_scores.iterrows():
    _id_string = f"{row['a']}_{row['b']}"
    if _id_string not in existing_comparisons:
        request = pymongo.InsertOne({'_id':_id_string})
        package.append(request)


In [23]:
collection.bulk_write(package, ordered=False)

<pymongo.results.BulkWriteResult at 0x13a641f48>

In [24]:
# SCORE THE LISTS and HASHES
SCORE_ON_ERROR = 0 # rank score is max 1, score >1 == error


def scorer(chunk):
    collection = pymongo.MongoClient()[name_source_database]['file_similarity']
    rank_scores = []
    for record in chunk:
        split_id = record['_id'].split('_')
        filename_a,filename_b = split_id[0], split_id[1]
        list_a, list_b = tag_lookup[filename_a], tag_lookup[filename_b]
        hash_a, hash_b = hash_lookup[filename_a], hash_lookup[filename_b]

        try:
            rank_score = matcher.similarity(list_a,list_b)
        except HungarianError:
            rank_score = SCORE_ON_ERROR
        hash_score = fuzz.ratio(hash_a, hash_b) / 100
        avg_score = (rank_score + hash_score) /2

        update = pymongo.UpdateOne(filter= {'_id':record['_id']}, update={'$set':
                                                                          {'hash_score':hash_score,
                                                                           'rank_score':rank_score,
                                                                           'avg_score': avg_score}}, upsert=False)
        rank_scores.append(update)
    collection.bulk_write(rank_scores, ordered=False)
    rank_scores.clear()
    return

tag_lookup = df.set_index('filename')[['tag_list']].tag_list.to_dict()
hash_lookup = df.set_index('filename')[['hash']].hash.to_dict()


# iterate through DB 


iterate_pipeline = [{'$match':{'avg_score':{'$exists':0}}}]
iterate_cursor = collection.aggregate(pipeline=iterate_pipeline, allowDiskUse=True)

In [25]:
from multiprocessing import Pool, cpu_count
from functions import chunks

In [26]:
cpus = int(cpu_count() /2)
cpus

4

In [27]:
todo = list(iterate_cursor)

In [28]:
len(todo)

306469

In [29]:
with Pool(6) as pool:
    res = pool.map(scorer, chunks(todo, 1000))