In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
plt.style.use('bmh')

import pyspark
import pyspark.sql.functions
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator


# The following is only for working with RDDs
# from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

# Setup a SparkSession
spark = SparkSession.builder.getOrCreate()

In [4]:
traindata = pd.read_csv('data/new_subset_data/final_train_data.csv')
testdata = pd.read_csv('data/new_subset_data/final_test_data.csv')

traindata = traindata.iloc[:300000,:]
testdata = testdata.iloc[:100000,:]

sp_train = spark.createDataFrame(traindata)
sp_test = spark.createDataFrame(testdata)

In [5]:
#Change column names to something Spark has hardcoded into their GridSearch feature for dataframes
oldColumns = sp_train.schema.names
newColumns = ["user", "item", "rating"]

sp_train = reduce(lambda sp_train, idx: sp_train.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), sp_train)
sp_train.printSchema()

#Change column names to something Spark has hardcoded into their GridSearch feature for dataframes
oldColumns = sp_test.schema.names
newColumns = ["user", "item", "rating"]

sp_test = reduce(lambda sp_test, idx: sp_test.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), sp_test)
sp_test.printSchema()

root
 |-- user: long (nullable = true)
 |-- item: long (nullable = true)
 |-- rating: double (nullable = true)

root
 |-- user: long (nullable = true)
 |-- item: long (nullable = true)
 |-- rating: double (nullable = true)



### Small test set

In [None]:
sp_test_smaller = sc.parallelize(sp_test.take(100000))

# Load entire dataset for use in crossval gridsearch

In [31]:
entiredata = pd.read_csv('data/new_subset_data/ratings_data.csv', sep='\t')

sp_entire = spark.createDataFrame(entiredata)

In [32]:
#Change column names to something Spark has hardcoded into their GridSearch feature for dataframes
oldColumns = sp_entire.schema.names
newColumns = ["user", "item", "rating"]

sp_entire = reduce(lambda sp_entire, idx: sp_entire.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), sp_entire)
sp_entire.printSchema()
sp_entire.show()

root
 |-- user: long (nullable = true)
 |-- item: long (nullable = true)
 |-- rating: double (nullable = true)

+----+----+------+
|user|item|rating|
+----+----+------+
|   4|   2|   4.0|
|  23|   3|   4.0|
|  28|   5|   4.0|
|  31|   6|   4.0|
|  67|  10|   6.0|
|  69|  11|   4.0|
|  78|  19|   4.0|
|  86|  23|   4.0|
|  89|  27|   4.0|
| 111|  29|   4.0|
| 136|  31|   4.0|
| 142|  34|   4.0|
| 145|  35|   4.0|
| 159|  37|   4.0|
| 161|  39|   4.0|
| 166|  40|   4.0|
| 178|  42|   4.0|
| 208|  49|   4.0|
| 216|  52|   4.0|
| 272|  55|   4.0|
+----+----+------+
only showing top 20 rows



# Create ALS models and fit recommenders

In [15]:
# define models using different ranks
model0 = ALS(
    itemCol='item',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=.10,
    rank=1,
    maxIter=15
    ) 

model1 = ALS(
    itemCol='item',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=.10,
    rank=5,
    maxIter=15
    ) 

model2 = ALS(
    itemCol='item',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=.10,
    rank=7,
    maxIter=15
    )

model3 = ALS(
    itemCol='item',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=.10,
    rank=10,
    maxIter=15
    )

model4 = ALS(
    itemCol='item',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=.10,
    rank=15,
    maxIter=15
    )

model5 = ALS(
    itemCol='item',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=.10,
    rank=20,
    maxIter=15
    )

model6 = ALS(
    itemCol='item',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=.10,
    rank=25,
    maxIter=15
    )

model7 = ALS(
    itemCol='item',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=.10,
    rank=30,
    maxIter=15
    )

model8 = ALS(
    itemCol='item',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=.10,
    rank=40,
    maxIter=15
    )

model9 = ALS(
    itemCol='item',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=.10,
    rank=55,
    maxIter=15
    )

model10 = ALS(
    itemCol='item',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=.10,
    rank=65,
    maxIter=15
    )

In [16]:
# Fit models
recommender0 = model0.fit(sp_train)
recommender1 = model1.fit(sp_train)
recommender2 = model2.fit(sp_train)
recommender3 = model3.fit(sp_train)
recommender4 = model4.fit(sp_train)
recommender5 = model5.fit(sp_train)
recommender6 = model6.fit(sp_train)
recommender7 = model7.fit(sp_train)
recommender8 = model8.fit(sp_train)
recommender9 = model9.fit(sp_train)
recommender10 = model10.fit(sp_train)

models = [recommender0, recommender1, recommender2, recommender3, recommender4, 
          recommender5, recommender6, recommender7, recommender8, recommender9, recommender10]

In [17]:
def evaluate_models(trained_models, testdf, metric='rmse'):
    '''
    INPUT: list of trained models, and spark dataframe of training data
    OUTPUT: list of rmses, and list of ranks associated with models
    
    trained_models = [recommender1, recommender2]
    testdf = spark.createDataFrame(test_pandas_df)
    
    rmses, ranks =  evaluate_models(trained_models, testdf)
    '''
    ranks = []
    rmses = []
    for model in trained_models:
        predictions = model.transform(testdf)
        pred_df = predictions.toPandas()
        rawPredictions = spark.createDataFrame(pred_df.dropna(axis=0))
        
        predictions = rawPredictions\
        .withColumn("rating", rawPredictions.rating.cast("double"))\
        .withColumn("prediction", rawPredictions.prediction.cast("double"))
       
        evaluator =\
        RegressionEvaluator(metricName=metric, labelCol="rating", predictionCol="prediction")
        rmse = evaluator.evaluate(predictions)
        
        ranks.append(model.rank)
        rmses.append(rmse)
    return rmses, ranks
    

In [None]:
rmses, ranks = evaluate_models(models, sp_test, metric='rmse')

In [None]:
rmsestrain, rankstrain = evaluate_models(models, sp_train, metric='rmse')

In [None]:
rmses

In [None]:
rmsestrain

# Use 20-25 latent features (rank)

In [None]:
fig, ax = plt.subplots(1,1, figsize=(10,7))

ax.plot(ranks, rmses, label='Test', c='darkred')
ax.plot(rankstrain, rmsestrain, label='Train', c='indianred', ls='dashed')
ax.axhline(mean_rmse, label='Mean', ls='dashed', c='dimgray')
#ax.set_facecolor('red')

ax.set_title("Latent Features to RMSE", fontsize=25)
ax.set_ylabel("Root Mean Squared Error", fontsize=15)
ax.set_xlabel("# Latent Features", fontsize=15)
ax.legend(fontsize=15)
#fig.savefig('/home/ubuntu/PROJECT/github-collaborator/matplots/RMSE2.png');

In [None]:
fig, ax = plt.subplots(1,1, figsize=(10,7))

ax.plot(ranks, rmses, label='Test', c='darkred')
ax.plot(rankstrain, rmsestrain, label='Train', c='indianred', ls='dashed')
ax.axhline(mean_rmse, label='Mean', ls='dashed', c='dimgray')
#ax.set_facecolor('red')

ax.set_title("Latent Features to RMSE", fontsize=25)
ax.set_ylabel("Root Mean Squared Error", fontsize=15)
ax.set_xlabel("# Latent Features", fontsize=15)
ax.legend(fontsize=15)
fig.savefig('/home/ubuntu/PROJECT/github-collaborator/matplots/RMSE2.png');

### playing with imputing random 'predictions' instead of actual predictions

In [None]:
np.random.choice([2.0, 4.0, 6.0])

In [None]:
fakedf = pd.DataFrame({'actual': [2.0, 2.0, 6.0, 4.0]})
fakedf['prediciton'] = pd.Series([np.random.choice([2.0, 4.0, 6.0]) for _ in range(fakedf.shape[0])])
fakedf

In [None]:
fakemean = np.mean(fakedf.actual)
fakemean

In [None]:
fakedf['prediciton'] = fakemean
fakedf

# Final model performance

In [None]:
sp_test.printSchema()

In [34]:
final_als = ALS(
    itemCol='item',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=0.1,
    rank=50,
    maxIter=15
    ) 
final_model = final_als.fit(sp_entire)

In [None]:
fin_rmses, fin_ranks = evaluate_models([final_model], sp_test, metric='rmse')

In [None]:
"Final model has RMSE of : {:.4f}".format(fin_rmses[0])

# Versus predicting random rank for all (2, 4, or 6)

In [None]:
import numpy as np

# Using random choice of 2, 4, or 6
predictions_random = alsModel.transform(sp_test)

pred_df = predictions_random.toPandas()
pred_df = pred_df.dropna(axis=0)
pred_df['prediction'] = pd.Series([np.random.choice([2.0, 4.0, 6.0]) for _ in range(pred_df.shape[0])])

rawPredictions = spark.createDataFrame(pred_df.dropna(axis=0))

predictions = rawPredictions\
.withColumn("rating", rawPredictions.rating.cast("double"))\
.withColumn("prediction", rawPredictions.prediction.cast("double"))

evaluator =\
RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rand_rmse = evaluator.evaluate(predictions)

print "Model imputed with random rank values for predictions has RMSE of : {:.4f}".format(rand_rmse)

In [12]:
import numpy as np

# Using mean rank
predictions_random = recommender10.transform(sp_test)

pred_df = predictions_random.toPandas()
mean_rank = np.mean(pred_df.rating)
pred_df = pred_df.dropna(axis=0)
pred_df['prediction'] = mean_rank

rawPredictions = spark.createDataFrame(pred_df.dropna(axis=0))

predictions = rawPredictions\
.withColumn("rating", rawPredictions.rating.cast("double"))\
.withColumn("prediction", rawPredictions.prediction.cast("double"))

evaluator =\
RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
mean_rmse = evaluator.evaluate(predictions)

print "Model imputed with random rank values for predictions has RMSE of : {:.4f}".format(mean_rmse)

Model imputed with random rank values for predictions has RMSE of : 0.6152


# Save Final Model

In [None]:
final_model.save("/home/ubuntu/PROJECT/github-collaborator/data/models/finalModel")

# GridSearch

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
als = ALS(nonnegative=True)

paramGrid = ParamGridBuilder() \
    .addGrid(als.maxIter, [15, 30]) \
    .addGrid(als.regParam, [1, 0.1, 0.01, 5]) \
    .addGrid(als.rank, [10,15,20,25,30,35,40]) \
    .build()
    
crossval = CrossValidator(estimator=als,
                      estimatorParamMaps=paramGrid,
                      evaluator=RegressionEvaluator(
                          metricName="rmse", 
                          labelCol="rating"),
                      numFolds=7)

# Run cross-validation, and choose the best set of parameters.
alsModel = crossval.fit(sp_train)
alsModel.bestModel.save('/home/ubuntu/PROJECT/github-collaborator/data/models/CVmodel')

In [None]:
alsModel = ALSModel.load('/home/ubuntu/PROJECT/github-collaborator/data/models/CVmodel')
alsModel.

In [None]:
cvrmses, cvranks = evaluate_models([alsModel.bestModel], sp_test, metric='rmse')

In [None]:
cvrmses

In [None]:
cvranks

# Using log of number of commits per repo

In [3]:
entirecommdata = pd.read_csv('data/new_subset_data/entire_commits.csv')
entirecommdata.head()

Unnamed: 0,user_id,repo_id,log10_commits
0,1,1,0.954243
1,1,51746271,1.0
2,1,56271530,1.176091
3,1,63915386,1.146128
4,2,1,1.681241


In [4]:
sp_entire_commits = spark.createDataFrame(entirecommdata)

In [5]:
#Change column names to something Spark has hardcoded into their GridSearch feature for dataframes
oldColumns = sp_entire_commits.schema.names
newColumns = ["user", "item", "rating"]

sp_entire_commits = reduce(lambda sp_entire_commits, idx: sp_entire_commits.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), sp_entire_commits)
sp_entire_commits.printSchema()
sp_entire_commits.show()

root
 |-- user: long (nullable = true)
 |-- item: long (nullable = true)
 |-- rating: double (nullable = true)

+----+--------+------------------+
|user|    item|            rating|
+----+--------+------------------+
|   1|       1|    0.954242509439|
|   1|51746271|               1.0|
|   1|56271530|1.1760912590600001|
|   1|63915386|1.1461280356799999|
|   2|       1|     1.68124123738|
|   2|       2|1.7634279935599997|
|   2|   25532|     1.94939000664|
|   2|   28923|3.3398487830400003|
|   2|   49907|2.8864907251700003|
|   2|   76547|     1.27875360095|
|   2|  133800|     1.86332286012|
|   2|  169210|     2.18469143082|
|   2|  187332|1.5910646070299999|
|   2|  258837|     2.12710479836|
|   2|  398246|1.5440680443500001|
|   2|  504851|2.0453229787900002|
|   2|  525267|2.3344537511500003|
|   2|  623841|1.5682017240700001|
|   2|  638698|2.0334237554900003|
|   2|  650825|     1.90308998699|
+----+--------+------------------+
only showing top 20 rows



In [7]:
sp_train_commits, sp_test_commits, sp_val_commits = sp_entire_commits.randomSplit([0.6, 0.2, 0.2])

In [8]:
commits_als = ALS(
    itemCol='item',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=10.0,
    rank=55,
    maxIter=20,
    implicitPrefs=True,
    alpha=40.0
    ) 
commits_als = commits_als.fit(sp_train_commits)


In [None]:
small_commits_als = ALS(
    itemCol='item',
    userCol='user',
    ratingCol='rating',
    nonnegative=True,    
    regParam=10.0,
    rank=55,
    maxIter=20,
    implicitPrefs=True,
    alpha=40.0
    ) 
small_commits_als = commits_als.fit(sp_test_commits)


In [9]:
users_repos = pd.read_csv('data/user_repo_lookup.csv', sep='\t')
users_repos.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 59366945 entries, 0 to 59366944
Data columns (total 6 columns):
user_id        int64
login          object
repo_id        int64
forked_from    object
repo_name      object
url            object
dtypes: int64(2), object(4)
memory usage: 2.7+ GB


In [16]:
(users_repos['user_id'].loc[users_repos['login'] == 'caitriggs'])[:1]

7623079    3384558
Name: user_id, dtype: int64

In [22]:
def recommend_repos(model, lookupdf, username):
    user_id = int((lookupdf['user_id'].loc[lookupdf['login'] == username])[:1])
    prediction_df = pd.DataFrame(lookupdf['repo_id'])
    prediction_df['user'] = user_id
    prediction_df = prediction_df.rename(columns={'id':'item'})
    
    pred_df = spark.createDataFrame(prediction_df)
    predictions_user = model.transform(pred_df)
    return predictions_user.sort(desc("prediction")).show(10)

#toPandas().sort_values('prediction', ascending=False).head(10)

In [28]:
commits_als.itemFactors.show()

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.0, 0.0, 0.0, 0...|
| 20|[0.0, 0.0, 0.0, 0...|
| 30|[0.0, 0.0, 0.0, 0...|
| 40|[1.292633E-4, 0.0...|
| 80|[0.0, 0.0, 0.0, 0...|
| 90|[0.0, 0.0, 0.0, 0...|
|100|[0.03258656, 0.0,...|
|130|[0.0, 0.0, 0.0, 0...|
|160|[0.0, 0.0, 0.0, 0...|
|170|[0.0, 0.0, 0.0, 0...|
|190|[0.0, 0.002047706...|
|230|[0.0, 0.0, 0.0, 0...|
|240|[0.0, 0.0, 0.0, 0...|
|250|[0.0, 0.0, 1.9977...|
|280|[0.0, 0.0, 0.0, 0...|
|310|[0.0, 0.0, 0.0, 0...|
|340|[0.0, 0.0, 0.0, 0...|
|350|[0.0, 0.0, 0.0, 0...|
|370|[0.0, 0.0, 0.0, 1...|
|380|[0.0, 0.0, 0.0, 0...|
+---+--------------------+
only showing top 20 rows



In [23]:
cait_reco = recommend_repos(commits_als, users_repos, 'caitriggs')
cait_reco

KeyboardInterrupt: 

In [None]:
gavin_reco = recommend_repos(commits_als, users_repos, 'gavin-peterkin')
gavin_reco

## Recommend

In [35]:
final_model.save("/home/ubuntu/PROJECT/github-collaborator/data/models/finalModel_2")

In [36]:
Model = ALSModel.load("/home/ubuntu/PROJECT/github-collaborator/data/models/finalModel_2")

In [None]:
commitsModel = ALSModel.load("/home/ubuntu/PROJECT/github-collaborator/data/models/commitsModel")

In [None]:
users_repos = pd.read_csv('data/user_repo_lookup.csv', sep='\t')
users_repos.info()

In [None]:
int(users['id'].loc[users['login'] == 'caitriggs'])

In [39]:
def recommend_repos(model, lookupdf, username):
    user_id = int((lookupdf['user_id'].loc[lookupdf['login'] == username])[:1])
    prediction_df = pd.DataFrame(lookupdf['repo_id'])
    prediction_df['user'] = user_id
    prediction_df = prediction_df.rename(columns={'id':'item'})
    
    pred_df = spark.createDataFrame(prediction_df)
    predictions_user = model.transform(pred_df)
    return predictions_user.sort(desc("prediction")).show(10)
    

In [None]:
cait_reco = recommend_repos(Model, users_repos, 'caitriggs')
cait_reco

In [None]:
#Top recommendations from Model
users_repos[users_repos.repo_id == 2559614]

In [None]:
# Top recommendations from CVModel
repos[repos.id == 25260835]