In [1]:
# Importing the required packages
import sys
from pyspark.sql import SparkSession

from math import sqrt


In [2]:
spark = SparkSession.builder.getOrCreate()

# Checking if spark session is working
df = spark.sql('''select 'spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [3]:
sc = spark.sparkContext

In [4]:
#Importing raw data into an RDD
rdd = sc.textFile("customers_books.txt")

In [5]:
#Check the imported data
rdd.take(5)

['u1:book1', 'u1:book2', 'u1:book2', 'u1:book3', 'u1:book3']

In [6]:
#Step 1. Remove duplicates
rdd_distinct = rdd.distinct()
rdd_distinct.take(5)
rdd_distinct.count()

77

## Step 2 - Create similarity Matrix

In [7]:
#Creating a list of unique users in the data
users = rdd_distinct.map(lambda x: (x.split(":")[0])).distinct().collect()
#users

In [8]:
#Creating a mapper function to generate key value pairs for each record
#Key - Book
#Value - User list with 1 and 0 for purchase or not
#For example (u1:book1) ==> (Book number, user vector) ==> (1, [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0])
def mapper_book_user(x):
    tokens = x.split(":")
    purchase = []
    for user in users:
        if user == tokens[0]:
            purchase.append(1)
        else:
            purchase.append(0)
    return (int(tokens[1][4:]), purchase)

#mapper_book_user("u1:book1")

In [9]:
#Create an rdd after applying the above mapper function
rdd_book_vector = rdd_distinct.map(mapper_book_user)
rdd_book_vector.take(5)

[(2, [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]),
 (3, [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]),
 (4, [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]),
 (5, [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]),
 (0, [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0])]

In [10]:
#Reducer to create one user vector for each book number
def list_add(x,y):
    l = []
    for a,b in zip(x,y):
        l.append(a+b)
    return l

In [11]:
#Create an RDD which has each book and its corresponding aggregated user vector 
rdd_book_vector_f = rdd_book_vector.reduceByKey(lambda a, b: list_add(a, b))
rdd_book_vector_f.take(2)

[(2, [1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0]),
 (4, [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0])]

In [12]:
#Defining a function to calculate phi correlation for two lists
def phi_corr(x, y):
    a = 0
    b = 0
    c = 0
    d = 0
    for i in range(0,len(x)):
        if ((x[i] == y[i]) & (x[i] == 1)):
            a=a+1
        elif ((x[i] == y[i]) & (x[i] == 0)):
            d=d+1
        elif ((x[i] != y[i]) & (x[i] == 1)):
            b=b+1
        else:
            c=c+1
    phi = (a*d - b*c)/sqrt((a+b)*(c+d)*(a+c)*(b+d))

    return phi

In [13]:
#1 sided cartesian product
rdd_book_1side_cartesian = rdd_book_vector_f.cartesian(rdd_book_vector_f).filter(lambda x: x[0][0] < x[1][0])

#calculating phi correlation and part 1 output - (Book1, Book2, phi-correlation)
similarity_matrix = rdd_book_1side_cartesian.map(lambda x: (x[0][0], x[1][0], phi_corr(x[0][1], x[1][1])))


In [14]:
rdd_book_1side_cartesian.take(2)

[((2, [1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0]),
  (4, [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0])),
 ((2, [1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0]),
  (6, [0, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 1]))]

In [15]:
similarity_matrix.take(2)

[(2, 4, 0.3042903097250923), (2, 6, 0.24444444444444444)]

In [16]:
similarity_matrix.collect()

[(2, 4, 0.3042903097250923),
 (2, 6, 0.24444444444444444),
 (2, 10, 0.20672455764868078),
 (2, 12, -0.4472135954999579),
 (2, 16, -0.3373495424699933),
 (2, 18, -0.3721042037676254),
 (2, 22, -0.12171612389003691),
 (0, 2, 0.3892494720807615),
 (0, 4, -0.21320071635561044),
 (4, 6, 0.12171612389003691),
 (4, 10, -0.11322770341445956),
 (4, 12, -0.40824829046386296),
 (4, 16, -0.21320071635561044),
 (0, 6, 0.3373495424699933),
 (0, 10, -0.14484136487558028),
 (0, 12, -0.17407765595569785),
 (0, 16, 0.15151515151515152),
 (4, 18, -0.11322770341445956),
 (4, 22, -0.16666666666666666),
 (0, 18, -0.14484136487558028),
 (0, 22, -0.21320071635561044),
 (6, 10, 0.3721042037676254),
 (6, 12, 0.14907119849998599),
 (6, 16, -0.025949964805384102),
 (10, 12, 0.2773500981126146),
 (10, 16, -0.14484136487558028),
 (12, 16, 0.5222329678670935),
 (6, 18, -0.20672455764868078),
 (6, 22, -0.3042903097250923),
 (10, 18, -0.07692307692307693),
 (10, 22, -0.11322770341445956),
 (12, 18, -0.2773500981126146

### Part 2 - Generate recommendation

In [17]:
# Generating recommendation for 1 book i.e. Book 10
book_number = 21

In [18]:
#Get top two correlated Books
op = similarity_matrix.filter(lambda x: (x[0] == book_number) | (x[1] == book_number)).takeOrdered(2, key = lambda x:-x[2])

In [19]:
op

[(21, 22, 1.0), (21, 23, 0.6793662204867574)]

In [20]:
recs = []
for op in op:
    for i in (0,1):
        if op[i] != book_number:
            recs.append(op[i])

print('Recommendations for book'+str(book_number)+ ' are book'+str(recs[0])+' and book'+str(recs[1]))

Recommendations for book21 are book22 and book23


### Generate all recommendations

In [21]:
#rev_matrix = similarity_matrix.map(lambda x: (x[1],x[0],x[2]))
#all_comb = rev_matrix.union(similarity_matrix)

In [22]:
#Using flat map to generate all combinations
all_comb = similarity_matrix.flatMap(lambda x: ((x[1],x[0],x[2]),(x[0],x[1],x[2])))
all_comb.take(5)

[(4, 2, 0.3042903097250923),
 (2, 4, 0.3042903097250923),
 (6, 2, 0.24444444444444444),
 (2, 6, 0.24444444444444444),
 (10, 2, 0.20672455764868078)]

In [23]:
def select_top_2(x):
    dict_x = dict(x)
    a = sorted(dict_x.items() , reverse=True, key=lambda x: x[1])[0][0]
    b = sorted(dict_x.items() , reverse=True, key=lambda x: x[1])[1][0]
    return (a, b)

In [24]:
all_recommendations = all_comb.map(lambda x: (x[0],(x[1],x[2]))).groupByKey().mapValues(lambda x: select_top_2(x)).collect()

In [25]:
for x in all_recommendations:
    print('Recommendations for book'+str(x[0])+' are book'+str(x[1][0])+' and book'+str(x[1][1]))

Recommendations for book4 are book9 and book7
Recommendations for book12 are book16 and book22
Recommendations for book16 are book15 and book13
Recommendations for book0 are book9 and book13
Recommendations for book5 are book2 and book9
Recommendations for book9 are book2 and book0
Recommendations for book13 are book0 and book16
Recommendations for book17 are book7 and book0
Recommendations for book61 are book31 and book6
Recommendations for book21 are book22 and book23
Recommendations for book1 are book3 and book11
Recommendations for book2 are book5 and book9
Recommendations for book6 are book7 and book10
Recommendations for book10 are book6 and book11
Recommendations for book18 are book19 and book11
Recommendations for book22 are book21 and book23
Recommendations for book3 are book1 and book2
Recommendations for book7 are book17 and book6
Recommendations for book11 are book22 and book19
Recommendations for book15 are book16 and book12
Recommendations for book19 are book18 and book11