In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import udf, log
from pyspark.sql.types import StringType
from pyspark import SparkConf, SparkContext, sql

import pandas as pd
from sqlalchemy import create_engine

import struct
import binascii


In [None]:
spark = SparkSession.builder.appName('ops').getOrCreate()

In [None]:
!curl http://files.grouplens.org/datasets/movielens/ml-25m.zip --output ml-25m.zip

In [None]:
!unzip -o ml-25m.zip 

In [None]:
movies = pd.read_csv('ml-25m/movies.csv',',', engine='python')

In [None]:
movies['year'] = movies['title'].str.extract('\(([0-9]{4})\)', expand=False).str.strip()

In [None]:
ratings = spark.read.options(inferSchema=True, header=True) \
     .csv('ml-25m/ratings.csv')

In [None]:
(training, test) = ratings.randomSplit([0.8, 0.2])

In [None]:
als = ALS(maxIter=10, regParam=0.05, rank=48, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

model = als.fit(training)

In [None]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

In [None]:
itemfactors = spark.createDataFrame(model.itemFactors.rdd)

In [None]:
userfactors = spark.createDataFrame(model.userFactors.rdd)

In [None]:
items_frame = itemfactors_with_hex.select('id','features').toPandas().rename(columns={"id": "movie_id", "features": "features"})
users_frame = userfactors_with_hex.select('id','features').toPandas().rename(columns={"id": "user_id", "features": "features"})

In [None]:
db_users = users.merge(users_frame, left_on='UserID', right_on='user_id').drop(columns=['user_id'])
db_users.rename(
    columns={'UserID':'id','Gender':'gender','Age':'age','Occupation':'occupation','Zip-code':'zip_code','features':'features'}, inplace=True)

In [None]:
db_movies = movies.merge(items_frame, left_on='MovieID', right_on='movie_id').drop(columns=['movie_id'])
db_movies.rename(
    columns={'MovieID':'id','Title':'title','year':'year','Genres':'genres','features':'features'}, inplace=True)

In [None]:
from elasticsearch import Elasticsearch
from elasticsearch import helpers

es_client = Elasticsearch(http_compress=True)

index_name = "movielens"
try:
    es_client.indices.delete(index=index_name)
except Exception as e:
    print(e)

In [None]:
index_body = {
      'settings': {
        'number_of_shards': 1,
        'number_of_replicas': 0,
        'analysis': {
          "filter":{  
            "english_stop":{
              "type":"stop",
              "stopwords":"_english_"
            },
            "english_stemmer":{
              "type":"stemmer",
              "language":"english"
            }
          },  
          "analyzer": {
            "stem_english": { 
              "type":"custom",
              "tokenizer":"standard",
              "filter":[
                "lowercase",
                "english_stop",
                "english_stemmer"
              ]
            }
        }
      }},
      'mappings': {
          'properties': {
            'title': {
                'type': 'text',
                'analyzer': 'standard', 
                'fields': {
                  'english': {
                    'type':     'text',
                    'analyzer': 'stem_english' 
                  }
                }
            },
            'year':  {'type': 'integer'},
            "profile_vector": {
              "type": "dense_vector",
              "dims": 48
            }
          }
      }
    }

es_client.indices.create(index=index_name,body=index_body)

In [None]:
import pickle
with open('es_data_48.pickle', 'rb') as f:
    es_data = pickle.load(f)

In [None]:
helpers.bulk(es_client, es_data) 