In [1]:
import sys
from collections import defaultdict
from itertools import combinations
import numpy as np
import random
import csv
import pdb

In [2]:
from pyspark import SparkContext
from recsys import evaluate

# 加载数据

In [50]:
lines = sc.textFile('file:///home/hadoop/workspaces/spark_knn_recommender/ratings.txt')

In [51]:
def parseVector(line):
    line = line.split('|')
    return line[0],(line[1],float(line[2]))

In [104]:
def sampleInteractions(user_id,items_with_rating,n):
    '''
    For users with # interactions > n, replace their interaction history
    with a sample of n items_with_rating
    '''
    if len(items_with_rating) > n:
        return user_id,random.sample(list(items_with_rating),n)
    else:
        return user_id,items_with_rating

In [105]:
''' 
Obtain the sparse user-item matrix:
    user_id -> [(item_id_1, rating_1),
               [(item_id_2, rating_2),
                ...]
'''
user_item_pairs = lines.map(parseVector)\
                    .groupByKey()\
                    .map(lambda p : sampleInteractions(p[0],p[1],500)).cache()

In [106]:
user_item_pairs.count()

943

In [107]:
def findItemPairs(user_id,items_with_rating):
    '''
    For each user, find all item-item pairs combos. (i.e. items with the same user) 
    '''
    for item1,item2 in combinations(items_with_rating,2):
        return (item1[0],item2[0]),(item1[1],item2[1])

In [108]:
'''
Get all item-item pair combos:
    (item1,item2) ->    [(item1_rating,item2_rating),
                         (item1_rating,item2_rating),
                         ...]
'''

'\nGet all item-item pair combos:\n    (item1,item2) ->    [(item1_rating,item2_rating),\n                         (item1_rating,item2_rating),\n                         ...]\n'

In [114]:
pairwise_items = user_item_pairs.filter(lambda p : len(p[1]) > 1)\
                                .map(lambda p : findItemPairs(p[0],p[1])).groupByKey()

In [159]:
pairwise_items.count()

938

In [116]:
def cosine(dot_product,rating_norm_squared,rating2_norm_squared):
    '''
    The cosine between two vectors A, B
       dotProduct(A, B) / (norm(A) * norm(B))
    '''
    numerator = dot_product
    denominator = rating_norm_squared * rating2_norm_squared
    return (numerator / (float(denominator))) if denominator else 0.0

In [135]:
def calcSim(item_pair,rating_pairs):
    ''' 
    For each item-item pair, return the specified similarity measure,
    along with co_raters_count
    '''
    sum_xx, sum_xy, sum_yy, n = (0.0, 0.0, 0.0, 0)
#     rating_pairs = list(rating_pairs)
    for rating_pair in rating_pairs:
        sum_xx += np.float(rating_pair[0]) * np.float(rating_pair[0])
        sum_yy += np.float(rating_pair[1]) * np.float(rating_pair[1])
        sum_xy += np.float(rating_pair[0]) * np.float(rating_pair[1])
        n += 1
        
    cos_sim = cosine(sum_xy,np.sqrt(sum_xx),np.sqrt(sum_yy))
    return item_pair,(cos_sim)

In [136]:
def keyOnFirstItem(item_pair,item_sim_data):
    '''
    For each item-item pair, make the first item's id the key
    '''
    (item1_id,item2_id) = item_pair
    return item1_id,(item2_id,item_sim_data)

In [288]:
def nearestNeighbors(item_id,items_and_sims,n):
    '''
    Sort the predictions list by similarity and select the top-N neighbors
    '''
    items_and_sims.sort(key = lambda x : x[1],reverse = True)
    if len(items_and_sims) < n:
        n = len(items_and_sims)
    return item_id,items_and_sims[:n]

In [299]:
'''
Calculate the cosine similarity for each item pair and select the top-N nearest neighbors:
    (item1,item2) ->    (similarity,co_raters_count)
 '''
item_sims = pairwise_items.map(lambda p: calcSim(p[0],p[1]))\
                          .map(lambda p: keyOnFirstItem(p[0],p[1]))\
                          .groupByKey()\
                          .map(lambda p : (p[0], list(p[1])))\
                          .map(lambda p: nearestNeighbors(p[0],p[1],50))

In [None]:
item_sims.count()

In [307]:
item_sims.take(1)

[('Die Hard (1988)',
  [('Star Trek III: The Search for Spock (1984)', 1.0),
   ('Pump Up the Volume (1990)', 1.0),
   ('Flirting With Disaster (1996)', 1.0),
   ('Pulp Fiction (1994)', 1.0),
   ("McHale's Navy (1997)", 1.0)])]

In [303]:
'''
Preprocess the item similarity matrix into a dictionary and store it as a broadcast variable:
'''
item_sim_dict = {}
for (item,data) in item_sims.collect(): 
    item_sim_dict[item] = data
isb = sc.broadcast(item_sim_dict)

In [306]:
item_sim_dict

{'12 Angry Men (1957)': [('Stag (1997)', 1.0), ('Game, The (1997)', 1.0)],
 '187 (1997)': [('English Patient, The (1996)', 1.0)],
 '2001: A Space Odyssey (1968)': [('Get on the Bus (1996)', 1.0),
  ('Victor/Victoria (1982)', 1.0),
  ('Liar Liar (1997)', 1.0),
  ('Twelve Monkeys (1995)', 1.0)],
 '8 1/2 (1963)': [("Schindler's List (1993)", 1.0)],
 'Abyss, The (1989)': [('Some Folks Call It a Sling Blade (1993)', 1.0),
  ('Body Snatchers (1993)', 1.0),
  ('Liar Liar (1997)', 1.0)],
 'Ace Ventura: Pet Detective (1994)': [('Clerks (1994)', 1.0)],
 'Addams Family Values (1993)': [('Great White Hype, The (1996)', 1.0)],
 'Adventures of Pinocchio, The (1996)': [('One Fine Day (1996)', 1.0)],
 'Adventures of Priscilla, Queen of the Desert, The (1994)': [('Crimson Tide (1995)',
   1.0),
  ('High Noon (1952)', 1.0),
  ('Quiet Man, The (1952)', 1.0)],
 'African Queen, The (1951)': [('Quiz Show (1994)', 1.0)],
 'Afterglow (1997)': [('Edge, The (1997)', 1.0)],
 'Age of Innocence, The (1993)': [('My

In [308]:
def topNRecommendations(user_id,items_with_rating,item_sims,n):
    '''
    Calculate the top-N item recommendations for each user using the 
    weighted sums method
    '''
    # initialize dicts to store the score of each individual item,
    # since an item can exist in more than one item neighborhood
    totals = defaultdict(int)
    sim_sums = defaultdict(int)
    for (item,rating) in items_with_rating:
        # lookup the nearest neighbors for this item
        nearest_neighbors = item_sims.get(item,None)
        if nearest_neighbors:
            for (neighbor,(sim,count)) in nearest_neighbors:
                if neighbor != item:
                    # update totals and sim_sums with the rating data
                    totals[neighbor] += sim * rating
                    sim_sums[neighbor] += sim
    # create the normalized list of scored items 
    scored_items = [(total/sim_sums[item],item) for item,total in totals.items()]
    # sort the scored items in ascending order
    scored_items.sort(reverse=True)
    # take out the item score
    # ranked_items = [x[1] for x in scored_items]
    return user_id,scored_items[:n]

In [309]:
'''
Calculate the top-N item recommendations for each user
    user_id -> [item1,item2,item3,...]
'''
user_item_recs = user_item_pairs.map(
        lambda p: topNRecommendations(p[0],p[1],isb.value,500))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 464.0 failed 1 times, most recent failure: Lost task 0.0 in stage 464.0 (TID 269, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/hadoop/app/spark-2.4.3-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/hadoop/app/spark-2.4.3-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/hadoop/app/spark-2.4.3-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/hadoop/app/spark-2.4.3-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-309-66bc8a631887>", line 6, in <lambda>
  File "<ipython-input-308-711d82f90be2>", line 14, in topNRecommendations
TypeError: 'float' object is not iterable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor84.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/hadoop/app/spark-2.4.3-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/hadoop/app/spark-2.4.3-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/hadoop/app/spark-2.4.3-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/hadoop/app/spark-2.4.3-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-309-66bc8a631887>", line 6, in <lambda>
  File "<ipython-input-308-711d82f90be2>", line 14, in topNRecommendations
TypeError: 'float' object is not iterable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


# 测试结果准确性

In [312]:
'''
Read in test data and calculate MAE
'''
test_ratings = defaultdict(list)
# read in the test data
f = open("/home/hadoop/workspaces/spark_knn_recommender/ratings1.txt", 'rt')
reader = csv.reader(f, delimiter='|')
for row in reader:
    user = row[0]
    item = row[1]
    rating = row[2]
    test_ratings[user] += [(item,rating)]

In [313]:
# create train-test rating tuples
preds = []
for (user,items_with_rating) in user_item_recs:
    for (rating,item) in items_with_rating:
        for (test_item,test_rating) in test_ratings[user]:                
            if str(test_item) == str(item):
                preds.append((rating,float(test_rating)))

mae = MAE(preds)
result = mae.compute()
print "Mean Absolute Error: ",result

SyntaxError: Missing parentheses in call to 'print' (<ipython-input-313-a276afcca426>, line 11)