In [1]:
import pandas as pd
import numpy as np
import networkx as nx

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("graph mining") \
    .config("spark.executor.memory", '2g') \
    .config('spark.executor.cores', '3') \
    .config('spark.cores.max', '3') \
    .config("spark.driver.memory",'2g') \
    .getOrCreate()

sc = spark.sparkContext
sc.addPyFile('/Users/gauravanand/opt/anaconda3/lib/python3.8/site-packages/pyspark/jars/graphframes-0.8.1-spark2.4-s_2.11.jar')

TypeError: an integer is required (got type bytes)

# Problem 5


## User-User CF

Note: You need to implement two functions, one for user-based collaborative filtering and the other one for item-based collaborative filtering. Your functions must take as input a user-item matrix (same format as u.data), the neighbor- hood size, a user id ‘u’, and an item-id ‘i’ and should provide the predicted rating for user-based CF or item-based CF as output.

In [3]:
from project_code.file_utils import *
# stack_files('raw_data','output_data')

In [4]:
# udata = pd.read_csv('output_data/full_data.txt',names=['source','target','rating'])
from pyspark.sql.types import StructType,StructField,IntegerType,NullType
schema = StructType([StructField('source',IntegerType()),StructField('target',IntegerType())\
                     ,StructField('rating',IntegerType())])
udata = spark.read.csv('output_data/full_data.txt',schema,sep=',',comment = '%')
udata = udata.dropna().cache()

In [5]:
from graphframes import GraphFrame
from pyspark.sql import functions as F
udata.createOrReplaceTempView('udata')
vertices = spark.sql(
'''
SELECT source FROM udata
UNION
SELECT target FROM udata
''')
vertices = vertices.withColumnRenamed('source','id')
edges = udata.withColumnRenamed('source','src').withColumnRenamed('target','dst')
# GraphFrame(vertices,edges)
vertices

DataFrame[id: int]

In [6]:
v = spark.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
], ["id", "name", "age"])

e = spark.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
], ["src", "dst", "relationship"])

g = GraphFrame(v, e)


Py4JJavaError: An error occurred while calling o118.createGraph.
: java.lang.NoSuchMethodError: 'scala.collection.mutable.ArrayOps scala.Predef$.refArrayOps(java.lang.Object[])'
	at org.graphframes.GraphFrame$.apply(GraphFrame.scala:676)
	at org.graphframes.GraphFramePythonAPI.createGraph(GraphFramePythonAPI.scala:10)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [14]:
from scipy import spatial

def create_user_item_matrix(data):
    user_item = pd.pivot_table(udata,columns = ['target'],index=['source'])
    user_item = user_item.fillna(0)
    user_item.columns = list(map(lambda x: x[1],user_item.columns))
    return user_item

def create_item_user_matrix(data):
    item_user = pd.pivot_table(udata,columns = ['user id'],index=['item id'])
    item_user = item_user.fillna(0)
    item_user.columns = list(map(lambda x: x[1],item_user.columns))
    return item_user

def MeanRatings(user_item):
    '''
    calculates the mean rating for user-user based collaborative filtering and item-item based collaborative filtering
    '''
    users = user_item.index
    agg_ratings = user_item.sum(axis = 0)  
    n_ratings = (user_item!=0).sum(axis = 0)  
    meanratings= {}
    m, n = user_item.shape
    meanratings = {user: agg_ratings[user]/n_ratings[user] for user in users}
    return meanratings


def calculate_similiarity_score(user_item,user1,user2):
    return 1-spatial.distance.cosine(user_item.loc[user1,:],user_item.loc[user2,:])

def get_similiarity_matrix(user_item,user):
    users = user_item.index
    similiarity = [(user_id,calculate_similiarity_score(user_item,user,user_id))\
                  for user_id in users if user_id != user]
    return similiarity


def get_nearest_neighbours(similiarity_matrix, N, contains_item=False,user=None,item=None,user_item=None):
    '''
    Gets the nearest neighbours. Similarity matrix must be a list with values 
    of the format `(userid,similiarity_score)`.
    If `contains_item` is True, then the algorithm only looks for the top neighbors 
    that have rated that particular item.
    '''
    ordered_neighbours = sorted(similiarity_matrix,key=lambda x: x[1],reverse=True)
    if contains_item:
        if (item) and isinstance(user_item,pd.DataFrame) and (user):
            ordered_neighbours = list(filter(lambda x: user_item.loc[x[0],item] > 0,ordered_neighbours))
        else:
            raise Exception('If `contains_item` is True, then user, item, and\
            user_item matrix need to be passed in as parameters')
    return ordered_neighbours[:N]

def get_item_rating_userUser(udata, user, item, N):
    user_item = create_user_item_matrix(udata)
    averageRatings = MeanRatings(user_item)
    similiarity_matrix = get_similiarity_matrix(user_item,user)
    nearest_neighbours = get_nearest_neighbours(similiarity_matrix,N,True,user,item,user_item)
    prediction = averageRatings[user]
    sim_term = [similiarity_score*(user_item.loc[neighbour,item] - averageRatings[neighbour]) \
                for neighbour,similiarity_score in nearest_neighbours]
    prediction += sum(sim_term)/sum([similiarity_score for neighbour,similiarity_score in nearest_neighbours])
    return prediction
# get_item_rating_userUser(udata,943,2,20)

## Item-item CF 

In [12]:
def MeanRatingsItem(user_item):
    '''
    calculates the mean rating for user-user based collaborative filtering and item-item based collaborative filtering
    '''
    users = user_item.index
    agg_ratings = user_item.sum(axis = 1)  
    n_ratings = (user_item!=0).sum(axis = 1)  
    meanratings= {}
    m, n = user_item.shape
    meanratings = {user: agg_ratings[user]/n_ratings[user] for user in users}
    return meanratings




def get_item_rating_itemItem(udata, item, user, N):
    user_item = create_item_user_matrix(udata)
    averageRatings = MeanRatingsItem(user_item)
    similiarity_matrix = get_similiarity_matrix(user_item,item)
    nearest_neighbours = get_nearest_neighbours(similiarity_matrix,N,True,item,user,user_item)
    prediction = averageRatings[item]
    sim_term = [similiarity_score*(user_item.loc[neighbour,user] - averageRatings[neighbour]) for neighbour,similiarity_score in nearest_neighbours]
    prediction += sum(sim_term)/sum([similiarity_score for neighbour,similiarity_score in nearest_neighbours])
    return prediction

# making a prediction
i = 1000
u = 100
N = 10

get_item_rating_itemItem(udata,i,u,N)

2.9166777619520317