In [2]:
import pyspark

from pyspark import SparkContext, SQLContext
sc = pyspark.SparkContext()
sqlc = SQLContext(sc)

In [27]:
import os
from pyspark.sql import functions as F, Window
from pyspark.sql.functions import *
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.feature import StringIndexer
from pyspark.ml.tuning import ParamGridBuilder
import numpy as np


In [4]:
class ActionsData:
    
    def __init__(self,folder,file,file_repos,user,date,item):
        self._data=None
        self._data_items=None
        self.load_data(folder,file,user,date,item)
        self.load_items(folder,file_repos)
            
    def load_data(self,foldername,filename,user,date,item):
        """load interactions data of users with repositories"""
        file=os.path.join(foldername+filename)
        data=sqlc.read.json(file)
        data=data.select(col(user).alias('user_id'),col(date).alias('created_at'), col(item).alias('repo_id'))
        self._data=data
        
    def load_items(self,foldername,filename):
        """load informations about repositories"""
        file_repos=os.path.join(foldername+filename)
        data_repos=sqlc.read.json(file_repos)
        self._data_items=data_repos.select(col('id').alias('repo_id'),'name','language').distinct()
    
    def join_w_repos(self):
        """consider only interactions to repositories contained in self._data_items"""
        self._data=self._data.join(self._data_items,'repo_id','inner')
    
    
    def remove_duplicates(self):
        """remove duplicated of interactions of a user with the same repository"""
        self._data=self._data.sort('user_id','created_at',ascending=True).dropDuplicates(['user_id','repo_id'])

    def filter_actions(self,min_actions,max_actions):
        """filter out users inactive users (users who interacted with less than min_actions
        repositories) and outliers (users who interacted with more than max_actions repositories)"""
        data_with_max=self._data.groupby('user_id').agg(F.count('repo_id').alias('total_actions'))
        data_filter=data_with_max.filter((data_with_max.total_actions>min_actions)\
                                           & (data_with_max.total_actions<max_actions))
        
        self._data=self._data.join(data_filter.select('user_id'),'user_id','inner')

        
    def add_rating(self,rating):
        """add a column with rating value: in a class instance each interaction has the same value"""
        self._data=self._data.groupby('user_id','created_at','repo_id')\
                            .agg((F.count('*')*rating).alias('rating'))
        
        
    def transform(self,min_actions,max_actions,rating):
        """apply data transformations"""
        self.join_w_repos()
        self.remove_duplicates()
        self.filter_actions(min_actions,max_actions)
        self.add_rating(rating)

In [29]:
class SimpleRecommender:
    
    def __init__(self,data):
        
        self._data=data
        self._train=None
        self._test=None
        self._model=None
    
    
    def message(self,x):
        print(x)
        
    def split_train_test(self):
        self._train=self._data.filter('number_of_actions<total_actions')
        self._test=self._data.filter('number_of_actions=total_actions')
         
    def fit(self,param):
        self.split_train_test()
        self.message('Train ALS with maxIter=%d, rank=%d, regParameter=%d.d' % (param['iter'],param['rank'],param['reg']))
        
        als = ALS(maxIter=param['iter'],rank=param['rank'],regParam=param['reg'],userCol="user_idn",\
                    itemCol="repo_idn",ratingCol="rating", seed=1, coldStartStrategy='drop')
        evaluator_reg=RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")      
        model=als.fit(self._train)
        self._model=model
        predictions_train=model.transform(self._train)
        train_rmse=evaluator_reg.evaluate(predictions_train)
        self.message('Train RMSE=' + str(train_rmse))
        predictions_train=predictions_train.withColumn('prediction_bin',when(col('prediction')>0.5,1).otherwise(0))
        multiclass_train= MulticlassMetrics(predictions_train.select(col('prediction_bin').\
                                          alias('score'),col('rating').alias('label')).\
                                          rdd.map(lambda x: (float(x.score),float(x.label))))
        train_rec=multiclass_train.recall()
        self.message('Train Recall=' + str(train_rec))
        predictions_test=model.transform(self._test)
        test_rmse=evaluator_reg.evaluate(predictions_test)
        self.message('Test RMSE=' + str(test_rmse))
        predictions_test=predictions_test.withColumn('prediction_bin',when(col('prediction')>0.5,1).otherwise(0))
        multiclass_test= MulticlassMetrics(predictions_test.select(col('prediction_bin').\
                                          alias('score'),col('rating').alias('label')).\
                                          rdd.map(lambda x: (float(x.score),float(x.label))))
        test_rec=multiclass_test.recall()
        self.message('Test Recall=' + str(test_rec))
        

### Load forks data: consider only users who forked between 5 and 2500 repositories. Each fork has rating value=1

In [6]:
forks=ActionsData(folder='./data',file='projects_forked_2017.json',\
                  file_repos='projects_not_forked_2017.json',\
                  user='owner_id',date='created_at',item='forked_from')

In [7]:
forks.transform(min_actions=5, max_actions=2500,rating=1)

In [8]:
forks_data=forks._data


In [9]:
forks_data.cache()

DataFrame[user_id: string, created_at: string, repo_id: string, rating: bigint]

In [10]:
forks_data.show(5)

+--------+--------------------+--------+------+
| user_id|          created_at| repo_id|rating|
+--------+--------------------+--------+------+
|10013904|2017-03-23 21:00:...|58639826|     1|
|10013904|2017-06-20 05:14:...|66659683|     1|
|10013904|2017-05-06 05:16:...|60058077|     1|
|10013904|2017-03-27 20:22:...|57414261|     1|
|10013904|2017-06-21 19:50:...|60254642|     1|
+--------+--------------------+--------+------+
only showing top 5 rows



### Load forks data: consider only users who forked between 20 and 7000 repositories. Each fork has rating value=1

In [11]:
stars=ActionsData(folder='./data',file='watchers_2017.json',\
                  file_repos='projects_not_forked_2017.json',\
                  user='w_user_id',date='w_created_at',item='w_repo_id')

In [12]:
stars.transform(min_actions=20, max_actions=7000,rating=1)

In [13]:
stars_data=stars._data

In [14]:
stars_data.cache()

DataFrame[user_id: string, created_at: string, repo_id: string, rating: bigint]

In [15]:
stars_data.show(5)

+-------+--------------------+--------+------+
|user_id|          created_at| repo_id|rating|
+-------+--------------------+--------+------+
| 101122|2017-04-27 02:43:...|63294245|     1|
| 101122|2017-07-15 01:51:...|65917216|     1|
| 101122|2017-08-08 07:32:...|70845318|     1|
| 101122|2017-03-26 06:21:...|56265831|     1|
| 101122|2017-08-31 04:23:...|73197055|     1|
+-------+--------------------+--------+------+
only showing top 5 rows



### Create the union data of forks and stars and drop duplicates (users that forked and starred the same repository)

In [16]:
union=forks_data.union(stars_data)
union_dd=union.sort('user_id','created_at').dropDuplicates(['user_id','repo_id'])
union_dd.cache()

DataFrame[user_id: string, created_at: string, repo_id: string, rating: bigint]

### Create two additional columns: number_of_actions is the sequential number of interations (forks,stars) by user ordered by date; total_actions is the total number of interactions by user. These two columns will be used to split train and test set

In [17]:
w=(Window.partitionBy('user_id').orderBy('created_at').rowsBetween(Window.unboundedPreceding, Window.currentRow))
union_dd=union_dd.withColumn('number_of_actions',F.count('user_id').over(w))

In [18]:
total_actions=union_dd.groupby('user_id').agg(F.max('number_of_actions').alias('total_actions'))
union_dd=union_dd.join(total_actions,'user_id','inner')

### Create numeric IDs for users and repos

In [19]:
indexer_user=StringIndexer(inputCol="user_id",outputCol="user_idn")#.setHandleInvalid('skip')
indexer_repo=StringIndexer(inputCol='repo_id',outputCol='repo_idn')
union_dd=indexer_user.fit(union_dd).transform(union_dd)
union_dd=indexer_repo.fit(union_dd).transform(union_dd)

### Create a recommendation model with SimpleRecommender

In [30]:
rec=SimpleRecommender(union_dd)

In [31]:
parameters={'rank':10,'iter':10,'reg':0.1}
rec.fit(param=parameters)

model_final=rec._model


Train ALS with maxIter=10, rank=10, regParameter=0.d
Train RMSE=0.09996531920783683
Train Recall=0.9999653022641679
Test RMSE=0.14636304894348268
Test Recall=0.9875230485556239


In [32]:
model_final.save('/data/als_r10_i10_reg01_fs.parquet')

In [33]:
sc.stop()