In [68]:
import os
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import SQLContext
import sys
import itertools

from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.evaluation import RegressionEvaluator
#from pyspark.ml.recommendation import ALS

from pyspark.sql import Row
print sc


<pyspark.context.SparkContext object at 0x10f119e90>


In [69]:
plays_df_schema = StructType(
  [StructField('userId', StringType()),
   StructField('songId', StringType()),
   StructField('Plays', IntegerType())]
)

In [70]:
raw_plays_df = sqlContext.read.format('com.databricks.spark.csv') \
                              .options(delimiter = '\t', header=True,inferSchema=False) \
                              .schema(plays_df_schema) \
                              .load("/Users/samanthapace/Desktop/SongData/train_triplets_sample.txt")
            
            
            

In [71]:
# change ids from strings to integers .F.shiftRightUnsigned()
userId_change = raw_plays_df.select('userId').distinct().select('userId', F.sqrt(F.monotonically_increasing_id()).alias('new_userId'))

userId_change.show(20)
songId_change = raw_plays_df.select('songId').distinct().select('songId', F.sqrt(F.monotonically_increasing_id()).alias('new_songId'))
songId_change.show(20)
#userId_change.select('new_userId', ((userId_change['new_userId'])/100).alias("use_userId")).show()



+--------------------+------------------+
|              userId|        new_userId|
+--------------------+------------------+
|b80344d063b5ccb32...|490426.51699923404|
+--------------------+------------------+

+------------------+------------------+
|            songId|        new_songId|
+------------------+------------------+
|SOBSUJE12A6D4F8CF5|185363.80004736633|
|SODACBL12A8C13C273|507639.67315409856|
|SOBFNSP12AF72A0E22| 711903.1822712973|
|SOAPDEY12A81C210A9| 780951.5708621118|
|SOBFOVM12A58A7D494|  864479.212881374|
|SOCNMUH12A6D4F6E6D| 898584.3597837656|
|SOBVFZR12A6D4F8AE3| 980853.0339984681|
|SOBYHAJ12A6701BF1D|1084813.8269325295|
|SOBXALG12A8C13C108|1135116.8172483395|
|SOBXHDL12A81C204C0|1142659.2046555264|
|SOBNZDC12A6D4FC103| 1176001.475046694|
|SOBBMDR12A8C13253B|1239999.3112772282|
+------------------+------------------+



In [72]:
unique_users = userId_change.count()
unique_songs = songId_change.count()
print('Number of unique users: {0}'.format(unique_users))
print('Number of unique songs: {0}'.format(unique_songs))

# join dataframes
raw_plays_df_with_int_ids = raw_plays_df.join(userId_change, 'userId')
raw_plays_df_with_int_ids = raw_plays_df_with_int_ids.join(songId_change, 'songId')

# remove half users to make more manageable
#raw_plays_df_with_int_ids = raw_plays_df_with_int_ids.filter(raw_plays_df_with_int_ids.new_userId < unique_users/2)


# cache
raw_plays_df_with_int_ids.cache()
raw_plays_df_with_int_ids.show(20)


Number of unique users: 1
Number of unique songs: 12
+------------------+--------------------+-----+------------------+------------------+
|            songId|              userId|Plays|        new_userId|        new_songId|
+------------------+--------------------+-----+------------------+------------------+
|SOAPDEY12A81C210A9|b80344d063b5ccb32...|    1|490426.51699923404| 780951.5708621118|
|SOBBMDR12A8C13253B|b80344d063b5ccb32...|    2|490426.51699923404|1239999.3112772282|
|SOBFNSP12AF72A0E22|b80344d063b5ccb32...|    1|490426.51699923404| 711903.1822712973|
|SOBFOVM12A58A7D494|b80344d063b5ccb32...|    1|490426.51699923404|  864479.212881374|
|SOBNZDC12A6D4FC103|b80344d063b5ccb32...|    1|490426.51699923404| 1176001.475046694|
|SOBSUJE12A6D4F8CF5|b80344d063b5ccb32...|    2|490426.51699923404|185363.80004736633|
|SOBVFZR12A6D4F8AE3|b80344d063b5ccb32...|    1|490426.51699923404| 980853.0339984681|
|SOBXALG12A8C13C108|b80344d063b5ccb32...|    1|490426.51699923404|1135116.8172483395|
|

In [73]:
song_ids_with_total_listens = raw_plays_df_with_int_ids.groupBy('songId') \
                                                       .agg(F.count(raw_plays_df_with_int_ids.Plays).alias('User_Count'),
                                                            F.sum(raw_plays_df_with_int_ids.Plays).alias('Total_Plays')) \
                                                       .orderBy('Total_Plays', ascending = False)

print 'song_ids_with_total_listens:'
song_ids_with_total_listens.show(3, truncate=False)


song_ids_with_total_listens:
+------------------+----------+-----------+
|songId            |User_Count|Total_Plays|
+------------------+----------+-----------+
|SOBSUJE12A6D4F8CF5|1         |2          |
|SOBBMDR12A8C13253B|1         |2          |
|SOAPDEY12A81C210A9|1         |1          |
+------------------+----------+-----------+
only showing top 3 rows



In [74]:
# We'll hold out 60% for training, 20% of our data for validation, and leave 20% for testing
seed = 1800009193L
(split_60_df, split_a_20_df, split_b_20_df) = raw_plays_df_with_int_ids.randomSplit([0.6, 0.2, 0.2], seed = seed)

# Let's cache these datasets for performance
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count())
)
training_df.show(3)
validation_df.show(3)
test_df.show(3)

#Number of plays needs to be double type, not integers
validation_df = validation_df.withColumn("Plays", validation_df["Plays"].cast(DoubleType()))
validation_df.show(10)


Training: 6, validation: 4, test: 2

+------------------+--------------------+-----+------------------+------------------+
|            songId|              userId|Plays|        new_userId|        new_songId|
+------------------+--------------------+-----+------------------+------------------+
|SOAPDEY12A81C210A9|b80344d063b5ccb32...|    1|490426.51699923404| 780951.5708621118|
|SOBBMDR12A8C13253B|b80344d063b5ccb32...|    2|490426.51699923404|1239999.3112772282|
|SOBFOVM12A58A7D494|b80344d063b5ccb32...|    1|490426.51699923404|  864479.212881374|
+------------------+--------------------+-----+------------------+------------------+
only showing top 3 rows

+------------------+--------------------+-----+------------------+------------------+
|            songId|              userId|Plays|        new_userId|        new_songId|
+------------------+--------------------+-----+------------------+------------------+
|SOBFNSP12AF72A0E22|b80344d063b5ccb32...|    1|490426.51699923404| 711903.1822

In [84]:
#convert dataframe to rdd
rddTrain = training_df.select("new_userId", "new_songId", "Plays").rdd
rddTrain.collect()


[Row(new_userId=490426.51699923404, new_songId=780951.5708621118, Plays=1),
 Row(new_userId=490426.51699923404, new_songId=1239999.3112772282, Plays=2),
 Row(new_userId=490426.51699923404, new_songId=864479.212881374, Plays=1),
 Row(new_userId=490426.51699923404, new_songId=1176001.475046694, Plays=1),
 Row(new_userId=490426.51699923404, new_songId=1135116.8172483395, Plays=1),
 Row(new_userId=490426.51699923404, new_songId=1084813.8269325295, Plays=1)]

In [81]:
model = ALS.trainImplicit(rddTrain, 5, seed=seed)

In [66]:
# Let's initialize our ALS learner
als = ALS()

# Now set the parameters for the method
als.setMaxIter(5)\https://smile.amazon.com/gp/product/B000UW2DTE/ref=oh_aui_detailpage_o07_s00?ie=UTF8&psc=1
   .setSeed(seed)\
   .setItemCol("new_songId")\
   .setRatingCol("Plays")\
   .setUserCol("new_userId")

# Now let's compute an evaluation metric for our test dataset
# We Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="Plays", metricName="rmse")
tolerance = 0.03
ranks = [4, 8, 12, 16]
regParams = [0.15, 0.2, 0.25]
errors = [[0]*len(ranks)]*len(regParams)
models = [[0]*len(ranks)]*len(regParams)
err = 0
min_error = float('inf')
best_rank = -1
i = 0


for regParam in regParams:
    j = 0
    for rank in ranks:
        # Set the rank here:
        als.setParams(rank = rank, regParam = regParam)
        # Create the model with these parameters.
       # model = als.fit(training_df)
        model = ALS.trainImplicit(training_df)
        # Run the model to create a prediction. Predict against the validation_df.
        predict_df = model.transform(validation_df)
        
        # Remove NaN values from prediction (due to SPARK-14489)
        predicted_plays_df = predict_df.filter(predict_df.prediction != float('nan'))
        predicted_plays_df = predicted_plays_df.withColumn("prediction", F.abs(F.round(predicted_plays_df["prediction"],0)))
        
        predict_df.show()
        predicted_plays_df.show()
        # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
        error = reg_eval.evaluate(predicted_plays_df)
        errors[i][j] = error
        models[i][j] = model
        print 'For rank %s, regularization parameter %s the RMSE is %s' % (rank, regParam, error)
        if error < min_error:
            min_error = error
            best_params = [i,j]
        j += 1
    i += 1
    
als.setRegParam(regParams[best_params[0]])
als.setRank(ranks[best_params[1]])
print 'The best model was trained with regularization parameter %s' % regParams[best_params[0]]
print 'The best model was trained with rank %s' % ranks[best_params[1]]
my_model = models[best_params[0]][best_params[1]]



SyntaxError: unexpected character after line continuation character (<ipython-input-66-69d844e0727c>, line 5)

In [77]:
#read file and parse data
#train
#model
#make recommendation

In [78]:
lines = sc.textFile("/Users/samanthapace/Desktop/SongData/train_triplets.txt")
parts = lines.map(lambda l: array([int(x) for x in l.split("\t")]))


In [81]:
rank=1
numIterations=1
model = ALS.trainImplicit(parts, rank, numIterations, alpha = 0.01).fit()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage 22.0 (TID 26, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.py", line 1339, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-78-a69c00da4a42>", line 2, in <lambda>
NameError: global name 'array' is not defined

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor94.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.py", line 1339, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-78-a69c00da4a42>", line 2, in <lambda>
NameError: global name 'array' is not defined

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
rawPredictions = model.transform(playCountRDD)
predictions = rawPredictions\
    .withColumn("playCount", rawPredictions.rating.cast("double"))\
    .withColumn("prediction", rawPredictions.prediction.cast("double"))
evaluator =\
    RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

In [30]:
training = sc.textFile("/Users/samanthapace/Desktop/SongData/train_triplets.txt").map(parse_playCount).cache()
test = sc.textFile("/Users/samanthapace/Desktop/SongData/train_triplets.txt").map(parse_playCount)

test.load_playCount()

model = ALS.train(test, rank = 10, iterations = 5)

predictions = model.predictAll(test.map(lambda x: (x[0], x[1])))



Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 13, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.py", line 1339, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-23-9c402299c360>", line 4, in parse_playCount
TypeError: int() takes at most 2 arguments (3 given)

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/samanthapace/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.py", line 1339, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-23-9c402299c360>", line 4, in parse_playCount
TypeError: int() takes at most 2 arguments (3 given)

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
