# Task two

The client requires a function to detect similarity between films. The function will take in a
film’s `film_id`, and a `threshold percentage` as input, and will return a `dataframe` that contains all
films with a similarity percentage above the threshold. The way similarity is calculated is up
to you, but the output should be sensible. 
(For example, any star wars film should be similar to all other star wars films, or films by the 
same director have a similar style etc.)


In [1]:
import pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import DoubleType

In [2]:
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import Normalizer
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

In [4]:
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "30g") \
    .appName('imdb-munging') \
    .getOrCreate()

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/22 19:11:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# load the IMDb films data prepared in previous task
input_path = "../output/films"
df = spark.read.parquet(input_path)


In [6]:
df.printSchema()

root
 |-- film_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: date (nullable = true)
 |-- duration: integer (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rating: decimal(4,2) (nullable = true)
 |-- vote_count: integer (nullable = true)
 |-- persons: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [7]:
# A few sample queries

#df.sample(withReplacement=False, fraction=0.10, seed=2).show(truncate=False).show(truncate=False)

#df.filter( df.year >= '2022-01-01').filter( ~(f.array_contains( df['genres'], 'Documentary')) ).show(100, truncate=False)
#df.filter(  f.regexp_count( f.array_join('persons', ','), f.lit(r'(?i)Spielberg') ) >= 1 ).show(100, truncate=False)a
#df.filter(  f.regexp_count( f.array_join('persons', ','), f.lit(r'(?i)George Lucas') ) >= 1 ).show(100, truncate=False)
#df.filter(  f.regexp_count( f.array_join('persons', ','), f.lit(r'(?i)Ridley Scott') ) >= 1 ).show(100, truncate=False)
#df.filter(  f.regexp_count( f.array_join('persons', ','), f.lit(r'(?i)James Cameron') ) >= 1 ).show(100, truncate=False)
#df.filter(  f.regexp_count( f.array_join('persons', ','), f.lit(r'(?i)Keanu') ) >= 1 ).show(100, truncate=False)
#df.filter(  f.regexp_count( f.array_join('persons', ','), f.lit(r'(?i)Sigourney Weaver') ) >= 1 ).show(100, truncate=False)
#df.filter(  f.regexp_count( f.array_join('persons', ','), f.lit(r'(?i)Jennifer Lawrence') ) >= 1 ).show(100, truncate=False)
#df.filter(  f.regexp_count( f.array_join('persons', ','), f.lit(r'(?i)Dave Bautista') ) >= 1 ).show(100, truncate=False)
#df.filter(  f.regexp_count( f.array_join('persons', ','), f.lit(r'(?i)Harrison Ford') ) >= 1 ).show(100, truncate=False)
#df.filter(  df['title'].rlike(r'(?i)hunger games') ).show(100, truncate=False)
#df.filter(  df['title'].rlike(r'(?i)terminator') ).show(100, truncate=False)
#df.filter(  df['title'].rlike(r'(?i)star wars') ).show(100, truncate=False)
#df.groupBy('genres').count().sort(f.desc('count')).show(100, truncate=False)
#df.count() # 39_427

In [8]:
# Choose which columns we will calculate the cosine similarity index for
#feature_cols = [c for c in df.columns if c != 'film_id']
#feature_cols = ['title', 'persons', 'genres', 'year', 'duration']

# Using the title onle for initial test
feature_cols = ['title', 'genres', 'rating', 'persons']
print(feature_cols)

['title', 'genres', 'rating', 'persons']


## Another approach -- calculate the cosine similarity across all rows

You can use the mllib package to compute the L^2 norm of the TF-IDF of every row. 
Then multiply the table with itself to get the cosine similarity as the dot product of two by two L2 norms: 

Skip this section to stage 2 below... 
You can jump to Stage 2 below, which loads the data back in if this proach has already be run once without any filtering changes to the source `df` above.

In [9]:
# concatenate the elements of the feature columns in to one long array
df_film = df.withColumn("features", f.split(f.concat_ws(", ", *feature_cols), ',' ))

# drop all the source feature columns from the df_film dataframe
#df_film = df_film.drop(*feature_cols)
drop_cols = [c for c in df.columns if c != 'film_id']
df_film = df_film.drop(*drop_cols)


In [10]:
df_film.show(truncate=False)

+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|film_id|features                                                                                                                                                                                                                                                                                                                                                                                                      |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [11]:
# Compute the TF-IDF (term freq inverse doc freq)

# The inputCol must be an array
hashingTF = HashingTF(inputCol='features', outputCol="features_tf")

#hashingTF.explainParam('numFeatures')
# numFeatures: Number of features. Should be greater than 0. (default: 262144)
print(hashingTF.explainParams())
#hashingTF.setNumFeatures(10)

tf = hashingTF.transform(df_film)

idf = IDF(inputCol="features_tf", outputCol="features_tfidf").fit(tf)

# we can also drop the text list of features and the TF column, and repartition to 32
tfidf = idf.transform(tf).drop('features').drop('features_tf')


binary: If True, all non zero counts are set to 1. This is useful for discrete probabilistic models that model binary events rather than integer counts. Default False. (default: False)
inputCol: input column name. (current: features)
numFeatures: Number of features. Should be greater than 0. (default: 262144)
outputCol: output column name. (default: HashingTF_7d37565e64a0__output, current: features_tf)


                                                                                

In [12]:
#tfidf.select('*', f.spark_partition_id()).show(truncate=False)
tfidf.sample(withReplacement=False, fraction=0.10, seed=2).show(truncate=False)

24/12/22 19:11:47 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
24/12/22 19:11:48 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|film_id|features_tfidf                                                                                                                                                                                                                                                                                                                                                                       

In [13]:
# Calculate the L2 norm
normalizer = Normalizer(inputCol="features_tfidf", outputCol="norm")
print(normalizer.explainParams())
data = normalizer.transform(tfidf).drop('features_tfidf')

inputCol: input column name. (current: features_tfidf)
outputCol: output column name. (default: Normalizer_45d4b5b1442d__output, current: norm)
p: the p norm value. (default: 2.0)


In [14]:
#data.select('*', f.spark_partition_id()).show(truncate=False)
data.sample(withReplacement=False, fraction=0.10, seed=2).show(truncate=False)

24/12/22 19:11:48 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
24/12/22 19:11:48 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|film_id|norm                                                                                                                                                                                                                                                                                                                                       

In [15]:
#data = data.localcheckpoint().repartitionByRange(96, 'film_id')
data.printSchema()

root
 |-- film_id: integer (nullable = true)
 |-- norm: vector (nullable = true)



In [16]:
# .toLocalMatrix is too large... use JOIN

# using a join and a UDF function for dot:
dot_udf = f.udf(lambda x, y: float(x.dot(y)), DoubleType())


In [17]:
# cross join? Word2Vec?
# Compute the cross join of the dot product or Cosine Similarity
# Alternatively the Jaccard Index or Euclidean distance would have been good alternatives

sc.setLogLevel("INFO")

# Write the cosine similarity IMDB data to a parquet file
output_path = "../output/csfilms"

# check the output path doesn't already exist, before wasting lots of compute time
if not path.exists(output_path):
    data.alias("i").join(data.alias("j"), f.col("i.film_id") < f.col("j.film_id"))\
        .select(
            f.col("i.film_id").alias("film_id"), 
            f.col("j.film_id").alias("other_id"), 
            dot_udf("i.norm", "j.norm").alias("similarity"))\
        .sort("film_id", "other_id")\
        .write.parquet(output_path, mode='error')

24/12/22 19:11:49 INFO FileSourceStrategy: Pushed Filters: IsNotNull(film_id)
24/12/22 19:11:49 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(film_id#0)
24/12/22 19:11:49 INFO FileSourceStrategy: Pushed Filters: IsNotNull(film_id)
24/12/22 19:11:49 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(film_id#92)
24/12/22 19:11:49 INFO CodeGenerator: Code generated in 8.18012 ms
24/12/22 19:11:49 INFO MemoryStore: Block broadcast_14 stored as values in memory (estimated size 210.1 KiB, free 17.8 GiB)
24/12/22 19:11:49 INFO MemoryStore: Block broadcast_14_piece0 stored as bytes in memory (estimated size 35.8 KiB, free 17.8 GiB)
24/12/22 19:11:49 INFO BlockManagerInfo: Added broadcast_14_piece0 in memory on europa:42721 (size: 35.8 KiB, free: 17.8 GiB)
24/12/22 19:11:49 INFO SparkContext: Created broadcast 14 from parquet at NativeMethodAccessorImpl.java:0
24/12/22 19:11:49 INFO FileSourceScanExec: Planning scan with bin packing, max size: 5567939 bytes, open cost is considered a

### First cut
- About 1hr on 32 workers per 20K films matched, 1.6kb per film on disk.
- 29,162 films in 1.5hrs, 4.7GB on disk, for an 425,196,444 film_id -> film_id -> cos_sim pairs.
* Essentially 29,162^2 / 2 pairs.

### Second draft
- About 2hr
- 39,427  films in 2hrs, 8.7g on disk, for 777_224_451 pairs
- roughly = 39427^2/2 pairs

### Final cut
- Under 2hr, but numFeatures was left at default of 262k
- 39,427  films in 1hrs 48mins, 6.5g on disk, for 777_224_451 pairs
- roughly = 39427^2/2 pairs

In [18]:
sc.stop()

24/12/22 20:59:09 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/12/22 20:59:09 INFO SparkUI: Stopped Spark web UI at http://europa:4040
24/12/22 20:59:09 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/12/22 20:59:10 INFO MemoryStore: MemoryStore cleared
24/12/22 20:59:10 INFO BlockManager: BlockManager stopped
24/12/22 20:59:10 INFO BlockManagerMaster: BlockManagerMaster stopped
24/12/22 20:59:10 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/12/22 20:59:10 INFO SparkContext: Successfully stopped SparkContext
