In [48]:
import sys
import pyspark
import json
import time
# sys.path.append('/home/jovyan/Recommenders/')
import pickle
import pandas as pd

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession


# from reco_utils.common.spark_utils import start_or_get_spark
# from reco_utils.dataset.spark_splitters import spark_random_split
from spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation


print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))

System version: 3.7.1 | packaged by conda-forge | (default, Mar 13 2019, 12:57:14) 
[GCC 7.3.0]
Spark version: 2.4.0


In [49]:
def start_or_get_spark(app_name="Sample", url="local[*]", memory="10G"):
    spark = (
        SparkSession.builder.appName(app_name)
        .master(url)
        .config("spark.driver.memory", memory)
        .getOrCreate()
    )

    return spark



def spark_random_split(data, ratio=0.75, seed=42):
    multi_split, ratio = process_split_ratio(ratio)

    if multi_split:
        return data.randomSplit(ratio, seed=seed)
    else:
        return data.randomSplit([ratio, 1 - ratio], seed=seed)
    


def process_split_ratio(ratio):
    if isinstance(ratio, float):
        if ratio <= 0 or ratio >= 1:
            raise ValueError("Split ratio has to be between 0 and 1")

        multi = False
    elif isinstance(ratio, list):
        if any([x <= 0 for x in ratio]):
            raise ValueError(
                "All split ratios in the ratio list should be larger than 0."
            )

        # normalize split ratios if they are not summed to 1
        if sum(ratio) != 1.0:
            ratio = [x / sum(ratio) for x in ratio]

        multi = True
    else:
        raise TypeError("Split ratio should be either float or a list of floats.")

    return multi, ratio

In [50]:
!which python

/opt/conda/bin/python


In [51]:
def prepro():
    """
    Encode amazon data into rating
    """
    user_dict={}
    prod_dict={}
    prod_decode_dict={}
    userid_counter=1
    prodid_counter=1

    MAX_ROW=200000
    count=0
    with open('review_rating','w') as out_fh:
        with open('video_games/reviews_Video_Games_5.json','rb') as in_f:
            for line in in_f:
                count+=1
                if count>MAX_ROW:
                    break

                r=json.loads(line)
                if r['reviewerID'] in user_dict:
                    userid=user_dict[r['reviewerID']]
                else:
                    user_dict[r['reviewerID']]=userid_counter
                    userid=userid_counter
                    userid_counter+=1

                if r['asin'] in prod_dict:
                    prodid=prod_dict[r['asin']]
                else:
                    prod_dict[r['asin']]=prodid_counter
                    prod_decode_dict[prodid_counter]=r['asin']
                    prodid=prodid_counter
                    prodid_counter+=1

                out="%s,%s,%s\n"%(userid,prodid,r['overall'])
                out_fh.write(out)

    print(userid_counter)
    print(prodid_counter)
    print(len(prod_decode_dict))
    with open('prodid_to_asin_decoder.pickle','wb') as f:
        pickle.dump(prod_decode_dict,f)
        
# prepro()

In [52]:
df=pd.read_csv('review_rating', names=["UserId","ProdId","Rating"])
df_count=df.groupby(by="UserId").count()['Rating']
user_20_idx = df_count[df_count>20].index
df.set_index('UserId').loc[user_20_idx].reset_index().to_csv('review_rating_20',header=None)

In [53]:
!wc -l review_rating

200000 review_rating


In [None]:
def do_ALS(TOP_K,RANK,REGULARIZATION):
    print("param: \t%d,%d,%f"%(TOP_K,RANK,REGULARIZATION))
    spark = start_or_get_spark("ALS PySpark", memory="25g")
#     spark.setLogLevel("ERROR")

    data_rdd=spark.sparkContext.textFile('review_rating'
                                        ).map(lambda l: l.split(',')
                                        ).map(lambda v: [int(v[0]),int(v[1]),float(v[2])]
                                        ).sample(True, 0.01
                                        ).collect(
                                        )
    schema=StructType((
            StructField("UserId", IntegerType()),
            StructField("ProdId", IntegerType()),
            StructField("Rating", FloatType()),
    ))
    data=spark.createDataFrame(data_rdd, schema)
    # data.show()


    # train test data split
    train, test = spark_random_split(data, ratio=0.90, seed=123)
#     train.toPandas().to_csv('instant_video_train.csv',index=None,header=None)
#     test.toPandas().to_csv('instant_video_test.csv',index=None,header=None)
#     print ("N train", train.cache().count())
#     print ("N test", test.cache().count())


    header = {
        "userCol": "UserId",
        "itemCol": "ProdId",
        "ratingCol": "Rating",
    }






    # Model training
    als = ALS(
        rank=RANK,
        maxIter=15,
        implicitPrefs=False,
        alpha=0.1,
        regParam=REGULARIZATION,
        coldStartStrategy='drop',
        nonnegative=False,
        seed=42,
        **header
    )

    #model training
    start_time = time.time()
    model = als.fit(train)





    # Get the cross join of all user-item pairs and score them.
    users = train.select('UserId').distinct()
    items = train.select('ProdId').distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)






    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred['UserId'] == train['UserId']) & (dfs_pred['ProdId'] == train['ProdId']),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train.Rating"].isNull()) \
        .select('pred.' + 'UserId', 'pred.' + 'ProdId', 'pred.' + "prediction")

    top_all_train = dfs_pred_exclude_train \
        .select('pred.' + 'UserId', 'pred.' + 'ProdId', 'pred.' + "prediction")

    # In Spark, transformations are lazy evaluation
    # Use an action to force execute and measure the test time 
    top_all.cache().count()



    
    
    # Train prediction
    prediction0 = model.transform(train)
    rating_eval0 = SparkRatingEvaluation(train, prediction0, col_user="UserId", col_item="ProdId", 
                                        col_rating="Rating", col_prediction="prediction")

    print("train RMSE:\t%f" % rating_eval0.rmse(),sep='\n')


    rank_eval = SparkRankingEvaluation(train, top_all_train, k = TOP_K, col_user="UserId", col_item="ProdId", 
                                        col_rating="Rating", col_prediction="prediction", 
                                        relevancy_method="top_k")
    print("train Prec@K:\t%f" % rank_eval.precision_at_k(),
          "train Reca@K:\t%f" % rank_eval.recall_at_k(), sep='\n')





    #TEST
    rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user="UserId", col_item="ProdId", 
                                        col_rating="Rating", col_prediction="prediction", 
                                        relevancy_method="top_k")

    print("test Prec@K:\t%f" % rank_eval.precision_at_k(),
          "test Reca@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

    # Generate predicted ratings.
    prediction = model.transform(test)

    #model loss on testing data
    rating_eval = SparkRatingEvaluation(test, prediction, col_user="UserId", col_item="ProdId", 
                                        col_rating="Rating", col_prediction="prediction")
    print("test RMSE:\t%f" % rating_eval.rmse(),sep='\n')
    
    print("Took(sec) \t{}".format(time.time() - start_time),sep='\n')

    spark.stop() 
    return
    
    
    
# TOP_K = 30
# RANK=1
# REGULARIZATION=0.1
print("===========================")
do_ALS(30,1,0.1)
# do_ALS(30,10,0.1)
# do_ALS(30,20,0.1)
# do_ALS(30,50,0.1)
# do_ALS(30,100,0.1)
# do_ALS(30,200,0.1)
# do_ALS(30,50,0.01)
# do_ALS(30,50,0.05)
# do_ALS(30,50,0.1)
# do_ALS(30,50,0.2)
# do_ALS(30,50,0.3)
print("===========================")


param: 	30,1,0.100000
