# Installing Spark

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.mirror.rafal.ca/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Mounting Google Drive to notebook

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


# Importing necessary libraries from spark, including mllib and recommendation module

In [0]:
import os
import findspark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
findspark.init()

import math

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark import SparkConf
from pyspark.mllib import recommendation
from pyspark.mllib.recommendation import *

sc = SparkContext()
sqlContext = SQLContext(sc)

# Storing each dataset to its corresponding variable

In [0]:
artist_data = sc.textFile('/content/drive/My Drive/CMIS550-TermProject/artist_dataset.txt')
artist_alias = sc.textFile('/content/drive/My Drive/CMIS550-TermProject/artist_alias_dataset.txt')
user_artist_data = sc.textFile('/content/drive/My Drive/CMIS550-TermProject/user_artist_dataset.txt')

# Defining functions to process the data

In [0]:
# This function will return a tuple with the ID of the artist converted
# into integer and the name of the artist.
def artist_info(data_set):
  artist_pair = data_set.split('\t')
  for i in range(len(artist_pair)):
    artist_pair[0] = int(artist_pair[0])
  return tuple(artist_pair)

# This function will return a tuple with both IDs of the artist
# converted into integers.
def alias_info(data_set):
  alias_pair = data_set.split('\t')
  for i in range(len(alias_pair)):
    alias_pair[0] = int(alias_pair[0])
    alias_pair[1] = int(alias_pair[1])
  return tuple(alias_pair)

# This function will return key and value pairs for the ID of the artist
# and the name of the artist and will convert the ID into an integer.
def user_info(data_set):
  user_data = data_set.split(' ')
  for i in range(len(user_data)):
    user_data[0] = int(user_data[0])
    user_data[1] = int(user_data[1])
    user_data[2] = int(user_data[2])
  return tuple(user_data)

# Processing Data: applying corresponding functions to the RDDs 

In [0]:
# Applies the artist_info function to the artist_data rdd
artist_data = artist_data.map(lambda x: artist_info(x))

# Applies the alias_info function to the artist_data rdd
artist_alias = artist_alias.map(lambda x: alias_info(x))

# Applies the user_info function to the user_artist_data rdd
user_artist_data = user_artist_data.map(lambda x: user_info(x))

# Collects only unique key and values from the artist_alias rdd
artist_alias_map = artist_alias.collectAsMap()

# Replaces the values of the user_artist_data rdd. With artist_alias_map,
# the alias IDs get replaced for the correct IDs. 
user_artist_data = user_artist_data.map(lambda x: (x[0], artist_alias_map.get(x[1], x[1]), x[2]))


# Building the recommendation models using ALS implicit collaborative filtering

In [0]:
# Building model
plays = user_artist_data.map(lambda p: Rating(int(p[0]), int(p[1]), int(p[2])))
recommendation_model = ALS.trainImplicit(plays, rank=10, seed=200)

# Defining the function to get the top 10 played artist of the user

In [0]:
def played_artists(user_id): 
  # Filters the artists played by the specified users with the number of plays
  artist_data_testmap = artist_data.collectAsMap()
  user_played_artists_test = plays.filter(lambda x: x.user == user_id).map(lambda x: (artist_data_testmap.get(x.product), x.rating))
  # return the top 10 played artists by the user
  return user_played_artists_test.sortBy(lambda x: x[1], False).take(10)

In [13]:
print('Played artist by user 1055449')
played_artists(1055449)

Played artist by user 1055449


[('Pearl Jam', 4957),
 ('Queens of the Stone Age', 4697),
 ('Foo Fighters', 3635),
 ('A Perfect Circle', 1967),
 ('Tool', 1637),
 ('Radiohead', 1254),
 ('Jethro Tull', 1185),
 ('The Smashing Pumpkins', 1098),
 ('Desert Sessions', 974),
 ('The Black Crowes', 893)]

In [14]:
print('Played artist by user 1059637')
played_artists(1059637)

Played artist by user 1059637


[('Something Corporate', 433060),
 ('My Chemical Romance', 155895),
 ('Bright Eyes', 19129),
 ('Straylight Run', 9294),
 ('Elliott Smith', 7665),
 ('Taking Back Sunday', 6880),
 ('The Movielife', 3462),
 ('Underoath', 3434),
 ('The Killers', 3091),
 ('Onelinedrawing', 2321)]

# Defining the function to output the recommendations for new artists the user has not played before

In [0]:
def discovery_weekly(user_id):
  # Filters the artists played by the specified users
  artist_data_map = artist_data.collectAsMap()
  user_played_artists = plays.filter(lambda x: x.user == user_id).map(lambda x: artist_data_map.get(x.product)).collect()
  # Creates the recommendations using the recommendProducts function and appends it to a list
  recommendations = map(lambda x: x.product, recommendation_model.recommendProducts(user_id, 100))
  recommendations_list = []
  for artist in recommendations:
    recommendations_list.append(artist_data.lookup(artist)[0])
  # Creates a list with all the recommended artists the user has not played yet
  discovery_playlist = []
  for i in range(len(recommendations_list)):
    if recommendations_list[i] in user_played_artists:
      continue
    else:
      if len(discovery_playlist) < 10:
        discovery_playlist.append(recommendations_list[i])
      else:
        break

  return discovery_playlist

In [16]:
# Calls the function and returns the list with the recommendations
print('Recommended artists for user 1055449')
discovery_weekly(1055449)

Recommended artists for user 1055449


['The Cardigans',
 'Counting Crows',
 'Oasis',
 'The Beach Boys',
 'Moby',
 'Portishead',
 'Michael Jackson',
 'Stevie Wonder',
 'Beastie Boys',
 'Björk']

In [17]:
# Calls the function and returns the list with the recommendations
print('Recommended artists for user 1059637')
discovery_weekly(1059637)

Recommended artists for user 1059637


['Bad Religion',
 'Avenged Sevenfold',
 'Lostprophets',
 'Counting Crows',
 'Eminem',
 'Atreyu',
 'Christina Aguilera',
 'Unwritten Law',
 'The Juliana Theory',
 'Simple Plan']