# BDCC project - Spark data processing

**[Big Data and Cloud Computing](https://www.dcc.fc.up.pt/~edrdo/aulas/bdcc), Eduardo R. B. Marques, DCC/FCUP**


## Spark setup

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:

def setupSpark():
  # Spark needs to run with Java 8 ... 
  !pip install -q findspark
  !apt-get install openjdk-8-jdk-headless > /dev/null
  !echo 2 | update-alternatives --config java > /dev/null
  # !java -version
  import os, findspark
  os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
  # !echo JAVA_HOME=$JAVA_HOME
  !pip install -q pyspark
  findspark.init(spark_home='/usr/local/lib/python3.6/dist-packages/pyspark')
  !pyspark --version

setupSpark()

from pyspark import SparkContext
from pyspark.sql import SparkSession
    
spark = SparkSession\
        .builder\
        .master('local[*]')\
        .getOrCreate()
sc = spark.sparkContext

[K     |████████████████████████████████| 217.8MB 62kB/s 
[K     |████████████████████████████████| 204kB 38.5MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/
                        
Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_242
Branch HEAD
Compiled by user centos on 2020-02-02T19:38:06Z
Revision cee4ecbb16917fa85f02c635925e2687400aa56b
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.


## Parameters

In [0]:

#@markdown ---
DEBUG = True #@param {type: "boolean"} 
PROJECT_ID = 'bigdata-269209'  #@param {type: "string"}
INPUT_BUCKET = 'zebigbucket' #@param {type: "string"}
DATASET = 'large4' #@param ["tiny1", "tiny2", "tiny3", "tiny4", "medium1", "medium2", "medium3", "medium4", "large1", "large2", "large3", "large4", "large5"] {allow-input: true}
OUTPUT_BUCKET = 'zaoutputbucket' #@param {type: "string"}
OUTPUT_ZIP_FILE = 'output.zip' #@param {type: "string"}
COPY_PARQUET_FILES_TO_OUTPUT_BUCKET = True #@param {type: "boolean"} 
MIN_TF_IDF = 0.1 #@param {type:"slider", min:0, max:1, step:0.05}
SEND_PUBSUB_MESSAGE = True #@param {type: "boolean"} 
PUBSUB_TOPIC = 'new_output' #@param {type: "string"}
#@markdown ---


## Authenticate to GCP

In [0]:
# The authentication method 
def google_colab_authenticate(projectId, keyFile=None, debug=True):  
    import os
    from google.colab import auth
    if keyFile == None:
      keyFile='/content/bdcc-colab.json'
    if os.access(keyFile,os.R_OK):
      if debug:
        print('Using key file "%s"' % keyFile)
      os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '%s' % keyFile
      os.environ['GCP_PROJECT'] = projectId 
      os.environ['GCP_ACCOUNT'] = 'bdcc-colab@' + projectId + '.iam.gserviceaccount.com'
      !gcloud auth activate-service-account "$GCP_ACCOUNT" --key-file="$GOOGLE_APPLICATION_CREDENTIALS" --project="$GCP_PROJECT"
    else:
      if debug:
        print('No key file given. You may be redirected to the verification code procedure.')
      auth.authenticate_user()
      !gcloud config set project $projectId
    !gcloud info | grep -e Account -e Project

# Copy key file from Google Drive if available 
# to a path without spaces (it usually creates problems)
!test -f "/content/drive/My Drive/bdcc-colab.json" && cp "/content/drive/My Drive/bdcc-colab.json" /content/bdcc-colab.json

google_colab_authenticate(PROJECT_ID)

No key file given. You may be redirected to the verification code procedure.
Updated property [core/project].
Account: [jprfpa@gmail.com]
Project: [bigdata-269209]


In [0]:
#download movielens dataset
!wget -p "http://files.grouplens.org/datasets/movielens/ml-latest.zip"
#unzip it
!unzip /content/files.grouplens.org/datasets/movielens/ml-latest.zip
#copy links file to top level folder
!cp /content/ml-latest/links.csv /content
#clean up, remove downloaded and unzipped files
!sudo rm -rf /content/files.grouplens.org
!sudo rm -rf /content/ml-latest/

--2020-04-12 23:40:22--  http://files.grouplens.org/datasets/movielens/ml-latest.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 277113433 (264M) [application/zip]
Saving to: ‘files.grouplens.org/datasets/movielens/ml-latest.zip’


2020-04-12 23:40:26 (69.0 MB/s) - ‘files.grouplens.org/datasets/movielens/ml-latest.zip’ saved [277113433/277113433]

FINISHED --2020-04-12 23:40:26--
Total wall clock time: 4.0s
Downloaded: 1 files, 264M in 3.8s (69.0 MB/s)
Archive:  /content/files.grouplens.org/datasets/movielens/ml-latest.zip
   creating: ml-latest/
  inflating: ml-latest/links.csv     
  inflating: ml-latest/tags.csv      
  inflating: ml-latest/genome-tags.csv  
  inflating: ml-latest/ratings.csv   
  inflating: ml-latest/README.txt    
  inflating: ml-latest/genome-scores.csv  
  inflating: ml-latest/movies.csv    


In [0]:
#download the imbd crew dataset
!wget -p "https://datasets.imdbws.com/title.crew.tsv.gz"
#unzip it
!gunzip /content/datasets.imdbws.com/title.crew.tsv.gz
#copy crew file to top level folder
!mv /content/datasets.imdbws.com/title.crew.tsv /content

--2020-04-12 23:40:43--  https://datasets.imdbws.com/title.crew.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 13.35.112.121, 13.35.112.17, 13.35.112.95, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|13.35.112.121|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 47588411 (45M) [binary/octet-stream]
Saving to: ‘datasets.imdbws.com/title.crew.tsv.gz’


2020-04-12 23:40:58 (3.42 MB/s) - ‘datasets.imdbws.com/title.crew.tsv.gz’ saved [47588411/47588411]

FINISHED --2020-04-12 23:40:58--
Total wall clock time: 15s
Downloaded: 1 files, 45M in 13s (3.42 MB/s)


In [0]:
#download the imbd names dataset
!wget -p "https://datasets.imdbws.com/name.basics.tsv.gz"
#unzip it
!gunzip /content/datasets.imdbws.com/name.basics.tsv.gz
#copy crew file to top level folder
!mv /content/datasets.imdbws.com/name.basics.tsv /content

--2020-04-12 23:41:02--  https://datasets.imdbws.com/name.basics.tsv.gz
Resolving datasets.imdbws.com (datasets.imdbws.com)... 54.192.160.10, 54.192.160.57, 54.192.160.76, ...
Connecting to datasets.imdbws.com (datasets.imdbws.com)|54.192.160.10|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 195554085 (186M) [binary/octet-stream]
Saving to: ‘datasets.imdbws.com/name.basics.tsv.gz’


2020-04-12 23:41:05 (76.6 MB/s) - ‘datasets.imdbws.com/name.basics.tsv.gz’ saved [195554085/195554085]

FINISHED --2020-04-12 23:41:05--
Total wall clock time: 2.6s
Downloaded: 1 files, 186M in 2.4s (76.6 MB/s)


In [0]:
#clean imdb downloads, delete folders
!sudo rm -rf /content/datasets.imdbws.com

## Transfer dataset files (if necessary) from GCS

In [0]:
!test -d $DATASET || gsutil -m cp -r gs://"$INPUT_BUCKET"/"bdcc1920_project_datasets"/"$DATASET" .
!du --human $DATASET

Copying gs://zebigbucket/bdcc1920_project_datasets/large4/actors.parquet/._SUCCESS.crc...
Copying gs://zebigbucket/bdcc1920_project_datasets/large4/actors.parquet/.part-00000-bbcacd10-b008-4da8-95fd-9edd158bb17b-c000.snappy.parquet.crc...
Copying gs://zebigbucket/bdcc1920_project_datasets/large4/actors.parquet/.part-00001-bbcacd10-b008-4da8-95fd-9edd158bb17b-c000.snappy.parquet.crc...
Copying gs://zebigbucket/bdcc1920_project_datasets/large4/actors.parquet/.part-00002-bbcacd10-b008-4da8-95fd-9edd158bb17b-c000.snappy.parquet.crc...
Copying gs://zebigbucket/bdcc1920_project_datasets/large4/actors.parquet/.part-00003-bbcacd10-b008-4da8-95fd-9edd158bb17b-c000.snappy.parquet.crc...
Copying gs://zebigbucket/bdcc1920_project_datasets/large4/actors.parquet/.part-00005-bbcacd10-b008-4da8-95fd-9edd158bb17b-c000.snappy.parquet.crc...
Copying gs://zebigbucket/bdcc1920_project_datasets/large4/actors.parquet/.part-00006-bbcacd10-b008-4da8-95fd-9edd158bb17b-c000.snappy.parquet.crc...
Copying gs://zeb

## Parquet file read/write methods


In [0]:
#function to read csv into spark dataframe
def readCSV(file):
  if DEBUG:
       print('==> Reading ' + file)
  df = spark.read.option("header", "true").csv(file)
  if DEBUG:
      df.printSchema()
      df.show(10)
  return df

#function to read tsv into spark dataframe
def readTSV(file):
  if DEBUG:
       print('==> Reading ' + file)
  df = spark.read.option("delimiter", "\t").option("header", "true").csv(file)
  if DEBUG:
      df.printSchema()
      df.show(10)
  return df

def readParquet(file):
    global spark
    if DEBUG:
       print('==> Reading ' + file)
    df = spark.read.parquet(file)
    if DEBUG:
      df.printSchema()
      df.show(10)
    return df
 
def writeParquet(df,path):
    if DEBUG:
        print('==> Writing ' + path)
    !rm -fr $path
    df.write.parquet(path, mode='overwrite')


## Load input dataset from Parquet files

In [0]:
movies =  readParquet(DATASET + '/movies.parquet')
actors =  readParquet(DATASET + '/actors.parquet')
genres =  readParquet(DATASET + '/genres.parquet')
ratings = readParquet(DATASET + '/ratings.parquet')
tags =    readParquet(DATASET + '/tags.parquet')

#load imdb and movielens daasets into memory
links = readCSV("links.csv")
names = readTSV("name.basics.tsv")
crew = readTSV("title.crew.tsv")

==> Reading large4/movies.parquet
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- imdbId: integer (nullable = true)

+-------+--------------------+----+-------+
|movieId|               title|year| imdbId|
+-------+--------------------+----+-------+
|  98583|     Hecho en México|2012|2062645|
|  98585|           Hitchcock|2012| 975645|
|  98587|  Cesare deve morire|2012|2177511|
|  98589|         Interceptor|1992| 104516|
|  98591|Mais où est donc ...|1973|  70356|
|  98593|     D'autres mondes|2004| 347718|
|  98595|      Diabolo menthe|1977|  75939|
|  98601|   American Roulette|1988|  94644|
|  98604|  Kokuriko-zaka kara|2011|1798188|
|  98607|             Redline|2009|1483797|
+-------+--------------------+----+-------+
only showing top 10 rows

==> Reading large4/actors.parquet
root
 |-- movieId: integer (nullable = true)
 |-- name: string (nullable = true)

+-------+-----------------+
|movieId|            

## Process crew dataset

In [0]:
from pyspark.sql import functions as F

#turn directors and writers columns into arrays of id numbers
crew_array = crew\
  .withColumn('directors', F.split('directors', ","))\
  .withColumn('writers', F.split('writers', ","))

#split df into directors and writers df
directors_array = crew_array.select("tconst", "directors")
writers_array = crew_array.select("tconst", "writers")

#get one entry for each movie, director combination and remove nulls
directors_df = directors_array\
  .withColumn('directors', F.explode('directors'))\
  .filter(F.col("directors") != "\\N")

#get one entry for each movie, writer combination and remove nulls
writers_df = writers_array\
  .withColumn('writers', F.explode('writers'))\
  .filter(F.col("writers") != "\\N")

#change names of columns from directors and writers to crew_member (to join later)
directors_crew = directors_df.select("tconst", F.col("directors").alias("crew_member"))
writers_crew = writers_df.select("tconst", F.col("writers").alias("crew_member"))

#join director and writers, and remove "tt" from the movie id
crew_clean = writers_crew\
  .union(directors_crew)\
  .withColumn('tconst', F.expr("substring(tconst,3,length(tconst))"))

if DEBUG:
  print(crew_clean.count())
  crew_clean.printSchema()
  crew_clean.orderBy('tconst').show()




13102317
root
 |-- tconst: string (nullable = true)
 |-- crew_member: string (nullable = true)

+-------+-----------+
| tconst|crew_member|
+-------+-----------+
|0000001|  nm0005690|
|0000002|  nm0721526|
|0000003|  nm0721526|
|0000004|  nm0721526|
|0000005|  nm0005690|
|0000006|  nm0005690|
|0000007|  nm0005690|
|0000007|  nm0374658|
|0000008|  nm0005690|
|0000009|  nm0085156|
|0000009|  nm0085156|
|0000010|  nm0525910|
|0000011|  nm0804434|
|0000012|  nm0525910|
|0000012|  nm0525908|
|0000013|  nm0525910|
|0000014|  nm0525910|
|0000015|  nm0721526|
|0000016|  nm0525910|
|0000017|  nm0804434|
+-------+-----------+
only showing top 20 rows



## Calculate aggregate movie data

The aim is to derive a data frame with the same data as __movies__, augmented
with __numRatings__ and __avgRating__ columns, respectively representing the 
number of ratings and average rating per movie.

In [0]:
movies.createOrReplaceTempView('movies')
ratings.createOrReplaceTempView('ratings')

if DEBUG:
  print(movies.count())
  movies.printSchema()
  movies.orderBy('movieId').show()

ratings_agg = spark.sql(
    '''
    SELECT movieId,
          COUNT(*) AS numRatings,
          AVG(rating) AS avgRating
    FROM ratings
    GROUP BY movieId
    '''
)
ratings_agg.createOrReplaceTempView('ratings_agg')

if DEBUG:
  print(ratings_agg.count())
  ratings_agg.printSchema()
  ratings_agg.orderBy('movieId').show()

movies_agg = spark.sql(
    '''
    SELECT movieId, title, year, imdbId, 
           ifnull(numRatings, 0) as numRatings,
           ifnull(avgRating, 0.0) as avgRating
    FROM movies LEFT OUTER JOIN ratings_agg USING(movieId)
    ORDER BY movieId
    '''
)
if DEBUG:
  print(movies_agg.count())
  movies_agg.printSchema()
  movies_agg.show()

# no need for these, clean up
spark.catalog.dropGlobalTempView('agg_ratings')
spark.catalog.dropGlobalTempView('ratings')
ratings.unpersist()


58016
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- imdbId: integer (nullable = true)

+-------+--------------------+----+------+
|movieId|               title|year|imdbId|
+-------+--------------------+----+------+
|      1|           Toy Story|1995|114709|
|      2|             Jumanji|1995|113497|
|      3|    Grumpier Old Men|1995|113228|
|      4|   Waiting to Exhale|1995|114885|
|      5|Father of the Bri...|1995|113041|
|      6|                Heat|1995|113277|
|      7|             Sabrina|1995|114319|
|      8|        Tom and Huck|1995|112302|
|      9|        Sudden Death|1995|114576|
|     10|           GoldenEye|1995|113189|
|     11|The American Pres...|1995|112346|
|     12|Dracula: Dead and...|1995|112896|
|     13|               Balto|1995|112453|
|     14|               Nixon|1995|113987|
|     15|    Cutthroat Island|1995|112760|
|     16|              Casino|1995|112641|
|     17|Sense and

DataFrame[userId: int, movieId: int, rating: double]

## Join datasets to obtain one in format |movie|crew names|

In [0]:
movies.createOrReplaceTempView("movies")
links.createOrReplaceTempView('links')
names.createOrReplaceTempView('names')
crew_clean.createOrReplaceTempView('crew')

#restrict links to the ones in the movies dataset
movie_links = spark.sql(
    '''
    SELECT links.movieId, links.imdbId
    FROM movies
    JOIN links ON(movies.movieId=links.movieId)
    '''
)
movie_links.createOrReplaceTempView('movie_links')

if DEBUG:
  print(movie_links.count())
  movie_links.printSchema()
  movie_links.orderBy('movieId').show()

#get crew ids for each movie
crew_ids = spark.sql(
    '''
    SELECT movie_links.movieId, crew.crew_member
    FROM movie_links
    JOIN crew ON(movie_links.imdbId=crew.tconst)
    '''
)
crew_ids.createOrReplaceTempView('crew_ids')

if DEBUG:
  print(crew_ids.count())
  crew_ids.printSchema()
  crew_ids.orderBy('movieId').show()

#get names of crew for each movie
crew_names = spark.sql(
    '''
    SELECT crew_ids.movieId, names.primaryName
    FROM crew_ids
    JOIN names ON(crew_ids.crew_member=names.nconst)
    '''
)
crew_names.createOrReplaceTempView('crew_names')

if DEBUG:
  print(crew_names.count())
  crew_names.printSchema()
  crew_names.orderBy('movieId').show()

58016
root
 |-- movieId: string (nullable = true)
 |-- imdbId: string (nullable = true)

+-------+-------+
|movieId| imdbId|
+-------+-------+
|      1|0114709|
|     10|0113189|
|    100|0115907|
|   1000|0115994|
| 100001|0064179|
| 100003|0051143|
| 100006|0189525|
| 100008|1787810|
| 100010|1758570|
| 100013|2120779|
| 100015|0995033|
| 100017|2011953|
| 100032|2040281|
| 100034|1806911|
| 100036|2370140|
| 100038|1211890|
| 100040|0267989|
| 100042|0050470|
| 100044|1806234|
| 100046|1842793|
+-------+-------+
only showing top 20 rows

188975
root
 |-- movieId: string (nullable = true)
 |-- crew_member: string (nullable = true)

+-------+-----------+
|movieId|crew_member|
+-------+-----------+
|      1|  nm0923736|
|      1|  nm0004056|
|      1|  nm0230032|
|      1|  nm0005124|
|      1|  nm0169505|
|      1|  nm0005124|
|      1|  nm0812513|
|      1|  nm0710020|
|     10|  nm0289833|
|     10|  nm0001220|
|     10|  nm0270761|
|     10|  nm0128997|
|     10|  nm0132709|
|    1

## Derive words for TF-IDF processing

In [0]:
from pyspark.sql import functions as F

# Utility method to explode strings contained in the given field into a 'word' column
def wordDf(df, field):
  wdf = df.withColumn('word',
                      F.explode(F.split(F.lower(F.col(field)),'[ \s\*\+\&\/\%\-\$\#\'\)\(\[\[\],.!?;:\t\n"]+'))).\
                      drop(field)

  return wdf


actor_words = wordDf(actors, 'name')
genre_words = wordDf(genres,'genre')
tag_words = wordDf(tags.drop('userId'), 'tag')
title_words = wordDf(movies.select('movieId','title'), 'title')
#add crew names to list of words
crew_words = wordDf(crew_names.select('movieId','primaryName'), 'primaryName')
crew_words.show(10)
all_words = actor_words.union(genre_words)\
                       .union(tag_words)\
                       .union(title_words)\
                       .union(crew_words)\
                       
if DEBUG:
  print(all_words.count())
  all_words.orderBy('movieId','word').show()


+-------+-----+
|movieId| word|
+-------+-----+
| 146674|louis|
| 146674|   de|
| 146674|funès|
| 134659|louis|
| 134659|   de|
| 134659|funès|
| 142020|louis|
| 142020|   de|
| 142020|funès|
|  95615|louis|
+-------+-----+
only showing top 10 rows

3000616
+-------+------+
|movieId|  word|
+-------+------+
|      1|  2009|
|      1|   250|
|      1|   250|
|      1|   250|
|      1|   250|
|      1|   250|
|      1|   250|
|      1|   250|
|      1|   250|
|      1|   250|
|      1|     3|
|      1|    3d|
|      1|    3d|
|      1|    3d|
|      1|    55|
|      1|     a|
|      1|acting|
|      1|action|
|      1|action|
|      1|action|
+-------+------+
only showing top 20 rows



## Calculate TF-IDF metric (TODO)

The aim is to derive a data frame with __(moviedId, word, tf_idf)__ values
where the TF-IDF calculation is similar to the [one we discussed before during classes](https://colab.research.google.com/drive/15yMxBf1ltxrIaHCZUqqUWZBm6cqdcErF), i.e., the ${\rm TFIDF}$ term we consider for a movie $m$ in the set of movies $M$ and word $w$ is given by
   
   $$
   {\rm TFIDF}(m,w) = {\rm TF}(m,w) \:\times \:{\rm IDF}(w,M)
   $$

Only TF_IDF values equal or higher than the __MIN\_TF_\_IDF__ threshold should be written to the output files. The standard value to use for __MIN\_TF_\_IDF__ is  __0.1__ but you may adjust this value for testing purposes.




In [0]:
# TODO CALCULATE TF-IDF METRIC
from pyspark.sql.functions import count, max
from pyspark.sql.functions import log2

def freq(table):
  freq_table = table.groupBy("movieId","word").agg(count("*").alias("count_f"))

  if DEBUG:
    freq_table.orderBy('movieId','word').show()

  return freq_table

def make_tf_table(freq, max_freq):
  tf_table_temp = freq.join(max_freq, ["movieId"])
  tf_table = tf_table_temp.withColumn("TF", tf_table_temp.count_f/tf_table_temp.max_f)

  if DEBUG:
    tf_table.show()

  return tf_table.drop("count_f","max_f")

def make_idf_table(table):
  doc_num = table.select("movieId").distinct().count()
  idf_table = table.groupBy("word").agg(count("*").alias("count_inter_f"))
  idf_table = idf_table.withColumn("IDF", log2(doc_num/idf_table.count_inter_f))

  if DEBUG:
    print(doc_num)
    idf_table.orderBy("word").show()

  return idf_table.drop("count_inter_f")

def tfidf(table):
    table_freq = freq(table)
    max_f = table_freq.groupby('movieId').agg(max("count_f").alias("max_f"))
    max_f.orderBy('movieId').show()

    tf_table = make_tf_table(table_freq, max_f)
    idf_table = make_idf_table(table_freq)

    tf_idf_table = tf_table.join(idf_table, ["word"])
    tf_idf_table = tf_idf_table.withColumn("tf_idf", tf_idf_table.TF * tf_idf_table.IDF)

    if DEBUG:
      tf_idf_table.show()
    return tf_idf_table.drop("TF","IDF").select("movieId","word","tf_idf")

TF_IDF = tfidf(all_words)
if DEBUG:
      TF_IDF.show()

+-------+----------+-------+
|movieId|      word|count_f|
+-------+----------+-------+
|      1|      2009|      1|
|      1|       250|      9|
|      1|         3|      1|
|      1|        3d|      3|
|      1|        55|      1|
|      1|         a|      1|
|      1|    acting|      1|
|      1|    action|      3|
|      1| adventure|     19|
|      1|     again|      2|
|      1|      alec|      1|
|      1|       all|      1|
|      1|     allen|      8|
|      1|    almost|      1|
|      1|  american|      1|
|      1|       and|      2|
|      1|    andrew|      1|
|      1|  animated|     19|
|      1| animation|    121|
|      1|animmation|      1|
+-------+----------+-------+
only showing top 20 rows

+-------+-----+
|movieId|max_f|
+-------+-----+
|      1|  128|
|     10|   41|
|    100|    4|
|   1000|    2|
| 100001|    2|
| 100003|    1|
| 100006|    2|
| 100008|    2|
| 100010|    2|
| 100013|    3|
| 100015|    3|
| 100017|    3|
| 100032|    3|
| 100034|    2|
| 1000

**Prepare Ratings and Tags for Jaccard**

In [0]:
tags.createOrReplaceTempView('tags')

ratings_tags_by_movie = spark.sql(
    '''
    SELECT movieId, title, userId 
    FROM movies JOIN (
      SELECT T.movieId, T.userId  
      FROM tags as T UNION
      SELECT R.movieId, R.userId
      FROM ratings as R
    ) USING(movieId)
    ORDER BY movieId,userId
    '''
)
if DEBUG:
  print(ratings_tags_by_movie.count())
  ratings_tags_by_movie.printSchema()
  ratings_tags_by_movie.show()

27838099
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- userId: integer (nullable = true)

+-------+---------+------+
|movieId|    title|userId|
+-------+---------+------+
|      1|Toy Story|     4|
|      1|Toy Story|    10|
|      1|Toy Story|    14|
|      1|Toy Story|    15|
|      1|Toy Story|    22|
|      1|Toy Story|    25|
|      1|Toy Story|    27|
|      1|Toy Story|    31|
|      1|Toy Story|    32|
|      1|Toy Story|    38|
|      1|Toy Story|    42|
|      1|Toy Story|    43|
|      1|Toy Story|    51|
|      1|Toy Story|    54|
|      1|Toy Story|    55|
|      1|Toy Story|    56|
|      1|Toy Story|    58|
|      1|Toy Story|    62|
|      1|Toy Story|    67|
|      1|Toy Story|    70|
+-------+---------+------+
only showing top 20 rows



## Write output data to Parquet files and generate ZIP file

In [0]:
# Clean up first
!rm -fr "$DATASET"/output 
!rm -f "$DATASET"/"$OUTPUT_ZIP_FILE"

if DEBUG:
  !ls -l $DATASET

writeParquet(movies_agg, DATASET + '/output/' + 'movies_agg.parquet')
writeParquet(TF_IDF, DATASET + '/output/' + 'tfidf.parquet')
writeParquet(ratings_tags_by_movie, DATASET + '/output/' + 'ratings_tags.parquet')
if DEBUG:
  print('Creating ZIP file ...')

!cd "$DATASET"/output  && zip -9qr ../"$OUTPUT_ZIP_FILE" .

if DEBUG:
  !ls -l $DATASET "$DATASET"/output
 

total 148
drwxr-xr-x 2 root root 45056 Apr 13 01:27 actors.parquet
drwxr-xr-x 2 root root 49152 Apr 13 01:27 genres.parquet
drwxr-xr-x 2 root root 49152 Apr 13 01:27 movies.parquet
drwxr-xr-x 2 root root  4096 Apr 13 01:28 ratings.parquet
drwxr-xr-x 2 root root  4096 Apr 13 01:27 tags.parquet
==> Writing large4/output/movies_agg.parquet
==> Writing large4/output/tfidf.parquet
==> Writing large4/output/ratings_tags.parquet
Creating ZIP file ...
large4:
total 79320
drwxr-xr-x 2 root root    45056 Apr 13 01:27 actors.parquet
drwxr-xr-x 2 root root    49152 Apr 13 01:27 genres.parquet
drwxr-xr-x 2 root root    49152 Apr 13 01:27 movies.parquet
drwxr-xr-x 5 root root     4096 Apr 13 01:56 output
-rw-r--r-- 1 root root 81065209 Apr 13 01:58 output.zip
drwxr-xr-x 2 root root     4096 Apr 13 01:28 ratings.parquet
drwxr-xr-x 2 root root     4096 Apr 13 01:27 tags.parquet

large4/output:
total 144
drwxr-xr-x 2 root root 49152 Apr 13 01:53 movies_agg.parquet
drwxr-xr-x 2 root root 53248 Apr 13 01

## Copy output ZIP file to output bucket

In [0]:
! gsutil cp $DATASET/output.zip gs://"$OUTPUT_BUCKET"/"$DATASET"/output.zip 

Copying file://large4/output.zip [Content-Type=application/zip]...
\
Operation completed over 1 objects/77.3 MiB.                                     


## Copy Parquet files to output bucket (optional)

In [0]:
if COPY_PARQUET_FILES_TO_OUTPUT_BUCKET:
  ! gsutil -m cp -r $DATASET/output/movies_agg.parquet gs://"$OUTPUT_BUCKET"/"$DATASET"/
  ! gsutil -m cp -r $DATASET/output/tfidf.parquet gs://"$OUTPUT_BUCKET"/"$DATASET"/
  ! gsutil -m cp -r $DATASET/output/ratings_tags.parquet gs://"$OUTPUT_BUCKET"/"$DATASET"/

Copying file://large4/output/movies_agg.parquet/part-00065-1948ddf0-6f58-4f96-9728-7d17c8261ccc-c000.snappy.parquet [Content-Type=application/octet-stream]...
/ [0 files][    0.0 B/519.6 KiB]                                                Copying file://large4/output/movies_agg.parquet/part-00022-1948ddf0-6f58-4f96-9728-7d17c8261ccc-c000.snappy.parquet [Content-Type=application/octet-stream]...
Copying file://large4/output/movies_agg.parquet/part-00113-1948ddf0-6f58-4f96-9728-7d17c8261ccc-c000.snappy.parquet [Content-Type=application/octet-stream]...
Copying file://large4/output/movies_agg.parquet/part-00023-1948ddf0-6f58-4f96-9728-7d17c8261ccc-c000.snappy.parquet [Content-Type=application/octet-stream]...
/ [0 files][    0.0 B/519.6 KiB]                                                / [0 files][    0.0 B/519.6 KiB]                                                / [0 files][    0.0 B/519.6 KiB]                                                Copying file://large4/output/movies_agg.

## Send PubSub cloud message 

This will trigger the LCF cloud function. 

In [0]:
if SEND_PUBSUB_MESSAGE:
 !gcloud pubsub topics publish $PUBSUB_TOPIC --message $DATASET

messageIds:
- '1120327702080525'
