In our original dataset, there are 1000 files storing 1 million of playlists which cost around 30GB of storage. We preprocessed 10% of them with local spark and save in parquet format because parquet compresses the spark dataformat and make the data loading process faster. After multiple rounds of trial and erros, we found out that, with our GCP configuration, we are comfortable only with working on half of the 100 files saved in the folder. Hence, the following analysis will be based on this 50000 playlists which 5% of the entire dataset. Though it sounds very small, but this subset of data is 1.6 gb in size which, in our opinion, it aligns with the course's interest that is to study the distributed computation in big data anlaysis.  

In [1]:
path = "gs://ncf446-201929129/rdd_data_metadata/" 

In [42]:
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
import time
import math
import random

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, explode, count, monotonically_increasing_id
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F
from pyspark.sql.functions import sum as _sum
from pyspark.sql.functions import udf
from pyspark.sql.types import MapType, StringType, DoubleType, FloatType

In [3]:
sqlContext = SQLContext(sc)



In [4]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true") # make the spark pandas dataframe cnversion mroe efficient 

23/04/22 12:37:39 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.


In [5]:
#loading data
bigframe=[] 
for i in range(1, 6, 1): #reading only 5 parquet files
    index = f'batch_{i}'
    fpath = path+index
    bigframe.append(spark.read.parquet(fpath)) 

                                                                                

In [6]:
#merge all dataframe into one big dataframe
metaframe = bigframe[0]
for index, frame in enumerate(bigframe):
    if index>=len(bigframe)-1: #len = 5 index cannot be >5
        break
    else:
        metaframe = metaframe.union(bigframe[index+1])

In [7]:
metaframe.show(10)

[Stage 5:>                                                          (0 + 1) / 1]

+-----+---------------+----------+----------+-------------+--------------------+
|  pid|           name|num_tracks|num_albums|num_followers|              tracks|
+-----+---------------+----------+----------+-------------+--------------------+
|10499|          Chill|        26|        24|            1|[{0, Heroin, null...|
|10500|         cardio|       146|       138|            1|[{0, Sweat (A La ...|
|10501|      Acoustic |        38|        38|            1|[{0, Hey - Acoust...|
|10502|        Its lit|        13|        13|            1|[{0, Faith - Radi...|
|10503|         Lounge|       168|       154|            1|[{0, Amazon Dawn,...|
|10504|Play it forward|        31|        29|            1|[{0, The General,...|
|10505|        Country|       116|        86|            2|[{0, Who Are You ...|
|10506|        country|        62|        53|            2|[{0, 19 You + Me,...|
|10507|   Chilllllllll|       106|        95|            2|[{0, Let Her Go, ...|
|10508|          Exit |     

                                                                                

In [8]:
df = metaframe
print(f'number of palylists: {df.count()}')



number of palylists: 50000


                                                                                

In [9]:
df_playlistNtrack = df.select(col("tracks"),col('pid')).withColumn("tracks", explode(col("tracks"))).select("tracks.*", 'pid')
print(f'Length of all playlists combined without removing duplicates: {df_playlistNtrack.count()}')
df_playlistNtrack.show(10)

distinct_tracks = df_playlistNtrack.select(col("track_name"), col("track_uri"), col("album_name"), col("album_name"), col("artist_name"), col("artist_uri")).distinct()
print(f'Length of all playlists combined with removing duplicates: {distinct_tracks.count()}')
distinct_tracks.show(10)

                                                                                

Length of all playlists combined without removing duplicates: 3348258
+---+--------------------+------------+--------------------+--------------------+--------------------+--------------+--------------------+-----+
|pos|          track_name|track_artist|           track_uri|          album_name|           album_uri|   artist_name|          artist_uri|  pid|
+---+--------------------+------------+--------------------+--------------------+--------------------+--------------+--------------------+-----+
|  0|              Heroin|        null|spotify:track:3qo...|       Lust For Life|spotify:album:1nP...|  Lana Del Rey|spotify:artist:00...|10499|
|  1|Press Play and Es...|        null|spotify:track:1Fp...|Press Play and Es...|spotify:album:0io...|   Teflon Sega|spotify:artist:0J...|10499|
|  2|               Bones|        null|spotify:track:3xS...|            Bones EP|spotify:album:33R...|Dustin Tebbutt|spotify:artist:0z...|10499|
|  3|   Always In My Head|        null|spotify:track:0FM...|

                                                                                

Length of all playlists combined with removing duplicates: 457016




+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          track_name|           track_uri|          album_name|          album_name|         artist_name|          artist_uri|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|The Drying Of The...|spotify:track:6ly...|       The Wild Hunt|       The Wild Hunt|The Tallest Man O...|spotify:artist:2B...|
|     Finna Get Loose|spotify:track:160...|     Finna Get Loose|     Finna Get Loose|               Diddy|spotify:artist:59...|
|          Be In Love|spotify:track:4Ck...|              Better|              Better|   Chrisette Michele|spotify:artist:3Y...|
|              X-tasy|spotify:track:42H...|Miss E...So Addic...|Miss E...So Addic...|       Missy Elliott|spotify:artist:2w...|
|Someday At Christmas|spotify:track:4XT...| Under The Mistletoe| Under The Mistletoe|       Justin Biebe

                                                                                

In [10]:
distinct_tracks_with_id = distinct_tracks.withColumn('unique_id', monotonically_increasing_id())
distinct_tracks_with_id.show(10)



+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|          track_name|           track_uri|          album_name|          album_name|         artist_name|          artist_uri|unique_id|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|The Drying Of The...|spotify:track:6ly...|       The Wild Hunt|       The Wild Hunt|The Tallest Man O...|spotify:artist:2B...|        0|
|     Finna Get Loose|spotify:track:160...|     Finna Get Loose|     Finna Get Loose|               Diddy|spotify:artist:59...|        1|
|          Be In Love|spotify:track:4Ck...|              Better|              Better|   Chrisette Michele|spotify:artist:3Y...|        2|
|              X-tasy|spotify:track:42H...|Miss E...So Addic...|Miss E...So Addic...|       Missy Elliott|spotify:artist:2w...|        3|
|Someday At Christmas|spotify:trac

                                                                                

Since we are only interested in modellling the relationship between item and item using implicit feedbacks data, the only useful information here is track_uri and playlist id which is known as pid above. The reason why only track_uri and playlist id are useful is because all other columns are just representation of those two columns ie they are perfectly correlated. Keep in mind that machine and model do not understand alphabet as what human do hence we need to make the track uri to integer using LabelEncoder from spark API. 

In [17]:
pid_trackID = df_playlistNtrack.select(col('pid'), col('track_uri')) #only keep the useful information
indexer = StringIndexer(inputCol="track_uri", outputCol="encoded_tracks") 
indexed_DF = indexer.fit(pid_trackID).transform(pid_trackID) 
indexed_DF.show(10) 

23/04/22 12:50:13 WARN DAGScheduler: Broadcasting large task binary with size 27.9 MiB
[Stage 45:>                                                         (0 + 1) / 1]

+-----+--------------------+--------------+
|  pid|           track_uri|encoded_tracks|
+-----+--------------------+--------------+
|10499|spotify:track:3qo...|       25451.0|
|10499|spotify:track:1Fp...|      139062.0|
|10499|spotify:track:3xS...|       18628.0|
|10499|spotify:track:0FM...|       17239.0|
|10499|spotify:track:7zD...|        8548.0|
|10499|spotify:track:2Gn...|        4899.0|
|10499|spotify:track:4tI...|       31790.0|
|10499|spotify:track:6Pm...|        8057.0|
|10499|spotify:track:48U...|       89267.0|
|10499|spotify:track:0Z9...|        4730.0|
+-----+--------------------+--------------+
only showing top 10 rows



                                                                                

co-occurance here serves as labels in the sense that the one represents the particular track exists in the particular playlist. 

In [18]:
indexed_DF = indexed_DF.select(col('pid'),col('encoded_tracks'), F.lit(1).alias('co_occurance'))
indexed_DF = indexed_DF.withColumn('encoded_tracks',col('encoded_tracks').cast('int'))
print(indexed_DF.count())
indexed_DF.show(10)

                                                                                

3348258


23/04/22 12:50:24 WARN DAGScheduler: Broadcasting large task binary with size 27.9 MiB
[Stage 49:>                                                         (0 + 1) / 1]

+-----+--------------+------------+
|  pid|encoded_tracks|co_occurance|
+-----+--------------+------------+
|10499|         25451|           1|
|10499|        139062|           1|
|10499|         18628|           1|
|10499|         17239|           1|
|10499|          8548|           1|
|10499|          4899|           1|
|10499|         31790|           1|
|10499|          8057|           1|
|10499|         89267|           1|
|10499|          4730|           1|
+-----+--------------+------------+
only showing top 10 rows



                                                                                

## Modelling starts here: 

Treat playlist as user and tracks as items. Then, we will need build a user item matrix whose row represents user and column represents item. Since it is extremely inefficient to store such a large yet sparse matrix, we only consider another way to represent the matrix without loss of information. Since the matrix is made of 0 and 1, we only keep the coordinate of row and column of the matrix which have entries as 1. 

Normalise the interaction column with is the co_occurance column:  
This means we are normalising the whole user item matrix with respect to items. Since we want to compute the items similarity score, hence we will normalise each column of the matrix by its l2 norm because column represents item.   
L2 norm = $\sqrt{x_1^2 + ... + x_n^2}$   
$x_i$ stands for every entry of different rows at a particular column.  
The code below can be understood as for every column, we compute the L2 norm then keep this outputs in new column called c. 

In [27]:
squareDF = indexed_DF.withColumn("sqval", indexed_DF['co_occurance'] ** 2) 
squareDF = squareDF.selectExpr("pid as rowidx","encoded_tracks as colidx" ,"co_occurance", "sqval")
summsquare = squareDF.groupBy('colidx').agg(_sum("sqval").alias("c"))
summsquare2 = summsquare.withColumn("c", F.sqrt("c"))  

In [28]:
new_df = squareDF.join(summsquare2, on='colidx').select(col('rowidx'), col('colidx'), col('c'))
new_df.show(10)

23/04/22 13:14:53 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:14:54 WARN DAGScheduler: Broadcasting large task binary with size 27.9 MiB
23/04/22 13:15:06 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
[Stage 61:>                                                         (0 + 1) / 1]

+------+------+------------------+
|rowidx|colidx|                 c|
+------+------+------------------+
|106163|432875|               1.0|
|137235|  3918|11.874342087037917|
|136572|  3918|11.874342087037917|
|134440|  3918|11.874342087037917|
|141898|  3918|11.874342087037917|
|141760|  3918|11.874342087037917|
|141385|  3918|11.874342087037917|
|139451|  3918|11.874342087037917|
|139271|  3918|11.874342087037917|
| 14775|  3918|11.874342087037917|
+------+------+------------------+
only showing top 10 rows



                                                                                

In [30]:
m = new_df.select(col('rowidx')).distinct().count() #count the number of users
n = new_df.select(col('colidx')).distinct().count() #count the number of items

23/04/22 13:33:07 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:33:23 WARN DAGScheduler: Broadcasting large task binary with size 27.9 MiB
23/04/22 13:33:28 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:33:38 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:33:39 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:33:48 WARN DAGScheduler: Broadcasting large task binary with size 27.9 MiB
23/04/22 13:33:50 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:33:58 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
                                                                                

The model is robust against the increase in number of rows in terms of efficiency and the paper discussed about the setting of matrix of shape n by m with n >> m. However, in our experiment, we cannot use the entire dataset of a million playlist. Hence, we will experiment with the opposite case instead. In practise, this is a very practical assumption because most of the companies will ahve more customers than products. 

In [31]:
print(f'number of users: {m}')
print(f'number of items: {n}')

number of users: 50000
number of items: 457016


Compute the parameter sqrtgamma. 
The formula is based on heuristics of the computation. As suggestion from the author[1], we may use:  

$\sqrt{\gamma}$ = $\sqrt{\frac{4 \log{(n)}}{s}}$   

where s is similarity_threshold. 

**n**: the number of items. As the number of items increases, the complexity of compututation of pairwise similarities also increases. Hence, having a larg n should lead to a small c to limit the number of similarities.

**similarity_threshold**: the minimum similarity value we are interested in. A high similarity threshold means that we want to be more selective with the item pairs we consider. Therefore, having a higher similarity threshold should lead to a smaller c.

**4 * math.log(n)**: This term serves as a scaling factor. By incorporating the log of the number of items, the scaling factor grows slower than the number of items, maintaining the balance between the number of computed similarities and the probability of missing relevant similarities. 

[1]: https://blog.twitter.com/engineering/en_us/a/2014/all-pairs-similarity-via-dimsum 

In [33]:
# start_time = time.time()
similarity_threshold = 0.1
sqrtgamma = math.sqrt(4 * math.log(n) / similarity_threshold)
adjlist = new_df.groupBy("rowidx").agg(F.collect_list(F.struct("colidx", "c")).alias("r_i")) 

In [35]:
adjlist.show(10)

23/04/22 13:42:44 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:42:45 WARN DAGScheduler: Broadcasting large task binary with size 27.9 MiB
23/04/22 13:43:02 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:43:07 WARN DAGScheduler: Broadcasting large task binary with size 27.9 MiB
[Stage 86:>                                                         (0 + 1) / 1]

+------+--------------------+
|rowidx|                 r_i|
+------+--------------------+
|    31|[{75131, 2.236067...|
|    34|[{4469, 11.090536...|
|    53|[{355, 25.9229627...|
|    65|[{882, 20.9045449...|
|    78|[{272711, 1.0}, {...|
|    85|[{747, 21.9544984...|
|   108|[{19216, 5.0}, {2...|
|   133|[{74662, 2.236067...|
|   137|[{1311, 18.384776...|
|   148|[{1837, 16.492422...|
+------+--------------------+
only showing top 10 rows



                                                                                

### The mapper: LeanDIMSUMMapper

- Here, the objective is to compute the approximate pairwise similarities between items while reducing computational complexity through adaptive sampling technique.   

- The pairwise similarity measure is a variant of cosine similairty which mathematically is the distance between two high dimensional vectors in a sparse vector space. To compute every paairwise similairty as what consine similarity recommendation model does, the algorithm here essentially try to reduce the number of item pairs for which similarities need to be computed by taking into account of the fact that the row is sparse hence more likely than not we are multiplying one with zero or zero with zero. Hence, the algorithm does not go through every possible pair of vectors in the space.   

- It uses the L2-norm of the rows in the input matrix to calculate the sampling probability for each item pair. This sampling probability is proportional to the product of the row lengths for the two items being compared. Pairs with higher row lengths are more likely to have higher similarity and are thus sampled more frequently.   

- r_i is a list of dictionaries that contains the column index and the value of the non-zero elements in the i-th row of the input matrix.   

- we iterate over every dict in r_i to get the column index colidx_j and use that to calculate the sampling probability. we then randomly generate a number from uniform distribution $[0,1]$ between 0 and 1 is less than prob_j. If it is, then proceed to the inner loop. this mechanism has slight similarity as MCMC algorithm.   

- In the inner loop, we iterate over every dictionary again in r_i to get the column index colidx_k and the value d for each non-zero element in the row. This time, if the prob_k is not larger than the random generated value then we will pass to new iteration. If it is larger than the value then compute the key and value for the emitted pair. The key is a string concatenation of colidx_j and colidx_k, separated by an underscore. 

- The value is calculated as the product of c and d divided by the product of the minimum values between sqrtgamma and c, and sqrtgamma and d. Update the emit dictionary with the computed key-value pair.  


In [43]:
def mapper(r_i):
    emit = {}
    for dict_j in r_i:
        colidx_j = dict_j['colidx']
        c = dict_j['c']
        prob_j = min(1.0, sqrtgamma/ c)
        if random.random() < prob_j:
            for dict_k in r_i: 
                colidx_k = dict_k['colidx']
                d = dict_k['c'] 
                prob_k = min(1.0, sqrtgamma/d)
                if random.random()< prob_k:
                    key = str(colidx_j)+'_'+str(colidx_k)
                    val = c * d/ (min(sqrtgamma, c)*min(sqrtgamma, d))
                    emit[key] = val
    return emit     

In [44]:
mapper_udf = F.udf(lambda rows: mapper(rows), MapType(StringType(), FloatType())) 
adjlist1 = adjlist.withColumn('emit', mapper_udf(adjlist.r_i))

In [45]:
adjlist1.show(10)

23/04/22 13:45:01 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:45:02 WARN DAGScheduler: Broadcasting large task binary with size 27.9 MiB
23/04/22 13:45:16 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:45:19 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
[Stage 100:>                                                        (0 + 1) / 1]

+------+--------------------+--------------------+
|rowidx|                 r_i|                emit|
+------+--------------------+--------------------+
|    31|[{623, 23.0}, {56...|{529_570 -> 1.074...|
|    34|[{1507, 17.578395...|{31744_2605 -> 1....|
|    53|[{8928, 7.7459666...|{1160_320116 -> 1...|
|    65|[{65, 34.13209633...|{126_2484 -> 1.35...|
|    78|[{3699, 12.206555...|{3699_11502 -> 1....|
|    85|[{392, 25.5147016...|{4612_189 -> 1.27...|
|   108|[{13832, 6.0}, {1...|{18811_19108 -> 1...|
|   133|[{5120, 10.295630...|{181842_3218 -> 1...|
|   137|[{6622, 9.0553851...|{4292_10585 -> 1....|
|   148|[{340, 26.1533936...|{84185_3438 -> 1....|
+------+--------------------+--------------------+
only showing top 10 rows



                                                                                

In [47]:
count_dicts = udf(lambda col: len(col.items()), IntegerType())
adjlistfilter = adjlist1.withColumn('num_emits', count_dicts(adjlist1['emit']))
adjlistfilter.show(10)

23/04/22 13:45:50 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:45:51 WARN DAGScheduler: Broadcasting large task binary with size 27.9 MiB
23/04/22 13:46:02 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:46:06 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
[Stage 107:>                                                        (0 + 1) / 1]

+------+--------------------+--------------------+---------+
|rowidx|                 r_i|                emit|num_emits|
+------+--------------------+--------------------+---------+
|    31|[{76, 33.68976105...|{529_570 -> 1.074...|     8869|
|    34|[{3127, 13.152946...|{31744_2605 -> 1....|     6091|
|    53|[{2007, 15.937377...|{1160_320116 -> 1...|     9253|
|    65|[{2484, 14.525839...|{29642_66091 -> 1...|    16527|
|    78|[{45087, 3.0}, {1...|{3699_11502 -> 1....|      256|
|    85|[{28, 38.03945320...|{546_376 -> 1.161...|     4129|
|   108|[{17147, 5.385164...|{18811_19108 -> 1...|     2500|
|   133|[{41449, 3.162277...|{181842_3218 -> 1...|      400|
|   137|[{8340, 8.0}, {42...|{105_5720 -> 1.39...|      532|
|   148|[{5526, 9.9498743...|{14903_90038 -> 1...|    19026|
+------+--------------------+--------------------+---------+
only showing top 10 rows



                                                                                

In [48]:
adjlistfilter.count()  #to check with the following filtered result

23/04/22 13:48:02 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:48:18 WARN DAGScheduler: Broadcasting large task binary with size 27.9 MiB
23/04/22 13:48:22 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:48:30 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
                                                                                

50000

In [49]:
adjlistfilter0 = adjlistfilter.filter('num_emits > 0')
adjlistfilter0.count() #we chose similarity_threshold to be quite small so it is normal to have the same counts. 

23/04/22 13:48:32 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:48:33 WARN DAGScheduler: Broadcasting large task binary with size 27.9 MiB
23/04/22 13:48:46 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:48:51 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
                                                                                

50000

In [50]:
exploded = adjlistfilter.select(explode('emit'))
exploded.show(10)

23/04/22 13:51:04 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:51:05 WARN DAGScheduler: Broadcasting large task binary with size 27.9 MiB
23/04/22 13:51:17 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:51:22 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
[Stage 134:>                                                        (0 + 1) / 1]

+----------+---------+
|       key|    value|
+----------+---------+
|   529_570|1.0740683|
|8360_16770|      1.0|
| 2793_9991|      1.0|
| 6864_5052|      1.0|
| 462_36568|1.0799618|
|94969_1159|      1.0|
|  1805_692|      1.0|
|    462_90|  1.53999|
|   486_623|1.0744058|
|6757_36568|      1.0|
+----------+---------+
only showing top 10 rows



                                                                                

### The reduce: summation reducer

The reducer is less of a interesting mechanism as it is just a summation reducer. 

In [51]:
edges = exploded.groupby(exploded['key']).agg(_sum('value').alias('similarity'))
edges.show(10) 

23/04/22 13:51:26 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:51:27 WARN DAGScheduler: Broadcasting large task binary with size 27.9 MiB
23/04/22 13:51:38 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:51:41 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
23/04/22 13:57:06 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
[Stage 145:>                                                        (0 + 1) / 1]

+------------+-----------------+
|         key|       similarity|
+------------+-----------------+
|   4064_6422|              3.0|
|   5852_4064|              3.0|
|   20104_645|              3.0|
|350066_10683|              1.0|
| 10683_23754|              3.0|
|   4238_1261|              6.0|
|      843_19|74.26603364944458|
|     665_413|6.648103952407837|
|   5669_3747|              8.0|
|     424_483|69.40363001823425|
+------------+-----------------+
only showing top 10 rows



                                                                                

clean the key column and split them into two columns

In [52]:
edgesdf = edges.withColumn("key_split", F.split(edges["key"], "_"))  
edgesdf = edgesdf.withColumn("key1", edgesdf["key_split"].getItem(0)).withColumn("key2", edgesdf["key_split"].getItem(1))
edgesdf = edgesdf.drop("key_split")

In [53]:
edgesdf.show(10)

23/04/22 14:03:12 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
[Stage 156:>                                                        (0 + 1) / 1]

+-----------+------------------+-----+------+
|        key|        similarity| key1|  key2|
+-----------+------------------+-----+------+
|   377_1159|12.349004745483398|  377|  1159|
| 16841_1041|               3.0|16841|  1041|
|1041_187886|               1.0| 1041|187886|
| 16841_2403|               3.0|16841|  2403|
|   5199_377|15.716915130615234| 5199|   377|
|    462_720| 86.39694213867188|  462|   720|
|  1041_5254|               4.0| 1041|  5254|
|  16841_720|               4.0|16841|   720|
| 2232_39878|               1.0| 2232| 39878|
|43637_22340|               1.0|43637| 22340|
+-----------+------------------+-----+------+
only showing top 10 rows



                                                                                

In [54]:
edgesdf_filter = edgesdf.filter(edgesdf["key1"] != edgesdf["key2"]) #top similarity pairs are always the item itself to itself 
edgesdf_filter.show(10) #hence filter it 

23/04/22 14:15:30 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
[Stage 167:>                                                        (0 + 1) / 1]

+-----------+------------------+-----+------+
|        key|        similarity| key1|  key2|
+-----------+------------------+-----+------+
|   377_1159|13.471641540527344|  377|  1159|
| 16841_1041|               3.0|16841|  1041|
|1041_187886|               1.0| 1041|187886|
| 16841_2403|               3.0|16841|  2403|
|   5199_377|14.594278335571289| 5199|   377|
|    462_720| 88.55686569213867|  462|   720|
|  1041_5254|               4.0| 1041|  5254|
|  16841_720|               4.0|16841|   720|
| 2232_39878|               1.0| 2232| 39878|
|43637_22340|               1.0|43637| 22340|
+-----------+------------------+-----+------+
only showing top 10 rows



                                                                                

In [55]:
finaldf = edgesdf_filter.select(col('key1'), col('key2'), col('similarity')) 
finaldf.orderBy(col('similarity').desc()).show(10) 

23/04/22 14:21:45 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB

+----+----+------------------+
|key1|key2|        similarity|
+----+----+------------------+
|   0|   4|1113.5994210243225|
|  38|   0|1091.8673872947693|
|   4|   0|1059.7823486328125|
|   0|  15|1022.0927059650421|
|   0|  38|1021.3159561157227|
|   0|  10|1013.8881661891937|
|  10|   0| 934.1441531181335|
|  15|   0| 908.5268497467041|
|   2|   5|  892.601514339447|
|   0|   6| 892.4115419387817|
|   9|   8| 880.1872181892395|
|  10|   4| 879.6758544445038|
|   5|   2|  873.849381685257|
|   4|  10| 872.5529730319977|
|   2|  12| 871.1944704055786|
|   6|   4| 869.7254869937897|
|   9|   0| 864.7741928100586|
|   4|   6| 862.4474494457245|
|  19|   0| 856.3447895050049|
|  15|  10| 854.0915157794952|
+----+----+------------------+
only showing top 20 rows



                                                                                

there is a repetition of pair for eg, x_i & x_j and x_j & x_i are the same in the dataframe hence eliminate the duplicates. 

In [56]:
finaldf = finaldf.filter(col("key1") < col("key2"))  
finaldf.show(20)

23/04/22 14:27:15 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB
[Stage 189:>                                                        (0 + 1) / 1]

+------+-----+------------------+
|  key1| key2|        similarity|
+------+-----+------------------+
|  4064| 6422|               3.0|
| 20104|  645|               3.0|
| 10683|23754|               3.0|
|   424|  483| 75.28529357910156|
|  3584| 5673|               1.0|
|  3747| 9926|               4.0|
|  1511| 3747|              10.0|
|  4275|  859|               2.0|
|   122| 1635| 85.53794574737549|
|111237| 1563|               1.0|
|111237| 5127|               1.0|
|  1757| 2489|               3.0|
|  1199|15463|               2.0|
|  1147|19415|               1.0|
| 12384| 7473|               1.0|
|   259|78889|1.2026582956314087|
|  2448|  640|12.019562244415283|
|  5518| 5667|               1.0|
| 38168|  780|               8.0|
|   309| 7927| 3.483818292617798|
+------+-----+------------------+
only showing top 20 rows



                                                                                

Final results:

In [61]:
finaldf.orderBy(col('similarity').desc()).show(20)

23/04/22 14:42:16 WARN DAGScheduler: Broadcasting large task binary with size 28.0 MiB

+----+----+-----------------+
|key1|key2|       similarity|
+----+----+-----------------+
|   0|  38|1128.822898864746|
|   0|   4| 976.986852645874|
|   0|  10| 975.914826631546|
|   0|  15| 926.843923330307|
|  10|   4|925.9745836257935|
|   2|   5| 892.601514339447|
|  10|  15|869.8496618270874|
|   0|  19|867.2305283546448|
|   8|   9|840.4795241355896|
|   4|   6|829.6962804794312|
|   0|   8|812.7971563339233|
|   0|   6| 791.530237197876|
|   3|  50|791.0237140655518|
|   2|   8|788.5979545116425|
|   5|   8| 786.571681022644|
|   0|   9|773.3443222045898|
|   1|  45|771.5560340881348|
|   5|   9|768.0380697250366|
|  12|   2| 760.509927034378|
|   6|   9|760.1692526340485|
+----+----+-----------------+
only showing top 20 rows



                                                                                

In [57]:
# end_time = time.time()

In [58]:
# time_used = end_time - start_time

In [59]:
# tt = f"time consumed when similarity = 0.1: {time_used}\n"

The map reduce framework here is similar to but not the same as the map reduce mechanism in distributed computation in Apache spark. Here, we applied the MapReduce framework on a randomised algorithm to reduce the complexity of similarity index computation. The benefit of using the map reduce framework here is to make the computation more efficient as compare to wrapping the algorithm into one function and use apply function to apply the udf to every entry of the data column. The mechanism of splitting computation into two parts map and reduce makes the computation more efficient. 

To collect the time consumption statistics, we manually ran the entire script multiple times without using for loop to prevent crashing the kernel. After every iteration, we manually restarted the kernel in order to clean up the usage of the above computation in the memory of the master node to prevent computation overflow problem as 60gb of RAM is the minimum that able to run the script above. Without manually restart the kernel, there are some variables and data being saved in the RAM and once the loop trying to start the new iteration, for example, in the computation of adjlist, there will be 2 adjlist in the RAM before the assignment takes place and therefore crash the resource allocator. (This is our observations and understanding after many trial and errors). 