In [15]:
from pyspark.sql import SQLContext
from pyspark.mllib.recommendation import ALS
import os
import pandas as pd
import numpy as np

In [16]:
def predictions_ALS(train, test, **kwargs):
    """
    ALS with PySpark.
    Compute the predictions on a test_set after training on a train_set using the method ALS from PySpark.
    Args:
        train (pandas.DataFrame): train set
        test (pandas.DataFrame): test set
        **kwargs: Arbitrary keyword arguments. Passed to ALS.train() (Except for the spark_context)
            spark_context (SparkContext): SparkContext passed from the main program. (Useful when using Jupyter)
            rank (int): Rank of the matrix for the ALS
            lambda (float): Regularization parameter for the ALS
            iterations (int): Number of iterations for the ALS
            nonnegative (bool): Boolean to allow negative values or not.
    Returns:
        pandas.DataFrame: predictions, sorted by (Movie, User)
    """

    # Delete folders that causes troubles
    os.system('rm -rf metastore_db')
    os.system('rm -rf __pycache__')

    # Extract Spark Context from the kwargs
    spark_context = kwargs.pop('spark_context')

    # Convert pd.DataFrame to Spark.rdd
    sqlContext = SQLContext(spark_context)

    train_sql = sqlContext.createDataFrame(train).rdd
    test_sql = sqlContext.createDataFrame(test).rdd

    # Train the model
    model = ALS.train(train_sql, **kwargs)

    # Get the predictions
    data_for_predictions = test_sql.map(lambda x: (x[0], x[1]))
    predictions = model.predictAll(data_for_predictions).map(lambda r: ((r[0], r[1]), r[2]))

    # Convert Spark.rdd to pd.DataFrame
    df = predictions.toDF().toPandas()

    # Post processing database
    df['User'] = df['_1'].apply(lambda x: x['_1'])
    df['Movie'] = df['_1'].apply(lambda x: x['_2'])
    df['Rating'] = df['_2']
    df = df.drop(['_1', '_2'], axis=1)
    df = df.sort_values(by=['Movie', 'User'])
    df.index = range(len(df))

    return df

In [17]:
from pyspark import SparkContext, SparkConf

In [18]:
conf = (SparkConf()
            .setMaster("local")
            .setAppName("My app")
            .set("spark.executor.memory", "1g")
            )
sc.stop()
sc = SparkContext(conf=conf)


ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:64655)

In [5]:
 sc.setLogLevel("ERROR")

In [6]:
df = pd.read_csv("data_train.csv")
df_test = pd.read_csv("data_test.csv")


# In[24]:


df['User'] = df['Id'].apply(lambda x: int(x.split('_')[0][1:]))
df['Movie'] = df['Id'].apply(lambda x: int(x.split('_')[1][1:]))
df['Rating'] = df['Prediction']
df = df.drop(['Id', 'Prediction'], axis=1)


# In[25]:


df_test['User'] = df_test['Id'].apply(lambda x: int(x.split('_')[0][1:]))
df_test['Movie'] = df_test['Id'].apply(lambda x: int(x.split('_')[1][1:]))
df_test['Rating'] = df_test['Prediction']
df_test = df_test.drop(['Id', 'Prediction'], axis=1)


# In[26]:


In [None]:
pred_als = predictions_ALS(df, df_test, spark_context=sc, rank=8, lambda_=0.081, iterations=24)

In [None]:
from implementations import *

In [None]:

submission = submission_table(pred_als, 'User', 'Movie', 'Rating')

In [None]:
pred = list(submission['Prediction'])
    
for i in range(len(pred)):
    if pred[i] > 5:
        pred[i] = 5
    elif pred[i] < 1:
        pred[i] = 1
preds = [round(x) for x in pred]
submission['Prediction'] = preds

In [None]:
submission

In [None]:
submission['Prediction'] = round(submission['Prediction']).astype(int) 

In [None]:
submission.loc[submission['Prediction']>5] = 5

In [None]:
submission.loc[submission['Prediction']<1] = 1

In [None]:
submission['Prediction'] = submission['Prediction'] .astype(int)

In [None]:
file_name = 'prediction_als.csv'
submission.to_csv(file_name, index=False)

In [None]:
df_sub = pd.read_csv("prediction_als.csv")

In [7]:
# Import the requisite items
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


als = ALS(userCol="User", itemCol="Movie", ratingCol="Rating", nonnegative = True, implicitPrefs = False)
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder().addGrid(als.rank, [10, 50, 100, 150]).addGrid(als.maxIter, [5, 50, 100, 200]).addGrid(als.regParam, [.01, .05, .1, .15]).build()
           
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 

In [8]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator="evaluator", numFolds=5)

# Confirm cv was built
print (cv)


CrossValidator_fc3cc6dbd3f9


In [9]:
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(df)

In [11]:
(train, test) = sdf.randomSplit([0.8, 0.2], seed = 1234)

In [13]:
model = cv.fit(train)

AttributeError: 'str' object has no attribute 'evaluate'

----------------------------------------
Exception happened during processing of request fromERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving 
('127.0.0.1', 6469

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.de

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.de

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.de

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.de

ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.de

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.de

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.de

In [14]:
train

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:64655)
Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/erdembocugoz/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:64655)

DataFrame[User: bigint, Movie: bigint, Rating: bigint]

In [None]:
[0::10]