In [None]:
# Optimize Hamming Distance calculation - avoid using 2 for loops
# Retain nos. of items (change the index in hashed RDD)
# Find actual cosine similarity and compare it with the cosine similarity obtained from LSH

In [2]:
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg import Matrices
from random import gauss
import math

np.random.seed(10)
count_users = 0

In [39]:
# Original data item : (user, rating)
def split_data(x):
    sp_x = x.split(",")
    return int(sp_x[1]), (int(sp_x[0]), float(sp_x[2]))

# data = sc.textFile("/Users/mrunmayee/AMLProject/Data/train_0_sub_1mil.txt").repartition(10)
# data = sc.textFile("/Users/mrunmayee/AdvancedML/Data_AML/netflix_data/test.txt")
data = sc.textFile("/Users/mrunmayee/AdvancedML/Data_AML/netflix_data/training_subset.txt")
or_data = data.map(lambda x: split_data(x))
#print or_data.collect()

In [4]:
#s = or_data.reduceByKey(lambda x, y: [x] + [y])
#print s.collect()
    
# m = s.map(lambda x: sort(x))

In [40]:
# Convert values to list
sp_data = or_data.map(lambda x: ((x[0]), ([x[1][0]], [x[1][1]])))
# print sp_data.collect()

In [41]:
# Convert to a sparse vector format
def conv(x, y):
    return x[0] + y[0], x[1] + y[1]

sp_format = sp_data.reduceByKey(lambda x, y: conv(x, y))
# print sp_format.collect()

In [42]:
# Count the number of users
cn = sp_format.flatMap(lambda x: x[1][0]).distinct().cache()
count_users = cn.count()
sorted_indices = sorted(cn.collect())


print count_users

1667


In [43]:
# Convert to a sparse vector format by sorting the indices
def sort_index(x):
    x = list(x)
    m = sorted(x[0])
    n = [x[1] for (x[0],x[1]) in sorted(zip(x[0],x[1]))]
    return m, n
    
sv = sp_format.mapValues(lambda x: sort_index(x))
#sv.collect()

In [44]:
# Create sparse vectors 'sp_vecs' is an RDD of sparse vectors
def conv_sv(x):
    return Vectors.sparse(count_users, x[1][0], x[1][1])
    
sp_vecs = sv.map(lambda x: conv_sv(x))
# sv.collect()

In [64]:
# Item ids
item_ids = sv.map(lambda x: x[0]).collect()
# print item_ids

In [49]:
# Total no. of users = 147371
print count_users
# Total no. of items = 125951
len_items = sp_vecs.count()
print len_items

1667
1000


In [50]:
# This function creates random vectors with a norm = 1 and dimension = no. of users
def make_rand_vector(dims):
    vec = [gauss(0, 1) for i in range(dims)]
    mag = sum(x**2 for x in vec) ** .5
    return [x/mag for x in vec]


# Create a list of vectors
list_rand_vecs = []
no_vectors = 10
for i in xrange(0, no_vectors):
    list_rand_vecs.append(Vectors.sparse(count_users, sorted_indices, make_rand_vector(count_users)))

def find_hash(x, list_rand_vecs):
    global ito, it
    ones_zeroes = []
    #it += 1
    # ito += 1
    for i in list_rand_vecs:
        if i.dot(x) >= 0:
            ones_zeroes.append(1)
        else: ones_zeroes.append(0)
    
    # return (it, ones_zeroes)
    return ones_zeroes
            

hashed = sp_vecs.map(lambda x: find_hash(x, list_rand_vecs)).cache()

In [51]:
hashed.collect()[1]

[1, 0, 0, 1, 1, 1, 0, 0, 1, 1]

In [52]:
# hashed_in = sc.parallelize(list(enumerate(hashed.collect(), 1)))
hashed_in = list(enumerate(hashed.collect(), 1))

In [54]:
# Hashed values for item 1
print hashed_in[1][1]

[1, 0, 0, 1, 1, 1, 0, 0, 1, 1]


In [55]:
# Hamming distance gives the probability that the two items are similar
def hamming(x,y):
    """Calculate the Hamming distance between two vectors"""
    assert len(x) == len(y)
    c = 0
    for i in xrange(0, len(x)):
        if x[i] != y[i]:
            c += 1
    return c * 1.0/ len(x)

# Find hamming distance between each pair of items
def group_func(ls):
    ls_tuples = []
    for i in ls:
        for j in ls:
            if j[0] > i[0]:
                ls_tuples.append(((i[0], j[0]), hamming(i[1], j[1])))
    return ls_tuples

ls_hamming = group_func(hashed_in)
for i in xrange(0, 4):
    print ls_hamming[i]

((1, 2), 0.7)
((1, 3), 0.5)
((1, 4), 0.6)
((1, 5), 0.6)


In [57]:
# Find cosine similarity for all pairs of items
import math

cosine_sim = []
for i in xrange(0, len(ls_hamming)):
    cosine_sim.append((ls_hamming[i][0], round(math.cos((1 - ls_hamming[i][1])* math.pi), 4)))

In [58]:
for i in xrange(0, 4):
    print cosine_sim[i]

((1, 2), 0.5878)
((1, 3), 0.0)
((1, 4), 0.309)
((1, 5), 0.309)


In [81]:
# Dictionary to store the original item id and the corresponding newly assigned serial no. to it
corr_item_ids = {}
for i in xrange(0, len_items):
    corr_item_ids[item_ids[i]] = i + 1
# print corr_item_ids

In [65]:
# Test file
testdata = sc.textFile("/Users/mrunmayee/AdvancedML/Data_AML/netflix_data/testing_subset.txt")
td = testdata.map(lambda x: split_data(x))

In [80]:
# Convert original item ids to the new ones (1,2,....)
def dict_conv(x, corr_item_ids):
    return (corr_item_ids[x[0]] , (x[1][0], x[1][1]))
    
conv_td = td.map(lambda x: dict_conv(x, corr_item_ids))


In [None]:
# Ignore the code after this

In [None]:
m = Vectors.sparse(sparse_vec.first()[0], sparse_vec.first()[1][0], sparse_vec.first()[1][1])
# print sparse_vec.first()[1][1]
print m
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
sv1.dot(sp_vecs.take(4)[3])

In [None]:
x = ["b", "a", "d", "c"]
y = [3, 2, 1, 4]

m = [x for (y,x) in sorted(zip(y,x))]
print m

In [None]:
s = [4, 3, 2]
''.join([str(x) for x in s])