# **Necessary Package Imports**

In [1]:
import os
os.environ["PYTHONHASHSEED"] = "0"


In [2]:
!ls

artist_alias_small.txt	artist_data_small.txt  sample_data  user_artist_data_small.txt


In [3]:
!pwd

/content


In [4]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Connecting to security.ubuntu.com (185.125.190.39)] [Connected to cloud.r-project.org (108.138.1                                                                                                    Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
0% [2 InRelease 47.5 kB/119 kB 40%] [Connecting to security.ubuntu.com (185.125.190.39)] [Waiting fo                                                                                                    Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
0% [2 InRelease 62.0 kB/119 kB 52%] [Connecting to security.ubuntu.com (185.125.190.39)] [3 InReleas0% [2 InRelease 64.9 kB/119 kB 55%] [Connecting to security.ubuntu.com (185.125.190.39)] [Connecting0% [Waiting for headers] [Connected to ppa.launchpadcontent.net (185.125.190.80)] [Waiting for heade                                                   

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [7]:
import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
sc



In [8]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

In [9]:
!pwd

/content


In [10]:
!ls

artist_alias_small.txt	sample_data		 spark-3.5.1-bin-hadoop3.tgz
artist_data_small.txt	spark-3.5.1-bin-hadoop3  user_artist_data_small.txt


In [11]:
from pyspark.mllib.recommendation import *
import random
from operator import *

# **Loading Data**

In [20]:
""" Load and preprocess user-artist data from files. """
def parseUserArtist(item): # Parse the data from the file
    item=item.split()
    userId=int(item[0])
    artistId=int(item[1])
    count=int(item[2])
    result_tup=(userId,artistId,count)
    return result_tup
def convertBadToGoodIds(item): # Convert bad artist Ids to good artist Ids
    artistId=item[1]
    if artistId in canonicalMap.keys():
        artistId=canonicalMap.get(item[1])
    return (item[0],artistId,item[2])


# Load the data from the files
artistDataSmall = sc.textFile('artist_data_small.txt').map(lambda x: x.split('\t')).map(lambda x: [int(x[0]), x[1]])
artistAliasSmall = sc.textFile('artist_alias_small.txt').map(lambda x: x.split('\t')).map(lambda x: [int(x[0]), int(x[1])])
canonicalMap = artistAliasSmall.collectAsMap()
userArtistDataSmall = sc.textFile('user_artist_data_small.txt').map(parseUserArtist)
userArtistDataSmall=userArtistDataSmall.map(convertBadToGoodIds)

# **Data Exploration**

In [21]:
# Create a dictionary of artist names
userDataSmallSplit = userArtistData.map(lambda x: (x[0], x[2]))
userResults = userDataSmallSplit.reduceByKey(lambda a,b: a + b).map(lambda x: (x[1], x[0])).sortByKey(False)
playCounts = userDataSmallSplit.countByKey()
for item in userResults.collect()[0:3]:
    print("User %d has a total play count of %d and a mean play count of %d." %(item[1],item[0],(item[0]/playCounts[item[1]])))

User 1059637 has a total play count of 674412 and a mean play count of 1878.
User 2064012 has a total play count of 548427 and a mean play count of 9455.
User 2069337 has a total play count of 393515 and a mean play count of 1519.


In [23]:
# Split the data into train, validation, and test sets
trainData, validationData, testData = userArtistDataSmall.randomSplit([0.4, 0.4, 0.2], 13)

# Cache the datasets for faster access
trainData.cache()
validationData.cache()
testData.cache()

# Print first 3 records of each dataset
print(trainData.take(3))
print(validationData.take(3))
print(testData.take(3))

# Print counts of records in each dataset
print(trainData.count())
print(validationData.count())
print(testData.count())


First 3 records of trainData:
[(1059637, 1000049, 1), (1059637, 1000056, 1), (1059637, 1000114, 2)]

First 3 records of validationData:
[(1059637, 1000010, 238), (1059637, 1000062, 11), (1059637, 1000123, 2)]

First 3 records of testData:
[(1059637, 1000094, 1), (1059637, 1000112, 423), (1059637, 1000113, 5)]

Count of records in trainData: 19769
Count of records in validationData: 19690
Count of records in testData: 10022


# **Model Evaluation**

In [24]:
""" Train the model with the specified parameters. """
def modelEval(model,dataset):
    # Create a dictionary of the dataset
    subDataset=dataset.map(lambda x:(x[0],x[1])).groupByKey() 
    datasetMap = subDataset.collectAsMap()
    # Create a dictionary of the train data
    subTrainData=trainData.map(lambda x:(x[0],x[1])).groupByKey()
    trainDataMap = subTrainData.collectAsMap()
    # Create a list of all artists
    allArtists=artistDataSmall.map(lambda x:(x[0]))
    allArtists = allArtists.collect()
    total=0.0
    userCount=0

    for user in datasetMap.keys(): # Iterate through each user in the dataset
        artistsInTrainData=trainDataMap.get(user)
        artistsNotInTrainData=[]
        for x in allArtists: # Create a list of artists not in the train data
            if x not in artistsInTrainData:
                artistsNotInTrainData.append(x)
        result=[]
        for x in artistsNotInTrainData: # Create a list of records to predict
            record=(user,x)
            result.append(record)
        # Create an RDD of the records to predict
        finalRDD=sc.parallelize(result)
        trueArtists=datasetMap.get(user)
        X=len(trueArtists)
        finalResult=model.predictAll(finalRDD)
        prediction = finalResult.map(lambda x: (x[2], x[1])).sortByKey(False).map(lambda x: x[1])
        total += len(set(prediction.take(X)).intersection(set(trueArtists)))/float(X)
        userCount=userCount+1


    print("The model score for rank %d is %f"%(rank,float(total/float(userCount))))

# **Model Construction**

In [29]:
# Train the model with ranks 5, 10, and 15
ranks=[5, 10, 15]
for rank in ranks:
    Model = ALS.trainImplicit(trainData, rank=rank, seed=345)
    rank = rank
    modelEval(Model, validationData)

The model score for rank 5 is 0.094497
The model score for rank 10 is 0.096076
The model score for rank 15 is 0.087339


In [27]:
# Train the model with the best rank
bestModel = ALS.trainImplicit(trainData, rank=10, seed=345)
modelEval(bestModel, testData)

The model score for rank 20 is 0.062384


# **Trying Some Artist Recommendations**

In [28]:
# Recommend the top 5 artists for user 1059637
topFive = bestModel.recommendProducts(1059637,5)
artistMap=artistData.collectAsMap()
i = 0
for artist in topFive:
    print("Artist " + str(i) + ": " + artistMap.get(artist[1]))
    i += 1

Artist 0: Something Corporate
Artist 1: My Chemical Romance
Artist 2: Further Seems Forever
Artist 3: Taking Back Sunday
Artist 4: Brand New
