In [None]:
cd <your path>

# Begining

## Load modules

In [None]:
! pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 64kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 40.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=dea8b4d5fa3cb6abee546fd96dbd4d91cf5ffd1c08b617ce9f00ed531132e5d1
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [None]:
import pyspark.sql.functions as func
from pyspark.sql import Window
from pyspark.sql import SparkSession

from pyspark import StorageLevel
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.feature import StringIndexer
# from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RankingMetrics

from operator import itemgetter

## Define the Parameters

In [None]:
# SRC_PATH = '/scratch/work/courses/DSGA1004-2021/MSD'
SRC_PATH = './data'
PATH = './data'

FRACTION = 0.25
TOP=500
PREC_AT = 500


MAX_MEMORY = "30g"
spark = SparkSession \
    .builder \
    .appName('quq') \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

# Data Preparation

## Whole [Only Execute Once]

### Data processing 

In [1]:
# Load the Data
print('Begin loading data')

train = spark.read.parquet('1004_Project/data/cf_train_new.parquet')
# train = spark.read.parquet(SRC_PATH+'/cf_train.parquet')
val = spark.read.parquet(SRC_PATH+'/cf_validation.parquet')
test = spark.read.parquet(SRC_PATH+'/cf_test.parquet')
print('Successfully loaded the data')
# print(test.show(1))

# Get Unique user and track
unique_user = train.select('user_id').distinct()
unique_track = ((train.select('track_id').distinct()) 
                .union(val.select('track_id').distinct())
                .union(test.select('track_id').distinct())).distinct()
print('Successfully get unique user/track')

''' Not working at Colab
# Encode string to index
user_to_index = unique_user.rdd.map(itemgetter(0)).zipWithIndex().toDF(['user_id', 'user_index'])
track_to_index = unique_track.rdd.map(itemgetter(0)).zipWithIndex().toDF(['track_id', 'track_index'])
print('Successfully get the encoding function')
train = train.join(user_to_index,['user_id'], how='left')
train = train.join(track_to_index,['track_id'], how='left')

val = val.join(user_to_index,['user_id'], how='left')
val = val.join(track_to_index,['track_id'], how='left')

test = test.join(user_to_index,['user_id'], how='left')
test = test.join(track_to_index,['track_id'], how='left')     
print('Successfully encoding the user and track')
print(test.show(1))
'''

             
# Too slow to use StringIndexer() when model.fit()
# Encode string to index
indexer_user = StringIndexer(inputCol="user_id", outputCol="user_index")
tran_user = indexer_user.fit(unique_user)
indexer_track = StringIndexer(inputCol="track_id", outputCol="track_index")
tran_track = indexer_track.fit(unique_track)
print('Successfully get the encoding function')

train = tran_user.transform(train)
val = tran_user.transform(val)
test = tran_user.transform(test)
# print(test.show(1))

train = tran_track.transform(train)
val = tran_track.transform(val)
test = tran_track.transform(test)   
# print(test.show(1))

train = train.withColumn('user_index', train['user_index'].cast('int'))
val = val.withColumn('user_index', val['user_index'].cast('int'))
test = test.withColumn('user_index', test['user_index'].cast('int'))

train = train.withColumn('track_index', train['track_index'].cast('int'))
val = val.withColumn('track_index', val['track_index'].cast('int'))
test = test.withColumn('track_index', test['track_index'].cast('int'))
print('Successfully encoding the user and track')
# print(test.show(1))


### Save whole data

In [None]:
# train.write.format('parquet').mode('overwrite').save(PATH+'/cf_train_trans.parquet')
# val.write.format('parquet').mode('overwrite').save(PATH+'/cf_validation_trans.parquet')
# test.write.format('parquet').mode('overwrite').save(PATH+'/cf_test_trans.parquet')

### Save encoding and unique info

In [None]:
user_encoding = train.select('user_id', 'user_index').distinct()
track_encoding = ((train.select('track_id', 'track_index').distinct()) 
                  .union(val.select('track_id', 'track_index').distinct())
                  .union(test.select('track_id', 'track_index').distinct())).distinct()
print(user_encoding.take(1))

unique_user_index_val = val.select('user_index').distinct()
unique_user_index_test = test.select('user_index').distinct()
print(unique_user_index_val.take(1))

[Row(user_index=44596)]


In [None]:
user_encoding.write.format('parquet').mode('overwrite').save(PATH+'/user_encoding.parquet')
track_encoding.write.format('parquet').mode('overwrite').save(PATH+'/track_encoding.parquet')

unique_user_index_val.write.format('parquet').mode('overwrite').save(PATH+'/unique_user_index_val.parquet')
unique_user_index_test.write.format('parquet').mode('overwrite').save(PATH+'/unique_user_index_test.parquet')

### Save sorted rec for user  

In [None]:
w = Window.partitionBy('user_index').orderBy('count')


true_rec_tracks_val = val.withColumn(
    'tracks', func.collect_list('track_index').over(w))\
    .groupBy('user_index')\
    .agg(func.max('tracks').alias('tracks'))
true_rec_tracks_val = true_rec_tracks_val.rdd.map(tuple)

true_rec_tracks_val.saveAsTextFile(PATH+'/true_rec_tracks_val/')

true_rec_tracks_test = test.withColumn(
    'tracks', func.collect_list('track_index').over(w))\
    .groupBy('user_index')\
    .agg(func.max('tracks').alias('tracks'))
true_rec_tracks_test = true_rec_tracks_test.rdd.map(tuple)
true_rec_tracks_test.saveAsTextFile(PATH+'/true_rec_tracks_test/')

print('Successfully transform the true values')

## Sub  [Only Execute Once for Each Fraction]

### Load whole data

In [None]:
train = spark.read.parquet(PATH+'/cf_train_trans.parquet')
val = spark.read.parquet(PATH+'/cf_validation_trans.parquet')
test = spark.read.parquet(PATH+'/cf_test_trans.parquet')

### Subsampling

In [None]:
# Get Unique user
unique_user = train.select('user_id').distinct()
unique_user_val = val.select('user_id').distinct()

# Subsample Based on userid
subset_unique_user = unique_user.sample(withReplacement=False, fraction=FRACTION).union(unique_user_val).distinct()
train_sub = train.join(subset_unique_user, train.user_id == subset_unique_user.user_id, how='leftsemi')
print('Successfully subsample the data')

Successfully subsample the data


### Save subsampling

In [None]:
train_sub.write.format('parquet').mode('overwrite').save(PATH+'/cf_train_trans_{}.parquet'.format(str(FRACTION).replace('.','_')))

In [None]:
print(train_sub.count())

2607960
