## Recommender Class
    Build api for recommendation system
    
    This notebook is still in progress...

In [1]:
import numpy as np
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import cPickle
from time import time
from sklearn.utils import shuffle
from collections import defaultdict, Counter
import pyspark

In [2]:
from  pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel
import math

In [7]:
class wine_recommender(object):
    def __init__(self, home_path, ratings_path, products_path, rec_results_path, model_path):
        
        self.seed = 5L
        self.iterations = 30
        self.regularization_parameter = 0.1
        self.rank = 20
        # number of varietals to include in curated recommendations for users
        self.n_varietals = 3
        # threshold value validated in Spark_Recommendation_Model_Validation notebook
        self.threshold = 9
        self.home = home_path
        self.ratings_path =  ratings_path
        self.products_path = products_path
        self.rec_results_path = rec_results_path
        self.save_model_path = model_path
        
        self.data = None
        self.unique_user_tags = None
        self.products_df = None
        self.products_rdd = None
        self.index_to_int = None
        self.cust_tag_bridge = None
        self.data_rdd = None
        self.tag_data_bridge_rdd = None
        self.clean_data_rdd = None
        self.sc = None
        self.training_RDD = None
        self.movie_ids = None
        self.unpurchased_wines_rdd = None
        self.unpurchased_wines = None
        self.user_recs = None
        self.validated_user_recs = None
        self.wineID_rating_userHash = None
        self.user_recs_dicts = None
        self.varietals  = None
        self.var_count = None
        self.top_varietals = None
        self.top_varietal_recs = None
        self.final_recs = None
        self.red_white_recs_dict = None
        self.user_ids = None
        self.user_id = None

    
    def get_ratings_data(self):
        self.data = cPickle.load(open(self.ratings_path, 'r'))

    def create_cust_tag_bridge_rdd(self):
        '''Create user tags/user ids bride rdd, 
           create int:cust_tag key value pairs,
           spark can't read string user ids'''

        self.unique_user_tags = np.unique([row[0] for row in self.data])
        self.index_to_int = np.arange(0, len(self.unique_user_tags) * 100, 100)
        self.cust_tag_bridge = [ (tag_hash, tag_int) \
                                for tag_hash, tag_int in zip(self.unique_user_tags, self.index_to_int)]

        self.cust_tag_bridge_rdd = self.sc.parallelize(self.cust_tag_bridge)

    def create_products_rdd(self):
        '''Creates products_rdd
        Input: products_df, pandas dataframe
        Output: products_rdd, spark rdd'''
        # create products_rdd
        self.products_rdd = self.sc.parallelize(self.products_df.values.tolist())

        # format --> (productKey, (productID, Appellation, Varietal, Vinyard) )
        self.products_rdd = self.products_rdd.map(lambda row: \
                                                  (row[0], (row[1], row[2], row[3], row[4], row[5]) ))

    def create_clean_data_rdd(self):
        '''Loads ratings from master file and formats data into model readable form.
           data --> (user_id, productKey, rating)'''
        
        # load raw data
        print "load ratings data..."
        self.get_ratings_data()
        # assigne each user's hash tag a corresponding user_id
        print "create customer hashtag/id rdd bridge..."
        self.create_cust_tag_bridge_rdd()
        # model readable format
        print "create_clean_data_rdd..."
        self.data_rdd = self.sc.parallelize(self.data)

        self.tag_data_bridge_rdd = self.data_rdd.map(lambda row: (row[0], (row[1], row[2]) ))

        self.clean_data_rdd = \
        self.tag_data_bridge_rdd.sortByKey()\
                            .join( self.cust_tag_bridge_rdd.sortByKey())\
                            .map(lambda row: ( row[1][1], row[1][0][0], row[1][0][1]))
                
        # cache rdd since rdd is called on a lot
        #print "cache clean_data_rdd..."
        #self.clean_data_rdd = self.clean_data_rdd.cache()

    def create_spark_context(self, n_cups = 3, local = True, remote_cluster_path=None):
        # number of nodes in local spark cluster
        n_worker_cups = n_cups
        if local:
            print "Create spark context for local cluster..."
            self.sc = pyspark.SparkContext(master = "local[{}]".format(n_worker_cups))
        elif local == False:
            print "Create spark context for remote cluster..."
            self.sc = pyspark.SparkContext(master = remote_cluster_path)
        else:
            print "ERROR: local is set to False, however remote_cluster_path is not specified!"
            
    def stop_spark_context(self):
        print "stoping spark context..."
        self.sc.stop()

    def get_clean_data_rdd(self):
        '''Loads ratings from master file and formats data into model readable form.
           data --> (user_id, productKey, rating)'''
        # load data
        print "load ratings data..."
        self.get_ratings_data()
        # assigne each user hash tag a user_id
        print "create customer hashtag/id rdd bridge..."
        self.create_cust_tag_bridge_rdd()
        # model readable format
        print "create_clean_data_rdd..."
        self.create_clean_data_rdd() 

    def train_model(self, save_model_to_file=True, unpersist_clean_data_rdd=True):
        

        print "Training Model..."
        start = time()
        self.model = ALS.train(self.clean_data_rdd, 
                            rank=self.rank, 
                            seed=self.seed, 
                            iterations=self.iterations,
                            lambda_=self.regularization_parameter,
                            nonnegative=True)
        end = time()
        print "Training Model: Time Elapsed = {:.3} \n".format(end - start)
        
        
        # PUT A TRY/EXCEPT STATEMENT HERE FOR
        # sanity check that error is thrown is an attempt is made to override the file
        if save_model_to_file==True:
            # Save model
            print "saving model to path: {}".format(self.save_model_path)
            self.model.save(self.sc ,self.save_model_path)
        
        
        # DO NOT UNPERSIST - rdd is need later
        # free memory resources
#         if unpersist_clean_data_rdd:
#             print "unpersisting clean_data_rdd..."
#             self.clean_data_rdd.unpersist()

    def load_model(self):
        '''Load trained model that has been saved to file. 
           It is more efficient to train a model once, then make predictions.'''
        # load model
        self.model = MatrixFactorizationModel.load(self.sc, self.save_model_path)


    # accept a user_id for DEBUGGING
    def get_userID_moiveID_pairs(self, user_id):
        '''In order to get recommendations for a user, we need to build an RDD with (user_id, wine_id)
           pairs for wines that the user has not previously purchased.'''
        
        #self.user_id = user_id
        # ( user_id, movie_id, rating  )
        # get user_id's  movie ids in a list
        movie_ids = self.clean_data_rdd.filter(lambda row: row[0] == user_id )\
                                  .map(lambda row: row[1]).collect()

        # get wine_ids that user_id has not purchased 
        self.unpurchased_wines_rdd = self.clean_data_rdd.filter(lambda row: row[0] != user_id )\
                                          .filter(lambda row: row[2] not in  movie_ids)\
                                          .map(lambda row: (user_id, row[1] ) ).distinct()


    def get_user_recommendations(self):
        self.user_recs = self.model.predictAll(self.unpurchased_wines_rdd)
# -------
    def format_user_recs(self):
        '''Reformat user recommendations so it's human readable and in preperation for curation.
           This function swaps the user_id back to the original user hash tag, and attachs the wine
           features (i.e. productID, appellation, varieatl, ...) '''


        self.validated_user_recs = self.user_recs.filter(lambda row: row[2] >= self.threshold )

        # format --> (product key, predicted rating, user hash tag)
        self.wineID_rating_userHash = \
        self.validated_user_recs.map(lambda row:  (row[0], (row[1], row[2]) )  )\
                           .join(self.cust_tag_bridge_rdd\
                           .map(lambda row: (row[1], row[0]) ))\
                           .map(lambda row: (row[1][0][0],
                                            (row[1][0][1],
                                             row[1][1] ) ))  

        self.products_df = pd.read_pickle(self.products_path)                  
        create_products_rdd(self)
        # Key:Value pair RDD
        # format --> (custumer tag, (productKey , productID, Appellation, Varietal, Vineyard, wine type, Rating ))  
        self.clean_user_recs_rdd = \
        self.wineID_rating_userHash.join(self.products_rdd)\
                              .map(lambda row: ( row[1][0][1], 
                                                 (row[0], 
                                                  row[1][1][0], 
                                                  row[1][1][1], 
                                                  row[1][1][2], 
                                                  row[1][1][3],
                                                  row[1][1][4],
                                                  row[1][0][0])))
            
        # keep return ?
        self.clean_user_recs_rdd

    def curate_top_wines(self):
        self.final_recs = defaultdict(list)
        for var in self.top_varietals:
            var_cnt = 1
            for row in self.top_varietal_recs:
                if row[1][3] == var:
                    if var_cnt <= 3:
                        var_cnt += 1
                        #final_recs.append((row[0], row[1][:-1]))
                        self.final_recs[row[0]].append(row[1][:-1])
        return self.final_recs
    
    
    def save_rec_results_to_file(self):
        '''Results will be appended to file. If you want old results deleted from the file,
           you will have to do it manually from outside this class.'''
        print "saving final_recs to file..."
        print "Results will be appended to file. If you want old results deleted from the file,"+\
              "you will have to do it manually from outside this class"
        # save recommendation results to file
        cPickle.dump(self.final_recs, open(self.rec_results_path, 'a'))

    def get_top_rec_varietals(self):
        '''Returns the top 3 wines from the top 3 varietals for user'''

        # { custumer tag : (productKey , productID, Appellation, Varietal, Vineyard, wine type, Rating  ) }
        self.user_recs_dicts = self.clean_user_recs_rdd.collect()
        self.varietals = [row[1][3] for row in self.user_recs_dicts]

        self.var_count = Counter(self.varietals)

        # get top 3 most recommender varietals for this user
        self.top_varietals =  [row[0] for row in self.var_count.most_common()[0:self.n_varietals]] 

        self.top_varietal_recs = self.clean_user_recs_rdd.filter(lambda row: \
                                                                 row[1][3] in  self.top_varietals ).collect()

        return curate_top_wines(self)
    
    def get_top_reds_and_whites(self):
        '''Returns top rated wines, 5 red and 5 white for user'''

        # { custumer tag : (productKey , productID, Appellation, Varietal, Vineyard, wine type, Rating  ) }
        self.user_recs_dicts = self.clean_user_recs_rdd.collect()
        self.red_white_recs_dict = defaultdict(list)
        
        white_cnt = 1
        red_cnt = 1
        for rec in self.user_recs_dicts:
            if rec[1][5] == "White Wines":
                if white_cnt <= 5:
                    self.red_white_recs_dict[rec[0]].append(rec[1])
                    white_cnt += 1
            else:
                if red_cnt <= 5:
                    self.red_white_recs_dict[rec[0]].append(rec[1])
                    red_cnt += 1

        return self.red_white_recs_dict

    def get_user_ids_for_recommendations(self):
        '''This function returns user ids from the cust_tag_bridge_rdd. 
           '''
        # NOTE: results are inside of a list !!!!!!
        self.user_ids = self.cust_tag_bridge_rdd.map(lambda row: row[1]).collect()

    def check_top_varietal_wine_count(self):
        '''Checks if top variatls have at lease 3 wines'''
        cnt = 0
        for row in self.top_varietals:
            if row[1] >= 3:
                cnt += 1
        return cnt

    
    

In [8]:
# paths to data
home_path = "/Users/Alexander/Wine_Recommender/data/"
model_path = "/Users/Alexander/Wine_Recommender/models/spark_recommender"
ratings_path = home_path + "spark_ready_data.pkl"
products_path = home_path + "wine_products.pkl"
rec_results_path = home_path + "user_rec_results.pkl"

In [9]:
# initialize recommender class and pass in data paths
WRS = wine_recommender(home_path, ratings_path, products_path, rec_results_path, model_path)

In [10]:
# create sparkContext 
WRS.create_spark_context()

Create spark context for local cluster...


In [11]:
# clean data for model
WRS.create_clean_data_rdd()

load ratings data...
create customer hashtag/id rdd bridge...
create_clean_data_rdd...


In [12]:
WRS.load_model()

In [13]:
WRS.get_user_ids_for_recommendations()

In [14]:
len(WRS.user_ids)

165679

In [15]:
# seems to have worked -- check !!
user_id_test = 0 
WRS.get_userID_moiveID_pairs(user_id_test)

In [16]:
WRS.get_user_recommendations()

In [17]:
WRS.format_user_recs()

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

In [29]:
result_two = \
WRS.validated_user_recs.map(lambda row:  (row[0], (row[1], row[2]) )  )

In [33]:
cust_tag_test = WRS.cust_tag_bridge_rdd

In [None]:
cust_tag_test.

In [34]:
result_one = result_two.join(WRS.cust_tag_bridge_rdd)

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

In [25]:
     
result_three = result_one \
                   .map(lambda row: (row[1], row[0]) ))\
                   .map(lambda row: (row[1][0][0],
                                    (row[1][0][1],
                                     row[1][1] ) ))  

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

In [13]:
WRS.stop_spark_context()

stoping spark context...
