In [None]:
!pip install pyspark
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .master('local[*]')\
        .appName('Homework9Part1')\
        .config('spark.driver.maxResultSize', '10g')\
        .config('spark.executor.memory' ,'10g')\
        .config('spark.driver.memory', '10g')\
        .getOrCreate()

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 30 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 32.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=f9f7fd893bb0b6d191d4bb480d81ebe03834bcc4a6d2233798f0269e7f6144e2
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


## Import Data and process for training

In [None]:
import pandas as pd

columns = ["userId", "itemId", "score"]
# Load Training data from trainIdx2_matrix
train = pd.read_csv('trainIdx2_matrix.txt', sep="|",names= columns, index_col=False, dtype=int)


train.head()


Unnamed: 0,userId,itemId,score
0,199808,248969,90
1,199808,2663,90
2,199808,28341,90
3,199808,42563,90
4,199808,59092,90


In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

#Used pyspark becasue pandas columns with NA values cannot be int
test_schema = StructType([
    StructField('userId', IntegerType()),
    StructField('trackId', IntegerType()),
    StructField('albumId', IntegerType()),
    StructField('artistId', IntegerType()),
    StructField('genreId_1', IntegerType()),
    StructField('genreId_2', IntegerType()),
    StructField('genreId_3', IntegerType()),
    StructField('genreId_4', IntegerType()),
    StructField('genreId_5', IntegerType()),
    StructField('genreId_6', IntegerType()),
    StructField('genreId_7', IntegerType()),
    StructField('genreId_8', IntegerType()),
    StructField('genreId_9', IntegerType()),
    StructField('genreId_10', IntegerType()),
    StructField('genreId_11', IntegerType()),
    StructField('genreId_12', IntegerType()),
    StructField('genreId_13', IntegerType()),
    StructField('genreId_14', IntegerType()),
    StructField('genreId_15', IntegerType()),
    StructField('genreId_16', IntegerType()),
    StructField('genreId_17', IntegerType()),
    StructField('genreId_18', IntegerType()),
    StructField('genreId_19', IntegerType()),
    StructField('genreId_20', IntegerType()),
    StructField('genreId_21', IntegerType()),
])

test = spark.read.csv('testTrack_hierarchy.txt', sep='|', nullValue='None', header=False, schema= test_schema)

test.show(5)

+------+-------+-------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
|userId|trackId|albumId|artistId|genreId_1|genreId_2|genreId_3|genreId_4|genreId_5|genreId_6|genreId_7|genreId_8|genreId_9|genreId_10|genreId_11|genreId_12|genreId_13|genreId_14|genreId_15|genreId_16|genreId_17|genreId_18|genreId_19|genreId_20|genreId_21|
+------+-------+-------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+
|199810| 208019| 209288|    null|     null|     null|     null|     null|     null|     null|     null|     null|     null|      null|      null|      null|      null|      null|      null|      null|      null|      null|      null

In [None]:
# Remove Users from Train that are not include in test
unique_users = test.select('userId').distinct().coalesce(1) #Get list of all unique users in test
train = train[train['userId'].isin(unique_users.toPandas().userId)] # Keep only users from test

train.head()
#reduced rows from 12403575 to 10643437

Unnamed: 0,userId,itemId,score
73,199810,48050,70
74,199810,1589,50
75,199810,155767,70
76,199810,178994,50
77,199810,195282,50


In [None]:
# convert to spark df
train = spark.createDataFrame(train)
train.show(5)

+------+------+-----+
|userId|itemId|score|
+------+------+-----+
|199810| 48050|   70|
|199810|  1589|   50|
|199810|155767|   70|
|199810|178994|   50|
|199810|195282|   50|
+------+------+-----+
only showing top 5 rows



## Instantiate and Fit ALS Model

In [None]:
from pyspark.ml.recommendation import ALS

als = ALS(userCol='userId', 
          itemCol='itemId',
          ratingCol='score', 
          rank=5,
          maxIter= 10,
          regParam=0.01,
          nonnegative = True, 
          implicitPrefs = False)

In [None]:
model = als.fit(train)
train_output = model.transform(train)
train_output.show(5)

+------+------+-----+----------+
|userId|itemId|score|prediction|
+------+------+-----+----------+
|199810|275191|   50| 78.732635|
|199810|  1589|   50|  73.12158|
|199810|178994|   50| 64.965836|
|199810| 48050|   70|   71.0759|
|199810|155767|   70|  34.88947|
+------+------+-----+----------+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName='rmse', labelCol='score', predictionCol='prediction')
print('RMSE: ', evaluator.evaluate(train_output))

RMSE:  26.98792269513774


In [None]:
# Predict scores of track, album, and artist
track_score = model.setItemCol('trackId').transform(test)
album_score = model.setItemCol('albumId').transform(test.filter('albumId IS NOT NULL'))
artist_score= model.setItemCol('artistId').transform(test.filter('artistId IS NOT NULL'))

In [None]:
# Combine all scores into dataframe
from pyspark.sql.types import IntegerType

predictions = test.select('userId', 'trackId')

predictions = predictions.join(track_score.select('userId', 'trackId', 'prediction'), ['userId', 'trackId'], 'left')
predictions = predictions.withColumn('prediction', predictions['prediction'].cast(IntegerType()))\
                         .withColumnRenamed('prediction', 'track_score')

predictions = predictions.join(album_score.select('userId', 'trackId', 'prediction'), ['userId', 'trackId'], 'left')

predictions = predictions.withColumn('prediction', predictions['prediction'].cast(IntegerType()))\
                         .withColumnRenamed('prediction', 'album_score')

predictions = predictions.join(artist_score.select('userId', 'trackId', 'prediction'), ['userId', 'trackId'], 'left')

predictions = predictions.withColumn('prediction', predictions['prediction'].cast(IntegerType()))\
                         .withColumnRenamed('prediction', 'artist_score')

predictions = predictions.na.fill(0) #Replace null values with 0
predictions.show(5)

+------+-------+-----------+-----------+------------+
|userId|trackId|track_score|album_score|artist_score|
+------+-------+-----------+-----------+------------+
|199810|  74139|         59|         73|          56|
|199810| 242681|         61|         65|          68|
|199810|   9903|         74|          0|           0|
|199810| 208019|         64|         57|           0|
|199810| 105760|         68|         60|          70|
+------+-------+-----------+-----------+------------+
only showing top 5 rows



In [None]:
# Add column for number of genres present for the user-track

genre_headers = ['userId', 'trackId', 'genreId_1', 'genreId_2', 'genreId_3', 'genreId_4',
                 'genreId_5', 'genreId_6', 'genreId_7','genreId_8',
                 'genreId_9', 'genreId_10','genreId_11', 'genreId_12',
                 'genreId_13', 'genreId_14','genreId_15','genreId_16',
                 'genreId_17','genreId_18','genreId_19','genreId_20',
                 'genreId_21']

genres = test.select(genre_headers)

from pyspark.sql.functions import isnull
# Get sum of null genre values and subtract from 21 (total genres)
num_genres = genres.select('userId', 'trackId', (21-sum([isnull(genres[col]).cast(IntegerType()) for col in genres.columns])).alias('num_genres')) #Must change to isNotNull()
predictions = predictions.join(num_genres, ['userId', 'trackId'], 'left')

predictions.coalesce(1).write.csv('\content\scores.csv', header=True)
predictions.show(5)

+------+-------+-----------+-----------+------------+----------+
|userId|trackId|track_score|album_score|artist_score|num_genres|
+------+-------+-----------+-----------+------------+----------+
|199810|  74139|         59|         73|          56|         7|
|199810| 242681|         61|         65|          68|         3|
|199810|   9903|         74|          0|           0|         4|
|199810| 208019|         64|         57|           0|         0|
|199810| 105760|         68|         60|          70|         4|
+------+-------+-----------+-----------+------------+----------+
only showing top 5 rows



# Create the train matrix that has the same structure of the test data

In [None]:
train2 = pd.read_csv('trainIdx2_matrix.txt', header=None, sep='|', names=['userId', 'itemId', 'score'])

In [None]:
train2.head()

Unnamed: 0,userId,itemId,score
0,199808,248969,90
1,199808,2663,90
2,199808,28341,90
3,199808,42563,90
4,199808,59092,90


In [None]:
trackIds = pd.read_csv('trackData2.txt', sep='|', usecols=[0], header=None, names=['trackId'])
albumIds = pd.read_csv('albumData2.txt', sep='|', usecols=[0], header=None, names=['albumId'])
artistIds = pd.read_csv('artistData2.txt', sep='|', usecols=[0], header=None, names=['artistId'])
genreIds = pd.read_csv('genreData2.txt', sep='|', usecols=[0], header=None, names=['genreId'])

FileNotFoundError: ignored

## Classify whether the itemId within the trainset is a track, album, artist, or genre

In [None]:
train_tracks = train2[train2.itemId.isin(trackIds.trackId)]
train_albums = train2[train2.itemId.isin(albumIds.albumId)]
train_artists = train2[train2.itemId.isin(artistIds.artistId)]
train_genres = train2[train2.itemId.isin(genreIds.genreId)]

In [None]:
train_tracks = train_tracks.rename(columns = {'score': 'track_score', 'itemId': 'trackId'})
train_albums = train_albums.rename(columns = {'score': 'album_score', 'itemId': 'albumId'})
train_artists = train_artists.rename(columns = {'score': 'artist_score', 'itemId': 'artistId'})

In [None]:
train_tracks.head()

In [None]:
train_albums.head()

In [None]:
train_artists.head()

## Get the hierarchy of the tracks and albums 

In [None]:
track_h = pd.read_csv('trackData2.txt', sep='|', header=None, na_values=['None'], names=['trackId', 'albumId', 'artistId', 'genreId_1', 'genreId_2', 'genreId_3', 'genreId_4', 'genreId_5', 'genreId_6', 'genreId_7', 'genreId_8', 'genreId_9', 'genreId_10', 'genreId_11', 'genreId_12', 'genreId_13', 'genreId_14', 'genreId_15', 'genreId_16', 'genreId_17', 'genreId_18', 'genreId_19', 'genreId_20', 'genreId_21'])
 
album_h = pd.read_csv('albumData2.txt', sep='|', header=None, na_values=['None'], names=['albumId', 'artistId', 'genreId_1', 'genreId_2', 'genreId_3', 'genreId_4', 'genreId_5', 'genreId_6', 'genreId_7', 'genreId_8', 'genreId_9', 'genreId_10', 'genreId_11', 'genreId_12', 'genreId_13', 'genreId_14', 'genreId_15', 'genreId_16', 'genreId_17', 'genreId_18', 'genreId_19', 'genreId_20', 'genreId_21'])

In [None]:
train_tracks_h = pd.merge(train_tracks, track_h, how='left', on='trackId')

In [None]:
train_albums_h = pd.merge(train_albums, album_h, how='left', on='albumId')

### Only keep the train albums seperate that are not already inside `train_tracks_h`

In [None]:
train_albums_h = train_tracks_h[~train_tracks_h.albumId.isin(train_albums_h.albumId)]

## For the `train_tracks_h` check to see if scores exist in the other matrices

In [None]:
track_score_in_albums = train_albums[train_albums.albumId.isin(train_tracks_h.albumId)]

In [None]:
track_score_in_albums.head()

In [None]:
track_score_in_artists = train_artists[train_artists.artistId.isin(train_tracks_h.artistId)]

In [None]:
track_score_in_artists.head()

### Combine the matrices together

In [None]:
final = pd.merge(train_tracks_h, track_score_in_albums, how='outer', on=['userId', 'albumId'])

In [None]:
final = pd.merge(final, track_score_in_artists, how='outer', on=['userId', 'artistId'])

In [None]:
final.head()

## Remove columns that have an `Nan` value for trackId since the test set only consists of trackIds

In [None]:
final = final.dropna(subset=['trackId'])

In [None]:
## Save final as csv
final.to_csv('train_with_empty.csv', index=False, na_rep='None', columns=['userId', 'trackId', 'albumId', 'artistId', 'genreId_1', 'genreId_2', 'genreId_3', 'genreId_4', 'genreId_5', 'genreId_6', 'genreId_7', 'genreId_8', 'genreId_9', 'genreId_10', 'genreId_11', 'genreId_12', 'genreId_13', 'genreId_14', 'genreId_15', 'genreId_16', 'genreId_17', 'genreId_18', 'genreId_19', 'genreId_20', 'genreId_21'])

In [None]:
empty_album_scores = final[final.album_score.isna()]
empty_album_scores = empty_album_scores.dropna(subset=['albumId'])
empty_album_scores.head()

empty_album_scores[['userId', 'trackId', 'albumId']].to_csv('empty_album_scores.csv', index=False)

In [None]:
empty_artist_scores = final[final.artist_score.isna()]
empty_artist_scores = empty_artist_scores.dropna(subset=['albumId'])
empty_artist_scores.head()

empty_artist_scores[['userId', 'trackId', 'artistId']].to_csv('empty_artist_scores.csv', index=False)

## Make predictions on the empty values

In [None]:
albums = spark.read.csv('empty_album_scores.csv', sep=',', header=True)
artists = spark.read.csv('empty_artist_scores.csv', sep=',', header=True)

In [None]:
albums.printSchema()

In [None]:
artists.printSchema()

In [None]:
albums = albums.withColumn('userId', albums['userId'].cast(IntegerType()))
albums = albums.withColumn('trackId', albums['trackId'].cast(IntegerType()))
albums = albums.withColumn('albumId', albums['albumId'].cast(IntegerType()))

artists = artists.withColumn('userId', artists['userId'].cast(IntegerType()))
artists = artists.withColumn('trackId', artists['trackId'].cast(IntegerType()))
artists = artists.withColumn('artistId', artists['artistId'].cast(IntegerType()))

In [None]:
albums.printSchema()

In [None]:
artists.printSchema()

In [None]:
 model.setItemCol('itemId')

In [None]:
album_score_train = model.transform(albums.filter('albumId IS NOT NULL').withColumnRenamed('albumId', 'itemId')).withColumnRenamed('itemId', 'albumId').withColumnRenamed('prediction', 'album_score')
artist_scores_train = model.transform(artists.filter('artistId IS NOT NULL').withColumnRenamed('artistId', 'itemId')).withColumnRenamed('itemId', 'artistId').withColumnRenamed('prediction', 'artist_score')

album_score_train.coalesce(1).write.csv('predicted_albums.csv', header=True)
artist_scores_train.coalesce(1).write.csv('predicted_artists.csv', header=True)

## Find num genres

In [None]:
final2 = spark.read.csv('train_with_empty.csv',
                     sep=',',
                     nullValue='None',
                     header=True)

In [None]:
final2.show(5)

In [None]:
final_genres = final2.select('userId', 
                          'trackId', 
                          'genreId_1', 
                          'genreId_2', 
                          'genreId_3', 
                          'genreId_4', 
                          'genreId_5', 
                          'genreId_6', 
                          'genreId_7', 
                          'genreId_8', 
                          'genreId_9', 
                          'genreId_10', 
                          'genreId_11', 
                          'genreId_12', 
                          'genreId_13', 
                          'genreId_14',
                          'genreId_15',
                          'genreId_16',
                          'genreId_17',
                          'genreId_18',
                          'genreId_19',
                          'genreId_20',
                          'genreId_21')

In [None]:
final_num_genres = final_genres.select('userId', 'trackId', sum([isnull(final_genres[col]).cast(IntegerType()) for col in final_genres.columns]).alias('num_genres'))

In [None]:
final_num_genres.show(5)

final_num_genres.coalesce(1).write.csv('train_num_genres.csv', header=True)

In [None]:
predicted_artists = pd.read_csv('/content/predicted_artists.csv/part-00000-57b1ab74-e11a-4289-a273-4fc4938b4721-c000.csv')
predicted_albums = pd.read_csv('/content/predicted_albums.csv/part-00000-89f681a0-d7cf-4382-89d4-427e26131332-c000.csv')
num_genres_df = pd.read_csv('/content/train_num_genres.csv/part-00000-094d6c01-0aba-49cb-a373-a5a9b714383b-c000.csv')

In [None]:
predicted_artists = predicted_artists.set_index(['userId', 'trackId'])

In [None]:
predicted_albums = predicted_albums.set_index(['userId', 'trackId'])

In [None]:
num_genred_df = num_genres_df.set_index(['userId', 'trackId'])

In [None]:
final = final[['userId', 'trackId', 'track_score', 'album_score', 'artist_score']].set_index(['userId', 'trackId'])
final.head()

In [None]:
final['artist_score'] = final['artist_score'].fillna(predicted_artists['artist_score'])

In [None]:
final['album_score'] = final['album_score'].fillna(predicted_albums['album_score'])

In [None]:
final['num_genres'] = num_genred_df['num_genres']

In [None]:
final.head()

In [None]:
final.isna().sum()

In [None]:
final = final.fillna(0)

In [None]:
final.isna().sum()

In [None]:
final.to_csv('finalTrainset.csv')