In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.functions import lit
from pyspark.ml.evaluation import RegressionEvaluator

import re
import math


In [2]:

# initialize spark
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [7]:
train_path = "temp/pruned_out/train_links"
test_path = "temp/pruned_out/test_links"

train_rdd = sc.textFile(train_path)
test_rdd = sc.textFile(test_path)

In [25]:

def mapLineToUserPairs(line):
    pairs = re.split(' ', line)
    return (int(pairs[0]), int(pairs[1]))


def getDataframeForALS(rdd, sc):
    schema = StructType([
        StructField("user", IntegerType(), True),
        StructField("item", IntegerType(), True)])
    df = sc.createDataFrame(rdd, schema)
    df = df.withColumn("rating", lit(1))
    return df

### Top 10 recommendations per user
def convertRecResult(row):
    user = row.user
    items = []
    for item in row.recommendations:
        items.append(item.item)
    return (user, items)


# use test_df to calculate accuracy
def get_k_accuracy(row, k):
    result = []
    no_result = False
    try:
        predictions = row[1][0]
        actuals = row[1][1]
    except:
        no_result = True
    
    if not no_result and not isinstance(actuals, list):
        no_result = True

    for i in range(1, k+1):
        if no_result:
            result.append((i, 0))
        else:
            result.append((i, len([x for x in predictions[:i] if x in actuals])) / i)
    return result

In [22]:
train_rdd_proc = train_rdd.map(lambda row: mapLineToUserPairs(row))
test_rdd_proc = test_rdd.map(lambda row: mapLineToUserPairs(row))

train_df = getDataframeForALS(train_rdd_proc, sqlContext)
test_df = getDataframeForALS(test_rdd_proc, sqlContext)

In [24]:
k = 10
als = ALS(rank=8, maxIter=10, regParam=0.01, userCol="user", itemCol="item", ratingCol='rating',
          coldStartStrategy="drop")
als_model = als.fit(train_df)
user_top10_rdd = als_model.recommendForAllUsers(k).rdd.map(lambda l: convertRecResult(l))
validation_rdd = user_top10_rdd.join(test_rdd_proc)

In [27]:
als_recs = als_model.recommendForAllUsers(100235959).collect()

# output als results
with open("results/als_pruned_results.txt", 'w') as f:
    for result in als_recs:
        f.write(str(bytes("{}: {}".format(str(result.user), str(result.recommendations)), encoding='utf-8')))
        f.write("\n")

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Document

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:60093)
Traceback (most recent call last):
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3291, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-27-c6e844805155>", line 1, in <module>
    als_recs = als_model.recommendForAllUsers(100235959).collect()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/recommendation.py", line 414, in recommendForAllUsers
    return self._call_java("recommendForAllUsers", numItems)
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/ml/wrapper.py", line 55, in _call_java
    return _java2py(sc, m(*java_args))
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/

Py4JError: An error occurred while calling o383.recommendForAllUsers

In [None]:

accuracy_result = validation_rdd.flatMap(lambda l : get_k_accuracy(l, k)) \
    .mapValues(lambda v: (v, 1)) \
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
    .mapValues(lambda v: v[0]/v[1]) \
    .collectAsMap()

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 60197)
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/socketserver.py", line 720, in __init__
    self.handle()
  File "/Users/zian.li/Documents/py_env/py3/lib/python3.7/site-packages/pyspark/accumulators.py", lin