In [1]:
%config IPCompleter.greedy=True

In [2]:
from pyspark import SparkContext
import findspark as fs
import re
import random

In [3]:
fs.init()
sc = SparkContext(appName="Graphs")

## Exercise 31

In [4]:
# generate file with graph
vertices = random.randint(100, 1000)

with open('./data/graph.txt', 'w') as f:
    f.write('[\n')
    for v in range(1, vertices+1):
        neighbours = []
        neighbours_count = random.randint(1, 10)

        while len(neighbours) < neighbours_count:
            next = random.randint(1, vertices)
            
            if next not in neighbours:
                neighbours.append(next)
        
        neighbours.sort()
        
        f.write(str([v, neighbours]) + ',\n')
    
    f.write(']\n')

In [5]:
def get_edges(row):
    res = re.search(r'\[(\d+), \[([\d, ]+)\]\]', row)
    print(res)
    return (res.group(1), res.group(2).split(','))

In [6]:
edges = sc.textFile('./data/graph.txt').filter(lambda row: ',' in row).map(get_edges)
mapped = edges.flatMap(lambda pair: [(v, pair[0]) for v in pair[1]])
groupped = mapped.groupByKey().map(lambda pair: (pair[0], list(pair[1]))).sortByKey()
groupped.collect()[0:10]

[(' 10', ['344']),
 (' 100', ['117', '164']),
 (' 101', ['71', '87', '290', '292', '658']),
 (' 103', ['187', '562', '710']),
 (' 104', ['612']),
 (' 105', ['9', '730']),
 (' 106', ['159', '275', '451']),
 (' 107', ['126', '222', '454']),
 (' 108', ['501', '560', '748']),
 (' 109', ['49', '591', '714'])]

## Exercise 36

In [7]:
def map_vertices(row):
    splitted = row.split()
    return (splitted[0], splitted[1])

In [8]:
edges = sc.textFile('./data/stanford_graph.txt').map(map_vertices) # (v1 -> v2)

In [9]:
in_out_pairs = edges.flatMap(lambda pair: [(pair[0], 'out'), (pair[1], 'in')]) # [(v1, 'out'), (v2, 'in')]

In [10]:
counted_pairs = in_out_pairs.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b) # ((v, 'out/in'), 1) -> aggregate by key ((v, 'out/in'), m)

In [11]:
sorted_pairs = counted_pairs.sortBy(lambda pair: pair[0][1], ascending=True) # ['in', 'in', ..., 'out', 'out']

In [12]:
pairs_with_edge_as_key = sorted_pairs.map(lambda pair: (pair[0][0], (pair[0][1], pair[1]))) # (v, ('in/out', n))

In [13]:
grouped_pairs = pairs_with_edge_as_key.groupByKey() # (v, [('in', n), ('out', m)])

In [14]:
def get_triplet(pair):
    counted = list(pair[1])
    
    if len(counted) == 2:
        return (pair[0], counted[0][1], counted[1][1])
    elif counted[0][0] == 'in':
        return (pair[0], counted[0][1], 0)
    else:
        return (pair[0], 0, counted[0][1])

In [15]:
vertices_set = grouped_pairs.map(get_triplet) # (v, n, m)
vertices_set.take(5)

[('15409', 3, 1),
 ('17794', 2, 1),
 ('25202', 2, 1),
 ('53625', 2, 1),
 ('54582', 2, 1)]

In [16]:
mapped_vertices = vertices_set.map(lambda x: (x[1], 1))
mapped_vertices.reduce(lambda a, b: ((a[0] * a[1] + b[0] * b[1]) / (a[1] + b[1]), a[1] + b[1]))[0]

8.203165627893243

In [17]:
mapped_vertices = vertices_set.map(lambda x: (x[2], 1))
mapped_vertices.reduce(lambda a, b: ((a[0] * a[1] + b[0] * b[1]) / (a[1] + b[1]), a[1] + b[1]))[0]

8.2031656278933

# Exercise 37

In [4]:
data = sc.textFile('./data/stanford_graph.txt')

In [5]:
def to_pairs(line):
    vs = line.split('\t')
    return (int(vs[0]), int(vs[1]))

In [6]:
edges = data.map(to_pairs)

In [7]:
adjs = edges.groupByKey().mapValues(list).sortByKey()

In [8]:
degs = adjs.map(lambda pair: (pair[0], len(pair[1])))

In [9]:
vertices = degs.keys()

In [10]:
br_adjs = sc.broadcast(adjs.collectAsMap())
br_degs = sc.broadcast(degs.collectAsMap())

In [14]:
def calc_clustering_coeff(v):
    deg = br_degs.value[v]
    denom = deg * (deg - 1.0)
    
    if denom == 0.0:
        return 0.0
    
    edges = []
    
    for u in br_adjs.value[v]:
        for w in br_adjs.value[v]:
            if w in br_adjs.value[u]:
                edges.append((u, w))
    
    return len(set(edges)) / denom

In [15]:
result = vertices.map(lambda v: calc_clustering_coeff(v)).map(lambda v: float(v))

In [39]:
result.mean()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 84.0 failed 1 times, most recent failure: Lost task 0.0 in stage 84.0 (TID 39, 192.168.1.112, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 595, in process
    out_iter = func(split_index, iterator)
  File "/home/jakub/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/jakub/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/jakub/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 425, in func
    return f(iterator)
  File "/home/jakub/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 1151, in <lambda>
    return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/statcounter.py", line 42, in __init__
    for v in values:
  File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-15-f9695c209808>", line 1, in <lambda>
  File "<ipython-input-14-f39b2b77c137>", line 12, in calc_clustering_coeff
KeyError: 69931

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 595, in process
    out_iter = func(split_index, iterator)
  File "/home/jakub/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/jakub/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 2596, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/jakub/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 425, in func
    return f(iterator)
  File "/home/jakub/.local/lib/python3.8/site-packages/pyspark/rdd.py", line 1151, in <lambda>
    return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/statcounter.py", line 42, in __init__
    for v in values:
  File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-15-f9695c209808>", line 1, in <lambda>
  File "<ipython-input-14-f39b2b77c137>", line 12, in calc_clustering_coeff
KeyError: 69931

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
