<h1 align="center">BDCC project 1</h1>

<h4 align="center">By: António Almeida, Miguel Ramalho</h4>

<h5 align="center"><a href="http://www.dcc.fc.up.pt/~edrdo/aulas/bdcc">Big Data and Cloud Computing</a>, DCC/FCUP</h5>


## Code necessary to run from the command line 

In [1]:
if __name__ == "__main__" :
    # This block is required to run the program from the command line in interface with a single Spark instance
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("BDCCp1").master("local[*]").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")

## Provided code - auxilliary functions

__You should not need to edit these.__

#### loadMovieLensData

In [2]:
from pyspark.sql import functions as F

In [3]:
def readCSV(file, debug=False):
    if debug: print('Reading ' + file)
    return spark.read.csv(file, inferSchema=True, header=True)

def readParquet(file, debug=False): 
    if debug: print('Reading ' + file)
    return spark.read.parquet(file)

def loadMovieLensData(path, format='parquet', debug=False):
    if format == 'parquet':
        movies = readParquet(path +'/movies.parquet', debug)
        ratings = readParquet(path +'/ratings.parquet', debug)
        tags = readParquet(path +'/tags.parquet', debug)
    else:
        movies = readCSV(path +'/movies.csv', debug)
        ratings = readCSV(path +'/ratings.csv', debug)
        tags = readCSV(path +'/tags.csv', debug)
    
    tags = tags.withColumn('tagl', F.explode(F.split(F.lower(F.col('tag')),'[ \*\+\&\/\%\-\$\#\'\)\(\[\[\],.!?;:\t\n"]+')))\
            .drop('tag')\
            .withColumnRenamed('tagl','tag')
    if (debug):
        print('> movies')
        movies.printSchema()
        movies.show()
        
        print('> ratings')
        ratings.printSchema()
        ratings.show()
        
        print('> tags')
        tags.printSchema()
        tags.show()
    return (movies, ratings, tags)

#### writeCSV / writeParquet (use them to write a data frame to CSV or Parquet format)

In [4]:
def writeCSV(df, path): 
    df.write.csv(path, header=True, mode='overwrite')

def writeParquet(df,path):
    df.write.parquet(path, mode='overwrite')

#### createTagListDF

In [5]:
def createTagListDF(csvTagList):
    # receives a string of space-separated tags and returns them in a dataframe
    return spark.createDataFrame([ (t,) for t in csvTagList.split(' ')], ['tag'])

#### Definition of functions available only in Spark 2.4 (GCP Spark instances run Spark 2.3) 

In [6]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType,IntegerType

# Define F.array_intersect if not defined (Spark version < 2.4)
if not hasattr(F,'array_intersect'):
    F.array_intersect = spark.udf.register('array_intersect', 
        lambda x,y: list(set(x) & set(y)), ArrayType(IntegerType()))

# Define F.array_union if not defined (Spark version < 2.4)
if not hasattr(F,'array_union'):
    F.array_union = spark.udf.register('array_union', 
        lambda x,y: list(set(x) | set(y)), ArrayType(IntegerType()))

## Group code - auxilliary functions

These come before the functions to define due to jupyter top-down logic.

In [7]:
def get_idf(*, data, w='w', d='d', n=None, debug=False):
    """Calculates the Inverse Document Frequency (IDF) of a DataFrame

    By default, uses the standard usage of IDF, i.e., 'w'
    is a word in a document 'd'. If 'n' is set, it also 
    returns a column containing the number of documents
    in which word 'w' appears.

    Args:
        data: A DataFrame instance.
        w: Column name for 'words'
        d: Column name for 'documents'
        n: Name for the output column containing the
            number of documents in which 'w' appears

    Returns:
        A DataFrame with 'w', 'IDF, [n] as columns
    """
    n_w_D = data\
           .groupBy(w)\
           .agg(F.countDistinct(d).alias('n_w_D'))
    if debug: n_w_D.orderBy('n_w_D',ascending=False).show()
        
    size_of_D = data.select(d).distinct().count()
    if debug: print("|D| = %d" % size_of_D)
    
    IDF = n_w_D.withColumn('IDF', F.log2(size_of_D / F.col('n_W_D')))

    if n: IDF = IDF.withColumnRenamed('n_w_D', n)
    else: IDF = IDF.drop('n_w_D')
    return IDF

## Functions to define 

__This is the section that will be evaluated.__

__Include your code for the various functions required in the assigment below.__

__You may include other auxilliary functions required for computation here
but NOT test code (see below).__

> Some auxiliary functions required for computation are in the [section above](#Group-code---auxilliary-functions)

#### tfidfTags
Calculates the TF-IDF metric for tags in association to movies.

In [8]:
def tfidfTags(tags, debug=False):
    # f is the the number of times tag has been used in with movieId
    # aggregate as (tag, movieId, f)
    df_f = tags.groupBy('tag', 'movieId')\
               .agg(F.count('userId').alias('f'))
    
    # f_max is the maximum absolute frequency of any tag used for movieId
    # aggregate as (movieId, f_max)
    df_f_max = df_f.groupBy('movieId')\
                   .agg(F.max('f').alias('f_max'))
    
    # call external function to calculate IDF
    df_idf = get_idf(data=tags, w='tag', d='movieId', n='n', debug=debug)
    
    # join f_max on movieId, calculate TF, join with IDF on tag
    df = df_f.join(df_f_max, 'movieId')\
             .withColumn('TF', F.col('f') / F.col('f_max'))\
             .join(df_idf, 'tag')
    
    # return dataframe with TF_IDF
    return df.withColumn('TF_IDF', df.TF * df.IDF)

#### recommendByTag
Recomends movies that have the highest TF-IDF value for a given (single) tag.

In [9]:
def recommendByTag(singleTag, TFIDF_tags, movies, min_fmax=10, numberOfResults=10, debug=False):
    # start by most complexity-reducing operation: filter
    # join to get movie title
    # order by descending TFIDF + ascending lexicographic title
    # remove unnecessary columns
    # return results limited to numberOfResults
    df = TFIDF_tags.filter(TFIDF_tags.tag == singleTag)\
                   .filter(TFIDF_tags.f_max >= min_fmax)\
                   .join(movies, 'movieId')\
                   .orderBy(['TF_IDF','title'], ascending=[0,1])\
                   .select('movieId', 'title', 'TF_IDF')\
                   .limit(numberOfResults)
    return df

#### recommendByTags
Recomends movies that have the highest combined (sum of) TF-IDF value for several given tags (1 or more).

In [30]:
def recommendByTags(searchTags, TFIDF_tags, movies, min_fmax=10, numberOfResults=10, debug=False):
    df_search_tags = createTagListDF(searchTags)
    if debug:
        print('> Search tags DF: ' + searchTags)
        df_search_tags.show()
    # filter by min_fmax
    # join df_search_tags to remove unasked tags
    # group by movieId and aggregate on the SUM of tfidf
    # join movies to get title
    # order by descending SUM_TF_IDF + ascending lexicographic title
    # force column order in the examples
    # return results limited to numberOfResults
    return TFIDF_tags.filter(TFIDF_tags.f_max >= min_fmax)\
                     .join(df_search_tags, 'tag', 'inner')\
                     .groupBy('movieId')\
                     .agg(F.sum('TF_IDF').alias('SUM_TF_IDF'))\
                     .join(movies, 'movieId')\
                     .orderBy(['SUM_TF_IDF', 'title'], ascending=[0,1])\
                     .select(["movieId", "title", "SUM_TF_IDF"])\
                     .limit(numberOfResults)

#### jiMovieSimilarity
Calculates the Jaccard index to measure similarity between movies based on user ratings.

In [None]:
def jiMovieSimilarity(ratings, minRatings=10, debug=False):
  # TODO
  return None

#### recommendBySimilarity

In [None]:
def recommendBySimilarity(movieId, movies, jiForMovies, numberOfResults=10, debug=False):
    # TODO
        
    return None

# Specify input data set and load it

In [13]:
# Load data
bucket = 'gs://bdcc1819'
path = '/p1/data/'
dataset = 'tiny3'
fullPath = bucket + path + dataset

movies, ratings, tags = loadMovieLensData(fullPath, format='csv', debug=True)

Reading gs://bdcc1819/p1/data/tiny3/movies.csv
Reading gs://bdcc1819/p1/data/tiny3/ratings.csv
Reading gs://bdcc1819/p1/data/tiny3/tags.csv
> movies
root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|      Jumanji (1995)|
|      3|Grumpier Old Men ...|
|      4|Waiting to Exhale...|
|      5|Father of the Bri...|
|      6|         Heat (1995)|
|      7|      Sabrina (1995)|
|      8| Tom and Huck (1995)|
|      9| Sudden Death (1995)|
|     10|    GoldenEye (1995)|
|     11|American Presiden...|
|     12|Dracula: Dead and...|
|     13|        Balto (1995)|
|     14|        Nixon (1995)|
|     15|Cutthroat Island ...|
|     16|       Casino (1995)|
|     17|Sense and Sensibi...|
|     18|   Four Rooms (1995)|
|     19|Ace Ventura: When...|
|     20|  Money Train (1995)|
+-------+--------------------+
only showing top 20 rows

##  Test code 

__Include test code below that you may need here.__

__The initial contents are only meant as an example.__

__This section will NOT be evaluated.__

In [14]:
# Get TF-IDF for tags
tfidf = tfidfTags(tags, debug=False)
tfidf.cache()
# guarantee all columns are present
assert tfidf.columns == ['tag', 'movieId', 'f', 'f_max', 'TF', 'n', 'IDF', 'TF_IDF'],\
    "Columns do not match expected values for tfidfTags"
# preview the dataframe
tfidf.orderBy(['f','TF_IDF','movieId','tag'], ascending=[0,0,1,1]).show()

+-----------+-------+---+-----+------------------+---+-----------------+-----------------+
|        tag|movieId|  f|f_max|                TF|  n|              IDF|           TF_IDF|
+-----------+-------+---+-----+------------------+---+-----------------+-----------------+
|       time|     32|  3|    3|               1.0|  1|5.426264754702098|5.426264754702098|
|     travel|     32|  3|    3|               1.0|  1|5.426264754702098|5.426264754702098|
|      pixar|      1|  2|    2|               1.0|  1|5.426264754702098|5.426264754702098|
|       game|      2|  2|    2|               1.0|  1|5.426264754702098|5.426264754702098|
|apocalyptic|     32|  2|    3|0.6666666666666666|  1|5.426264754702098|3.617509836468065|
|       post|     32|  2|    3|0.6666666666666666|  1|5.426264754702098|3.617509836468065|
|      moldy|      3|  1|    1|               1.0|  1|5.426264754702098|5.426264754702098|
|        old|      3|  1|    1|               1.0|  1|5.426264754702098|5.426264754702098|

In [22]:
# Recommend by tag, tests for tiny3
recommendByTag('twist', tfidf, movies, min_fmax=1).show()
recommendByTag('killer', tfidf, movies, min_fmax=1).show()
recommendByTag('remake', tfidf, movies, min_fmax=1).show()

rm = recommendByTag('twist', tfidf, movies, min_fmax=1, numberOfResults=2)
assert rm.count() == 2, "numberOfResults should be 2"
assert rm.columns == ["movieId", "title", "TF_IDF"], "unexpected columns"

+-------+--------------------+-----------------+
|movieId|               title|           TF_IDF|
+-------+--------------------+-----------------+
|     47|Seven (a.k.a. Se7...|3.841302253980942|
|     50|Usual Suspects, T...|3.841302253980942|
|     32|Twelve Monkeys (a...|1.280434084660314|
+-------+--------------------+-----------------+

+-------+--------------------+-----------------+
|movieId|               title|           TF_IDF|
+-------+--------------------+-----------------+
|     22|      Copycat (1995)|4.426264754702098|
|     47|Seven (a.k.a. Se7...|4.426264754702098|
+-------+--------------------+-----------------+

+-------+--------------------+-----------------+
|movieId|               title|           TF_IDF|
+-------+--------------------+-----------------+
|      5|Father of the Bri...|3.841302253980942|
|      7|      Sabrina (1995)|3.841302253980942|
|     32|Twelve Monkeys (a...|1.280434084660314|
+-------+--------------------+-----------------+



In [32]:
# Recommend by tags, tests for tiny3
recommendByTags('jane austen', tfidf, movies,min_fmax=1).show()
recommendByTags('remake time twist', tfidf, movies, min_fmax=1).show()
recommendByTags('robin williams remake', tfidf, movies, min_fmax=1).show()

rm = recommendByTags('remake time twist', tfidf, movies, min_fmax=1, numberOfResults=4)
assert rm.count() == 4, "numberOfResults should be 4"
assert rm.columns == ["movieId", "title", "SUM_TF_IDF"], "unexpected columns"

# Tests for tiny 1 (?)
# recommendByTags('tom hanks airport', tfidf, movies, numberOfResults=20).show()
# recommendByTags('tom hanks', tfidf, movies, numberOfResults=20).show()
# recommendByTags('hitchcock birds', tfidf, movies, numberOfResults=10).show()

+-------+--------------------+-----------------+
|movieId|               title|       SUM_TF_IDF|
+-------+--------------------+-----------------+
|     39|     Clueless (1995)|7.682604507961884|
|     28|   Persuasion (1995)|7.682604507961884|
|     17|Sense and Sensibi...|7.682604507961884|
+-------+--------------------+-----------------+

+-------+--------------------+-----------------+
|movieId|               title|       SUM_TF_IDF|
+-------+--------------------+-----------------+
|     32|Twelve Monkeys (a...|7.987132924022726|
|      5|Father of the Bri...|3.841302253980942|
|      7|      Sabrina (1995)|3.841302253980942|
|     47|Seven (a.k.a. Se7...|3.841302253980942|
|     50|Usual Suspects, T...|3.841302253980942|
+-------+--------------------+-----------------+

+-------+--------------------+-----------------+
|movieId|               title|       SUM_TF_IDF|
+-------+--------------------+-----------------+
|      2|      Jumanji (1995)|5.426264754702098|
|      5|Father of

In [None]:
jiM = jiMovieSimilarity(ratings)

#jiM.orderBy(['JI','m1','m2'], ascending=[0,1,1]).show()




In [None]:
#jiM.cache()

# Pulp Fiction
#sm = recommendBySimilarity(296, movies, jiM)
#sm.show()

# Fight club
#sm = recommendBySimilarity(2959, movies, jiM)
#sm.show()
    
# Shrek
#sm = recommendBySimilarity(4306, movies, jiM)
#sm.show()


## Made by the teacher
### TODO: Remove ?

In [None]:
def getTF(data, debug=False):
    f_wd = data\
       .groupBy('w','d')\
             .agg(F.count('w').alias('f_wd'))
    if debug:
        f_wd.orderBy('d','w').show()

    f_wd_max = f_wd\
             .groupBy('d')\
             .agg(F.max('f_wd').alias('f_wd_max'))
    if debug:
        f_wd_max.orderBy('d').show()
        
    TF = f_wd.join(f_wd_max, 'd')\
             .withColumn('TF', F.col('f_wd') / F.col('f_wd_max'))\
             .drop('f_wd','f_wd_max')
    return TF

def getIDF(data, debug=False):
    n_w_D = data\
           .groupBy('w')\
           .agg(F.countDistinct('d').alias('n_w_D'))
    if debug:
        n_w_D.orderBy('n_w_D',ascending=False).show()
        
    size_of_D = data.select('d').distinct().count()
    if debug:
        print("|D| = %d" % size_of_D)
    
    IDF = n_w_D\
            .withColumn('IDF', F.log2(size_of_D / F.col('n_w_D')))\
            .drop('n_w_D')
            
    return IDF
    
def getTF_IDF(data, debug=False):
    TF = getTF(data, debug)
    if debug:
        TF.orderBy(['d','TF'],ascending=[1,0]).show(TF.count())
    
    IDF = getIDF(data, debug)
    if debug:
        IDF.orderBy(['IDF','w'], ascending=[0,1]).show(IDF.count())

    TF_IDF = TF\
      .join(IDF,'w')\
      .withColumn('TF_IDF',F.col('TF') * F.col('IDF'))
        
    if debug:
        TF_IDF.orderBy(['d','TF_IDF','w'],ascending=[1,0,1]).show(TF_IDF.count())
    return TF_IDF
