# BDCC project 1 

_[Big Data and Cloud Computing](http://www.dcc.fc.up.pt/~edrdo/aulas/bdcc), DCC/FCUP_


## Code necessary to run from the command line 

In [None]:
if __name__ == "__main__" :
    # This block is required to run the program from the command line
    # in interface with a single Spark instance
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    
    spark = SparkSession\
        .builder\
        .appName("BDCCp1")\
        .master("local[*]")\
        .getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")

## Provided code - auxilliary functions

__You should not need to edit these.__

#### loadMovieLensData

In [None]:
from pyspark.sql import functions as F

def readCSV(file, debug=False):
    if debug:
      print('Reading ' + file)
    return spark.read.csv(file, inferSchema=True, header=True)

def readParquet(file, debug=False): 
    if debug:
       print('Reading ' + file)
    return spark.read.parquet(file)

def loadMovieLensData(path, format='parquet', debug=False):
    if format == 'parquet':
       movies = readParquet(path +'/movies.parquet', debug)
       ratings = readParquet(path +'/ratings.parquet', debug)
       tags = readParquet(path +'/tags.parquet', debug)
    else:
       movies = readCSV(path +'/movies.csv', debug)
       ratings = readCSV(path +'/ratings.csv', debug)
       tags = readCSV(path +'/tags.csv', debug)
    
    tags = tags.withColumn('tagl', F.explode(F.split(F.lower(F.col('tag')),'[ \*\+\&\/\%\-\$\#\'\)\(\[\[\],.!?;:\t\n"]+')))\
            .drop('tag')\
            .withColumnRenamed('tagl','tag')
    if (debug):
        print('> movies')
        movies.printSchema()
        movies.show()
        print('> ratings')
        ratings.printSchema()
        ratings.show()
        print('> tags')
        tags.printSchema()
        tags.show()
    return (movies, ratings, tags)

#### writeCSV / writeParquet (use them to write a data frame to CSV or Parquet format)

In [None]:
def writeCSV(df, path): 
    df.write.csv(path, header=True, mode='overwrite')

def writeParquet(df,path):
    df.write.parquet(path, mode='overwrite')


#### createTagListDF

In [None]:
def createTagListDF(csvTagList):
    return spark.createDataFrame([ (t,) for t in csvTagList.split(' ')], ['tag'])

#### Definition of functions available only in Spark 2.4 (GCP Spark instances run Spark 2.3) 

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType,IntegerType

# Define F.array_intersect if not defined (Spark version < 2.4)
if not hasattr(F,'array_intersect'):
  F.array_intersect = spark.udf\
    .register('array_intersect', 
       lambda x,y: list(set(x) & set(y)), ArrayType(IntegerType()))

# Define F.array_union if not defined (Spark version < 2.4)
if not hasattr(F,'array_union'):
  F.array_union = spark.udf\
    .register('array_union', 
       lambda x,y: list(set(x) | set(y)), ArrayType(IntegerType()))

# Define F.array_except if not defined (Spark version < 2.4)
if not hasattr(F,'array_except'):
  F.array_except = spark.udf\
    .register('array_except', 
       lambda x,y: list(set(x) - set(y)), ArrayType(IntegerType()))

## Functions to define 

__This is the section that will be evaluated.__

__Include your code for the various functions required in the assigment below.__

__You may include other auxilliary functions required for computation here
but NOT test code (see below).__



#### tfidfTags

In [None]:
# Auxiliary function to compute the tfidf of a given DF
def tfidf_aux(data, term, document, debug=False):
    """Data is the Dataframe to apply tf-idf to.
    term is the name of the column containing the terms.
    document is the name of the column containing the documents"""
    
    f = data\
        .groupBy(term, document)\
            .agg(F.count(document)\
                 .alias('f')\
                )
    if debug:
        print('>>> TF-IDF Debugger')
        print('>>> Step 1 :: Compute number of times ' + term +
              'has been used in association to ' + document)
        f.show()
    
    f_max = f.groupBy(document)\
                .agg(F.max('f')\
                     .alias('f_max')\
                )
    f_f_max = f.join(f_max, document)
    if debug:
        print('>>> Step 2 :: Compute maximum absolute frequence of any ' + term +
              ' used for ' + document)
        f_f_max.show()
    
    tf = f_f_max\
            .withColumn('TF', f_f_max.f / f_f_max.f_max)
    if debug:
        print('>>> Step 3 :: TF value of ' + term + ' for ' + document)
        tf.show()
    
    n = data\
        .groupBy(term)\
        .agg(F.countDistinct(document)\
             .alias('n')\
        )
    tf_n = tf.join(n, term)
    if debug:
        print('>>> Step 4 :: Join with the number of ' + document +
              's with ' + term + ' at least once')
        tf_n.show()

    N = tags.select(document).distinct().count()
    idf = tf_n\
            .withColumn('IDF',  F.log2(N / tf_n.n))
    if debug:
        print('>>> Step 5 :: IDF value of ' + term +
              ' considering all ' + document + 's with ' + term)
        idf.show()
    
    tfidf = idf\
                .withColumn('TF_IDF',idf.TF * idf.IDF)
    if debug:
        print('>>> Step 6 :: TF-IDF value of ' + term + ' for ' + document)
        tfidf.show()
        print('>>> Finished TF-IDF processing')

    return tfidf

In [None]:
from pyspark.sql import functions as F

def tfidfTags(tags, debug=False):
    if debug:
        print('>> Step 1 :: Compute tfidf using "tag" as term and "movieId" as document')

    return tfidf_aux(tags, 'tag', 'movieId', debug)
                

#### recommendByTag

In [None]:
from pyspark.sql import functions as F

def recommendByTag(singleTag, TFIDF_tags, movies, min_fmax=10, numberOfResults=10, debug=False):
    filt_tags = TFIDF_tags\
                        .filter((TFIDF_tags.tag == singleTag) &\
                                (TFIDF_tags.f_max >= min_fmax))\
                        .drop('tag', 'f', 'f_max', 'n', 'TF', 'IDF')
    if debug:
        print('>> Step 1 :: TFIDF of single tag & Filtered by >= ' + str(min_fmax))
        filt_tags.show()

    tags_movie = filt_tags.join(movies, 'movieId')
    if debug:
        print('>> Step 2 :: Join with the corresponding movie')
        tags_movie.show()

    rm_tag = tags_movie\
                .orderBy(['TF_IDF', 'title'], ascending=[0, 1])\
                .select('movieId', 'title', 'TF_IDF')\
                .limit(numberOfResults)
    if debug:
        print('>> Step 3 :: Limit to ' + str(numberOfResults) + ' ordered results')
        rm_tag.show()

    return rm_tag

#### recommendByTags

In [None]:
from pyspark.sql import functions as F

# Can it be done by using previous function recommendByTag?
# Even if possible is more computationally heavy
def recommendByTags(searchTags, TFIDF_tags, movies, min_fmax=10, numberOfResults=10, debug=False):
    searchTagsDF = createTagListDF(searchTags)
    if debug:
        print('>> Step 1 :: Search tags DF: ' + searchTags)
        searchTagsDF.show()

    filt_tags = TFIDF_tags\
                    .join(searchTagsDF, 'tag')\
                    .filter(F.col('f_max') >= min_fmax)
    if debug:
        print('>> Step 2 :: TFIDF of given tags & filtered by >= ' + str(min_fmax))
        filt_tags.show()

    sum_tfidf = filt_tags\
                    .groupBy('movieID')\
                    .agg(F.sum('TF_IDF')\
                        .alias('SUM_TF_IDF')\
                    )
    if debug:
        print('>> Step 3 :: Sum of TF_IDF on same movies')
        sum_tfidf.show()

    tags_movie = sum_tfidf\
                    .join(movies, 'movieId')\
                    .orderBy(['SUM_TF_IDF', 'title'], ascending=[0, 1])\
                    .select('movieId', 'title', 'SUM_TF_IDF')\
                    .limit(numberOfResults)
    if debug:
        print('>> Step 4 :: Join with the corresponding movie & limit to ' +
              str(numberOfResults) + ' ordered results')
        tags_movie.show()

    return tags_movie

#### jiMovieSimilarity

In [None]:
def jiSimilarity(data, col_ref, col_set, min_col_set=0, debug=False):
    """"Data is the Dataframe to apply Jaccard Index to.
    col_ref is the name of the column of reference for the sets.
    col_ref should have as a last character a '1'.
    col_set is the name of the column to generate the sets from.
    min_col_set is the minimum size of the admitted sets"""

    f1 = data\
                .groupBy(col_ref)\
                .agg(F.collect_set(data[col_set])\
                      .alias('f1')\
                    )\
                .filter(F.size('f1') >= min_col_set)
    if debug:
        print('>>> Jaccard Index debugger')
        print('>>> Step 1 :: ' + col_ref + ' & Set of ' + col_set + \
                  ' that are related with' + col_ref + ' (f1) & Have sets size ' +
                  'bigger than ' + str(min_col_set))
        f1.show()

    col_ref_2 = col_ref[:-1] + '2'
    f2 = f1\
            .withColumnRenamed(col_ref, col_ref_2)\
            .withColumnRenamed('f1', 'f2')

    cross_prod = f1\
                .crossJoin(f2)\
                .filter(f1[col_ref] < f2[col_ref_2])
    if debug:
        print('>>> Step 2 :: Crossing different ' + col_ref[:-1] +
              ' and the respective sets of ' + col_set)
        cross_prod.show()

    i_u = cross_prod\
                .withColumn('i', F.size(\
                             F.array_intersect(cross_prod.f1,\
                                               cross_prod.f2)\
                                       )\
                           )\
                .withColumn('u', F.size(\
                           F.array_union(cross_prod.f1,\
                                         cross_prod.f2)\
                                       )\
                           )\
                .drop('f1', 'f2')
    if debug:
        print('>>> Step 3 :: Intersection between ' + col_set +
              ' (i) & Union between ' + col_set + ' (u)')
        i_u.show()

    ji = i_u\
            .withColumn('JI', i_u.i / i_u.u)
    if debug:
        print('>>> Step 4 :: Computed JI out of i & u')
        ji.show()
        print('>>> Finished Jaccard Index processing')

    return ji

In [None]:
from pyspark.sql import functions as F

def jiMovieSimilarity(ratings, minRatings=10, debug=False):
    liked_ratings = ratings\
                        .filter(ratings.rating >= 4.0)\
                        .withColumnRenamed('movieId', 'm1')
    if debug:
        print('>> Step 1 :: Filter ratings for liked movies & rename movieId to m1')
        liked_ratings.show()
        print('>> Step 2 :: Compute JI using "m1" as col_ref and "userId" as col_set')

    return jiSimilarity(liked_ratings, 'm1', 'userId', minRatings, debug)

#### recommendBySimilarity

In [None]:
def getJiEntries(ji, entry_id, col_name_1, debug=False):
    """Gets the Entries of a Jaccard Index for the first two columns.
    Since in the given JI col1 < col2 we want to retrieve the DF were entry_id can be either
    on col1 or col2. 
    col_name_1 is the name of the first column in the JI.
    The col1 or col2 results will be returned in a column named
    col_name_1[:-1] (removing the digit of the given column name)"""

    col_name = col_name_1[:-1]
    col_name_2 = col_name + '2'
    
    col1_ji = ji\
                .filter(ji[col_name_1] == entry_id)\
                .drop(col_name_1, 'i', 'u')
    if debug:
        print('>>> GetJiEntries Debugger')
        print('>>> Step 1 :: Filter ji where col1 is ' + str(entry_id))
        col1_ji.show()

    col2_ji = ji\
                .filter(ji[col_name_2] == entry_id)\
                .drop(col_name_2, 'i', 'u')
    if debug:
        print('>>> Step 2 :: Filter ji where col2 is ' + str(entry_id))
        col2_ji.show()

    col_ji = col1_ji\
                .withColumnRenamed(col_name_2, col_name)\
                .union(\
                       col2_ji\
                           .withColumnRenamed(col_name_1, col_name)\
                      )
    if debug:
        print('>>> Step 3 :: Union of the two DFs presented before')
        col_ji.show()
        print('>>> Finished GetJiEntries processing')

    return col_ji

In [None]:
def recommendBySimilarity(movieId, movies, jiForMovies, numberOfResults=10, debug=False):
    if debug:
        print('>> Step 1 :: Get JI entries for the given Jaccard Index')

    ji_movieId = getJiEntries(jiForMovies, movieId, 'm1', debug)\
                    .withColumnRenamed('m', 'movieId')

    result = ji_movieId\
                    .join(movies, 'movieId')\
                    .select('movieId', 'title', 'JI')\
                    .orderBy('JI', ascending=False)\
                    .limit(numberOfResults)
    if debug:
        print('>> Step 2 :: Join with the respective movies and order results')
        result.show()
        
    return result

# Specify input data set and load it

In [None]:
# Load data
bucket = 'gs://bdcc_up201503784_311' # Ed's bucket 
#bucket = 'gs://bdcc_up201503316' # Foo's bucket 
path = '/p1/data/'
#path = '/p1/'
dataset = 'medium2'
fullPath = bucket + path + dataset

(movies, ratings, tags) = \
  loadMovieLensData(fullPath, format='csv', debug=True)

##  Test code 

__Include test code below that you may need here.__

__The initial contents are only meant as an example.__

__This section will NOT be evaluated.__

In [None]:
# Get TF-IDF for tags
tfidf = tfidfTags(tags)

# tfidf.cache()
# tfidf.orderBy(['movieId','TF_IDF'],ascending=[1,0]).show()
tfidf.orderBy(['f','TF_IDF','movieId','tag'],ascending=[0,0,1,1]).show()



In [None]:
# Recommend by tag 

rm = recommendByTag('cartoon', tfidf, movies)
rm.show()

rm = recommendByTag('cartoon', tfidf, movies, min_fmax=1)
rm.show()


rm = recommendByTag('cruise', tfidf, movies)
rm.show()




In [None]:
# Recommend by Tags

rm = recommendByTags('hitchcock birds', tfidf, movies)
rm.show()

rm = recommendByTags('quentin tarantino', tfidf, movies)
rm.show()

rm = recommendByTags('sci fi space', tfidf, movies)
rm.show()

#rm = recommendByTags('hitchcock birds', tfidf, movies, numberOfResults=10)
#rm.show()




In [None]:
jiM = jiMovieSimilarity(ratings)

#jiM.orderBy(['JI','m1','m2'], ascending=[0,1,1]).show()
jiM.orderBy(['i','JI','m1','m2'], ascending=[0,0,1,1]).show()




In [None]:
jiM.cache()

# The Dish
sm = recommendBySimilarity(4225, movies, jiM)
sm.show()

# Miami Vice
sm = recommendBySimilarity(47044, movies, jiM)
sm.show()

# Extended Functionalities

### tfidfMovies

In [None]:
def tfidfMoviesAndTags(movies, tags, debug=False):
    movie_title_w = movies\
                .withColumn('word',\
                    F.explode(F.split(F.col('title'), '( \([0-9]{4}\))| '))\
                           )\
                .filter(F.col('word') != '')\
                .drop('title')
    if debug:
        print('>> Step 1 :: Associate to each movie the words belonging to its title')
        movie_title_w.show()

    # Union keeps duplicates - intended
    movie_w = movie_title_w\
                    .union(tags\
                              .withColumnRenamed('tag', 'word')\
                              .drop('userId'))
    if debug:
        print('>> Step 2 :: Union of previous DF with the given tags')
        movie_w.orderBy('movieId').show()
        print('>> Step 3 :: Compute tfidf using "word" as term and "movieId" as document')

    return tfidf_aux(movie_w, 'word', 'movieId', debug)

### jiTagSimilarity

In [None]:
def jiTagSimilarity(tags, debug=False):
    t1_tags = tags\
                .withColumnRenamed('tag', 't1')
    if debug:
        print('>> Step 1 :: Rename tag to t1')
        t1_tags.show()
        print('>> Step 2 :: Compute JI using "t1" as col_ref and "movieId" as col_set')

    return jiSimilarity(t1_tags, 't1', 'movieId', 0, debug)

### recommendTags

In [None]:
from functools import reduce

def recommendTags(movieId, jiTags, tags, numberSimilarTags=5, debug=False):
    movie_tags = tags\
                    .filter(tags.movieId == movieId)
    movie_tags_list = [row.tag for row\
                               in movie_tags.distinct().collect()]
    if debug:
        print('>> Step 1 :: Get the tags associated to the given movieId')
        movie_tags.show()
        print('>> Step 1 :: Representing the tags in a list: ')
        print(movie_tags_list)

    df_array = [getJiEntries(jiTags, movie_tag, 't1')\
                            .filter(~F.col('t').isin(movie_tags_list))\
                            .orderBy('JI', ascending=False)
                    for movie_tag
                    in movie_tags_list]
    if debug:
        print('>> Step 2 :: For each movie tag, get the tags not yet associated' +
                'with movieId that maximize the JI')
        for df in df_array:
            df.show()

    res_df = reduce(\
                lambda acc, df: acc.union(df),\
                df_array)
    if debug:
        print('>> Step 3 :: Union of previous DFs')
        res_df.show()

    result = res_df\
                .groupBy('t')\
                .agg(F.sum('JI')\
                    .alias('SUM_JI')\
                )\
                .orderBy('SUM_JI', ascending=False)\
                .limit(numberSimilarTags)
    if debug:
        print('>> Step 4 :: Summed all the JIs referencing the given tag and ' +
                'limited to ' + str(numberSimilarTags) + ' ordered results. ' +
                'The higher the JI sum, the more relevant is the tag')
        result.show()

    return result.drop('SUM_JI')

### jiUserSimiliarity

In [None]:
# Calculate the Jaccard similarity between users based
# on what films they rate (independently of the value of the rating itself). 
def jiUserSimilarity(ratings, debug=False):
    u1_rat = ratings\
                .withColumnRenamed('userId', 'u1')
    if debug:
        print('>> Step 1 :: Rename userId to u1')
        u1_rat.show()
        print('>> Step 2 :: Compute JI using "u1" as col_ref and "movieId" as col_set')

    return jiSimilarity(u1_rat, 'u1', 'movieId', 0, debug)

### recommendByUserSimilarity

In [None]:
from pyspark.sql.functions import array, lit, udf
from pyspark.sql.types import FloatType

# Given an array returns a new array with the lit function applied to every member
def get_lit_array_from(arr):
    ret = [ lit(item) for item in arr[0] ]
    return array(ret)

# Given an array of movies and an array of ratings 
# returns the top rated movie
# Assumes index relation between arrays 
# ratings[0] rates movie[0]
def get_top_rated(movies, ratings):
    m = -1
    tr = -1
    for i in range(0, len(movies)):
        if ratings[i] > m:
            m = ratings[i]
            tr = movies[i]
    return tr

# Given an array of movies, an array of ratings and an array of movies to exclude
# returns the rating of the movies included
def get_ratings(movies, ratings, except_movies):
    temp_ratings = []
    for i in range(0, len(movies)):
        if movies[i] not in except_movies:
            temp_ratings.append(ratings[i])
    return temp_ratings

# Given the id of a user, recommend the top-rated film per each of the most n
# similar users to user u, as long as u has not yet rated or tagged the movies at stake.
def recommendByUserSimilarity(userID, ratings, tags, jiForUsers, numberSimilarUsers=10, debug=False):
    related_users = getJiEntries(jiForUsers, userID, 'u1')
    if debug:
        print('>> Step 1 :: Get the Jaccard Index of related users ' \
              'to User' + str(userID))
        related_users.show()
     
    relatedUsers_movies = related_users\
            .join(ratings, ratings.userId == related_users.u)\
            .groupBy('u')\
            .agg(F.collect_list('movieId')\
                 .alias('movies'),\
                 F.collect_list('rating')\
                .alias('ratings'))
    
    if debug:
        print('>> Step 2 :: Join of the DFs with ratings')
        relatedUsers_movies.show()

    u1_mu1 = ratings.union(tags)\
                .filter(F.col('userId') == userID)\
                .drop('rating') \
                .agg(F.collect_set(F.col('movieId'))\
                      .alias('movies'))
    
    if debug:
        print('>> Step 3 :: Set of movies rated/tagged by User' + str(userID))
        u1_mu1.show()
        
    arr = get_lit_array_from(u1_mu1.collect()[0])
    relatedUsers_exceptMovies = relatedUsers_movies \
                            .withColumn("except_movies", arr)
    
    
    potencialMovies = F.array_except( \
                    relatedUsers_exceptMovies.movies \
                    ,relatedUsers_exceptMovies.except_movies \
                    )
    potencialRatings = udf(get_ratings, ArrayType(FloatType()))
    
    relatedUsers_potencialMovies=relatedUsers_exceptMovies \
                                .withColumn("potencialMovies", potencialMovies) \
                                .filter(F.size(F.col('potencialMovies')) > 0) \
                                .withColumn("potencialRatings",\
                                    potencialRatings("potencialMovies",\
                                                "ratings", "except_movies")) \
                                .drop('movies', 'except_movies', 'ratings') \
                                .join(related_users, ['u']) \
                                .orderBy('JI', ascending=False) \
                                .drop('JI') \
                                .limit(numberSimilarUsers)

    if debug:
        print('>> Step 4 :: Potencial Movies Only')
        relatedUsers_potencialMovies.show()

        
    top_rated = udf(get_top_rated)
    
    result = relatedUsers_potencialMovies \
            .withColumn("topRated", top_rated("potencialMovies", "potencialRatings"))\
            .drop('potencialMovies', 'ratings', 'potencialRatings')
    

    if debug:
        print('>> Step 5 :: Choose the top rated movies')
        result.show()
        
    return result

# Test Code for Extended Fuctionalities

In [None]:
# Tests for tfidfMovies

tfidfMT = tfidfMoviesAndTags(movies, tags, debug=True)

tfidfMT.orderBy(['f','TF_IDF','movieId','word'],ascending=[0,0,1,1]).show()

In [None]:
# Tests for jiTagSimilarity
jiT = jiTagSimilarity(tags)
jiT.cache()
jiT.orderBy('JI', ascending=False).show()

# MovieId 2
recommendTags(2, jiT, tags, debug=True).show()

In [None]:
# Tests for jiUserSimilarity
jiU = jiUserSimilarity(ratings, debug=True)

#jiU.orderBy(['JI','m1','m2'], ascending=[0,1,1]).show()
#jiU.orderBy(['i','JI','m1','m2'], ascending=[0,0,1,1]).show()

jiU.cache()

# User 62
sm = recommendByUserSimilarity(62, ratings, tags, jiU, 10, True)
sm.show()