In [1]:
import numpy as np 
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession


In [3]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

training = spark.read.csv("trainItem.data",header=False)
training.show(5)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/15 17:23:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/04/15 17:23:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


+------+------+---+
|   _c0|   _c1|_c2|
+------+------+---+
|199808|248969| 90|
|199808|  2663| 90|
|199808| 28341| 90|
|199808| 42563| 90|
|199808| 59092| 90|
+------+------+---+
only showing top 5 rows



In [16]:
training = training.withColumnRenamed("_c0", "userID").withColumnRenamed("_c1", "itemID").withColumnRenamed("_c2", "rating")
training.show(5)

+------+------+------+
|userID|itemID|rating|
+------+------+------+
|199808|248969|    90|
|199808|  2663|    90|
|199808| 28341|    90|
|199808| 42563|    90|
|199808| 59092|    90|
+------+------+------+
only showing top 5 rows



In [17]:
from pyspark.sql.types import IntegerType
training = training.withColumn("userID", training["userID"].cast(IntegerType()))
training = training.withColumn("itemID", training["itemID"].cast(IntegerType()))
training = training.withColumn("rating", training["rating"].cast('float'))
training.show(3)

+------+------+------+
|userID|itemID|rating|
+------+------+------+
|199808|248969|  90.0|
|199808|  2663|  90.0|
|199808| 28341|  90.0|
+------+------+------+
only showing top 3 rows



In [4]:
# Create ALS model
als = ALS(
    maxIter=5, 
    rank = 5,
    regParam=0.01,
    userCol="userID", 
    itemCol="itemID",
    ratingCol="rating", 
    nonnegative = True, 
    implicitPrefs = False,
    coldStartStrategy="drop"
)

22/04/15 18:24:42 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 950455 ms exceeds timeout 120000 ms
22/04/15 18:24:42 WARN SparkContext: Killing executors is not supported by current scheduler.


In [19]:
model = als.fit(training)

22/04/15 11:26:40 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/04/15 11:26:40 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
                                                                                

In [20]:
testing = spark.read.csv("testItem.data", header = False)

In [21]:
testing = testing.withColumnRenamed("_c0", "userID").withColumnRenamed("_c1", "itemID").withColumnRenamed("_c2", "rating")
testing.show(5)

+------+------+------+
|userID|itemID|rating|
+------+------+------+
|199810|208019|     0|
|199810| 74139|     0|
|199810|  9903|     0|
|199810|242681|     0|
|199810| 18515|     0|
+------+------+------+
only showing top 5 rows



In [22]:
testing = testing.withColumn("userID", testing["userID"].cast(IntegerType()))
testing = testing.withColumn("itemID", testing["itemID"].cast(IntegerType()))
testing = testing.withColumn("rating", testing["rating"].cast('float'))
testing.show(3)

+------+------+------+
|userID|itemID|rating|
+------+------+------+
|199810|208019|   0.0|
|199810| 74139|   0.0|
|199810|  9903|   0.0|
+------+------+------+
only showing top 3 rows



In [23]:
predictions = model.transform(testing)
predictions.show(5)

                                                                                

+------+------+------+----------+
|userID|itemID|rating|prediction|
+------+------+------+----------+
|233686|     1|   0.0|103.265686|
|200179|    13|   0.0| 46.781307|
|206335|    27|   0.0| 59.619865|
|248282|    27|   0.0|  90.33147|
|223017|    65|   0.0| 222.13089|
+------+------+------+----------+
only showing top 5 rows



In [25]:
# save dataframe to a single csv file 
# save dataframe into a folder 'predictions' 
# with a single file (coalesce(1))
# but you can not assign the filename
predictions.coalesce(1).write.csv("predictions")

                                                                                

In [26]:
predictions.toPandas().to_csv('myprediction.csv')


In [49]:
import pandas as pd 

df = pd.read_csv("myprediction.csv")
df = df.iloc[: , 1:5]
display(df)

Unnamed: 0,userID,itemID,rating,prediction
0,233686,1,0.0,103.265686
1,200179,13,0.0,46.781307
2,206335,27,0.0,59.619865
3,248282,27,0.0,90.331470
4,223017,65,0.0,222.130890
...,...,...,...,...
119969,237358,296000,0.0,43.422146
119970,248383,296016,0.0,54.893906
119971,235949,296028,0.0,69.734470
119972,220168,296042,0.0,69.251625


In [50]:
df = df.sort_values(by=['userID'])
#display(df.iloc[0:20,:])
display(df)

Unnamed: 0,userID,itemID,rating,prediction
83952,199810,242681,0.0,51.552810
91442,199810,18515,0.0,63.371460
30962,199810,9903,0.0,58.694900
97116,199810,74139,0.0,40.278970
40757,199810,105760,0.0,60.111965
...,...,...,...,...
70678,249010,110470,0.0,56.495970
86206,249010,262811,0.0,68.190000
98157,249010,86104,0.0,101.365875
37209,249010,72192,0.0,107.374115


In [57]:
test = df.iloc[:119970,:]
test


Unnamed: 0,userID,itemID,rating,prediction
83952,199810,242681,0.0,51.552810
91442,199810,18515,0.0,63.371460
30962,199810,9903,0.0,58.694900
97116,199810,74139,0.0,40.278970
40757,199810,105760,0.0,60.111965
...,...,...,...,...
43028,249008,127497,0.0,41.112793
30437,249008,4967,0.0,5.049868
76418,249008,165413,0.0,94.791820
59508,249010,293818,0.0,130.840970


In [58]:
a = 0
newdf = pd.DataFrame([])
print(len(test))
for i in range(len(test)):
    #print(a)
    x = test.iloc[a:a+6,:]
    x = x.sort_values(by=['prediction'])
    x['rating'] = [0,0,0,1,1,1]
    a = a+6
    #print(a)
    #print(i)
    #isplay(x)
    newdf = pd.concat([newdf,x])
    newdf = newdf.dropna()
    #isplay(newdf)

newdf["userID"] = newdf["userID"].round().astype(int)
newdf["itemID"] = newdf["itemID"].round().astype(int)
display(newdf)
    




119970


Unnamed: 0,userID,itemID,rating,prediction
97116,199810,74139,0,40.278970
83952,199810,242681,0,51.552810
110786,199810,208019,0,51.897552
30962,199810,9903,1,58.694900
40757,199810,105760,1,60.111965
...,...,...,...,...
43028,249008,127497,0,41.112793
70678,249010,110470,0,56.495970
50196,249008,197975,1,83.366890
76418,249008,165413,1,94.791820


In [59]:
newdf.to_csv('mypredz2.csv')