In [12]:
from pyspark.sql import SQLContext
from pyspark.mllib.recommendation import ALS
import os
import pandas as pd
import numpy as np

In [3]:
def predictions_ALS(train, test, **kwargs):
    """
    ALS with PySpark.
    Compute the predictions on a test_set after training on a train_set using the method ALS from PySpark.
    Args:
        train (pandas.DataFrame): train set
        test (pandas.DataFrame): test set
        **kwargs: Arbitrary keyword arguments. Passed to ALS.train() (Except for the spark_context)
            spark_context (SparkContext): SparkContext passed from the main program. (Useful when using Jupyter)
            rank (int): Rank of the matrix for the ALS
            lambda (float): Regularization parameter for the ALS
            iterations (int): Number of iterations for the ALS
            nonnegative (bool): Boolean to allow negative values or not.
    Returns:
        pandas.DataFrame: predictions, sorted by (Movie, User)
    """

    # Delete folders that causes troubles
    os.system('rm -rf metastore_db')
    os.system('rm -rf __pycache__')

    # Extract Spark Context from the kwargs
    spark_context = kwargs.pop('spark_context')

    # Convert pd.DataFrame to Spark.rdd
    sqlContext = SQLContext(spark_context)

    train_sql = sqlContext.createDataFrame(train).rdd
    test_sql = sqlContext.createDataFrame(test).rdd

    # Train the model
    model = ALS.train(train_sql, **kwargs)

    # Get the predictions
    data_for_predictions = test_sql.map(lambda x: (x[0], x[1]))
    predictions = model.predictAll(data_for_predictions).map(lambda r: ((r[0], r[1]), r[2]))

    # Convert Spark.rdd to pd.DataFrame
    df = predictions.toDF().toPandas()

    # Post processing database
    df['User'] = df['_1'].apply(lambda x: x['_1'])
    df['Movie'] = df['_1'].apply(lambda x: x['_2'])
    df['Rating'] = df['_2']
    df = df.drop(['_1', '_2'], axis=1)
    df = df.sort_values(by=['Movie', 'User'])
    df.index = range(len(df))

    return df

In [4]:
from pyspark import SparkContext, SparkConf

In [7]:
conf = (SparkConf()
            .setMaster("local")
            .setAppName("My app")
            .set("spark.executor.memory", "1g")
            )
sc.stop()
sc = SparkContext(conf=conf)


In [8]:
 sc.setLogLevel("ERROR")

In [13]:
df = pd.read_csv("data_train.csv")
df_test = pd.read_csv("data_test.csv")


# In[24]:


df['User'] = df['Id'].apply(lambda x: int(x.split('_')[0][1:]))
df['Movie'] = df['Id'].apply(lambda x: int(x.split('_')[1][1:]))
df['Rating'] = df['Prediction']
df = df.drop(['Id', 'Prediction'], axis=1)


# In[25]:


df_test['User'] = df_test['Id'].apply(lambda x: int(x.split('_')[0][1:]))
df_test['Movie'] = df_test['Id'].apply(lambda x: int(x.split('_')[1][1:]))
df_test['Rating'] = df_test['Prediction']
df_test = df_test.drop(['Id', 'Prediction'], axis=1)


# In[26]:


train_file = 'tmp_train.csv'
header = ['user_id','item_id','rating']
df.to_csv(train_file, index=False, header=False)
ratings_df = pd.read_csv('tmp_train.csv',
                          sep=',',
                         names=header,
                         
                         dtype={
                           'user_id': np.int32,
                           'item_id': np.int32,
                           'rating': np.float32,
                           'timestamp': np.int32,
                         })


# In[27]:


test_file = 'tmp_test.csv'
header = ['user_id','item_id','rating']
df_test.to_csv(test_file, index=False, header=False)
ratings_df_test = pd.read_csv('tmp_test.csv',
                          sep=',',
                         names=header,
                         
                         dtype={
                           'user_id': np.int32,
                           'item_id': np.int32,
                           'rating': np.float32,
                           'timestamp': np.int32,
                         })


In [15]:
pred_als = predictions_ALS(df, df_test, spark_context=sc, rank=8, lambda_=0.081, iterations=24)

In [16]:
pred_als

Unnamed: 0,User,Movie,Rating
0,37,1,3.189018
1,73,1,3.030078
2,156,1,3.677778
3,160,1,3.249707
4,248,1,3.486249
5,256,1,3.336539
6,284,1,2.984229
7,400,1,3.384151
8,416,1,3.587439
9,456,1,3.387924


In [20]:
from implementations import *

In [23]:
pred_als.g

KeyError: 1

In [None]:

submission = submission_table(pred_als, 'User', 'Movie', 'Rating')