Neste exemplo vamos utilizar um dataset da MovieLens com 100 mil linhas para exemplificar a utilização do Apache Spark e do PySpark.

A MovieLens (movielens.org) é uma base de dados de ratings de filmes.

Para fazer o downloads aceda a https://grouplens.org/. Existem datasets de diferentes tamanhos conforme o número de filmes e o número de ratings disponibilizados. Nesta fase vamos usar o Hadoop no desktop e não num cluster e portanto vamos utilizar um dataset mais pequeno que só tem filmes até ao ano de 1998. Fazer o download de ml-100k.zip (100000 ratings ou 100000 linhas) ou aceda a ele na pasta onde se encontra a 
ficha. 

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 42 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 43.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=b72f63bcbfeec3026156311937ba7419c508050e258db1e792164dab99065f72
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [4]:
#Iniciar e criar a sessão Spark - na versao antiga
import pyspark
from pyspark.sql import SparkSession
sc = pyspark.SparkContext('local[*]')

In [5]:
#carregar do drive ou importar do desktop
from google.colab import drive
drive.mount('/content/drive')
#from google.colab import files
#uploaded = files.upload()
#importar o ficheiro u.data, u.item

Mounted at /content/drive


In [None]:
!ls /content/drive/MyDrive/

ls: cannot access '/content/drive/MyDrive/Colab': No such file or directory
ls: cannot access 'Notebooks': No such file or directory


In [6]:
#Carregar para um RDD o ficheiro cujo upload foi feito
#Cada linha guarda o ID do utilizador, filme, rating e timestamp
linhas = sc.textFile('/content/drive/MyDrive/Colab Notebooks/MADSAD BIGDATA/PyTrigo-V2-11-PySpark/u.data')
linhas.take(10)

['196\t242\t3\t881250949',
 '186\t302\t3\t891717742',
 '22\t377\t1\t878887116',
 '244\t51\t2\t880606923',
 '166\t346\t1\t886397596',
 '298\t474\t4\t884182806',
 '115\t265\t2\t881171488',
 '253\t465\t5\t891628467',
 '305\t451\t3\t886324817',
 '6\t86\t3\t883603013']

In [7]:
ratings = linhas.map(lambda x:x.split()[2])
ratings.take(10)

['3', '3', '1', '2', '1', '4', '2', '5', '3', '3']

In [8]:
#Ex1. Contar o número de filmes por rating
import collections
result = ratings.countByValue()

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
  print("Rating: %s Número de filmes: %i" % (key, value))

Rating: 1 Número de filmes: 6110
Rating: 2 Número de filmes: 11370
Rating: 3 Número de filmes: 27145
Rating: 4 Número de filmes: 34174
Rating: 5 Número de filmes: 21201


In [9]:
nomes = sc.textFile('/content/drive/MyDrive/Colab Notebooks/MADSAD BIGDATA/PyTrigo-V2-11-PySpark/u.item')
nomes.take(100) 

['1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0',
 '2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0',
 '3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0',
 '4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0',
 '5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0',
 '6|Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)|01-Jan-1995||http://us.imdb.com/Title?Yao+a+yao+yao+dao+waipo+qiao+(1995)|0|0|0|0|0|0|0|0|1|0|0|0|0|0|0|0|0|0|0',
 '7|Twelve Monkeys (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Twelve%20Monkeys%20(1995)|0|0|0|0|0|0|0|0|1|0|0|0|0|0|0|1|0|0|0',
 '8|Babe (1995)|01-Jan-1995||http://us.imdb.com/M/title

In [10]:
#Ex2a. Encontrar o filme mais popular
movies = linhas.map(lambda x: (int(x.split()[1]), 1))
movieCounts = movies.reduceByKey(lambda x, y: x + y)
flipped = movieCounts.map( lambda x : (x[1], x[0]))
#Ordena descendentemente pela chave que representa o filme com mais classificado pelos utilizadores
sortedMovies = flipped.sortByKey(False) 
#Mostra os primeiros 10 (classificações, id do filme)
for m in sortedMovies.take(10):
  print(m)

(583, 50)
(509, 258)
(508, 100)
(507, 181)
(485, 294)
(481, 286)
(478, 288)
(452, 1)
(431, 300)
(429, 121)


In [11]:
#Ex2b. Encontrar o filme mais popular
#Esta parte do código faz a tradução do nome dos filmes
#a partir do ficheiro u.item que contém entre outros elementos
#o id, o nome, a data e o link para a bases de dados imdb
def loadMovieNames():
  movieNames = {}
  #Podem existir outros encodings
  #with open('u.item',encoding='ascii', errors='ignore') as f:
  with open('/content/drive/MyDrive/Colab Notebooks/MADSAD BIGDATA/PyTrigo-V2-11-PySpark/u.item',encoding='latin1') as f:
    for linha in f:
      fields = linha.split('|')
      movieNames[int(fields[0])] = fields[1]
  return movieNames

nameDict = sc.broadcast(loadMovieNames())
sortedMoviesWithNames = sortedMovies.map(lambda countMovie : (nameDict.value[countMovie[1]], countMovie[0]))
#Podem utilizar o collect para ver todos os nomes
results = sortedMoviesWithNames.take(10)

for result in results:
  print (result)

#O filme mais popular e o Star Wars

('Star Wars (1977)', 583)
('Contact (1997)', 509)
('Fargo (1996)', 508)
('Return of the Jedi (1983)', 507)
('Liar Liar (1997)', 485)
('English Patient, The (1996)', 481)
('Scream (1996)', 478)
('Toy Story (1995)', 452)
('Air Force One (1997)', 431)
('Independence Day (ID4) (1996)', 429)


In [12]:
#Ex3. Que filmes são similiares entre si baseado nas similaridades dos ratings
# de diferentes utilizadores

#import sys
from pyspark import SparkConf, SparkContext
from math import sqrt

def loadMovieNames():
  movieNames = {}
  with open('/content/drive/MyDrive/Colab Notebooks/MADSAD BIGDATA/PyTrigo-V2-11-PySpark/u.item',encoding='latin1') as f:
    for linha in f:
      fields = linha.split('|')
      movieNames[int(fields[0])] = fields[1]
  return movieNames

#Python 3 doesn't let you pass around unpacked tuples,
#so we explicitly extract the ratings now.
def makePairs( userRatings ):
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return ((movie1, movie2), (rating1, rating2))

def filterDuplicates( userRatings ):
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return movie1 < movie2

def computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    return (score, numPairs)

linhas = sc.textFile('/content/drive/MyDrive/Colab Notebooks/MADSAD BIGDATA/PyTrigo-V2-11-PySpark/u.data')
# Splitting the data on white spaces and map it again in the format we want
# Map ratings to key / value pairs: user ID => movie ID, rating
ratings = linhas.map(lambda l: l.split()).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))
# Emit every movie rated together by the same user.
# Self-join to find every combination.
joinedRatings = ratings.join(ratings)
# At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))
# Filter out duplicate pairs
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)
# Now key by (movie1, movie2) pairs.
moviePairs = uniqueJoinedRatings.map(makePairs)
# We now have (movie1, movie2) => (rating1, rating2)
# Now collect all ratings for each movie pair and compute similarity
moviePairRatings = moviePairs.groupByKey()
# We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
# Can now compute similarities.
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).cache()
# Save the results if desired
#moviePairSimilarities.sortByKey()
#moviePairSimilarities.saveAsTextFile("movie-sims")
# Extract similarities for the movie we care about that are "good".

scoreThreshold = 0.97
coOccurenceThreshold = 50

movieID = 56 #Pulp fiction

# Filter for movies with this sim that are "good" as defined by
# our quality thresholds above
filteredResults = moviePairSimilarities.filter(lambda pairSim: \
    (pairSim[0][0] == movieID or pairSim[0][1] == movieID) \
    and pairSim[1][0] > scoreThreshold and pairSim[1][1] > coOccurenceThreshold)

# Sort by quality score.
results = filteredResults.map(lambda pairSim: (pairSim[1], pairSim[0])).sortByKey(ascending = False).take(10)

nameDict = loadMovieNames()
print("Top 10 similar movies for " + nameDict[movieID])
for result in results:
  (sim, pair) = result
  # Display the similarity result that isn't the movie we're looking at
  similarMovieID = pair[0]
  if (similarMovieID == movieID):
    similarMovieID = pair[1]
  print(nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1]))

Top 10 similar movies for Pulp Fiction (1994)
Smoke (1995)	score: 0.9743848338030823	strength: 68
Reservoir Dogs (1992)	score: 0.9740674165782123	strength: 134
Donnie Brasco (1997)	score: 0.9738247291149608	strength: 75
Sling Blade (1996)	score: 0.9713796344244161	strength: 111
True Romance (1993)	score: 0.9707295689679896	strength: 99
Jackie Brown (1997)	score: 0.9706179145690377	strength: 55
Carlito's Way (1993)	score: 0.9706021261759088	strength: 52


Top 10 similar movies for Pulp Fiction (1994)
Smoke (1995)	score: 0.9743848338030823	strength: 68
Reservoir Dogs (1992)	score: 0.9740674165782123	strength: 134
Donnie Brasco (1997)	score: 0.9738247291149608	strength: 75
Sling Blade (1996)	score: 0.9713796344244161	strength: 111
True Romance (1993)	score: 0.9707295689679896	strength: 99
Jackie Brown (1997)	score: 0.9706179145690377	strength: 55
Carlito's Way (1993)	score: 0.9706021261759088	strength: 52
