#Big Data Coursework CSC8101

####Group_EX

In [2]:
sc

In [3]:
import pandas as pd
ratingsURL = 'https://csc8101storageblob.blob.core.windows.net/datablobcsc8101/ratings.csv'
ratings = spark.createDataFrame(pd.read_csv(ratingsURL))
ratings = ratings.drop("timestamp").persist()

Deleted timestamp column since it has no use for this coursework so we are going to work with the following schema:

|-- userId: integer (nullable = true)  
|-- movieId: integer (nullable = true)  
|-- rating: double (nullable = true)

##Task 1 : summary statistics from the ratings dataset

the user-item matrix has many missing values since not all the movies were rated by all the users.

In [7]:
display(ratings.select("userId", "movieId", "rating"))

userId,movieId,rating
1,2,3.5
1,29,3.5
1,32,3.5
1,47,3.5
1,50,3.5
1,112,3.5
1,151,4.0
1,223,4.0
1,253,4.0
1,260,4.0


(1) Average number of ratings per users

In [9]:
# To calculate the average number of ratings per user, the ratings dataframe is grouped by the userId and the entries per user counted. 
# In the next command the created dataframe with number of ratings per user is then used to calculate the average. 
from pyspark.sql.functions import avg
ratingsPerUser = ratings.select("userId", "rating").groupBy("userId").count()
ratingsPerUser.show(5)

In [10]:
ratingsPerUser.agg({"count": 'avg'}).show()

(2) Average number of ratings per movie

In [12]:
# Calculating the average number of ratings per movie is similarly done to the average number of ratings per user, only that the ratings dataframe is this time grouped by the movieId. 
ratingsPerMovie = ratings.select("movieId", "rating").groupBy("movieId").count()
ratingsPerMovie.show(5)

In [13]:
ratingsPerMovie.agg({"count": 'avg'}).show()

(3) Histogram shows the distribution of movie ratings per user

count : number of rating per user

Density represent frequency of "count"

users have not rated the majority of movies they watched

In [15]:
# Creating the two dataframes 'ratingsPerUser' and 'ratingsPerMovie' simplify the creation of a histogram showing the distribution of the ratings. 
display(ratings.select("userId", "rating").groupBy("userId").count())

userId,count
26,61
29,177
474,141
964,227
1677,154
1697,39
1806,205
1950,26
2040,35
2214,185


(4) Histogram shows the distribution of movie ratings per movie

count : number of rating per movie

Density represent frequency of "count" 

the majority of movies have not been rated by user they watched

In [17]:
display(ratingsPerMovie)

movieId,count
71530,1272
106002,662
29,8520
474,18836
1950,3449
2529,12702
3506,708
5409,160
5556,597
6721,919


(5) the user-item matrix

matrix has very many missing values since not all the movies were rated by all the users.

In [19]:
display(ratings.select("userId", "movieId", "rating"))

userId,movieId,rating
1,2,3.5
1,29,3.5
1,32,3.5
1,47,3.5
1,50,3.5
1,112,3.5
1,151,4.0
1,223,4.0
1,253,4.0
1,260,4.0


the average rating for each movie
The histogram below shows that most ratings fall between 3-4

In [21]:
display(ratings.select("movieId","rating").groupBy("movieId").mean())

movieId,avg(movieId),avg(rating)
29,29.0,3.952230046948357
2529,2529.0,3.620414108014486
474,474.0,3.732506901677639
45726,45726.0,2.8117010816125863
60756,60756.0,3.3851397409679618
71530,71530.0,3.242138364779874
3764,3764.0,2.6955420466058766
1950,1950.0,4.073789504204117
3091,3091.0,4.054763690922731
106100,106100.0,3.896078431372549


##Task 2: recommendation model

(1) training a recommender model using the ALS algorithm to predict the rating

and measuring the perfomenance, using a RMSE performance metric. RMSE is a measure of how accurately the model predicts the response

In [24]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from time import time

(trainingData, testData) = ratings.randomSplit([0.75, 0.25])
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True, coldStartStrategy="drop", implicitPrefs=False)

model = als.fit(trainingData)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(testData)
evaluator = RegressionEvaluator(metricName="rmse", predictionCol="prediction", labelCol="rating")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

(2) using a GridSearch strategy, we have tuned 2 parameters ( the rank and regParam ) hyperparameters of the model with 3 fold cross validation

each with 3 diffirent values and that took 48.64 ​minutes as the algorithm checked 9 combinations

In [26]:
grid = ParamGridBuilder().addGrid(als.rank, [10,25,40]).addGrid(als.regParam, [.01, .1, .05]).build()

validator = CrossValidator(estimator=als, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5)
start = time()
cvModel = validator.fit(trainingData)
print("Grid Search took %.2f seconds " % (time() - start))

bestModel = cvModel.bestModel
print("Best model with parameters>> ","rank: ", bestModel.rank, "\nregParam: ", bestModel._java_obj.parent().getRegParam())

predictions = bestModel.transform(testData)
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

(3) check model overfitting issues

First we find the RMSE returned for all combinations

then we find the average of all the accuracies

after that we find the standard deviation of the data to see degree of variance in the results obtained by the model

the standard deviation is extremely low, which means that the model has a very low variance,

The model also perform roughly similar on test sets with Root-mean-square error = 0.8203030304627332 

So that there is no overfiting

In [28]:
import statistics
print("All RMSE for all combinations :",cvModel.avgMetrics,"\n")
list(cvModel.avgMetrics)
print("mean : ",statistics.mean(list(cvModel.avgMetrics)),"\n")
print("Standard Divation :", statistics.stdev(list(cvModel.avgMetrics)))


(4) smaller size of parameter grid

using a GridSearch strategy, we have tuned 2 parameters ( the rank and regParam ) each with 2 diffirent values  with 3 fold cross validation

This took 25.5 minutes as the algorithm checked 4 combinations

that is faster by roughly 75% than with 9 combinations

In [30]:
grid = ParamGridBuilder().addGrid(als.rank, [20,45]).addGrid(als.regParam, [.01, .05]).build()

validator = CrossValidator(estimator=als, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5)
start = time()
cvModel = validator.fit(trainingData)
print("Grid Search took %.2f seconds " % (time() - start))

bestModel = cvModel.bestModel
print("rank: ", bestModel.rank, "\nregParam: ", bestModel._java_obj.parent().getRegParam())

predictions = bestModel.transform(testData)
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

(5) tuning one parameter

We can tune one parameter but to improve the performance, it is better to test values for other parameters and check the RMSE if it improves or not

Lower values of RMSE indicate better fit.

In [32]:
grid = ParamGridBuilder().addGrid(als.rank, [10,15,20]).build()

validator = CrossValidator(estimator=als, estimatorParamMaps=grid, evaluator=evaluator, numFolds=3)
start = time()
cvModel = validator.fit(trainingData)
print("Grid Search took %.2f seconds " % (time() - start))

bestModel = cvModel.bestModel
print("rank: ", bestModel.rank)

predictions = bestModel.transform(testData)
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

In conclusion, we observe better performance as we increase the rank but we also observe that the model does not scale well for rank roughly greater than 50. For such values the time computation is rather high with no significant improvement on RMSE. On the other hand, the golden ration for the regularization parameter is 0.05.

##Task 3 : Downsampling ratings so it includes 27,000 unique users

(1) creating a set of users and sampling 27,000

In [36]:
sampleUsers = ratings.select("userId").distinct().sample(False, 0.5).limit(27000)
sampleUsers.count()

(2) collecting all of their ratings from the ratings dataset.

In [38]:
from pyspark.sql.functions import when
ratings_small = ratings[ratings["userId"].isin(sampleUsers.toPandas()["userId"].tolist())]
ratings_small.count()

(3) Saving sample dataset as a parquet file

In [40]:
ratings_small.write.mode("overwrite").parquet("ratings-small.parquet")

##Task 4: user-user network

(1) join two copies of ratings dataset based on movieId so a row is a record of relationship between two different users rating same movie (including rows of users and theirselves)

In [43]:
ratings_small = spark.read.parquet("ratings-small.parquet")
df1 = ratings_small.select("movieId", "userId")
df2 = ratings_small.select("movieId", "userId").withColumnRenamed("movieId", "movieId2").withColumnRenamed("userId", "userId2")
UserUserMap = df1.join(df2, df1.movieId == df2.movieId2)

(2) select only userId columns. first column represents user1 second column represents user2 

 each user is represented by a node in the graph

In [45]:
UserUserMap = UserUserMap.select("userId", "userId2")
UserUserMap.show(5)

In [46]:
UserUserMap.count()

(3) count is  the weight of the edge that represent the number of the same movies that have both users rated

In [48]:
from pyspark.sql.functions import col, sum
edgesWithDuplicates = UserUserMap.groupBy(UserUserMap.columns).count().filter(UserUserMap.userId != UserUserMap.userId2)
edgesWithDuplicates.orderBy("count", ascending=False).show(5)

Set User 1 to the smaller id and user2 to the larger id in order to remove mirrored edges. 

Since the graph we are working with is undirected this is not necessary.

In [50]:
#Set User 1 to the smaller id and user2 to the larger id in order to remove mirrorred edges
#edgesRDD = edgesWithDuplicates.select("userId", "userId2", "count").rdd.map(lambda x: (min(x["userId"], x["userId2"]), max(x["userId"], x["userId2"]), x["count"]))

As we are using an undirected graph, we do not necessarily need to remove the mirrored edges, hence we can continue using the mirrored edges

In [52]:
edges = edgesWithDuplicates.withColumnRenamed("userId", "src").withColumnRenamed("userId2", "dst")

Calculate the average and standard deviation

In [54]:
average = edges.agg({"count": 'avg'})
average.show()

In [55]:
from pyspark.sql.functions import stddev
std = edges.select(stddev(col("count")).alias("std")).collect()
print(std[0]["std"])

We use (average + (4 * standard deviation)) as our threshold to drop edges, to create a smaller graph, that can be processed in a decent amount of time.

In [57]:
threshold = int(round(average.collect()[0]["avg(count)"] + (4 * std[0]["std"])))
print(threshold) #107

filter the dataset based on the threshold

In [59]:
edgesFiltered= edges.filter("count>" + str(threshold))
edgesFiltered.count()

In [60]:
verticesUnfiltered = ratings_small.select("userId").distinct().withColumnRenamed("userId", "id")

In [61]:
verticesUnfiltered.count()

(5) Generate a user-user network

In [63]:
from graphframes import *
graph = GraphFrame(verticesUnfiltered, edgesFiltered)
display(graph.edges)

src,dst,count
51874,50113,311
51874,28225,157
51874,103668,354
51874,59873,458
52996,20285,266
53837,116317,196
54188,12044,291
54840,13244,272
55251,122035,485
55251,91020,334


##Task 5 : the connected components of the graph

(1) calculate the connected components of the graph

In [66]:
sc.setCheckpointDir("dbfs:/tmp/groupEX/checkpoints")
connectedComponents = graph.connectedComponents()
display(connectedComponents.orderBy("component", ascending = False))

id,component
138478,138478
138473,138473
138470,138470
138460,138460
138452,138452
138451,138451
138442,138442
138438,138438
138434,138434
138426,138426


(2) GroupBy the component to get all nodes within the component and find components that containing the largest number of nodes

* First Result with Threshold 15 (average rounded down) - 26985 in one Component
* Second Result with Threshold 44 (avg + 1 * standard deviation) - 17454 in one Component, else singles
* Third Result with Threshold 72 (avg + 2 * standard deviation) - 12643in one Component, else singles

In [69]:
connectedComponents.groupBy("component").count().agg({"count": 'max'}).show()

In [70]:
sortedCc = connectedComponents.groupBy("component").count().orderBy("count", ascending=False)
maxCC = sortedCc.take(1)[0][0]
print(maxCC)

(3) generate a representation of the subgraph containing the largest component,

In [72]:
subGraphMax = graph.filterEdges("count>1").filterVertices(graph.vertices["id"].isin(connectedComponents.filter("component="+str(maxCC)).toPandas()["id"].tolist())).dropIsolatedVertices()
display(subGraphMax.vertices)

id
29
964
1677
1806
2529
4823
8440
9945
10156
11190


In [73]:
display(subGraphMax.edges)

src,dst,count
16938,586,885
16938,49022,298
18628,84476,960
20249,82633,141
20249,120575,254
20285,101275,467
20285,18628,450
20304,58650,354
20304,43970,158
21408,123334,280


(4) save the subgraph

In [75]:
subGraphMax.edges.write.mode("overwrite").parquet("subGraphEdges.parquet")
subGraphMax.vertices.write.mode("overwrite").parquet("subGraphVertices.parquet")

In [76]:
subGraphMax.edges.count()

##Task 6 
###Newman Girvan parallel implementation with MapReduce

In [78]:
vertices = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)], ["id", "name", "age"])

edges = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "a", "friend"),
  ("a", "c", "friend"),  
  ("c", "a", "friend"),
  ("b", "c", "friend"),
  ("c", "b", "friend"),
  ("b", "d", "friend"),
  ("d", "b", "friend"),
  ("d", "e", "friend"),
  ("e", "d", "friend"),
  ("d", "g", "friend"),
  ("g", "d", "friend"),
  ("e", "f", "friend"),
  ("f", "e", "friend"),
  ("g", "f", "friend"),
  ("f", "g", "friend"),
  ("d", "f", "friend"),
  ("f", "d", "friend")
], ["src", "dst", "relationship"])
vertices = vertices.drop("name","age")

vertices = vertices.drop("name","age")
edges= edges.drop('relationship')

1- finding shortest paths

In [80]:
e=edges.rdd.map(lambda  x:(x[0],x[1]))
adjList=e.groupByKey().map(lambda x : (x[0], list(x[1])))
adjDict = adjList.collectAsMap() # make a adjacency dictionary of lists

# make a dictionary of key-sets values.
adjDictSet = {}
for k in adjDict:
  adjDictSet[k] = set(adjDict[k])
# broadcast it so every worker can read it since no write operation will be performed
adjBroad = sc.broadcast(adjDictSet)
def getAdjOf(letter):
    return adjBroad.value[letter] 

In [81]:
def traverseNode(key,val):
  """
  k = (currentId,sourceId)
  v = (currentId,[sourceId,distance,visited,denominator,pathList])
           currentId   [the nodeId that we are currently traversing]
  arr[0] = sourceId    [the nodeId from which BFS has started]
  arr[1] = distance    [int | Distance between targetId and sourceId]
  arr[2] = visited     [boolean| False if this node has not been expanded otherwise true]9
  arr[3] = pathSum     [int | Number of shortest paths from sourceId to currentId]
  arr[4] = pathList    [List| list of visited nodes to reach k node]
  """
  k = val[0]
  v = val[1]
  src = val[1][0]
  returnRows = []
  if (v[2] == False):
    # set node to visited
    v[2] = True
    # append current visited Node to pathList
    v[4].append(k)
    # emit Row
    returnRows.append((key,val))
    
    # Get the nodes that are k's neighbors but have not been visited before
    for a in (getAdjOf(k) - set(v[4])):
      # emit each new path that can be discoved by visiting each Neighbor
      returnRows.append(((a,src),(a,[v[0],v[1] + 1,False,v[3],v[4].copy()])))
  else:
    # do nothing - emit tuple
    returnRows.append((key,val))
  return (returnRows)

def getLowestDistance(x,y):
  """
  Return the pair with the minimum pathSum thus returning the shortest Path
  If two pairs have the same pathSum merge their pathList and add one
  to the pathSum 
  """
  if (x[1][1] == y[1][1]):
    # no need for deepcopy here we are not going to change it later
    listToCopy = []
    if isinstance(x[1][4][1], list):
      for sublist in x[1][4]:
          listToCopy += [sublist]
      listToCopy += [y[1][4]]
    else:
      listToCopy = [x[1][4]] + [y[1][4]]
    return ((x[0],[x[1][0],x[1][1],x[1][2],x[1][3] + 1,listToCopy]))
  if (x[1][1] > y[1][1]):
    return y
  else:
    return x

In [82]:
# Perform BFS from every node of the graph.
# Each Iteration explores the graph an extra level till all nodes have been visited.
# Using flatMap because we generate new tuples
BFS = vertices.rdd.flatMap(lambda x:traverseNode((x[0],x[0]),(x[0],[x[0],0,False,1,[]])))
# loop until all nodes are visited
while(BFS.filter(lambda a: a[1][1][2] == False).count() > 0):
  BFS = BFS.flatMap(lambda x:traverseNode(x[0],x[1])) 
# Find the shortest paths
BFS = BFS.reduceByKey(lambda x,y: getLowestDistance(x,y))

2- Calculating Edge Betweenness

In [84]:
def calculateBetwenness(v):
  """
  For each edge calculate its betwenness for the current shortest path (v)
  """
  returnRows = []
  if v[3] > 1: # if more than one available shortest paths exist
    for x in v[4]: # iterate each path in the pathList
      for c,elem in enumerate(x):
        if (c == len(x) - 1):
          break
        # for each edge in the shortest path add its correlating weight.
        # Each edge weight is the result of dividing one by the number of shortest paths
        # that can be explored through that edge
        nextElem = x[c+1]
        # Sort the tuple/key so reduceByKey calculates edgeBetweenness faster
        if (nextElem < elem):
          returnRows.append(((nextElem,elem),1/v[3]))
        else:
          returnRows.append(((elem,nextElem),1/v[3]))
  else:
    for c,y in enumerate(v[4]):
      if (c == len(v[4]) - 1):
        break
      nextElem = v[4][c+1]
      if (nextElem < y):
        returnRows.append(((nextElem,y),1))
      else:
        returnRows.append(((y,nextElem),1))
  return (returnRows)

In [85]:
edgeValues = BFS.flatMap(lambda x: calculateBetwenness(x[1][1])).reduceByKey(lambda x,y:x+y).map(lambda x:(x[0],x[1]/2))
# Edge betweenness of the graph
edgeValues.collect()

3- Selecting the Edges to be Removed

In [87]:
betwenness_values = edgeValues.map(lambda x:x[1])
import statistics
maxBetwennessToDrop = max(betwenness_values.collect()) - 2.5*statistics.stdev(betwenness_values.collect())
edgesToDrop = edgeValues.filter(lambda x: x[1] >= maxBetwennessToDrop).map(lambda x:x[0]).collect()
edgesToDrop

4- Removing the Edges

In [89]:
# deletes edges from adjacency list
def deleteEdgesInAdj(listOfEdgesToDrop):
  for x in listOfEdgesToDrop:
    if x[0] in adjDict:
      adjDict[x[0]].remove(x[1])
    if x[1] in adjDict:
      adjDict[x[1]].remove(x[0])
      
def deleteEdges(edge,listOfEdges):
  if (edge in listOfEdges or (edge[1],edge[0]) in listOfEdges):
    return False
  return True   

newEdges = edges
newEdges.rdd.filter(lambda x: deleteEdges(x,edgesToDrop)).collect()

Create new graph after removing edges in Girvan Newman algorithm

In [91]:
newGraph = GraphFrame(vertices, newEdges)
newConnectedComponents = newGraph.connectedComponents()
newConnectedComponents = newConnectedComponents.groupBy("component").count().orderBy("count", ascending=False)
# create new communities
numOfCommunities = newGraph.labelPropagation(maxIter=5)
numOfCommunities.select("id", "label").show()

In [92]:
# display the number of different communities
numOfCommunities.select("label").distinct().count()

In [93]:
display(newGraph.edges)

src,dst
a,b
b,a
a,c
c,a
b,c
c,b
b,d
d,b
d,e
e,d
