In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark

[K     |████████████████████████████████| 217.8MB 55kB/s 
[K     |████████████████████████████████| 204kB 63.8MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [0]:
import pyspark, os
from pyspark import SparkConf, SparkContext
os.environ["PYSPARK_PYTHON"]="python3"
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64/"

#connects our python driver to a local Spark JVM running on the Google Colab server virtual machine
try:
  conf = SparkConf().setMaster("local[*]").set("spark.executor.memory", "1g")
  sc = SparkContext(conf = conf)
except ValueError:
  #it's ok if the server is already started
  pass


#also include this short helper function for use later in this lab
def dbg(x):
  """ A helper function to print debugging information on RDDs """
  if isinstance(x, pyspark.RDD):
    print([(t[0], list(t[1]) if 
            isinstance(t[1], pyspark.resultiterable.ResultIterable) else t[1])
           if isinstance(t, tuple) else t
           for t in x.take(100)])
  else:
    print(x)


#Queston 1a

In [0]:
from math import sqrt

def dot_product(vector1,vector2):
  """caculate the dot product of two given vectors """
  result = [] 
  for i in range(len(vector1)):
    result.append(vector1[i] * vector2[i])
  return sum(result)

    
def cosine_similarity(x,y):
  """works out the dot product by calling the dot product function and cubes them if you call it on themselves then sqrt to get length """
  return dot_product(x,y) / ( (sqrt(dot_product(x,x))) * (sqrt(dot_product(y,y)) ) )



In [0]:
print(cosine_similarity([1,2,-1],[2,1,1]))
print(cosine_similarity([1,2,-1],[1,2,1]))
print(cosine_similarity([1,2,1],[1,2,1]))
print(cosine_similarity([1, 1, 0, 1, 1, 0], [0, 0, 0, 0, 1, 1]))

0.5000000000000001
0.6666666666666667
1.0000000000000002
0.35355339059327373


#1b


In [0]:
test = "1 1 0 1 1 0 0 0 0 0 1 1 0 1 1 0 0 1 0 0 0 1 1 1 0 0 1 0 0 1 1 1 1 1 0 1 0 1 1 1 0 1 1 1 1 1 1 1 0 0 0 1 1 0 1 1 0 1 1 1 1 1 1 1 0 1 1 1 0 1 1 1 1 1 0 0 1 1 1 0 0 1 1 1 1 1 0 1 1 0 1 1 0 0 0 0 1 1 1 1 1 1 0 1 1 1 1 1 1 1 1 1 0 1 1 0 0 0 0 0 1 1 0 1 1 1 0 1 1 1 0 0 0 1 0 1 1 1 1 0 1 1 1 1 0 1 0 1 1 0 1 0 0 1 1 1 0 1 0 0 1 1 1 1 0 1 1 1 1 1 0 0 0 0 1 0 1 1 1 1 1 1 1 1 0 0 0 1 1 0 1 1 1 1 0 0 1 1 1 0 1 1 1 1 1 1 1 1 1 0 1 1 0 1 0 0 0 1 1 1 1 1 1 0 0 1 1 0 1 1 1 1 1 0 1 1 1 1 1 1 1 0 0 1 1 1 0 1 0 1 1 0 1 0 1 1 0 0 0 0 1 1 1 1 0 0 1 1 0 0 1 1 1 1 1 0 1 1 0 1 0 1 1 0 1 0 1 1 1 1 1 0 1 1 1 0 1 0 0 1 1 0 1 0 0 1 0 1 1 1 0 1 1 1 1 1 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 1 0 1 1 1 1 1 1 1 1 1 1 1 0 1 1 1 1 1 0 1 1 1 0 0 0 1 0 0 1 1 1 1 1 1 1 1 1 1 0 1 0 1 1 1 1 0 1 1 0 0 1 1 1 0 0 0 0 0 1 1 0 1 0 0 0 0 1 0 1 1 1 0 1 1 1 1 1 1 0 0 0 0 0 1 1 1 1 0 1 0 1 1 0 1 1 1 1 1 0 0 1 1 1 1 1 1 0 1 1 1 0 1 1 1 0 1 1 1 1 0 1 1 1 1 1 0 1 0 1 0 1 1 1 1 0 1 1 1 0 1 0 1 1 0 0 1 0 1 1 1 0 1 0 0 1 1 1 0 1 1 1 0 0 1 0 0 1 0 1 1 1 1 0 1 0 1 1 1 1 1 0 1 1 1 1 1 1 1 1 1 1 1 0 1 1 1 1 1 0 1 1 1 0 1 0 1 0 0"
test_as_list = [int(x) for x in test.split()]
# print(test_as_list)
file = sc.textFile("user-shows.txt") #this point it split on line break a list of strings ie ['0 0 1 1','1 1 0 0','0 1 0 1']

def change(array):
  change_to_int_list = [int(x) for x in array]
  return change_to_int_list

user_shows = file.map(lambda x: (change(x.split()))) #this gets a list that contains many small lists of ints which are the user shows
#dbg(user_shows)

cosine_sim_of_test_case = user_shows.map(lambda x: cosine_similarity(test_as_list,x))
sorted_cosine_sim_of_test_case = cosine_sim_of_test_case.sortBy(lambda x: x, False) 
dbg(sorted_cosine_sim_of_test_case.take(5))


[0.5653963756883335, 0.5611939760061074, 0.5580880342929785, 0.5554706066152114, 0.5496782626513883]


#1c

In [0]:
index_of_users = [] # see how others have done this 
#create list of similar user tables index

zip_with_index = user_shows.map(lambda x: cosine_similarity(test_as_list,x)).zipWithIndex().sortBy(lambda x: x[0], False) 
collected_index = zip_with_index.take(5)
user_shows_collected = user_shows.collect()

ans = []
A = []

for i in range(len(collected_index)):
  index_of_list = collected_index[i][1]
  table_to_compare = user_shows_collected[index_of_list]
  A.append(table_to_compare)


  # if (test_as_list[index] == 0 and table_to_compare[index] == 0):
  #   ans.append(index)

  #I could combine them in to tuple pairs then combine and only keep those with 5


In [0]:
# matching_index = []
# for table in (all_tables_of_users):
#   for index,element in enumerate(table):
#     if (test_as_list[index] == 0 and table[index] == 0):
shows_index_to_watch = []
for index in range(len(A[0])):
  if (A[0][index] == 1 and A[1][index] == 1 and A[2][index] == 1 and A[3][index] == 1 and A[4][index] == 1 and test_as_list[index] == 0):
    shows_index_to_watch.append(index)
print(shows_index_to_watch)

[5, 9, 68, 74, 170, 265, 337]


#1d

In [0]:
#1st way of doing it with zipwithindex
file2 = sc.textFile("shows.txt")
file2 = file2.map(lambda x: x).zipWithIndex()
file2 = file2.map(lambda x: (x[1],x[0]))
show_five = file2.filter(lambda x: x[0] in shows_index_to_watch)
dbg(show_five)

#2st way of doing it with a simple loop and the index..
result = []
file1 = file2.collect()
for i in shows_index_to_watch:
  result.append(file1[i][1])
print(result)

[(5, '"Everybody Loves Raymond"'), (9, '"Two and a Half Men"'), (68, '"Today"'), (74, '"Family Guy"'), (170, '"George Lopez"'), (265, '"The Oprah Winfrey Show"'), (337, '"The Early Show"')]
['"Everybody Loves Raymond"', '"Two and a Half Men"', '"Today"', '"Family Guy"', '"George Lopez"', '"The Oprah Winfrey Show"', '"The Early Show"']


#2a

In [0]:
"""all working :) """

graph_small = sc.textFile("graph-small.txt")
dbg(graph_small)
rdd_by_key = graph_small.map(lambda x: (int(x.split("\t")[0]) - 1, int(x.split("\t")[1]) - 1))
rdd_by_key = rdd_by_key.groupByKey()
dbg(rdd_by_key.sortByKey())
# graph_small = graph_small.map(lambda: x)

['100\t1', '13\t1', '28\t1', '89\t1', '82\t1', '30\t1', '79\t1', '65\t1', '88\t1', '25\t1', '46\t1', '73\t1', '59\t1', '13\t1', '50\t1', '11\t1', '24\t1', '28\t1', '38\t1', '44\t1', '13\t1', '3\t1', '79\t1', '58\t1', '14\t1', '38\t1', '31\t1', '85\t1', '17\t1', '98\t1', '25\t1', '51\t1', '94\t1', '53\t1', '73\t1', '64\t1', '33\t1', '82\t1', '56\t1', '31\t1', '28\t1', '1\t2', '26\t2', '58\t2', '45\t2', '2\t3', '86\t3', '67\t3', '12\t3', '96\t3', '26\t3', '83\t3', '3\t4', '66\t4', '11\t4', '82\t4', '77\t4', '33\t4', '49\t4', '4\t5', '31\t5', '92\t5', '83\t5', '87\t5', '50\t5', '17\t5', '20\t5', '64\t5', '78\t5', '44\t5', '21\t5', '56\t5', '45\t5', '74\t5', '64\t5', '49\t5', '5\t6', '24\t6', '91\t6', '58\t6', '6\t7', '82\t7', '2\t7', '83\t7', '7\t8', '17\t8', '1\t8', '1\t8', '35\t8', '7\t8', '69\t8', '8\t9', '21\t9', '20\t9', '29\t9', '11\t9', '12\t9', '8\t9', '45\t9', '30\t9']
[(0, [1, 7, 7, 13, 13, 21, 25, 43, 52, 65, 66, 77, 77, 78, 79, 81, 94]), (1, [2, 6, 12, 52, 52, 57, 60, 65, 89])

#2b

In [0]:
"""2b answer as show in the lab doc"""
sorted_rdd = rdd_by_key.sortByKey()
def create_dict(array):
  result = {}
  for i in list(set(array)) :
    result[i] = 1/len(set(array))
  return result

mapped_destinations = sorted_rdd.map(lambda b: (b[0], create_dict(b[1])))
dbg(mapped_destinations)
#wants it in format 
#0.07142857142857142 is 1 divide by the unique count of [1, 7, 7, 13, 13, 21, 25, 43, 52, 65, 66, 77, 77, 78, 79, 81, 94] so 1/14
#(0, {1: 0.07142857142857142, 65: 0.07142857142857142,

[(0, {1: 0.07142857142857142, 65: 0.07142857142857142, 66: 0.07142857142857142, 7: 0.07142857142857142, 43: 0.07142857142857142, 13: 0.07142857142857142, 77: 0.07142857142857142, 78: 0.07142857142857142, 79: 0.07142857142857142, 81: 0.07142857142857142, 52: 0.07142857142857142, 21: 0.07142857142857142, 25: 0.07142857142857142, 94: 0.07142857142857142}), (1, {89: 0.125, 65: 0.125, 2: 0.125, 6: 0.125, 12: 0.125, 52: 0.125, 57: 0.125, 60: 0.125}), (2, {0: 0.07692307692307693, 65: 0.07692307692307693, 34: 0.07692307692307693, 3: 0.07692307692307693, 37: 0.07692307692307693, 71: 0.07692307692307693, 13: 0.07692307692307693, 77: 0.07692307692307693, 47: 0.07692307692307693, 82: 0.07692307692307693, 19: 0.07692307692307693, 20: 0.07692307692307693, 52: 0.07692307692307693}), (3, {65: 0.1111111111111111, 4: 0.1111111111111111, 16: 0.1111111111111111, 50: 0.1111111111111111, 52: 0.1111111111111111, 86: 0.1111111111111111, 26: 0.1111111111111111, 60: 0.1111111111111111, 95: 0.1111111111111111}),

#2c

In [0]:
"""2b with different ordering or will produce the wrong result below"""
def create_dict(array):
  result2 = {}
  for i in array:
    if i not in result2:
      result2[i] = 1/len(set(array))
  return result2

small_graph_with_values = rdd_by_key.map(lambda b: (b[0], create_dict(b[1])))
dbg(small_graph_with_values.sortByKey())


[(0, {1: 0.07142857142857142, 7: 0.07142857142857142, 13: 0.07142857142857142, 21: 0.07142857142857142, 25: 0.07142857142857142, 43: 0.07142857142857142, 52: 0.07142857142857142, 65: 0.07142857142857142, 66: 0.07142857142857142, 77: 0.07142857142857142, 78: 0.07142857142857142, 79: 0.07142857142857142, 81: 0.07142857142857142, 94: 0.07142857142857142}), (1, {2: 0.125, 6: 0.125, 12: 0.125, 52: 0.125, 57: 0.125, 60: 0.125, 65: 0.125, 89: 0.125}), (2, {0: 0.07692307692307693, 3: 0.07692307692307693, 13: 0.07692307692307693, 19: 0.07692307692307693, 20: 0.07692307692307693, 34: 0.07692307692307693, 37: 0.07692307692307693, 47: 0.07692307692307693, 52: 0.07692307692307693, 65: 0.07692307692307693, 71: 0.07692307692307693, 77: 0.07692307692307693, 82: 0.07692307692307693}), (3, {4: 0.1111111111111111, 16: 0.1111111111111111, 26: 0.1111111111111111, 50: 0.1111111111111111, 52: 0.1111111111111111, 60: 0.1111111111111111, 65: 0.1111111111111111, 86: 0.1111111111111111, 95: 0.1111111111111111}),

In [0]:
def create_row(array):
  lst = []
  for i in list(array[1].keys()):
    value = array[1][i]
    lst.append((i,(array[0], value)))
  return lst
  
two_c_answer = small_graph_with_values.flatMap(lambda x: create_row(x)).groupByKey().sortByKey()

dbg(two_c_answer)
#change [(12, {0: 0.1111111111111111,  in to [(0, [(12, 0.1111111111111111)

[(0, [(12, 0.1111111111111111), (88, 0.08333333333333333), (78, 0.07142857142857142), (64, 0.1111111111111111), (24, 0.08333333333333333), (72, 0.125), (58, 0.058823529411764705), (10, 0.07142857142857142), (2, 0.07692307692307693), (30, 0.06666666666666667), (84, 0.1), (16, 0.08333333333333333), (50, 0.14285714285714285), (52, 0.25), (32, 0.08333333333333333), (99, 0.09090909090909091), (27, 0.1111111111111111), (81, 0.08333333333333333), (29, 0.1111111111111111), (87, 0.125), (45, 0.07692307692307693), (49, 0.09090909090909091), (23, 0.1111111111111111), (37, 0.09090909090909091), (43, 0.1111111111111111), (57, 0.06666666666666667), (13, 0.09090909090909091), (97, 0.16666666666666666), (93, 0.1111111111111111), (63, 0.125), (55, 0.1111111111111111)]), (1, [(0, 0.07142857142857142), (44, 0.0625), (57, 0.06666666666666667), (25, 0.14285714285714285)]), (2, [(66, 0.09090909090909091), (82, 0.06666666666666667), (25, 0.14285714285714285), (1, 0.125), (85, 0.14285714285714285), (11, 0.1),

#2d

In [0]:
def row_multiply(list_of_tuples,vector):
  result = 0
  for index, multiplier in list_of_tuples:
    result += vector[index] * multiplier
  return result

print(row_multiply([(0,1),(2,1)],[.2,.3,.5,0]))
#index 0(1 * 0.2) + 2(1 * 0.5) = 0.7
print(row_multiply([(1,1),(3,5)],[.2,.3,.5,0]))
print(row_multiply([(0,.1),(1,.1),(2,.1),(3,.1)],[.2,.3,.5,0]))


0.7
0.3
0.1


In [0]:
N = two_c_answer.count()
R = [1/N]*N
for t in range(100):
  vecR = sc.broadcast(R)
  row_results = two_c_answer.map(lambda kv: row_multiply(kv[1],vecR.value))
  R = row_results.collect()
  
print(R)


[0.03540059180244296, 0.005171063533189883, 0.007109590714645791, 0.006994840443364616, 0.016446111939280406, 0.005496844981748568, 0.0026390553476102504, 0.005708505652182352, 0.01286206891761162, 0.007283518367242056, 0.003884996061450237, 0.0035954991211598992, 0.020690343993951178, 0.039377727003573254, 0.00595446904616268, 0.004211316450735475, 0.004757546407624067, 0.013768088495850591, 0.004719333295801115, 0.0036398895463850113, 0.005253971879864083, 0.017061942554327036, 0.0026746831662129303, 0.006079163246662503, 0.006484629773168002, 0.01538112682133083, 0.03489656776403212, 0.005823318003010778, 0.006223751644838689, 0.006038775174000848, 0.012166055454916372, 0.00352432272451961, 0.00390795213201113, 0.005252714771083757, 0.013281350423884765, 0.004025475654532075, 0.001930479325791477, 0.006184599824253461, 0.011389080370852518, 0.039884267610139156, 0.008390858686994004, 0.012067981145007824, 0.005463367373694924, 0.012766987824747689, 0.0029509121267135034, 0.003727327

#2e

In [0]:
"""all working :) 2a"""

graph_small = sc.textFile("graph-full.txt")
dbg(graph_small)
rdd_by_key = graph_small.map(lambda x: (int(x.split("\t")[0]) - 1, int(x.split("\t")[1]) - 1))
rdd_by_key = rdd_by_key.groupByKey()
dbg(rdd_by_key.sortByKey())
# graph_small = graph_small.map(lambda: x)

['1\t2', '2\t3', '3\t4', '4\t5', '5\t6', '6\t7', '7\t8', '8\t9', '9\t10', '10\t11', '11\t12', '12\t13', '13\t14', '14\t15', '15\t16', '16\t17', '17\t18', '18\t19', '19\t20', '20\t21', '21\t22', '22\t23', '23\t24', '24\t25', '25\t26', '26\t27', '27\t28', '28\t29', '29\t30', '30\t31', '31\t32', '32\t33', '33\t34', '34\t35', '35\t36', '36\t37', '37\t38', '38\t39', '39\t40', '40\t41', '41\t42', '42\t43', '43\t44', '44\t45', '45\t46', '46\t47', '47\t48', '48\t49', '49\t50', '50\t51', '51\t52', '52\t53', '53\t54', '54\t55', '55\t56', '56\t57', '57\t58', '58\t59', '59\t60', '60\t61', '61\t62', '62\t63', '63\t64', '64\t65', '65\t66', '66\t67', '67\t68', '68\t69', '69\t70', '70\t71', '71\t72', '72\t73', '73\t74', '74\t75', '75\t76', '76\t77', '77\t78', '78\t79', '79\t80', '80\t81', '81\t82', '82\t83', '83\t84', '84\t85', '85\t86', '86\t87', '87\t88', '88\t89', '89\t90', '90\t91', '91\t92', '92\t93', '93\t94', '94\t95', '95\t96', '96\t97', '97\t98', '98\t99', '99\t100', '100\t101']
[(0, [1, 585,

In [0]:
# """2b answer as show in the lab doc 2b"""
# sorted_rdd = rdd_by_key.sortByKey()
# def create_dict(array):
#   result = {}
#   for i in list(set(array)) :
#     result[i] = 1/len(set(array))
#   return result

# mapped_destinations = sorted_rdd.map(lambda b: (b[0], create_dict(b[1])))
# dbg(mapped_destinations)

In [0]:
"""2b with different ordering or will produce the wrong result below"""
def create_dict(array):
  result2 = {}
  for i in array:
    if i not in result2:
      result2[i] = 1/len(set(array))
  return result2

small_graph_with_values = rdd_by_key.map(lambda b: (b[0], create_dict(b[1])))
dbg(small_graph_with_values.sortByKey())

[(0, {1: 0.16666666666666666, 585: 0.16666666666666666, 903: 0.16666666666666666, 501: 0.16666666666666666, 530: 0.16666666666666666, 688: 0.16666666666666666}), (1, {2: 0.09090909090909091, 504: 0.09090909090909091, 798: 0.09090909090909091, 780: 0.09090909090909091, 414: 0.09090909090909091, 712: 0.09090909090909091, 689: 0.09090909090909091, 432: 0.09090909090909091, 631: 0.09090909090909091, 439: 0.09090909090909091, 497: 0.09090909090909091}), (2, {3: 0.1111111111111111, 189: 0.1111111111111111, 544: 0.1111111111111111, 561: 0.1111111111111111, 795: 0.1111111111111111, 678: 0.1111111111111111, 980: 0.1111111111111111, 454: 0.1111111111111111, 618: 0.1111111111111111}), (3, {4: 0.16666666666666666, 212: 0.16666666666666666, 974: 0.16666666666666666, 535: 0.16666666666666666, 952: 0.16666666666666666, 985: 0.16666666666666666}), (4, {5: 0.16666666666666666, 930: 0.16666666666666666, 197: 0.16666666666666666, 393: 0.16666666666666666, 703: 0.16666666666666666, 452: 0.1666666666666666

In [0]:
def create_row(array):
  lst = []
  for i in list(array[1].keys()):
    value = array[1][i]
    lst.append((i,(array[0], value)))
  return lst
  
two_c_answer = small_graph_with_values.flatMap(lambda x: create_row(x)).groupByKey().sortByKey()

dbg(two_c_answer)
#change [(12, {0: 0.1111111111111111,  in to [(0, [(12, 0.1111111111111111)

[(0, [(54, 0.07142857142857142), (100, 0.1), (226, 0.14285714285714285), (250, 0.1111111111111111), (912, 0.2), (167, 0.1), (397, 0.1111111111111111), (487, 0.14285714285714285), (685, 0.1), (893, 0.16666666666666666), (999, 0.07142857142857142)]), (1, [(0, 0.16666666666666666), (30, 0.1), (120, 0.1111111111111111), (790, 0.2), (441, 0.1111111111111111), (469, 0.08333333333333333), (583, 0.09090909090909091)]), (2, [(58, 0.14285714285714285), (236, 0.14285714285714285), (1, 0.09090909090909091), (459, 0.09090909090909091), (611, 0.125), (947, 0.125)]), (3, [(2, 0.1111111111111111), (496, 0.16666666666666666), (676, 0.07142857142857142), (678, 0.1111111111111111), (712, 0.125), (798, 0.14285714285714285), (311, 0.16666666666666666), (333, 0.125), (591, 0.125), (655, 0.08333333333333333), (661, 0.1111111111111111), (747, 0.125), (923, 0.16666666666666666)]), (4, [(534, 0.16666666666666666), (3, 0.16666666666666666), (91, 0.14285714285714285), (275, 0.1), (293, 0.14285714285714285), (669,

In [0]:
N = two_c_answer.count()
R = [1/N]*N
for t in range(100):
  vecR = sc.broadcast(R)
  row_results = two_c_answer.map(lambda kv: (row_multiply(kv[1],vecR.value)))
  row_results2 = two_c_answer.map(lambda kv: (row_multiply(kv[1],vecR.value), kv[0])) #no idea why this works a 2nd time
  R2 = row_results2.collect()
  R = row_results.collect()

ans = sorted(R2, reverse=True)
print(sorted(R, reverse=True))
print(ans)
top_5_nodes = []
for i in ans[:5]:
  top_5_nodes.append(i[1])

print("the top 5 nodes are", top_5_nodes)

#need to associate thees valyes with their node equilivant

[0.0023177681473929425, 0.0022954975147433765, 0.002189010552384874, 0.002097424249330295, 0.0020787314135073216, 0.0020349676456453874, 0.0020290537137209815, 0.0020093037888945734, 0.002008035079007209, 0.0019914578133371098, 0.001986714529606316, 0.0019795984039896034, 0.0019287432505472734, 0.0019079461904995974, 0.0018995151096649918, 0.0018989075620997036, 0.0018820671743319155, 0.0018486625789305938, 0.0018476757250520434, 0.0018411507813314773, 0.0018338823043272507, 0.001832403589636454, 0.0018315667741019021, 0.0018143226564717454, 0.0018008595824084478, 0.0017931333889564297, 0.0017912314802871812, 0.0017887557017614952, 0.0017874787889504771, 0.0017772809215796125, 0.0017601053401164277, 0.0017596512908139724, 0.0017590527497953713, 0.0017504532562475528, 0.0017478099700227075, 0.0017452764163879198, 0.0017421081037416956, 0.001736110217401433, 0.0017305265480465706, 0.001730242398296355, 0.0017229096908954287, 0.0017221921444757248, 0.001712115094737416, 0.0017023065197027