In [1]:
# Import Spark library for python
import findspark
findspark.init("/usr/local/spark")
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL basic example").getOrCreate()

In [2]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.sql import Row

In [18]:
df2 = spark.read.csv("code/output-1.csv", header=True, inferSchema=True).limit(1000000)

In [19]:
df2.show()


+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    110|   1.0|1425941529|
|     1|    147|   4.5|1425942435|
|     1|    858|   5.0|1425941523|
|     1|   1221|   5.0|1425941546|
|     1|   1246|   5.0|1425941556|
|     1|   1968|   4.0|1425942148|
|     1|   2762|   4.5|1425941300|
|     1|   2918|   5.0|1425941593|
|     1|   2959|   4.0|1425941601|
|     1|   4226|   4.0|1425942228|
|     1|   4878|   5.0|1425941434|
|     1|   5577|   5.0|1425941397|
|     1|  33794|   4.0|1425942005|
|     1|  54503|   3.5|1425941313|
|     1|  58559|   4.0|1425942007|
|     1|  59315|   5.0|1425941502|
|     1|  68358|   5.0|1425941464|
|     1|  69844|   5.0|1425942139|
|     1|  73017|   5.0|1425942699|
|     1|  81834|   5.0|1425942133|
+------+-------+------+----------+
only showing top 20 rows



In [20]:
df2.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [8]:
(training, test) = df2.randomSplit([0.8, 0.2])

In [9]:
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop", nonnegative = True)
model = als.fit(training)

IllegalArgumentException: 'requirement failed: Column userId must be of type numeric but was actually of type string.'

In [7]:
param_grid = ParamGridBuilder().addGrid(als.rank, [11,12,13]).addGrid(als.maxIter,[19,20,21]).addGrid(als.regParam,[.16,.17,.18]).build()

In [8]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")


In [9]:
# tvs = TrainValidationSplit(
#         estimator=als,
#         estimatorParamMaps=param_grid,
#         evaluator=evaluator)

In [10]:
# model=tvs.fit(training)

In [11]:
best_model = model.bestModel

In [12]:
predictions = best_model.transform(test)
rmse=evaluator.evaluate(predictions)

In [13]:
userRecs = best_model.recommendForAllUsers(5)

In [14]:
print("RMSE = "+str(rmse))
print("Best Model --")
print("Rank : ", best_model.rank)
print("MaxIter : ", best_model._java_obj.parent().getMaxIter())
print("RegParam : ", best_model._java_obj.parent().getRegParam())

RMSE = 0.8426955129927696
Best Model --
Rank :  13
MaxIter :  21
RegParam :  0.16


In [15]:
display(predictions.sort("userId", "rating"))

DataFrame[userId: int, movieId: int, rating: double, timestamp: int, prediction: float]

In [16]:
import pandas

In [23]:
halo = predictions.toPandas()

In [25]:
halo2 = halo.to_json()
print (halo2)

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [18]:
userRecs.createOrReplaceTempView("movies")

In [19]:
userRecs.printSchema()

root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [20]:
query=spark.sql("SELECT * FROM movies where userId = 580")

In [21]:
query.show(truncate=False)

+------+---------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                    |
+------+---------------------------------------------------------------------------------------------------+
|580   |[[96631, 5.7149363], [91582, 5.432895], [25886, 5.424262], [171603, 5.4198103], [44851, 5.4090385]]|
+------+---------------------------------------------------------------------------------------------------+



In [22]:
type(query)

pyspark.sql.dataframe.DataFrame