In [137]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import udf, log
from pyspark.sql.types import StringType
from pyspark import SparkConf, SparkContext, sql

import pandas as pd
from sqlalchemy import create_engine

import struct
import binascii


In [138]:
spark = SparkSession.builder.appName('ops').getOrCreate()

In [6]:
!curl http://files.grouplens.org/datasets/movielens/ml-1m.zip --output ml-1m.zip

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 5778k  100 5778k    0     0   842k      0  0:00:06  0:00:06 --:--:-- 1133k


In [7]:
!unzip -o ml-1m.zip 

Archive:  ml-1m.zip
  inflating: ml-1m/movies.dat        
  inflating: ml-1m/ratings.dat       
  inflating: ml-1m/README            
  inflating: ml-1m/users.dat         


In [162]:
movies = pd.read_csv('ml-1m/movies.dat', sep="::", header=None, names=['MovieID','Title','Genres'], engine='python')
users = pd.read_csv('ml-1m/users.dat', sep="::", header=None, names=['UserID','Gender','Age','Occupation','Zip-code'], engine='python')

In [163]:
movies['year'] = movies['Title'].str.extract('\(([0-9]{4})\)', expand=False).str.strip()

In [142]:
lines = spark.read.text('ml-1m/ratings.dat').rdd
parts = lines.map(lambda row: row.value.split('::'))

ratingsRDD = parts.map(lambda p: Row(userID=int(p[0]),movieID=int(p[1]),rating=int(p[2])))
ratings = spark.createDataFrame(ratingsRDD)

(training, test) = ratings.randomSplit([0.8, 0.2])

In [143]:
ratings.show()

+-------+------+------+
|movieID|rating|userID|
+-------+------+------+
|   1193|     5|     1|
|    661|     3|     1|
|    914|     3|     1|
|   3408|     4|     1|
|   2355|     5|     1|
|   1197|     3|     1|
|   1287|     5|     1|
|   2804|     5|     1|
|    594|     4|     1|
|    919|     4|     1|
|    595|     5|     1|
|    938|     4|     1|
|   2398|     4|     1|
|   2918|     4|     1|
|   1035|     5|     1|
|   2791|     4|     1|
|   2687|     3|     1|
|   2018|     4|     1|
|   3105|     5|     1|
|   2797|     4|     1|
+-------+------+------+
only showing top 20 rows



In [144]:
als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="movieID", ratingCol="rating",
          coldStartStrategy="drop")

model = als.fit(training)

In [145]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8941641483795894


In [146]:
itemfactors = spark.createDataFrame(model.itemFactors.rdd)

In [147]:
userfactors = spark.createDataFrame(model.userFactors.rdd)

In [148]:
def vector2hex(vector):
    vectorStr = b"".join([struct.pack('f', elem) for elem in vector])
    return str(binascii.hexlify(vectorStr))[2:-1] # not to elegant but this works

udf_vector2hex = udf(vector2hex, StringType())

In [149]:
itemfactors_with_hex = itemfactors.withColumn("factors", udf_vector2hex("features"))
userfactors_with_hex = userfactors.withColumn("factors", udf_vector2hex("features"))

In [150]:
items_frame = itemfactors_with_hex.select('id','factors').toPandas().rename(columns={"id": "movie_id", "factors": "factors"})
users_frame = userfactors_with_hex.select('id','factors').toPandas().rename(columns={"id": "user_id", "factors": "factors"})

In [151]:
db_users = users.merge(users_frame, left_on='UserID', right_on='user_id').drop(columns=['user_id'])
db_users.rename(
    columns={'UserID':'id','Gender':'gender','Age':'age','Occupation':'occupation','Zip-code':'zip_code','factors':'factors'}, inplace=True)

In [152]:
db_movies = movies.merge(items_frame, left_on='MovieID', right_on='movie_id').drop(columns=['movie_id'])
db_movies.rename(
    columns={'MovieID':'id','Title':'title','year':'year','Genres':'genres','factors':'factors'}, inplace=True)

In [153]:
from memsql.common import database

HOST = "127.0.0.1"
PORT = 3306

In [154]:
def get_connection(host=None, port=None, db=None):
    """ Returns a new connection to the database. """
    if host is None:
        host = HOST
    if port is None:
        port = PORT

    return database.connect(
        host=host,
        port=port,
        user='root',
        password='',
        database=db)

In [156]:
with get_connection(db="information_schema") as conn:
    conn.query('USE data')
    conn.query('CREATE TABLE IF NOT EXISTS movies (id INT AUTO_INCREMENT PRIMARY KEY, title VARCHAR(255), year INT, genres VARCHAR(255), factors BINARY(80))')
    conn.query('CREATE TABLE IF NOT EXISTS users (id INT AUTO_INCREMENT PRIMARY KEY, gender VARCHAR(5), age INT, occupation INT, zip_code VARCHAR(255), factors BINARY(80))')

In [157]:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine

Base = declarative_base()

engine = create_engine('mysql://root@127.0.0.1')
engine.execute('USE data')
Base.metadata.create_all(engine)

db_movies.to_sql('movies', con=engine, if_exists='append', index=False)
db_users.to_sql('users', con=engine, if_exists='append', index=False)