# ALS [Link Spark](https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html)

In [1]:
# Import dependencies
from __future__ import print_function

import sys
if sys.version >= '3':
    long = int
    
import pandas as pd
from pyspark.sql import SparkSession

# PySpark ML Imports
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

# SparkContext
from pyspark import SparkContext

# PySpark ML Imports

from pprint import pprint

In [2]:
if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("ALSExample")\
        .getOrCreate()

In [3]:
# Load data
df = pd.DataFrame()
df = pd.read_csv('TopStaredRepositories.csv')

In [4]:
# Remove columns not needed
#df.set_index(['Repository Name'])
COLUMNS_TO_REMOVE_LIST = ['Description',
                          'Last Update Date', 'Language', 
                          'Tags', 'Url','Gravatar' ,'Unnamed: 0']



for column in COLUMNS_TO_REMOVE_LIST :
    try:
        del df[column]
    except Exception:
        pass
print(df.head(2))

       Username Repository Name Number of Stars
0  freeCodeCamp    freeCodeCamp            290k
1          twbs       bootstrap            112k


In [5]:
# DF User
def get_metadata(df, key, val):
    #create a new column with index 
    df['index'] = df.index
    if key == "Username":
        return {str(row[key]): row[val] for _, row in df.iterrows()}
    else:
        return {row[key]: row[val] for _, row in df.iterrows()}

# embd2idx is a mapping. type(embd2idx) = dictionary    
emb2idxU = get_metadata(df, "index", "Username")

# print(emb2idxU)
len(emb2idxU)


980

In [6]:
# DF User
# Create DataFrame from 0...n
d = dict()
size = len(emb2idxU)
for x in range(0,size):
    d[x]=x
# print(d)

# Convert dict to DataFrame
dfU = pd.DataFrame.from_dict(d,orient='index')
# type(dfU)

In [7]:
# DF Repo
def get_metadata(df, key, val):
    #create a new column with index 
    df['index'] = df.index
    if key == "Repository Name":
        return {str(row[key]): row[val] for _, row in df.iterrows()}
    else:
        return {row[key]: row[val] for _, row in df.iterrows()}

# embd2idx is a mapping. type(embd2idx) = dictionary    
emb2idxR = get_metadata(df, "index", "Repository Name")

# print(emb2idx)
len(emb2idxR)

980

In [8]:
# DF Repo
# Create DataFrame from 0...n
d = dict()
size = len(emb2idxR)
for x in range(0,size):
    d[x]=x
# print(d)

# Convert dict to DataFrame
dfR = pd.DataFrame.from_dict(d,orient='index')
type(dfR)


pandas.core.frame.DataFrame

In [9]:
df1 = df['Repository Name'].map(lambda x: x) # pandas.core.series.Series
df1 = df1.to_frame()                         # pandas.core.frame.DataFrame
df2 = df['Number of Stars'].map(lambda x: x.rstrip('k'))
df2 = df2.to_frame()

In [10]:
# df1.set_index(['Repository Name'])
df2.set_index(['Number of Stars'])
type(df2)

pandas.core.frame.DataFrame

In [11]:
#Combine the three data frames
# fdf = User, Repo, of stars
fdf = pd.concat([dfU,dfR, df2], axis=1, ignore_index=False,sort = False) #Sort Warning

#print(fdf)

In [12]:
fdf.to_csv('Preprocessing.csv')

In [13]:
# Remove top Row containing Coloumn names
with open("Preprocessing.csv",'r+') as f:
    with open("Dataset.csv",'w') as f1:
        f.readline() # skip header line
        for line in f:
            f1.write(line)


In [14]:
df = pd.read_csv("Dataset.csv")

df.head(2)

Unnamed: 0,0,0.1,0.2,290
0,1,1,1,112.0
1,2,2,2,87.8


In [23]:
lines = spark.read.text("Dataset.csv").rdd
print(lines.take(2))

[Row(value='0,0,0,290'), Row(value='1,1,1,112')]


In [16]:
# Need to convert p[1] from str to int
parts = lines.map(lambda row: row.value.split(","))
print(parts.take(2))
type(parts)

[['0', '0', '0', '290'], ['1', '1', '1', '112']]


pyspark.rdd.PipelinedRDD

In [17]:
# RDD mapped as int and float from Dataset

ratingsRDD = parts.map(lambda p: Row(userId=int(p[1]),
                                     repoId=int(p[2]),
                                     repoCount=float(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
type(ratings) #pyspark.sql.dataframe.DataFrame
# print(ratings.head(10))

pyspark.sql.dataframe.DataFrame

In [18]:
def parse(s):
    x, y, z  = s.split(",")
    return Rating(int(x), int(y), float(z))
SparkContext.stop()
sc = SparkContext()
ratings = (sc.textFile("Dataset.csv")
  .map(parse)
  .union(sc.parallelize([Rating(1, 5, 4.0)])))

model = ALS.train(ratings, 10, 10)


TypeError: stop() missing 1 required positional argument: 'self'

In [19]:
(training, test) = ratings.randomSplit([0.8, 0.2])

In [20]:
# Build the recommendation model using ALS on the training data
# Cold start strategy is set to '"drop" to make sure there is 
# no NaN evaluation metrics which would result in error.
als = ALS(maxIter=5, regParam=0.01, userCol="userId", 
                                    itemCol="repoId", 
                                    ratingCol="repoCount"
        ,coldStartStrategy="drop") #Cold-start is set to DROP
model = als.fit(training)

#model.show(2)

#type(als)
#type(model)


In [21]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)


predictions.show(3)

+---------+------+------+----------+
|repoCount|repoId|userId|prediction|
+---------+------+------+----------+
+---------+------+------+----------+



In [None]:
evaluator = RegressionEvaluator(metricName="rmse", 
                                labelCol="repoCount", 
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))