In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 41 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 62.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=d046cc179e5a20275d823c2f1b5e516fd30630dab3946bd0d4fd7cd480cd33bc
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [5]:
spark

In [6]:
# load pyspark modules
from pyspark.ml.evaluation import RegressionEvaluator    
from pyspark.ml.recommendation import ALS    
from pyspark.sql.types import IntegerType       
from os import path   
import pandas as pd   
import numpy                             
import shutil

In [7]:
training_df = spark.read.csv("trainItem.data", header = False)
training_df.show(5)

+------+------+---+
|   _c0|   _c1|_c2|
+------+------+---+
|199808|248969| 90|
|199808|  2663| 90|
|199808| 28341| 90|
|199808| 42563| 90|
|199808| 59092| 90|
+------+------+---+
only showing top 5 rows



In [8]:
training_df.count()

12403575

In [9]:
training_df = training_df.withColumnRenamed('_c0', 'userID').withColumnRenamed('_c1', 'itemID').withColumnRenamed('_c2', 'rating')

In [10]:
training_df.dtypes

[('userID', 'string'), ('itemID', 'string'), ('rating', 'string')]

In [11]:
training_df = training_df.withColumn('userID', training_df['userID'].cast(IntegerType()))
training_df = training_df.withColumn('itemID', training_df['itemID'].cast(IntegerType()))
training_df = training_df.withColumn('rating', training_df['rating'].cast('float'))

In [12]:
training_df.dtypes

[('userID', 'int'), ('itemID', 'int'), ('rating', 'float')]

In [13]:
als = ALS(
    maxIter=5,              
    rank=5,                 
    regParam=0.01,            
    userCol="userID",       
    itemCol="itemID",        
    ratingCol="rating",      
    nonnegative = True,      
    implicitPrefs = False    
)

In [14]:
model = als.fit(training_df)

In [15]:
testing_df = spark.read.csv('testItem.data', header=False)
testing_df.show(5)

+------+------+---+
|   _c0|   _c1|_c2|
+------+------+---+
|199810|208019|  0|
|199810| 74139|  0|
|199810|  9903|  0|
|199810|242681|  0|
|199810| 18515|  0|
+------+------+---+
only showing top 5 rows



In [16]:
testing_df.count()

120000

In [17]:
testing_df = testing_df.withColumnRenamed("_c0", "userID").withColumnRenamed("_c1", "itemID").withColumnRenamed("_c2", "rating")

In [18]:
testing_df.dtypes

[('userID', 'string'), ('itemID', 'string'), ('rating', 'string')]

In [19]:
testing_df = testing_df.withColumn("userID", testing_df["userID"].cast(IntegerType()))
testing_df = testing_df.withColumn("itemID", testing_df["itemID"].cast(IntegerType()))
testing_df = testing_df.withColumn("rating", testing_df["rating"].cast('float'))

In [20]:
testing_df.dtypes

[('userID', 'int'), ('itemID', 'int'), ('rating', 'float')]

In [26]:
predictions = model.transform(testing_df)

In [27]:
predictions = predictions.drop('rating')

In [28]:
predictions.show(5)

+------+------+----------+
|userID|itemID|prediction|
+------+------+----------+
|199810|  9903| 15.583747|
|199810| 18515|  83.21027|
|199810| 74139| 39.558212|
|199810|105760|  75.45437|
|199810|208019| 60.328117|
+------+------+----------+
only showing top 5 rows



In [29]:
predictions.count()

120000

In [30]:
if path.exists('predictions'):
    shutil.rmtree('predictions')

In [31]:
predictions.coalesce(numPartitions=1).write.csv("predictions")

In [32]:
predictions.toPandas().to_csv('myprediction.csv')

In [48]:
ratings_file = 'myprediction.csv'
predictions_file = 'predictions.csv'
als_submission_file = 'als_submission_2.csv'

f_ratings = open(ratings_file)
f_predictions = open(predictions_file, 'w')
f_als_submission = open(als_submission_file, 'w')

In [49]:
f_als_submission.write('TrackID,Predictor\n')

18

In [50]:
column_list = ('userID', 'itemID', 'prediction')
ratings_df = pd.read_csv(ratings_file, usecols=column_list)

In [51]:
ratings_df['prediction'] = ratings_df['prediction'].fillna(50.0)

In [52]:
ratings_df

Unnamed: 0,userID,itemID,prediction
0,233686,1,43.141533
1,215400,3,37.245644
2,224379,5,13.557379
3,200179,13,44.289536
4,199859,17,17.466816
...,...,...,...
119995,203390,296078,68.375640
119996,232887,296081,8.313974
119997,239053,296095,76.775620
119998,204230,296098,0.000000


In [53]:
ratings_df.sort_values(["userID", "prediction"], ascending = (True, False), inplace=True)

In [54]:
ratings_df.reset_index(drop=True, inplace=True)

In [55]:
ratings_df.head(5)

Unnamed: 0,userID,itemID,prediction
0,199810,18515,83.21027
1,199810,105760,75.45437
2,199810,208019,60.328117
3,199810,74139,39.558212
4,199810,242681,38.155624


In [56]:
ratings_df.to_csv('predictions.csv', index=False, header=False)

In [57]:
f_predictions.close()

In [58]:
f_predictions = open(predictions_file)

In [59]:
ratings_array = numpy.zeros(shape=(6))
last_user_id = -1
track_id_out_vec = [0] * 6

In [60]:
for line in f_predictions:
    arr_out = line.strip().split(',')     
    user_id_out = arr_out[0]            
    track_id_out = arr_out[1]            
    rating = float(arr_out[2])           
    
    if user_id_out != last_user_id:             
        i = 0                                   
        ratings_array = numpy.zeros(shape=(6))  
        
    ratings_array[i] = rating                   
    track_id_out_vec[i] = track_id_out          
        
    i = i + 1                    
    last_user_id = user_id_out   
    
    if i == 6:                               
         
        predictions = numpy.zeros(shape=(6)) 
        for index in range(0, 3):            
            predictions[index] = 1           
        
        for ii in range(0, 6):         
            out_str = str(user_id_out) + '_' + str(track_id_out_vec[ii]) + ',' + str(int(predictions[ii]))
            f_als_submission.write(out_str + '\n')

In [61]:
f_ratings.close()
f_predictions.close()
f_als_submission.close()