In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import random

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [3]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [4]:
schema_ratings = StructType([
    StructField("user_id", IntegerType(), False),
    StructField("item_id", IntegerType(), False),
    StructField("rating", IntegerType(), False),
    StructField("timestamp", IntegerType(), False)])

schema_items = StructType([
    StructField("item_id", IntegerType(), False),
    StructField("movie", StringType(), False)])

training = spark.read.option("sep", "\t").csv("../data/MovieLens.training", header=False, schema=schema_ratings)
test = spark.read.option("sep", "\t").csv("../data/MovieLens.test", header=False, schema=schema_ratings)
items = spark.read.option("sep", "|").csv("../data/MovieLens.item", header=False, schema=schema_items)

In [22]:
trainDf = training.toPandas()
testDf = test.toPandas()
num_users, num_items = len(trainDf.user_id.unique()), len(trainDf.item_id.unique())
max_uid, max_iid = trainDf.user_id.unique().max(), trainDf.item_id.unique().max()
print(f'num_users: {num_users} num_items : {num_items}')
print(f'max_user_id: ', max_uid, " max_item_id: ", max_iid)

num_users: 943 num_items : 1650
max_user_id:  943  max_item_id:  1682


In [339]:
NUM_SEL_ITEMS = 3
NUM_FILLER_ITEMS = 90

In [340]:
# - Create popular selected item list
item_ratings = dict(trainDf.groupby('item_id').size())
item_pop = [0] * (max_iid + 1)

for item_id in item_ratings.keys():
    item_pop[item_id] = item_ratings[item_id]
    
items_sorted = np.array(item_pop).argsort()[::-1]
selected_items = items_sorted[:NUM_SEL_ITEMS]
print("selected_items: ", selected_items)

selected_items:  [ 50 181 258]


In [341]:
trainDf.groupby('item_id', as_index=False).agg(
    rating_sum= ('rating', 'count')).sort_values('rating_sum', ascending=False).head(5)

Unnamed: 0,item_id,rating_sum
49,50,484
180,181,422
257,258,402
99,100,395
293,294,394


In [342]:
# - Select target item
target_items = [j for i in range(2, 10) for j in
                    items_sorted[i * len(items_sorted) // 10:(i * len(items_sorted) // 10) + 2]][::-1]
target_items = list(
    np.random.choice([i for i in range(len(item_pop)) if item_pop[i] == 3], 4, replace=False)) + target_items
target_items += [243]
print('target_items:', target_items)
print('target_items rating count: ', [(i, item_pop[i]) for i in target_items])

target_items: [1102, 1517, 1247, 104, 1564, 1647, 1301, 927, 917, 1276, 1057, 1029, 1137, 1045, 1142, 224, 277, 145, 421, 699, 243]
target_items rating count:  [(1102, 3), (1517, 3), (1247, 3), (104, 3), (1564, 1), (1647, 1), (1301, 4), (927, 4), (917, 7), (1276, 7), (1057, 12), (1029, 12), (1137, 21), (1045, 21), (1142, 34), (224, 34), (277, 52), (145, 52), (421, 79), (699, 79), (243, 108)]


In [399]:
# - create target users
threshold = testDf.rating.mean()
threshold = threshold if threshold < 3 else 3.0
print(f'threshold: {threshold}')
    
target_item = 243
users_rated_target = set(trainDf[trainDf.item_id == target_item].user_id.values)
# - Users who have not rated target item
data_tmp = trainDf[~trainDf.user_id.isin(users_rated_target)].copy()
#data_tmp = data_tmp[data_tmp.rating >= threshold]

# - Users who have not rated target item and have rated selected_items
target_users = data_tmp[data_tmp.item_id.isin(selected_items)].groupby('user_id').size()
print("target_users[(target_users == selected_num)].shape[0]: ", 
       target_users[(target_users == NUM_SEL_ITEMS)].shape[0])
target_users = sorted(target_users[(target_users == NUM_SEL_ITEMS)].index)
target_users
print("target_users: ", len(target_users))

threshold: 3.0
target_users[(target_users == selected_num)].shape[0]:  162
target_users:  162


In [400]:
# - Get ratings mean and rating std
stdDf = trainDf.groupby('item_id', as_index=False).agg(rating_mean= ('rating', 'mean'), rating_std= ('rating', 'std'), 
                                  rating_count = ('rating', 'count'))
item_mean = dict(zip(stdDf.item_id,stdDf.rating_mean))
item_std = dict(zip(stdDf.item_id,stdDf.rating_std))
rating_mean, rating_std = trainDf.rating.mean(), trainDf.rating.std()
print(f'rating_mean: {rating_mean} rating_std: {rating_std}')

rating_mean: 3.52835 rating_std: 1.118564668374818


In [401]:
# - get filler items
from random import randrange

class FakeProfile(object): 
    MAX_RATING = 5
    
    def __init__(self, target_item, 
                 filler_item_count = 70):
        self.target_item  = target_item
        self.filler_item_count = filler_item_count
        self.selected_items = {}
        self.filler_items = {}
        
    def setSelectedItems(self, selectedItems):
        #selectedItems = freqRatedItems[np.random.choice(len(freqRatedItems), size=self.selected_items_count, replace=False)]
        for item in selectedItems:
            self.selected_items[item] = self.MAX_RATING
    
    def fillerItems(self, selectedItems):
        fillers_candidates = list(set(trainDf.item_id.unique()) - set([self.target_item] + selectedItems))
        fillers = np.random.choice(fillers_candidates, size=self.filler_item_count, replace=False)
        ratings = np.random.normal(loc=rating_mean, scale=rating_std, size=self.filler_item_count)
        for item, rating in zip(fillers, ratings):
            self.filler_items[item] = rating
                
    def create(self, selectedItems):
        self.setSelectedItems(selectedItems)
        self.fillerItems(selectedItems)
        
    def print(self):
        print(f'target_item : {self.target_item}')
        print(f'selected_items : {self.selected_items}')
        print(f'filler_items : {self.filler_items}')
        print("\n")
        
    def getAllItemRatings(self):
        itemRatings = [(self.target_item, self.MAX_RATING)]
        for item in self.selected_items:
            itemRatings.append((item, self.selected_items[item]))
        for item in self.filler_items:
            itemRatings.append((item, self.filler_items[item]))
        return itemRatings

In [402]:
NUM_FAKE_USERS = 70
fake_profiles = []

for u in range(NUM_FAKE_USERS):
    fp = FakeProfile(target_item)
    fp.create(selected_items)
    fake_profiles.append(fp)
    
for fp in fake_profiles:
    fp.print()

target_item : 243
selected_items : {50: 5, 181: 5, 258: 5}
filler_items : {912: 3.3479744783722003, 88: 4.132813422588995, 1303: 4.461824307986662, 1003: 3.410162885530628, 163: 2.93195141561704, 974: 2.865109921477799, 844: 3.0965221074770866, 874: 2.6361484488406832, 1475: 3.948619129660323, 282: 2.365160107742227, 1281: 2.8960868899995305, 818: 3.833997735961874, 849: 4.2546436838561705, 394: 2.850656092648287, 819: 2.9511584926450425, 50: 4.6852736739671945, 226: 2.3340909467749658, 1182: 2.824787217176144, 433: 2.4394136064641705, 1613: 3.0529346482681676, 1371: 3.6345824284973247, 40: 1.852747151096434, 94: 3.453535443181803, 840: 3.077210570521875, 1473: 2.5829590668801092, 732: 5.591479194218834, 1291: 3.278446913547462, 1134: 5.84748420808333, 1289: 4.602400502366104, 1352: 3.870503124103759, 451: 4.002550434464542, 17: 2.7702151837638365, 1547: 4.393704763077473, 635: 4.642659691092159, 1356: 2.4709255151790543, 1438: 3.9880552265624445, 114: 4.829266234668925, 729: 1.8733985

In [403]:
# - Create attack data frame
userId = 1100
timestamp = 874965758
fakeRatingsdata = {'userId': [], 'item_id': [], 'ratings': [], 'timestamp': []}
for fp in fake_profiles:
    userId += 1
    itemRatings = fp.getAllItemRatings()
    for itemRatingPair in itemRatings:
        fakeRatingsdata['userId'].append(userId)
        fakeRatingsdata['item_id'].append(itemRatingPair[0])
        fakeRatingsdata['ratings'].append(itemRatingPair[1])
        fakeRatingsdata['timestamp'].append(timestamp)
        
columnsZipped = zip(fakeRatingsdata['userId'], fakeRatingsdata['item_id'],
                   fakeRatingsdata['ratings'], fakeRatingsdata['timestamp'])
attackDataDf = pd.DataFrame(list(columnsZipped),
               columns =['user_id', 'item_id', 'rating', 'timestamp'])
attackDataDf

Unnamed: 0,user_id,item_id,rating,timestamp
0,1101,243,5.000000,874965758
1,1101,50,5.000000,874965758
2,1101,181,5.000000,874965758
3,1101,258,5.000000,874965758
4,1101,912,3.347974,874965758
...,...,...,...,...
5175,1170,1248,3.282524,874965758
5176,1170,897,1.579563,874965758
5177,1170,964,3.207862,874965758
5178,1170,1209,3.418300,874965758


In [404]:
attackTrainData = pd.concat([trainDf, attackDataDf]).sort_values(by=['user_id', 'item_id'])
attackTrainData

Unnamed: 0,user_id,item_id,rating,timestamp
0,1,1,5.000000,874965758
1,1,2,3.000000,876893171
2,1,3,4.000000,878542960
3,1,4,3.000000,876893119
4,1,5,3.000000,889751712
...,...,...,...,...
5169,1170,1558,1.005717,874965758
5130,1170,1575,5.048238,874965758
5133,1170,1617,3.390225,874965758
5158,1170,1643,3.711430,874965758


<h3>Evaluation</h3>

<h4>Base model</h4>

In [368]:
# 0.1
als = ALS(maxIter=10, rank=100, regParam=0.1, userCol="user_id", itemCol="item_id", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(training)
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
rmse

0.9283872090543989

In [369]:
userRecs = model.recommendForAllUsers(10)
userRecs = userRecs.toPandas()

In [370]:
count = 0
for index, row in userRecs.iterrows():
    recommendations = [r['item_id'] for r in row['recommendations']]
    if target_item in recommendations:
        print(row['user_id'], recommendations)
        count += 1
print(f'Total users with {target_item}: {count}')

Total users with 243: 0


<h4>Model with train data + attack data</h4>

In [405]:
# 0.1
attackDF = spark.createDataFrame(attackTrainData)
als_atk = ALS(maxIter=10, rank=100, regParam=0.1, userCol="user_id", itemCol="item_id", ratingCol="rating", coldStartStrategy="drop")
model_atk = als_atk.fit(attackDF)
predictions_atk = model_atk.transform(test)
evaluator_atk = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse_atk = evaluator_atk.evaluate(predictions_atk)
rmse_atk

0.9265142840198731

In [412]:
predAtk = predictions_atk.toPandas()
pred = predictions.toPandas()

np.mean(predAtk[predAtk.user_id.isin(target_users)].prediction - pred[pred.user_id.isin(target_users)].prediction)

0.0044556605

In [382]:
userRecs = model_atk.recommendForAllUsers(12)
userRecs = userRecs.toPandas()

In [383]:
count = 0
for index, row in userRecs.iterrows():
    recommendations = [r['item_id'] for r in row['recommendations']]
    if target_item in recommendations:
        print(row['user_id'], recommendations)
        count += 1
print(f'Total users with {target_item}: {count}')

1127 [1329, 1612, 1253, 955, 50, 1370, 913, 181, 243, 258, 1131, 1449]
1143 [609, 369, 1058, 593, 243, 1289, 1267, 1136, 1172, 1424, 258, 1120]
1157 [1453, 516, 1472, 181, 50, 551, 258, 431, 1594, 1194, 169, 243]
1160 [1234, 1069, 1385, 57, 1528, 258, 50, 181, 243, 1344, 220, 1472]
1165 [243, 1596, 50, 1034, 598, 913, 181, 258, 668, 1671, 1019, 974]
1135 [1347, 50, 181, 1594, 243, 1344, 258, 1581, 1268, 908, 1483, 1650]
1148 [1323, 220, 57, 1344, 50, 181, 1653, 456, 243, 344, 1540, 258]
1125 [1314, 759, 258, 243, 181, 50, 1284, 481, 1449, 1137, 320, 1170]
1114 [1594, 1508, 181, 50, 1278, 1194, 265, 174, 1344, 713, 243, 258]
1170 [1321, 718, 1658, 243, 1616, 895, 50, 181, 258, 1403, 1102, 1385]
1122 [1355, 258, 1426, 1006, 899, 277, 1292, 243, 1344, 181, 50, 466]
1137 [1235, 50, 1430, 1649, 1612, 243, 258, 1034, 1578, 1344, 524, 1307]
1155 [1486, 1019, 1472, 1045, 258, 667, 181, 50, 1630, 1302, 243, 590]
1130 [75, 1369, 1315, 258, 181, 243, 1604, 50, 316, 64, 272, 1482]
1113 [817, 258, 

In [356]:
len(testDf.user_id.unique())

459

In [357]:
35/459

0.07625272331154684