# BDCC project - Spark data processing

**[Big Data and Cloud Computing](https://www.dcc.fc.up.pt/~edrdo/aulas/bdcc), Eduardo R. B. Marques, DCC/FCUP**


## Spark setup

In [None]:
def setupSpark():
  # Spark needs to run with Java 8 ... 
  !pip install -q findspark
  !apt-get install openjdk-8-jdk-headless > /dev/null
  !echo 2 | update-alternatives --config java > /dev/null
  # !java -version
  import os, findspark
  os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
  # !echo JAVA_HOME=$JAVA_HOME
  !pip install -q pyspark
  findspark.init(spark_home='/usr/local/lib/python3.6/dist-packages/pyspark')
  !pyspark --version

setupSpark()

from pyspark import SparkContext
from pyspark.sql import SparkSession
    
spark = SparkSession\
        .builder\
        .master('local[*]')\
        .getOrCreate()
sc = spark.sparkContext

[K     |████████████████████████████████| 217.8MB 56kB/s 
[K     |████████████████████████████████| 204kB 49.9MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/
                        
Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_242
Branch HEAD
Compiled by user centos on 2020-02-02T19:38:06Z
Revision cee4ecbb16917fa85f02c635925e2687400aa56b
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.


## Parameters

In [None]:

#@markdown ---
DEBUG = True #@param {type: "boolean"} 
PROJECT_ID = 'cobalt-catalyst-231416'  #@param {type: "string"}
INPUT_BUCKET = 'bdcc1920_project_datasets' #@param {type: "string"}
DATASET = 'tiny2' #@param ["tiny1", "tiny2", "tiny3", "tiny4", "medium1", "medium2", "medium3", "medium4", "large1", "large2", "large3", "large4", "large5"] {allow-input: true}
OUTPUT_BUCKET = 'up201604170' #@param {type: "string"}
OUTPUT_ZIP_FILE = 'output.zip' #@param {type: "string"}
COPY_PARQUET_FILES_TO_OUTPUT_BUCKET = False #@param {type: "boolean"} 
MIN_TF_IDF = 0.1 #@param {type:"slider", min:0, max:1, step:0.05}
SEND_PUBSUB_MESSAGE = True #@param {type: "boolean"} 
PUBSUB_TOPIC = 'new_output' #@param {type: "string"}
#@markdown ---


## Authenticate to GCP

In [None]:
# The authentication method 
def google_colab_authenticate(projectId, keyFile=None, debug=True):  
    import os
    from google.colab import auth
    if keyFile == None:
      keyFile='/content/bdcc-colab.json'
    if os.access(keyFile,os.R_OK):
      if debug:
        print('Using key file "%s"' % keyFile)
      os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '%s' % keyFile
      os.environ['GCP_PROJECT'] = projectId 
      os.environ['GCP_ACCOUNT'] = 'bdcc-colab@' + projectId + '.iam.gserviceaccount.com'
      !gcloud auth activate-service-account "$GCP_ACCOUNT" --key-file="$GOOGLE_APPLICATION_CREDENTIALS" --project="$GCP_PROJECT"
    else:
      if debug:
        print('No key file given. You may be redirected to the verification code procedure.')
      auth.authenticate_user()
      !gcloud config set project $projectId
    !gcloud info | grep -e Account -e Project

# Copy key file from Google Drive if available 
# to a path without spaces (it usually creates problems)
!test -f "/content/drive/My Drive/bdcc-colab.json" && cp "/content/drive/My Drive/bdcc-colab.json" /content/bdcc-colab.json

google_colab_authenticate(PROJECT_ID)

No key file given. You may be redirected to the verification code procedure.
Updated property [core/project].
Account: [tsilvacoelho@gmail.com]
Project: [cobalt-catalyst-231416]


## Transfer dataset files (if necessary) from GCS

In [None]:
!test -d $DATASET || gsutil -m cp -r gs://"$INPUT_BUCKET"/"$DATASET" .
!du --human $DATASET

Copying gs://bdcc1920_project_datasets/large3/actors.parquet/._SUCCESS.crc...
/ [0 files][    0.0 B/722.5 KiB]                                                Copying gs://bdcc1920_project_datasets/large3/actors.parquet/.part-00000-1435610a-affe-4a9c-9fed-0c5dcf7d5885-c000.snappy.parquet.crc...
/ [0 files][    0.0 B/722.5 KiB]                                                Copying gs://bdcc1920_project_datasets/large3/actors.parquet/.part-00001-1435610a-affe-4a9c-9fed-0c5dcf7d5885-c000.snappy.parquet.crc...
/ [0 files][    0.0 B/722.5 KiB]                                                Copying gs://bdcc1920_project_datasets/large3/actors.parquet/.part-00002-1435610a-affe-4a9c-9fed-0c5dcf7d5885-c000.snappy.parquet.crc...
/ [0 files][    0.0 B/722.5 KiB]                                                Copying gs://bdcc1920_project_datasets/large3/actors.parquet/.part-00004-1435610a-affe-4a9c-9fed-0c5dcf7d5885-c000.snappy.parquet.crc...
/ [0 files][    0.0 B/722.5 KiB]                  

## Parquet file read/write methods


In [None]:
def readParquet(file):
    global spark
    if DEBUG:
       print('==> Reading ' + file)
    df = spark.read.parquet(file)
    if DEBUG:
      df.printSchema()
      df.show(10)
    return df
 
def writeParquet(df,path):
    if DEBUG:
        print('==> Writing ' + path)
    !rm -fr $path
    df.write.parquet(path, mode='overwrite')


## Load input dataset from Parquet files

In [None]:
movies =  readParquet(DATASET + '/movies.parquet')
actors =  readParquet(DATASET + '/actors.parquet')
genres =  readParquet(DATASET + '/genres.parquet')
ratings = readParquet(DATASET + '/ratings.parquet')
tags =    readParquet(DATASET + '/tags.parquet')

==> Reading large3/movies.parquet
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- imdbId: integer (nullable = true)

+-------+--------------------+----+-------+
|movieId|               title|year| imdbId|
+-------+--------------------+----+-------+
| 118946|The Aggression Scale|2012|1816597|
| 118954|New York Lightboa...|1961|  55227|
| 118956|   Lines: Horizontal|1962|  56185|
| 118968|           Amour fou|2014|3003800|
| 118989|Riderà! (Cuore ma...|1967|  62199|
| 119029|   Squadra antifurto|1976|  76753|
| 119039|Squadra antigangs...|1979|  79943|
| 119041|Agenzia Riccardo ...|1979|  77130|
| 119049|Delitto a Porta R...|1980|  80606|
| 119084|           Narcissus|1983| 203725|
+-------+--------------------+----+-------+
only showing top 10 rows

==> Reading large3/actors.parquet
root
 |-- movieId: integer (nullable = true)
 |-- name: string (nullable = true)

+-------+-----------------+
|movieId|            

## Calculate aggregate movie data

The aim is to derive a data frame with the same data as __movies__, augmented
with __numRatings__ and __avgRating__ columns, respectively representing the 
number of ratings and average rating per movie.

In [None]:
movies.createOrReplaceTempView('movies')
ratings.createOrReplaceTempView('ratings')

if DEBUG:
  print(movies.count())
  movies.printSchema()
  movies.orderBy('movieId').show()

ratings_agg = spark.sql(
    '''
    SELECT movieId,
          COUNT(*) AS numRatings,
          AVG(rating) AS avgRating
    FROM ratings
    GROUP BY movieId
    '''
)
ratings_agg.createOrReplaceTempView('ratings_agg')

if DEBUG:
  print(ratings_agg.count())
  ratings_agg.printSchema()
  ratings_agg.orderBy('movieId').show()

movies_agg = spark.sql(
    '''
    SELECT movieId, title, year, imdbId, 
           ifnull(numRatings, 0) as numRatings,
           ifnull(avgRating, 0.0) as avgRating
    FROM movies LEFT OUTER JOIN ratings_agg USING(movieId)
    ORDER BY movieId
    '''
)
if DEBUG:
  print(movies_agg.count())
  movies_agg.printSchema()
  movies_agg.show()

# no need for these, clean up
spark.catalog.dropGlobalTempView('agg_ratings')
spark.catalog.dropGlobalTempView('ratings')
ratings.unpersist()

11590
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- imdbId: integer (nullable = true)

+-------+--------------------+----+------+
|movieId|               title|year|imdbId|
+-------+--------------------+----+------+
|      2|             Jumanji|1995|113497|
|      7|             Sabrina|1995|114319|
|     11|The American Pres...|1995|112346|
|     17|Sense and Sensibi...|1995|114388|
|     24|              Powder|1995|114168|
|     28|          Persuasion|1995|114117|
|     32|      Twelve Monkeys|1995|114746|
|     36|    Dead Man Walking|1995|112818|
|     39|            Clueless|1995|112697|
|     42|     Dead Presidents|1995|112819|
|     43|         Restoration|1995|114272|
|     47|               Se7en|1995|114369|
|     56|Kids of the Round...|1995|113541|
|     57|Home for the Holi...|1995|113321|
|     60|The Indian in the...|1995|113419|
|     61|      Eye for an Eye|1996|116260|
|     64|       Tw

DataFrame[movieId: int, userId: int, rating: double]

## Derive words for TF-IDF processing

In [None]:
from pyspark.sql import functions as F

# Utility method to explode strings contained in the given field into a 'word' column
def wordDf(df, field):
  wdf = df.withColumn('word',
                      F.explode(F.split(F.lower(F.col(field)),'[ \s\*\+\&\/\%\-\$\#\'\)\(\[\[\],.!?;:\t\n"]+'))).\
                      drop(field)

  return wdf


actor_words = wordDf(actors, 'name')
genre_words = wordDf(genres,'genre')
tag_words = wordDf(tags.drop('userId'), 'tag')
title_words = wordDf(movies.select('movieId','title'), 'title')

all_words = actor_words.union(genre_words)\
                       .union(tag_words)\
                       .union(title_words)\
                       
if DEBUG:
  print(all_words.count())
  all_words.orderBy('movieId','word').show()


521106
+-------+----------+
|movieId|      word|
+-------+----------+
|      2|          |
|      2|          |
|      2|          |
|      2|         a|
|      2|         a|
|      2|         a|
|      2|    action|
|      2|   actress|
|      2|adaptation|
|      2|   adapted|
|      2|   adapted|
|      2|   adapted|
|      2| adventure|
|      2| adventure|
|      2| adventure|
|      2|  allsburg|
|      2|  allsburg|
|      2|      also|
|      2|   animals|
|      2|   animals|
+-------+----------+
only showing top 20 rows



## Calculate TF-IDF metric (TODO)

The aim is to derive a data frame with __(moviedId, word, tf_idf)__ values
where the TF-IDF calculation is similar to the [one we discussed before during classes](https://colab.research.google.com/drive/15yMxBf1ltxrIaHCZUqqUWZBm6cqdcErF), i.e., the ${\rm TFIDF}$ term we consider for a movie $m$ in the set of movies $M$ and word $w$ is given by
   
   $$
   {\rm TFIDF}(m,w) = {\rm TF}(m,w) \:\times \:{\rm IDF}(w,M)
   $$

Only TF_IDF values equal or higher than the __MIN\_TF_\_IDF__ threshold should be written to the output files. The standard value to use for __MIN\_TF_\_IDF__ is  __0.1__ but you may adjust this value for testing purposes.




In [None]:
import math
from pyspark.sql.functions import udf

#Get f table
all_words.createOrReplaceTempView("all_words")
all_words_f = spark.sql("SELECT movieId, word, count(movieId, word) as f FROM all_words Group by movieId, word")

#Get |N|
n_movies = movies.count()

#Get TF
max_f = all_words_f.groupby('movieId').agg({'f': 'max'})
max_f = max_f.withColumnRenamed("max(f)","max_f").withColumnRenamed("movieId","movieId_max")
tfd = max_f.join(all_words_f, max_f.movieId_max == all_words_f.movieId, 'inner').select('movieId', 'max_f', 'word', 'f')
tfd = tfd.withColumn('tf', tfd.f / tfd.max_f).select('movieId','word','tf')

#Get IDF
log = udf(lambda x: math.log(x,2))

idfd = tfd.groupby('word').agg({ 'movieId': 'count'}).withColumnRenamed("count(movieId)","n")
idfd = idfd.withColumn('idf_temp', n_movies / idfd.n)
idfd = idfd.withColumn("idf", log(idfd.idf_temp)).select('word','n','idf')

#Get TF_IDF
tfd = tfd.withColumnRenamed('word','word_tf')
tf_idfd = tfd.join(idfd, tfd.word_tf == idfd.word, 'inner').select('movieId','word','tf','n','idf')
tf_idfd = tf_idfd.withColumn('tf_idf', tf_idfd.tf * tf_idfd.idf)

#Apply MIN_TF_IDF restriction to TF_IDF
tf_idfd = tf_idfd.select('movieId','word','tf_idf').where(tf_idfd.tf_idf >= MIN_TF_IDF)


#Calculate Jackard Index metric
ratings.createOrReplaceTempView("ratings")
ratings_df = spark.sql("SELECT userId, movieId FROM ratings")
tags.createOrReplaceTempView("tags")
tags_df = spark.sql("SELECT DISTINCT userId, movieId FROM tags")
users_df = ratings_df.union(tags_df).distinct()
users_df.createOrReplaceTempView("users_df")

movies_intersection = spark.sql("Select movieId1, movieId2, count(*) as count FROM (SELECT a.userId, a.movieId as movieId1, b.movieId as movieId2 FROM users_df as a JOIN users_df as b on a.userId = b.userId) where movieId1 < movieId2 Group by movieId1, movieId2")
movies_intersection.createOrReplaceTempView("movies_intersection")
movies_m = spark.sql("SELECT movieId, count(*) as count FROM users_df Group by movieId")
movies_m.createOrReplaceTempView("movies_m")
movies_intersection = spark.sql("SELECT movieId1, movieId2, a.count as aggr, b.count as m1 FROM movies_intersection as a JOIN movies_m as b on a.movieId1 = b.movieId")
spark.catalog.dropGlobalTempView('movies_intersection')
movies_intersection.createOrReplaceTempView("movies_intersection")
jackard = spark.sql("SELECT movieId1, movieId2, aggr, m1, b.count as m2 FROM movies_intersection as a JOIN movies_m as b on a.movieId2 = b.movieId")
jackard = jackard.withColumn('ji', jackard.aggr / (jackard.m1 + jackard.m2)).select('movieId1','movieId2','ji')


## Write output data to Parquet files and generate ZIP file

In [None]:
# Clean up first
!rm -fr "$DATASET"/output 
!rm -f "$DATASET"/"$OUTPUT_ZIP_FILE"

if DEBUG:
  !ls -l $DATASET

writeParquet(movies_agg, DATASET + '/output/' + 'movies_agg.parquet')
writeParquet(tf_idfd, DATASET + '/output/' + 'tfidf.parquet')
writeParquet(jackard, DATASET + '/output/' + 'jackard.parquet')

if DEBUG:
  print('Creating ZIP file ...')

!cd "$DATASET"/output  && zip -9qr ../"$OUTPUT_ZIP_FILE" .

if DEBUG:
  !ls -l $DATASET "$DATASET"/output
 

total 72
drwxr-xr-x 2 root root  4096 Apr 15 21:20 actors.parquet
drwxr-xr-x 2 root root  4096 Apr 15 21:20 genres.parquet
drwxr-xr-x 2 root root 53248 Apr 15 21:20 movies.parquet
drwxr-xr-x 2 root root  4096 Apr 15 21:20 ratings.parquet
drwxr-xr-x 2 root root  4096 Apr 15 21:20 tags.parquet
==> Writing large3/output/movies_agg.parquet
==> Writing large3/output/tfidf.parquet
==> Writing large3/output/jackard.parquet
Creating ZIP file ...
large3:
total 150112
drwxr-xr-x 2 root root      4096 Apr 15 21:20 actors.parquet
drwxr-xr-x 2 root root      4096 Apr 15 21:20 genres.parquet
drwxr-xr-x 2 root root     53248 Apr 15 21:20 movies.parquet
drwxr-xr-x 5 root root      4096 Apr 15 21:25 output
-rw-r--r-- 1 root root 153635318 Apr 15 21:30 output.zip
drwxr-xr-x 2 root root      4096 Apr 15 21:20 ratings.parquet
drwxr-xr-x 2 root root      4096 Apr 15 21:20 tags.parquet

large3/output:
total 148
drwxr-xr-x 2 root root 49152 Apr 15 21:30 jackard.parquet
drwxr-xr-x 2 root root 49152 Apr 15 21:

## Copy output ZIP file to output bucket

In [None]:
! gsutil cp $DATASET/output.zip gs://"$OUTPUT_BUCKET"/"$DATASET"/output.zip 

Copying file://large3/output.zip [Content-Type=application/zip]...
\
Operation completed over 1 objects/146.5 MiB.                                    


## Copy Parquet files to output bucket (optional)

In [None]:
if COPY_PARQUET_FILES_TO_OUTPUT_BUCKET:
  ! gsutil -m cp -r $DATASET/output/movies_agg.parquet gs://"$OUTPUT_BUCKET"/"$DATASET"/
  ! gsutil -m cp -r $DATASET/output/tfidf.parquet gs://"$OUTPUT_BUCKET"/"$DATASET"/
  ! gsutil -m cp -r $DATASET/output/jackard.parquet gs://"$OUTPUT_BUCKET"/"$DATASET"/

## Send PubSub cloud message 

This will trigger the LCF cloud function. 

In [None]:
if SEND_PUBSUB_MESSAGE:
 !gcloud pubsub topics publish $PUBSUB_TOPIC --message $DATASET

messageIds:
- '1127329284852086'
