# task1

In [2]:
import pandas as pd
ratingsURL = 'https://csc8101storageblob.blob.core.windows.net/datablobcsc8101/ratings.csv'
ratings = spark.createDataFrame(pd.read_csv(ratingsURL))

In [3]:
#calculate average number of ratings per users
from pyspark.sql.functions import avg
display(ratings.select("userId","rating").groupBy("userId").agg(avg("rating")).orderBy("userId"),asscending = True)

userId,avg(rating)
1,3.742857142857143
2,4.0
3,4.122994652406417
4,3.571428571428572
5,4.2727272727272725
6,3.75
7,3.289855072463768
8,3.8
9,3.057142857142857
10,3.8947368421052633


In [4]:
#calculate average number of ratings per movies
display(ratings.select("movieId","rating").groupBy("movieId").agg(avg("rating")).orderBy("movieId"),asscending = True)

movieId,avg(rating)
1,3.921239561324077
2,3.2119768016904198
3,3.1510404397330194
4,2.8613933236574747
5,3.064591727653976
6,3.834930331813047
7,3.3664840675873777
8,3.142049469964664
9,3.0049242424242424
10,3.430029305292191


# task2

In [6]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

# lines = spark.read.text("https://csc8101storageblob.blob.core.windows.net/datablobcsc8101/ratings.csv").rdd
# parts = lines.map(lambda row: row.value.split("::"))
# ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
#                                      rating=float(p[2]), timestamp=long(p[3])))
# ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(rank=10, maxIter=5, regParam=0.08, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit

paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [10,15]) \
    .addGrid(als.regParam, [0.06, 0.07, 0.08]) \
    .build()

crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

prediction = cvModel.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
selected = prediction.select("userId", "rating", "prediction")
for row in selected.collect():
    print(row)

In [8]:
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=als,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)
# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(training)

prediction = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
model.transform(test)\
    .select("userId", "rating", "prediction")\
    .show()

# task3

In [10]:
#get 27000 random userId
import random
UserSet = ratings.select("userId").distinct().rdd.map(lambda row : row.userId).collect()
UserSet2 = random.sample(UserSet, 27000)
print(UserSet2)

In [11]:
#select the data which include userSet
UserRating = ratings.filter(ratings.userId.isin(UserSet2))
display(ratings.filter(ratings.userId.isin(UserSet2)))

userId,movieId,rating,timestamp
1,2,3.5,1112486027
1,29,3.5,1112484676
1,32,3.5,1112484819
1,47,3.5,1112484727
1,50,3.5,1112484580
1,112,3.5,1094785740
1,151,4.0,1094785734
1,223,4.0,1112485573
1,253,4.0,1112484940
1,260,4.0,1112484826


In [12]:
#%fs rm -r "ratings-small.parquet"
UserRating.write.mode('overwrite').parquet("ratings-small.parquet")

In [13]:
UserRating.describe()

# task4

In [15]:
allUserRating = spark.read.parquet("ratings-small.parquet")
allUserRating1 = allUserRating.selectExpr("userId as userId1", "movieId as movieId1", "rating as rating1")
allUserRating2 = allUserRating.selectExpr("userId as userId2", "movieId as movieId2", "rating as rating2")
allUserRating1.count()

In [16]:
#join two dataframe with same key-movieId and count the same movies that have both rated by users.
from functools import reduce
from pyspark.sql.functions import col, lit, when
from graphframes import *
allUserRatingJoin = allUserRating1.select("userId1", "movieId1", "rating1").join(allUserRating2.select("userId2", "movieId2", "rating2"), allUserRating1.movieId1 == allUserRating2.movieId2)
allUserRatingFilter = allUserRatingJoin.filter(allUserRatingJoin.userId1 != allUserRatingJoin.userId2)
allUserRatingPairs = allUserRatingFilter.groupBy("userId1", "userId2").count()
allUserRatingPairs.show()

In [17]:
# calculate squared deviation and mean
from pyspark.sql.functions import col, udf, mean as _mean, stddev as _stddev
rating_stats = allUserRatingPairs.select(_mean(col('count')).alias('mean'),_stddev(col('count')).alias('std')).collect()

mean = rating_stats[0]['mean']
std = rating_stats[0]['std']

print([mean, std])

In [18]:
#select data from UserRatingPairs which count > threshold 
threshold = mean + std
selectUserRating = allUserRatingPairs.filter(allUserRatingPairs['count'] > threshold)
#selectUserRating.count()

In [19]:
#create graphframe
vertices = allUserRating.select("userId").distinct().withColumnRenamed("userId", "id")
edges = selectUserRating.select("userId1", "userId2", "count").withColumnRenamed("userId1", "src").withColumnRenamed("userId2", "dst").withColumnRenamed("count", "weight")
g = GraphFrame(vertices, edges)
print(g)

In [20]:
display(g.vertices)

id
52001
52051
52743
54349
57157
57651
60756
60882
61135
61749


In [21]:
display(g.edges)

src,dst,weight
69481,63046,271
69481,105871,101
69481,80920,487
69481,35609,261
69481,35669,201
69738,89307,52
69793,59025,229
69793,110211,162
69793,78526,47
69793,84195,124


In [22]:
largest = g.edges.groupBy().max("weight")
display(largest)

In [23]:
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")
result = g.connectedComponents()
display(result)

id,component
17499,1
18295,1
18628,1
18730,1
19141,19141
19158,1
20532,20532
21899,1
21965,21965
22129,1


#task 5

In [25]:
result.groupBy("component").count().orderBy(["count"],ascending=False).show()

In [26]:
g.vertices.write.mode('overwrite').parquet("vertices.parquet")
g.edges.write.mode('overwrite').parquet("edges.parquet")

# task 6

In [28]:
vertices = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)], ["id", "name", "age"])

edges = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "a", "friend"),
  ("a", "c", "friend"),  
  ("c", "a", "friend"),
  ("b", "c", "friend"),
  ("c", "b", "friend"),
  ("b", "d", "friend"),
  ("d", "b", "friend"),
  ("d", "e", "friend"),
  ("e", "d", "friend"),
  ("d", "g", "friend"),
  ("g", "d", "friend"),
  ("e", "f", "friend"),
  ("f", "e", "friend"),
  ("g", "f", "friend"),
  ("f", "g", "friend"),
  ("d", "f", "friend"),
  ("f", "d", "friend")
], ["src", "dst", "relationship"])

from graphframes import *
g2 = GraphFrame(vertices, edges)

In [29]:
#get the neighbors list
vertice = g2.vertices.rdd.flatMap(lambda x : x.id).collect()
edge = g2.edges.rdd.map(lambda x : (x.src, x.dst)).collect()
verticeRDD = sc.parallelize(vertice).map(lambda x : (x, ''))
edgeRDD = sc.parallelize(edge)
joinRDD = verticeRDD.join(edgeRDD).map(lambda x : (x[0], x[1][1])).groupByKey()
neigh_list = joinRDD.map(lambda x : (x[0], list(x[1]))).collect()
print(vertice)
print(edge)
print(neigh_list)

In [30]:
#get the neighbors of each node
def find_neighbors(node, neighlist):
  neighbors = []
  for i in range(len(neighlist)):
    if node in neighlist[i]:
      neighbors = neighlist[i][1]
  return neighbors

In [31]:
#calculate the level of the graph 
#start with the first node, the level of first node is 0, the level of neighbors of the first node is 1...
def bfs(neighlist, start):
  queue = [start]
  levels = {}
  levels[start] = 0
  visited = [start]    # to avoid inserting the same node twice into the queue
  # keep looping until there are nodes still to be checked
  while queue:
    # pop first node from queue
    node = queue.pop(0)
    neighbors_node = find_neighbors(node,neighlist)
    for neighbor in neighbors_node:
      if neighbor not in visited:
        queue.append(neighbor)
        visited.append(neighbor)
        levels[neighbor] = levels[node] + 1
  return levels

In [32]:
#calculate the shortest path 
#when a path which is shortest reached from the first node, the number of shortest path of this node + 1
def shortest_path(neighlist, bfs_d):
  shortpath = {x:0 for x in bfs_d}
  upper_nodes = []
  for key in bfs_d:
    if bfs_d[key] == 0:
      shortpath[key] = 1
    else:
      neighbors = find_neighbors(key,neighlist)
      level = bfs_d[key]
      #get the neighbors of upper level
      upper_nodes = [k for k in bfs_d if (bfs_d[k] == level -1)]
      for node in upper_nodes:
        if node in neighbors:
          #calculate the number of shortest path
          shortpath[key] = shortpath[key] + shortpath[node]
  return shortpath

In [33]:
#calculate the betweeness of each node 
def calculate_betweenness(neighlist, bfs_d, sp):
  bfs_d_pairs = {n : [k for k in bfs_d.keys() if bfs_d[k] == n] for n in set(bfs_d.values())}
  bfs_d_pairs = sorted(bfs_d_pairs.items(), reverse = True)
  #the initial value of each node is 1
  betweenness_dict = {x : 1 for x in bfs_d}
  edges = []
  upper_nodes = []
  parents = []
  short_path = {}
  for element in bfs_d_pairs:
    if element[0] != 0:
      nodes = element[1]
      level = element[0]
      for node in nodes:
        for e in edges:
          pair = e[0]
          credit = e[1]
          if node in pair:
            betweenness_dict[node] = betweenness_dict[node] + credit
        neighbors = find_neighbors(node, neighlist)
        upper_nodes = [key for key in bfs_d if (bfs_d[key] == level - 1)]
        parents = [x for x in upper_nodes if x in neighbors]
        short_path = {k:sp[k] for k in parents if k in sp}
        credit_share = betweenness_dict[node] / (sum(short_path.values()))
        for key in short_path:
          betweeness = credit_share * short_path[key]
          edges.append(([node, key], betweeness))
  return edges

In [34]:
def GN(neighlist, start):
  bfs_d = bfs(neighlist, start)
  shortp = shortest_path(neighlist, bfs_d)
  betweenness = calculate_betweenness(neighlist, bfs_d, shortp)
  return betweenness

In [35]:
betweeness = g2.vertices.rdd.flatMap(lambda x : GN(neigh_list, x.id)).map(lambda x : (tuple(sorted(x[0])), x[1]))
betweeness.reduceByKey(lambda x,y : (x+y)).mapValues(lambda x : x/2).sortBy((lambda x : x[1])).collect()

In [36]:
GN(neigh_list, 'g')

In [37]:
vertices2 = spark.read.parquet("vertices.parquet")
edges2 = spark.read.parquet("edges.parquet")

In [38]:
#get the neighbors list
vertice2 = vertices2.rdd.flatMap(lambda x : x).collect()
edge2 = edges2.rdd.map(lambda x : (x.src, x.dst)).collect()
verticeRDD2 = sc.parallelize(vertice2).map(lambda x : (x, ''))
edgeRDD2 = sc.parallelize(edge2)
joinRDD2 = verticeRDD2.join(edgeRDD2).map(lambda x : (x[0], x[1][1])).groupByKey()
neigh_list2 = joinRDD2.map(lambda x : (x[0], list(x[1]))).collect()
print(vertice2)
print(edge2)
print(neigh_list2)

In [39]:
betweeness2 = vertices2.rdd.flatMap(lambda x : GN(neigh_list2, x.id)).map(lambda x : (tuple(sorted(x[0])), x[1]))
betweeness2.reduceByKey(lambda x,y : (x+y)).mapValues(lambda x : x/2).sortBy((lambda x : x[1])).collect()