In [1]:
import numpy
import math
import time
import threading
from datetime import date, datetime
from numpy.linalg import norm
from pyspark import SparkContext, SparkConf
from pyspark.mllib.recommendation import ALS, Rating
from scipy.spatial.distance import chebyshev, cosine
from scipy.stats import entropy
import json





# =====================================================================================================================
# = ALS
# =====================================================================================================================


def getALSCache(conf, s3Data):

    hyperparams = conf['hyperparams']['als']
    curTime = int(time.time() * 1000)
    rangeInMilliseconds = hyperparams['minDays'] * 24 * 3600 * 1000

    def decay(ts):
        # 1 lineary going down to decayMin at minDays
        return max((1 - (1.0 - hyperparams['decayMin']) * float(curTime - int(ts)) / rangeInMilliseconds),
                   hyperparams['decayMin'])

    def createRating(row):
        viewed = map(lambda r: (r.productId, decay(r.updated)), row.products)
        #print row
        return [Rating(row.userId, val[0], val[1]) for val in viewed]

    ratings = s3Data['viewing']['rdd'].flatMap(lambda r: createRating(r))
    #for x in ratings.collect():
    #  print x
    alsModel = ALS.trainImplicit(ratings,
            hyperparams['rank'],
            hyperparams['numIterations'],
            alpha=hyperparams['alpha'],
            lambda_=hyperparams['lambda'])

    prodsDictReversedBroadcast = sc.broadcast(s3Data['prods']['dictReversed']) # { 0: 'guid-series-1', 1: 'guid-movie-2' }
    prodFeatures = alsModel.productFeatures().map(lambda r: (prodsDictReversedBroadcast.value[r[0]], list(r[1])))  # => ('guid-series-1, [1..rank])
    prodsDictReversedBroadcast.unpersist()


    # Inverse the unique id process, this is expensive - but no broadcasts
    joined = alsModel.userFeatures().leftOuterJoin(s3Data['users']['rdd'])
    userFeatures = joined.map(lambda r: (r[1][1], list(r[1][0])))

    return {
        'prods': prodFeatures,
        'users': userFeatures
    }

# =====================================================================================================================
# = Ranking
# ======================================================================================================================


def distance(mainVec, relVec, weights):
    mainVecAbs = map(abs, mainVec[1])
    relVecAbs = map(abs, relVec[1])
    distances = [
        weights['cosine'] * cosine(mainVec[1], relVec[1]),
        weights['eucl'] * math.tanh(numpy.linalg.norm(numpy.array(mainVec[1]) - numpy.array(relVec[1]))),
        weights['chebychev'] * math.tanh(chebyshev(mainVec[1], relVec[1])),
        weights['jsdistance'] * math.tanh(JensenShanonDistance(mainVecAbs, relVecAbs))
    ]
    return sum(distances) / float(len(distances))


def rank(mainVec, features=None, similarity_weights=None, available_products=None, limit=None):
    prods = features.value
    available_prods = available_products.value

    prods_to_rank = filter(lambda prod: prod[0] in available_prods, prods)
    rankedProds = sorted(prods_to_rank, key=lambda r: distance(mainVec, r, similarity_weights))

    ##for prod in rankedProds[:2]: print(prod[1])

    return [prod[0]+"_"+str(distance(mainVec, prod, similarity_weights)) for prod in rankedProds[:limit]]


# =====================================================================================================================
# = Saving
# =====================================================================================================================
def saveToS3(conf, rankedLists, s3_path=None, suffix='', folder = ''):

    print 'save partition'
    print folder
    rankedLists.repartition(1).saveAsTextFile('dbfs:/antonina/output/cf/'+folder + '/')
    
    
def markCompleted(s3Folder, conf=None):
    name_store = s3Folder + '/' + conf['successFile']
    print name_store
    #dbutils.fs.put(name_store, findReplaceDate('%Y-%m-%d'))
# =====================================================================================================================
# = Miscellaneous functions
# =====================================================================================================================


def is_sunday_today():
    return datetime.today().weekday() == 6


def findReplaceDate(string):
    return date.today().strftime(string)


def JensenShanonDistance(P, Q):
    """
    An implementation of Jensen-Shanon divergence, properties being symmetric and finite.
    """
    Pnonzero = [val if val > 0.001 else 0.00001 for val in P]  # Implementation caveat
    Qnonzero = [val if val > 0.001 else 0.00001 for val in Q]
    Pnorm = numpy.array(Pnonzero) / norm(numpy.array(Pnonzero), ord=1)
    Qnorm = numpy.array(Qnonzero) / norm(numpy.array(Qnonzero), ord=1)
    Mtmp = 0.5 * (Pnorm + Qnorm)
    M = numpy.array([val if val > 0.001 else 0.00001 for val in Mtmp])
    return math.sqrt(0.5 * (entropy(Pnorm, M) + entropy(Qnorm, M)))


#######################################
# Calculate ALS and Save data to S3
#######################################


def rank_and_save_als_data(conf, features=None, products_available=None, suffix=None, folder_name = ''):
    product_features_broadcast = sc.broadcast(features['prods'].collect())
    available_prods_broadcast = sc.broadcast(products_available)

    def rankProds(vec):
        products = rank(vec,
            features = product_features_broadcast,
            similarity_weights = conf['hyperparams']['similarityWeights'],
            available_products = available_prods_broadcast,
            limit = conf['numberRecommended'])
        return {'guid': vec[0], 'prods': products}

    def generate_similarities(features):
         product_similarities = features.filter(lambda prod: prod[0] in available_prods_broadcast.value).map(rankProds)
         saveToS3(conf, product_similarities, s3_path=conf['write']['prods'], folder=folder_name+"/similarities")
    def generate_recommendations(features):
        if conf['recommendForUsers'] and (is_sunday_today() or conf['forceUserRecommendations']):
            user_recommendations = features.map(rankProds)
            saveToS3(conf, user_recommendations, s3_path=conf['write']['users'], suffix='/' + findReplaceDate('%Y-%m-%d') + suffix, folder = folder_name+"/recommendations")

    generate_similarities(features['prods'])
    generate_recommendations(features['users'])


def precalculate_als_data(conf, cleaned):
    return {
        's3': cleaned,
        'ensemble': {
            'als': getALSCache(conf, cleaned)
            }
        }
