### System rekomendujący na bazie Netflix Prize award

1. Zacznijmy od obejrzenia wykładu [rozdział 9](http://www.mmds.org/)
2. proszę ściągnac bazę netflixa o której była mowa w/w wykładzie [z kaggle](https://www.kaggle.com/netflix-inc/netflix-prize-data) (2GB) 

Zaczniemy od wczytania danych, ponizszy listing wczytuje dane z jednego pliku i robi z nich trójkę (user, product, rating). Wykorzystamy do tego predefiniowany obiekt Rating z mlib.recommendation (prosze zwrócic uwagę że w tej konwencji nasz film będzie produktem)

#### Zadanie 1:
zmodyfikuj poniższy listing tak aby wczytywać wsystkie pięc plików

In [1]:
import pyspark
from pyspark.mllib.recommendation import Rating
from pyspark import SparkContext

# tutaj zaincjalizujemy klaster (na jednym komputerze) sparka
# zmodyfikujemy nieco std ustawienia maszyny java zwiększając domyslne (bardzo małe) limity pamieci
# local[*] oznacza że użyjemy wsystkich rdzeni - jeśli zabrankie nam ramu możemy zmniejszyć ilość tą ilość
# polecam spoglądać do konsoli na http://localhost:4040/ aby monitorować zużycie zasobów

conf = pyspark.SparkConf().setAppName("recommendation")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '4G')
        .set('spark.driver.memory', '20G')
        .set('spark.driver.maxResultSize', '10G'))
sc = SparkContext(conf=conf)

#files = ['./RS/combined_data_1.txt',
        #'./RS/combined_data_2.txt',
        #'./RS/combined_data_3.txt',
        #'./RS/combined_data_4.txt'] # Ufam, że użytkownik wprowadził nazwy plików, które istnieją...
files = [ './RS/combined_data_1.txt' ]

ratings = []
for single_name in files:
    f = open(single_name)
    for i, line in enumerate(f):
        line = line.strip()
        if line.endswith(':'):
            movie_id = int(line[:-1])        
        else:
            user_id, rating, _ = line.split(',')
            r = Rating(int(user_id), int(movie_id), int(rating))
            ratings.append(r)
    f.close()
    print("File '" + single_name + "' loaded!")

print('Finished! Processed lines: ' + str(len(ratings)))

File './RS/combined_data_1.txt' loaded!
Finished! Processed lines: 24053764


In [2]:
#tutaj wczytamy identyfikatory filmów i ich tytuły

f = open('./RS/movie_titles.csv', encoding = "ISO-8859-1")
g = [l.strip().split(',') for l in f.readlines()]
id2title = {int(a[0]):','.join(a[2:]) for a in g}
f.close()
print('Finished!')

Finished!


In [3]:
id2title
print('Finished!')

Finished!


In [4]:
import time

# utworzmy z naszej listy ratingow zasob rdd (rozproszenie)
start = time.time()
ratings_rdd = sc.parallelize(ratings)
end = time.time()
print('Finished! Elapsed time: ' + str(round(end - start, 2)) + ' seconds.')

Finished! Elapsed time: 31.56 seconds.


#### Zadanie 2
Przefiltruj ratings_rdd aby wziac pod uwage filmy ktory maja co najmniej 50 ocen
tip: zrób napierw liste par (movie_id, Rating) i pogrupuj ją przy użyciu GroupByKey a nstępnie przefiltruj

In [8]:
ratings_pairs = ratings_rdd.map(lambda r: (r.product, r))
rg = ratings_pairs.groupByKey() # ???

# ???
ratings_filtered = rg.filter(lambda r: len(r[1]) >= 50).collect() # ???


# zasob rdd na przefiltrowanym obiekcie
ratings_filtered = sc.parallelize(ratings_filtered)

PythonRDD[1] at RDD at PythonRDD.scala:53


In [6]:
# TROCHĘ INNYM SPOSOBEM - na około (zmiana na DataFrame po to by korzystać z SQL Queries)
import pyspark.sql.functions as ps_functions
from pyspark.sql import SQLContext
from pyspark.sql import Window
import time

start = time.time()
data = SQLContext(sc)
print('Stage 1 - finished!')

all_ratings = data.createDataFrame(ratings_rdd)
print('Stage 2 - finished!')

fragment = Window.partitionBy('product')
print('Stage 3 - finished!')

query = all_ratings.select('user', 'product', 'rating', ps_functions.count('product').over(fragment).alias("ratings_count"))
print('Stage 4 - finished!')

marker = query.filter("ratings_count >= 50").select('user','product','rating')
print('Stage 5 - finished!')

ratings = sc.parallelize(marker.collect())
print('Stage 6 - finished!')

end = time.time()
print('Finished! Elapsed time: ' + str(round(end - start, 2)) + ' seconds.')
# TROCHĘ INNYM SPOSOBEM - na około (zmiana na DataFrame po to by korzystać z SQL Queries)

Stage 1 - finished!
Stage 2 - finished!
Stage 3 - finished!
Stage 4 - finished!
Stage 5 - finished!
Stage 6 - finished!
Finished! Elapsed time: 168.47 seconds.


Dokonamy teraz faktoryzacji macierzy, nasej utility Matrix (zasób RDD ratings_filtered jest własnie taką macierzą) przy użyciu aproksymacji algorytmu spadku gradientowego
![alt](https://edersoncorbari.github.io/assets/images/blog/als-matrix-rec-calc.png)

In [7]:
from pyspark.mllib.recommendation import ALS
import time

rank = 10
numIterations = 10

start = time.time()
model = ALS.train(ratings, rank, numIterations)
end = time.time()
print('Finished! Elapsed time: ' + str(round(end - start, 2)) + ' seconds.')

Finished! Elapsed time: 80.86 seconds.


Zrzucilismy uzytkowników i filmy na nowy wymiar o wielkości 10. Możemy wprost poprosić o te macierze

In [8]:
users  = model.userFeatures()
movies = model.productFeatures()

Gęsta ale 'wąska' macierz movies zmieści się nam już bezproblemowo w RAM, a zatem zróbmy z niej po prostu macierz numpy

In [9]:
import numpy as np

movies_mtx = np.array(movies.map(lambda rv:rv[1]).collect())
movie2row_number = movies.map(lambda rv:rv[0]).collect() #movie id to row number

Zróbmy prosty ekesperyment, zobaczmy czy jakie filmy są 'podobne' do 'Władcy pierścieni'

In [10]:
id2title[1757] # 'The Lord of the Rings'

lotr_idx = movie2row_number.index(1757)
lotr_vector = movies_mtx[lotr_idx]
print('Title: ' + id2title[1757])
print(lotr_vector) #to jest wladca pierscienie rzutowany na latent space

Title: The Lord of the Rings
[-0.22731483  0.16489471 -0.4131501  -0.38900298 -0.48030087  0.04403076
 -0.03553521 -0.34933752  0.33792284  0.77826786]


In [11]:
# obliczmy macierz odleglosci 
from scipy.spatial import distance

ds = distance.cdist([lotr_vector], movies_mtx, 'cosine')[0]
dist = ds.argsort()[:10] # 10 - liczba filmów

print('Distance: ' + str(dist))

for i in dist:
    print('Movie title using ID ' + str(i) + ': ' + id2title[movie2row_number[i]])

Distance: [1561  174  844  141 3245 2782 1957 3005 1147 4302]
Movie title using ID 1561: The Lord of the Rings
Movie title using ID 174: Todd McFarlane's Spawn
Movie title using ID 844: Shaka Zulu
Movie title using ID 141: Knights of Ramune
Movie title using ID 3245: Cosplay Complex
Movie title using ID 2782: The Return of the King
Movie title using ID 1957: Mezzo
Movie title using ID 3005: A Million to Juan
Movie title using ID 1147: Complete Shamanic Princess
Movie title using ID 4302: Hercules: The Legendary Journeys: Season 2


In [12]:
for i in dist:
    print('Distance value: ' + str(ds[i]) + " refers movie: " + id2title[movie2row_number[i]])

# print(str(ds[2018]) + ': ' + id2title[movie2row_number[2018]])
# print(str(ds[928]) + ': ' + id2title[movie2row_number[928]])

Distance value: 0.0 refers movie: The Lord of the Rings
Distance value: 0.055450248493897236 refers movie: Todd McFarlane's Spawn
Distance value: 0.05566620568613523 refers movie: Shaka Zulu
Distance value: 0.0593692100299571 refers movie: Knights of Ramune
Distance value: 0.07500270642058404 refers movie: Cosplay Complex
Distance value: 0.08188804371612912 refers movie: The Return of the King
Distance value: 0.08383961407035634 refers movie: Mezzo
Distance value: 0.08502933677577662 refers movie: A Million to Juan
Distance value: 0.09167598711851499 refers movie: Complete Shamanic Princess
Distance value: 0.10279860640752103 refers movie: Hercules: The Legendary Journeys: Season 2


Odległosć 0.0 to oczywiscie ten sam obiekt a zatem interesuje nas drugi w kolejności wpis. I cóż za niespodzianka? ludzie którzy ocenili wysoko Władce pierścieni ocenili również wysokos Powrót Króla

Na ile cech powinniśmy faktoryzować Utility Matrix? 

#### Zadanie 3
Dobierz paramter rank (ilość cech) dla obiektu ALS dzieląc zbiór ratings_filtered na 80% vs 20% (treningowy i testowy, tak jak na wykłdzie). Następnie wykorzystać metodę ALS.recommendProductsForUsers aby polecić filmy i zweryfikuj ile filmów udało Ci sie prawidłowo polecić ze zbioru testowego. Odpowiednio modyfikuj parametr rank aby otrzymać względnie wysoki wynik rekomendacji jednocześnie utrymując w miare wąskie macierze

In [13]:
#files = ['./RS/combined_data_1.txt',
        #'./RS/combined_data_2.txt',
        #'./RS/combined_data_3.txt',
        #'./RS/combined_data_4.txt'] # Ufam, że użytkownik wprowadził nazwy plików, które istnieją...
files = [ './RS/combined_data_1.txt' ]

ratings = []
for single_name in files:
    f = open(single_name)
    for i, line in enumerate(f):
        line = line.strip()
        if line.endswith(':'):
            movie_id = int(line[:-1])        
        else:
            user_id, rating, _ = line.split(',')
            r = Rating(int(user_id), int(movie_id), int(rating))
            ratings.append(r)
    f.close()
    print("File '" + single_name + "' loaded!")

print('Loading finished! Processed lines: ' + str(len(ratings)))

f = open('./RS/movie_titles.csv', encoding = "ISO-8859-1")
g = [l.strip().split(',') for l in f.readlines()]
id2title = {int(a[0]):','.join(a[2:]) for a in g}
f.close()
print('IDs and titles loaded!!')

start = time.time()
ratings_rdd = sc.parallelize(ratings)
end = time.time()
print('RDD resource created! Elapsed time: ' + str(round(end - start, 2)) + ' seconds.')

File './RS/combined_data_1.txt' loaded!
Loading finished! Processed lines: 24053764
IDs and titles loaded!!
RDD resource created! Elapsed time: 30.99 seconds.


In [14]:
import pyspark.sql.functions as ps_functions
from pyspark.sql import SQLContext
from pyspark.sql import Window
import time

start = time.time()
data = SQLContext(sc)
print('Stage 1 - finished!')

all_ratings = data.createDataFrame(ratings_rdd) # Tu zawsze pojawia się error przy pierwszym uruchomieniu. Drugie uruchomienie już go nie wywołuje
print('Stage 2 - finished!')

fragment = Window.partitionBy('product')
print('Stage 3 - finished!')

query = all_ratings.select('user', 'product', 'rating', ps_functions.count('product').over(fragment).alias("ratings_count"))
print('Stage 4 - finished!')

marker = query.filter("ratings_count >= 50").select('user','product','rating')
print('Stage 5 - finished!')

ratings = sc.parallelize(marker.collect())
print('Stage 6 - finished!')

end = time.time()
print('Finished! Elapsed time: ' + str(round(end - start, 2)) + ' seconds.')

Stage 1 - finished!
Stage 2 - finished!
Stage 3 - finished!
Stage 4 - finished!
Stage 5 - finished!
Stage 6 - finished!
Finished! Elapsed time: 205.19 seconds.


In [16]:
from pyspark.mllib.recommendation import ALS
import numpy as np
import math
import time

############################################################

train, valid, test = ratings.randomSplit([6,2,2], seed=0)
emptyTest = test.map(lambda r: (r[0], r[1]))
emptyValid = valid.map(lambda r: (r[0], r[1]))
print('Finished!')

############################################################

ranks = range(1, 21)

errorsList = []
models = []
for i in range(len(ranks)):
    errorsList.append(0)
ind = 0

remembered_error = float('inf')
the_best_rank = -1
for rank in ranks:
    start = time.time()
    model = ALS.train(train, rank, seed=5, iterations=20, lambda_=0.1)
    pred = model.predictAll(emptyValid).map(lambda r: ((r[0], r[1]), r[2]))
    rates = valid.map(lambda r: ((r[0], r[1]), r[2])).join(pred)
    newError = math.sqrt(rates.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errorsList[ind] = newError
    ind += 1
    if newError < remembered_error:
        remembered_error = newError
        the_best_rank = rank
    end = time.time()
    print('Training completed for rank: ' + str(rank) + '/' + str(len(ranks)) + '. Elapsed time: ' + str(round(end - start, 2)) + ' seconds.')
print("The best rank %s" % the_best_rank)

############################################################

train, test = ratings.randomSplit([8,2], seed=0)
emptyTest = test.map(lambda r: (r[0], r[1]))

############################################################

start = time.time()
model = ALS.train(train, the_best_rank, seed=5, iterations=20, lambda_=0.1)
pred = model.predictAll(emptyTest).map(lambda r: ((r[0], r[1]), r[2]))
rates = test.map(lambda r: ((r[0], r[1]), r[2])).join(pred)
end = time.time()
print('Test completed for rank: ' + str(the_best_rank) + ' Elapsed time: ' + str(round(end - start, 2)) + ' seconds.')

############################################################

products = model.recommendProductsForUsers(10)
products.first()

Finished!
Training completed for rank: 1/20. Elapsed time: 207.87 seconds.
Training completed for rank: 2/20. Elapsed time: 185.76 seconds.
Training completed for rank: 3/20. Elapsed time: 193.15 seconds.
Training completed for rank: 4/20. Elapsed time: 187.55 seconds.


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 4780.0 failed 1 times, most recent failure: Lost task 5.0 in stage 4780.0 (TID 3734, localhost, executor driver): java.net.SocketException: Software caused connection abort: socket write error
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(Unknown Source)
	at java.net.SocketOutputStream.write(Unknown Source)
	at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
	at java.io.BufferedOutputStream.write(Unknown Source)
	at java.io.DataOutputStream.write(Unknown Source)
	at java.io.FilterOutputStream.write(Unknown Source)
	at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:212)
	at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Software caused connection abort: socket write error
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(Unknown Source)
	at java.net.SocketOutputStream.write(Unknown Source)
	at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
	at java.io.BufferedOutputStream.write(Unknown Source)
	at java.io.DataOutputStream.write(Unknown Source)
	at java.io.FilterOutputStream.write(Unknown Source)
	at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:212)
	at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)


In [9]:
import numpy as np
import math

productions = model.productFeatures()
productions_matrix = np.array(movies.map(lambda rv:rv[1]).collect())
m2rn = productions.map(lambda rv:rv[0]).collect()

############################################################

print('Recommended movies: ')
for i in enumerate(products.first()[1]):
    product_index = i[1][1]
    print("\"" + id2title[m2rn.index(product_index)] + "\" for value %s" % i[1][2])

id2title[1757]
lotr_idx = m2rn.index(1757)
lotr_vector = productions_matrix[lotr_idx]

############################################################

print('Title of watched movie: ' + id2title[1757])
print('Vector of watched movie:')
print(lotr_vector)

"Fight Club: Bonus Material" dla wartości 4.866043521149377
"Thomas & Sarah" dla wartości 4.721042942324603
"The Element of Crime" dla wartości 4.708888030404897
"Niea 7" dla wartości 4.6854508074529715
"Tuck Everlasting" dla wartości 4.6836380275702485
"Cinderella II" dla wartości 4.674958529953608
"Creepshow" dla wartości 4.671594230892056
"Scratch" dla wartości 4.635454454397707
"Black Orpheus" dla wartości 4.603810422278698
"King Cobra" dla wartości 4.603664821979537


#### Zadanie 4
Zrób program "co obejrzeć dziś wieczorem".
Program powinien pokazaywać 5 tytułow, uzytkownik wskazuje jeden lub kilka z nich i system pokazuje kolejne rekomendacje. Każda kolejna rekomendacja powinna wskazywać coraz lepsze rekomendacje. 

problem można rozwiazać na kilka sposób. Można pogrupować przestrzeń filmów (macierz P.T) i pokazywać z różnych np. centroidy klastrów (które będą reprezentować gatunki filmów (prawdopodobnie) ). 

każde kolejne wybranie filmu będzie wymagało stworzenie nowego sztuznego użytkownika z ratingiem. W każdej iteracji takiego użytkownika należy rzucic na macierz Q (czyli na latentną macierz użytkownika). Przypatrz się dokładnie sposobowi mnożenia na obrazku powyżej i zastanów się jak otrzymać takiego użytkownika w przestrzeni cech?). Przypominam że macierz P.T jest nieodracalna ale można wykorzystać pseudo odwrotnosć.