In [1]:
import pyspark as ps
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, col
from pyspark.ml import Pipeline
from pyspark.mllib.recommendation import ALS as lib_ALS
from pyspark.mllib.evaluation import RegressionMetrics
import math

In [2]:
spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("building recommender") \
            .getOrCreate()
            
sc = spark.sparkContext

In [3]:
subset_raw_data = sc.textFile('data/subset.csv')

In [4]:
print type(subset_raw_data)
subset_raw_data.take(3)

<class 'pyspark.rdd.RDD'>


[u',id,source,reader_id,from_book_id,book_id,ad_id,boost_id,clicked,claimed,optin,created_at,updated_at',
 u'29295882,29295919,success_page,186643,20370,19203,0,0,1,1,0,2017-02-20 02:16:11,2017-02-20 02:26:24',
 u'29295883,29295920,success_page,186643,20370,19813,0,0,0,0,0,2017-02-20 02:16:11,2017-02-20 02:16:11']

In [5]:
subset_raw_data_header = subset_raw_data.take(1)[0]
subset_raw_data_header

u',id,source,reader_id,from_book_id,book_id,ad_id,boost_id,clicked,claimed,optin,created_at,updated_at'

In [6]:
def clicked_not_claimed_to_neg1(claimed, clicked):
    if (clicked == 1) and (claimed == 0):
        return -1
    else:
        return claimed

In [7]:
subset_raw_data_test_schema = subset_raw_data.filter(lambda line: line!=subset_raw_data_header).map(lambda line: line.split(",")).map(lambda p: Row(reader_id=p[3], book_id=p[5], clicked=int(p[8]), claimed=int(p[9])))

In [8]:
subset_data_test_df = spark.createDataFrame(subset_raw_data_test_schema)

In [9]:
subset_data_test_df.show(15)

+-------+-------+-------+---------+
|book_id|claimed|clicked|reader_id|
+-------+-------+-------+---------+
|  19203|      1|      1|   186643|
|  19813|      0|      0|   186643|
|  16281|      0|      0|   523754|
|  22669|      1|      1|   523754|
|  25350|      0|      0|   523754|
|  17251|      0|      0|    17224|
|  19012|      0|      0|    17224|
|  20771|      1|      1|    17224|
|   7544|      0|      0|   239648|
|   8682|      0|      0|   239648|
|  26798|      0|      0|   239648|
|  22094|      0|      1|    13391|
|  23867|      0|      0|    13391|
|  26736|      1|      1|    13391|
|   4132|      0|      0|    31393|
+-------+-------+-------+---------+
only showing top 15 rows



In [10]:
my_udf = udf(lambda claimed, clicked : clicked_not_claimed_to_neg1(claimed, clicked), IntegerType())
df_out = subset_data_test_df.withColumn('claimed', my_udf(col('claimed'), col('clicked')))

In [12]:
df_out.show(15)

+-------+-------+-------+---------+
|book_id|claimed|clicked|reader_id|
+-------+-------+-------+---------+
|  19203|      1|      1|   186643|
|  19813|      0|      0|   186643|
|  16281|      0|      0|   523754|
|  22669|      1|      1|   523754|
|  25350|      0|      0|   523754|
|  17251|      0|      0|    17224|
|  19012|      0|      0|    17224|
|  20771|      1|      1|    17224|
|   7544|      0|      0|   239648|
|   8682|      0|      0|   239648|
|  26798|      0|      0|   239648|
|  22094|     -1|      1|    13391|
|  23867|      0|      0|    13391|
|  26736|      1|      1|    13391|
|   4132|      0|      0|    31393|
+-------+-------+-------+---------+
only showing top 15 rows



In [13]:
subset_test_rdd_raw = df_out.rdd

In [14]:
subset_test_rdd_raw.take(15)

[Row(book_id=u'19203', claimed=1, clicked=1, reader_id=u'186643'),
 Row(book_id=u'19813', claimed=0, clicked=0, reader_id=u'186643'),
 Row(book_id=u'16281', claimed=0, clicked=0, reader_id=u'523754'),
 Row(book_id=u'22669', claimed=1, clicked=1, reader_id=u'523754'),
 Row(book_id=u'25350', claimed=0, clicked=0, reader_id=u'523754'),
 Row(book_id=u'17251', claimed=0, clicked=0, reader_id=u'17224'),
 Row(book_id=u'19012', claimed=0, clicked=0, reader_id=u'17224'),
 Row(book_id=u'20771', claimed=1, clicked=1, reader_id=u'17224'),
 Row(book_id=u'7544', claimed=0, clicked=0, reader_id=u'239648'),
 Row(book_id=u'8682', claimed=0, clicked=0, reader_id=u'239648'),
 Row(book_id=u'26798', claimed=0, clicked=0, reader_id=u'239648'),
 Row(book_id=u'22094', claimed=-1, clicked=1, reader_id=u'13391'),
 Row(book_id=u'23867', claimed=0, clicked=0, reader_id=u'13391'),
 Row(book_id=u'26736', claimed=1, clicked=1, reader_id=u'13391'),
 Row(book_id=u'4132', claimed=0, clicked=0, reader_id=u'31393')]

In [15]:
subset_data_test = subset_test_rdd_raw.map(lambda tokens: (tokens[3],tokens[0],tokens[1])).cache()

In [16]:
subset_data_test.take(14)

[(u'186643', u'19203', 1),
 (u'186643', u'19813', 0),
 (u'523754', u'16281', 0),
 (u'523754', u'22669', 1),
 (u'523754', u'25350', 0),
 (u'17224', u'17251', 0),
 (u'17224', u'19012', 0),
 (u'17224', u'20771', 1),
 (u'239648', u'7544', 0),
 (u'239648', u'8682', 0),
 (u'239648', u'26798', 0),
 (u'13391', u'22094', -1),
 (u'13391', u'23867', 0),
 (u'13391', u'26736', 1)]

In [26]:
subset_data = subset_raw_data.filter(lambda line: line!=subset_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[3],tokens[5],tokens[9])).cache()

In [27]:
subset_data.take(4)

[(u'186643', u'19203', u'1'),
 (u'186643', u'19813', u'0'),
 (u'523754', u'16281', u'0'),
 (u'523754', u'22669', u'1')]

In [17]:
training_RDD, validation_RDD, test_RDD = subset_data_test.randomSplit([6, 2, 2], seed=0L)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [18]:
type(training_RDD)

pyspark.rdd.PipelinedRDD

In [20]:
seed = 5L
iterations = 20
regularization_parameter = 0.05
ranks = [10, 15, 20, 25, 30]
rank = 25
errors = [0, 0, 0, 0, 0]
reg_met = [0, 0, 0, 0, 0]
err = 0
tolerance = 0.02

# min_error = float('inf')
# best_rank = -1
# best_iteration = -1
# for rank in ranks:
#     model = lib_ALS.train(training_RDD, rank=rank, seed=seed, iterations=iterations,\
#                       lambda_=regularization_parameter)
# #     model = lib_ALS.trainImplicit(training_RDD, rank=rank, seed=seed, iterations=iterations,\
# #                               lambda_=regularization_parameter)
#     predictions = model.predictAll(validation_for_predict_RDD)\
#                         .map(lambda r: ((r[0], r[1]), r[2]))

#     rates_and_preds = validation_RDD\
#                         .map(lambda r: ((int(r[0]), int(r[1])), float(r[2])))\
#                         .join(predictions)
#     valuesAndPreds = rates_and_preds\
#                         .map(lambda p: (p[1]))
#     metrics = RegressionMetrics(valuesAndPreds)
#     error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
#     errors[err] = error
#     reg_met[err] = metrics.rootMeanSquaredError
#     err += 1
#     print 'For rank %s the RMSE is %s the reg_met is %s' % (rank, error,\
#                                                             metrics.rootMeanSquaredError)
#     if error < min_error:
#         min_error = error
#         best_rank = rank

# print 'The best model was trained with rank %s' % best_rank

In [21]:
model = lib_ALS.train(training_RDD, rank=rank, seed=seed, iterations=iterations,\
                      lambda_=regularization_parameter)

In [22]:
predictions = model.predictAll(validation_for_predict_RDD)\
                        .map(lambda r: ((r[0], r[1]), r[2]))

In [23]:
rates_and_preds = validation_RDD\
                        .map(lambda r: ((int(r[0]), int(r[1])), float(r[2])))\
                        .join(predictions)

In [24]:
valuesAndPreds = rates_and_preds.map(lambda p: (p[1]))

In [25]:
metrics = RegressionMetrics(valuesAndPreds)

In [26]:
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job 19 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:808)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:806)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1668)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1587)
	at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1826)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1283)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1825)
	at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:581)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 62658)
----------------------------------------


Traceback (most recent call last):
  File "/Users/davidclausen/anaconda2/lib/python2.7/SocketServer.py", line 290, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Users/davidclausen/anaconda2/lib/python2.7/SocketServer.py", line 318, in process_request
    self.finish_request(request, client_address)
  File "/Users/davidclausen/anaconda2/lib/python2.7/SocketServer.py", line 331, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Users/davidclausen/anaconda2/lib/python2.7/SocketServer.py", line 652, in __init__
    self.handle()
  File "/Users/davidclausen/spark-2.1.0-bin-hadoop2.7/python/pyspark/accumulators.py", line 235, in handle
    num_updates = read_int(self.rfile)
  File "/Users/davidclausen/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 557, in read_int
    raise EOFError
EOFError


In [18]:
errors[err] = error

In [19]:
reg_met[err] = metrics.rootMeanSquaredError

In [20]:
err += 1

In [21]:
print 'For rank %s the RMSE is %s the reg_met is %s' % (rank, error,\
                                                            metrics.rootMeanSquaredError)

For rank 25 the RMSE is 0.293608735816 the reg_met is 0.293608735816


In [22]:
if error < min_error:
    min_error = error
    best_rank = rank

In [23]:
print 'The best model was trained with rank %s' % best_rank

The best model was trained with rank 25


In [24]:
type(subset_data.take(1)[0][0])

unicode

In [30]:
rates_and_preds.take(100)

[((223813, 20831), (0.0, 0.0899249391151878)),
 ((163115, 19627), (0.0, 0.09408654156834773)),
 ((369590, 19130), (0.0, -0.05952669047578264)),
 ((528665, 21987), (0.0, 0.0)),
 ((86810, 20764), (0.0, 0.24387854400033937)),
 ((136815, 12427), (0.0, 0.0)),
 ((416521, 24939), (1.0, 0.19363965726346982)),
 ((385023, 18231), (0.0, 0.0)),
 ((567643, 24095), (0.0, 0.0)),
 ((538803, 19499), (1.0, 0.7973047413843842)),
 ((304226, 24796), (0.0, 0.04156643639615049)),
 ((81070, 15836), (1.0, 0.13718560371824387)),
 ((236093, 12543), (0.0, 0.0)),
 ((418280, 23924), (0.0, 0.0)),
 ((349699, 26067), (1.0, 0.0)),
 ((481977, 7663), (0.0, 0.36049119541868013)),
 ((12732, 20796), (1.0, 0.10949134786201241)),
 ((129149, 25501), (1.0, 0.1527517510284398)),
 ((300481, 22695), (1.0, 0.42263723221974947)),
 ((166480, 18534), (0.0, 0.0)),
 ((444559, 11395), (0.0, 0.0)),
 ((64813, 15493), (0.0, 0.0)),
 ((508958, 10634), (0.0, 0.0)),
 ((508958, 10634), (0.0, 0.0)),
 ((508958, 10634), (0.0, 0.0)),
 ((508958, 1063

In [26]:
predictions.take(5)

[((90628, 17981), 0.0),
 ((567356, 7550), 0.0),
 ((182316, 19732), 0.0),
 ((182316, 19730), 0.0),
 ((567788, 8363), 0.0)]

In [27]:
valuesAndPreds.take(5)

[(0.0, 0.046730506967385704),
 (0.0, 0.18970916262654078),
 (0.0, 0.011221973957054456),
 (0.0, 0.0),
 (0.0, -0.00655216909141193)]