Imports
-----------

In [56]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from IPython.display import clear_output
import pyspark.sql.types as types
import pyspark.ml.stat as stat
import os
from pyspark.sql import functions as F
from math import ceil

Deploy PySpark Session
------------------------------------

In [2]:
# deploy a PySpark session
_conf = SparkConf()
_conf.set('spark.driver.host', '127.0.0.1')
_conf.set('log.level', 'ERROR')
_conf.set('spark.sql.pivotMaxValues', '99999999')
_conf.set('spark.driver.memory', '8g')
_cpu_count = os.cpu_count()
sc = SparkContext(master=f"local[{_cpu_count}]",
                  appName="Exercise 01",
                  conf=_conf).getOrCreate()
sc.setLogLevel('ERROR')
clear_output()
spark = SparkSession(sc)

Exercise 5a
-----------------

In [3]:
# Load up artist_alias_small
alias_schema = types.StructType([
    types.StructField("id_1", types.IntegerType(), True),
    types.StructField("id_2", types.IntegerType(), True)
])
alias_df = spark.read.load("dataset-problemset2-ex5/artist_alias_small.txt",
                    format="csv",
                    sep="\t",
                    schema=alias_schema)
alias_df.show()

+-------+-------+
|   id_1|   id_2|
+-------+-------+
|1027859|1252408|
|1017615|    668|
|6745885|1268522|
|1018110|1018110|
|1014609|1014609|
|6713071|   2976|
|1014175|1014175|
|1008798|1008798|
|1013851|1013851|
|6696814|1030672|
|1036747|1239516|
|1278781|1021980|
|2035175|1007565|
|1327067|1308328|
|2006482|1140837|
|1314530|1237371|
|1160800|1345290|
|1255401|1055061|
|1307351|1055061|
|1234249|1005225|
+-------+-------+
only showing top 20 rows



In [4]:
# load up user_artist_data_small
main_schema = types.StructType([
    types.StructField("user_id", types.IntegerType(), True),
    types.StructField("artist_id", types.IntegerType(), True),
    types.StructField("playcount", types.IntegerType(), True)
])

main_df = spark.read.load(
    "dataset-problemset2-ex5/user_artist_data_small.txt",
    format="csv",
    sep=" ",  # it is not tab-separated as stated in the exercise!
    schema=main_schema
)
main_df.show()

+-------+---------+---------+
|user_id|artist_id|playcount|
+-------+---------+---------+
|1059637|  1000010|      238|
|1059637|  1000049|        1|
|1059637|  1000056|        1|
|1059637|  1000062|       11|
|1059637|  1000094|        1|
|1059637|  1000112|      423|
|1059637|  1000113|        5|
|1059637|  1000114|        2|
|1059637|  1000123|        2|
|1059637|  1000130|    19129|
|1059637|  1000139|        4|
|1059637|  1000241|      188|
|1059637|  1000263|      180|
|1059637|  1000289|        2|
|1059637|  1000305|        1|
|1059637|  1000320|       21|
|1059637|  1000340|        1|
|1059637|  1000427|       20|
|1059637|  1000428|       12|
|1059637|  1000433|       10|
+-------+---------+---------+
only showing top 20 rows



In [5]:
# join alias_df with main_df on artist_id to overwrite artist_id with id_2 from alias
main_df = main_df.join(alias_df, on=main_df['artist_id'] == alias_df['id_1'], how='full')
main_df.show()

+-------+---------+---------+----+----+
|user_id|artist_id|playcount|id_1|id_2|
+-------+---------+---------+----+----+
|1046559|        1|      147|null|null|
|1052461|        1|     3481|null|null|
|1031009|        1|        4|null|null|
|2062243|        1|       22|null|null|
|1058890|        1|        3|null|null|
|2288164|        1|     4006|null|null|
|1063644|        1|      331|null|null|
|1048402|        1|       53|null|null|
|1017610|        1|      376|null|null|
|2023686|        1|       49|null|null|
|2010008|        1|       27|null|null|
|1072684|        1|      100|null|null|
|1026084|        1|      590|null|null|
|1029563|        1|      125|null|null|
|1041919|        1|      334|null|null|
|2102019|        1|       22|null|null|
|1059637|        2|        3|null|null|
|1046559|        2|      155|null|null|
|1031009|        2|      436|null|null|
|2020513|        2|      846|null|null|
+-------+---------+---------+----+----+
only showing top 20 rows



In [9]:
# overwrite id_2 into artist_id if id_2 is not null
main_df = main_df.withColumn('artist_id', 
                      F.coalesce(
                          main_df['id_2'],
                          main_df['artist_id']
                          )
                     )
# validation check that id_2 is now artist_id on not null id_2 values
main_df.filter(
    main_df['id_1'].isNotNull()
).show()

+-------+---------+---------+-------+-------+
|user_id|artist_id|playcount|   id_1|   id_2|
+-------+---------+---------+-------+-------+
|1072684|  1252408|       31|1027859|1252408|
|1001440|      668|        3|1017615|    668|
|1024631|  1268522|        1|6745885|1268522|
|1059334|  1018110|        1|1018110|1018110|
|2010008|  1014609|       10|1014609|1014609|
|2023686|  1014609|       35|1014609|1014609|
|1017610|  1014609|        8|1014609|1014609|
|1009943|  1014609|      182|1014609|1014609|
|2023742|  1014609|      284|1014609|1014609|
|2014936|  1014609|        2|1014609|1014609|
|2062243|  1014609|      392|1014609|1014609|
|2069337|  1014609|       61|1014609|1014609|
|2010008|     2976|        2|6713071|   2976|
|1024631|  1014175|       88|1014175|1014175|
|1029563|  1008798|        6|1008798|1008798|
|1026084|  1008798|        8|1008798|1008798|
|2010008|  1008798|        4|1008798|1008798|
|2023686|  1008798|        1|1008798|1008798|
|1070641|  1008798|      114|10087

In [14]:
# drop the id_1, id_2 columns
columns_to_remove = ['id_1', 'id_2']
main_df = main_df.drop(*columns_to_remove)
main_df.show()
print(main_df.count())

+-------+---------+---------+
|user_id|artist_id|playcount|
+-------+---------+---------+
|1046559|        1|      147|
|1052461|        1|     3481|
|1031009|        1|        4|
|2062243|        1|       22|
|1058890|        1|        3|
|2288164|        1|     4006|
|1063644|        1|      331|
|1048402|        1|       53|
|1017610|        1|      376|
|2023686|        1|       49|
|2010008|        1|       27|
|1072684|        1|      100|
|1026084|        1|      590|
|1029563|        1|      125|
|1041919|        1|      334|
|2102019|        1|       22|
|1059637|        2|        3|
|1046559|        2|      155|
|1031009|        2|      436|
|2020513|        2|      846|
+-------+---------+---------+
only showing top 20 rows

49481


In [31]:
# each row is one artist, each column is one user
utility_df = main_df.groupBy('artist_id').pivot('user_id').sum('playcount')

In [41]:
# first row for showcasing the row structure
print(utility_df.first())
print(f'\nTotal number of rows: {utility_df.count()}')
print(f'\nTotal number of columns: {len(utility_df.columns)}')

Row(artist_id=6911438, 1000647=None, 1001440=None, 1007308=None, 1009943=None, 1017610=None, 1021501=None, 1021940=None, 1024631=1, 1026084=None, 1029563=None, 1031009=None, 1035511=None, 1041919=None, 1042223=None, 1046559=None, 1047812=None, 1048402=None, 1052054=None, 1052461=None, 1055449=None, 1058890=None, 1059245=None, 1059334=None, 1059637=None, 1059765=None, 1063644=None, 1070641=None, 1070932=None, 1072684=None, 1073421=None, 1076906=None, 2000668=None, 2005710=None, 2007381=None, 2010008=None, 2010581=None, 2014936=None, 2017397=None, 2020513=None, 2023686=None, 2023742=None, 2023977=None, 2030069=None, 2062243=None, 2064012=None, 2069337=None, 2069889=None, 2070757=None, 2102019=None, 2288164=None)

Total number of rows: 30100

Total number of columns: 51


Exercise 5b
-----------------

In [83]:
# function to get the similarity between two users based on the Pearson correlation
def get_similarity(df, user_one, user_two):
    # return tuple of (user one, user two, similarity)
    return int(user_one), int(user_two), df.stat.corr(user_one, user_two, method='pearson')

# example below
get_similarity(utility_df, '1000647', '2030069')

(1000647, 2030069, 0.006601958200178793)

Exercise 5c
-----------------

In [87]:
def get_most_similar_users(df, user, k):
    # calculate similarity of the provided user with all the other users
    scores = [
        get_similarity(df, user, user_two) for user_two in df.drop('artist_id').columns
        if user_two != user
    ]
    schema = types.StructType([
        types.StructField('user_one', types.IntegerType(), True),
        types.StructField('user_two', types.IntegerType(), True),
        types.StructField('similarity', types.DoubleType(), True)
    ])
    result = spark.createDataFrame(data=scores, schema=schema)
    result = result.sort(F.col('similarity').desc()).limit(k)
    return result

most_similar = get_most_similar_users(utility_df, '1000647', 10)
most_similar.show()

+--------+--------+--------------------+
|user_one|user_two|          similarity|
+--------+--------+--------------------+
| 1000647| 2023686| 0.11398225362054047|
| 1000647| 1009943| 0.10851019404398154|
| 1000647| 1001440| 0.07701264082513853|
| 1000647| 1024631| 0.07648002945468792|
| 1000647| 1052461| 0.07479034338033153|
| 1000647| 1072684| 0.04908567689073624|
| 1000647| 2007381| 0.04434030978136511|
| 1000647| 1017610| 0.03101525929697622|
| 1000647| 1029563|0.030152642173836192|
| 1000647| 2010008|0.026769569018660885|
+--------+--------+--------------------+



Exercise 5d
-----------------

In [88]:
# load artist_data_small
artist_schema = types.StructType([
    types.StructField("artist_id", types.IntegerType(), True),
    types.StructField("name", types.StringType(), True)
])
artist_df = spark.read.load("dataset-problemset2-ex5/artist_data_small.txt",
                    format="csv",
                    sep="\t",
                    schema=artist_schema)
artist_df.show()

+---------+--------------------+
|artist_id|                name|
+---------+--------------------+
|  1240105|        André Visior|
|  1240113|           riow arai|
|  1240132|Outkast & Rage Ag...|
|  6776115|            小松正夫|
|  1030848|       Ravers Nature|
|  6671601|      Erguner, Kudsi|
|  1106617|              Bloque|
|  1240185|      Lexy & K. Paul|
|  6671631|    Rev. W.M. Mosley|
|  6671632|      Labelle, Patti|
|  1240238|   the Chinese Stars|
|  1240262|            The Gufs|
|  6718605|          Bali Music|
|  6828988|Southern Conferen...|
|  1240415|        Paul & Paula|
|  1009439|            Cinnamon|
|  1018275|      School Of Fish|
|  6671680|Armstrong, Louis ...|
|  1240508|The Ozark Mountai...|
|  1240510| The Mercury Program|
+---------+--------------------+
only showing top 20 rows



In [90]:
# add new entires for the new user
chosen_artists = ['6776115', '1018275', '1240508', '1240238', '1240113']
new_user = 777
main_df = main_df.union(
    spark.createDataFrame(
    [
        (new_user, chosen_artists[0], 5),
        (new_user, chosen_artists[1], 10),
        (new_user, chosen_artists[2], 1),
        (new_user, chosen_artists[3], 2),
        (new_user, chosen_artists[4], 2)
    ])
)
# update utility df
# each row is one artist, each column is one user
utility_df = main_df.groupBy('artist_id').pivot('user_id').sum('playcount')