In [33]:
from pyspark.mllib.linalg.distributed import RowMatrix

from pyspark.sql import Row
from pyspark.sql import SparkSession 

from pyspark.mllib.linalg import Vectors

import tqdm

from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.mllib.linalg import SparseVector

from scipy.sparse import find

import numpy as np

from collections import Counter

NUMCLASTERS = 15

import matplotlib.pyplot as plt
import numpy as np

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [2]:
spark = SparkSession \
       .builder \
       .master('yarn') \
       .enableHiveSupport() \
       .getOrCreate()

#.master('yarn') \

In [3]:
DICT_W_FOR_PAGE_TYPE = {"Card" : 3,
                        "CardJK" : 2,
                        "Listing" : 1,
                        "ListingFavorites" : 5}

DICT_W_FOR_EVENT_TYPE = {"card_show" : 3,
                        "phone_show" : 10}

data=[('Card', 'card_show', int(DICT_W_FOR_PAGE_TYPE["Card"] * DICT_W_FOR_EVENT_TYPE["card_show"])), 
        ('CardJK', 'card_show', int(DICT_W_FOR_PAGE_TYPE["CardJK"] * DICT_W_FOR_EVENT_TYPE["card_show"])), 
        ('Listing', 'card_show', int(DICT_W_FOR_PAGE_TYPE["Listing"] * DICT_W_FOR_EVENT_TYPE["card_show"])), 
        ('ListingFavorites', 'card_show', int(DICT_W_FOR_PAGE_TYPE["ListingFavorites"] * DICT_W_FOR_EVENT_TYPE["card_show"])), 
        ('Card', 'phone_show', int(DICT_W_FOR_PAGE_TYPE["Card"] * DICT_W_FOR_EVENT_TYPE["phone_show"])), 
        ('CardJK', 'phone_show', int(DICT_W_FOR_PAGE_TYPE["CardJK"] * DICT_W_FOR_EVENT_TYPE["phone_show"])), 
        ('Listing', 'phone_show', int(DICT_W_FOR_PAGE_TYPE["Listing"] * DICT_W_FOR_EVENT_TYPE["phone_show"])), 
        ('ListingFavorites', 'phone_show', int(DICT_W_FOR_PAGE_TYPE["ListingFavorites"] * DICT_W_FOR_EVENT_TYPE["phone_show"]))]

#spark.createDataFrame(data, ['page_type', 'event_type', 'value']).collect() 

In [4]:
dfdict = spark.createDataFrame(data, ['page_type', 'event_type', 'value'])
dfdict.createOrReplaceTempView("dfdict")

In [5]:
def compareRdd(x,y):
    if (x['user_num'] > y['user_num']):
        return x
    else:
        return y

In [6]:
def createVector(x,y):
    if type(x) is list:
        if type(y) is list:
            return x + y
        elif type(y) is tuple:
            return x + [y]
        else: 
            raise BaseException('Wrong type of y')
    elif type(x) is tuple:
        if type(y) is list:
            return [x] + y
        elif type(y) is tuple:
            return [x] + [y]
        else: 
            raise BaseException('Wrong type of y')
    else:
        raise BaseException('Wrong type of x')

In [7]:
def sortVector(a):
    if type(a[1]) is list:
        a[1].sort(key=lambda t: t[0])
        #print(a)
        b = []
        c = []

        i = 0
        while True:
            #print(i)
            if a[1][i][0] == a[1][i+1][0]:
                if a[1][i][1] < a[1][i+1][1]:
                    t = a[1].pop(i)
                else:
                    t = a[1].pop(i+1)
                #print('pop', t)

            else:
                i = i + 1
                b.append(a[1][i][0])
                c.append(a[1][i][1])
                #print('add', (a[1][i][0], a[1][i][1]) )
            if i == len(a[1]) - 1:
                break

        return (a[0], (b, c))
    elif type(a[1]) is tuple:
        return (a[0], ([a[1][0]], [a[1][1]]))
    else:
        raise BaseException('Wrong type of a: ' + str(type(a[1])))

In [18]:
user_item = spark.sql("""select user_id, offer_id, value from dfdict as a, prod.mles_sopr as b 
                            where a.page_type = b.page_type and a.event_type = b.event_type and user_id != 'noid'
                        limit 10000
                  """).repartition(100).createOrReplaceTempView("user_item")

In [20]:
rdd = spark.sql("""select distinct a.offer_num, a.offer_id, b.user_num, b.user_id, c.value
                    from user_item as c
                         INNER JOIN (select row_number() OVER (ORDER BY a.user_id) as user_num, a.user_id 
                            from (select distinct user_id from user_item) as a) as b on c.user_id = b.user_id
                         INNER JOIN (select row_number() OVER (ORDER BY a.offer_id) as offer_num, a.offer_id 
                            from (select distinct offer_id from user_item) as a) as a on c.offer_id = a.offer_id 
                        """) \
              .repartition(100).rdd

In [21]:
maxUserNum = rdd.reduce(lambda x, y: compareRdd(x,y))['user_num']

In [23]:
rddKey = rdd.map(lambda x: (x['offer_num'], (x['user_num'], x['value']))) 
targetWithId = rddKey.reduceByKey(lambda x,y: createVector(x, y)) \
                     .map(lambda x: sortVector(x)) \
                     .map(lambda x: (x[0], Vectors.sparse(maxUserNum + 1, x[1][0], x[1][1])))

target = targetWithId.map(lambda x: x[1])

In [24]:
targetSize = target.count()

In [27]:
X = target.randomSplit([(targetSize - 100)/targetSize, 100/targetSize])[1]

In [28]:
model = KMeans.train(X,
                     NUMCLASTERS, 
                     initializationMode="k-means||", 
                     initializationSteps=5, 
                     epsilon=1e-4)

In [29]:
def addpredict(x):
    #print('It works!')
    p = model.predict(x[1])
    #print(p)
    return (x[0], p)

In [30]:
targetWithIdPred = targetWithId.map(lambda x: addpredict(x))

In [40]:
listrdd = [spark.createDataFrame(targetWithIdPred.filter(lambda x: x[1] == i) \
                                                 .join(rdd.map(lambda x: (x['offer_num'], x))) \
                                                 .map(lambda x: x[1][1])) 
           for i in range(NUMCLASTERS)]

In [43]:
def PoolALS(x):
    als = ALS(maxIter=5, regParam=0.01, userCol="user_num", itemCol="offer_id", ratingCol="value",
          coldStartStrategy="drop")
    model = als.fit(x)
    return model

In [44]:
modelList = []
recsList = []
with tqdm.tqdm(total=NUMCLASTERS) as progress:
    for i in range(NUMCLASTERS):
        model = PoolALS(listrdd[i])
        modelList.append(model)
        user_recs = model.recommendForAllUsers(3)
        recsList.append(user_recs)
        progress.update(1)

100%|██████████| 15/15 [01:33<00:00,  5.95s/it]
