In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext()

In [3]:
raw_data = sc.textFile('/data/twitter/twitter_sample.txt')

In [4]:
def parse_edge(s):
    user, follower = s.split('\t')
    return (int(user), int(follower))

In [5]:
edges = raw_data.map(parse_edge).cache()

In [6]:
edges.take(5)

[(12, 18), (12, 41), (12, 57), (12, 62), (12, 235)]

### Find users with the largest number of followers

In [7]:
follower_counts = edges.mapValues(lambda v: 1).reduceByKey(lambda x, y: x + y)

In [8]:
follower_counts.top(5, key=lambda x: x[1])

[(20, 121425), (13, 102854), (12, 100113), (586, 92777), (291, 92026)]

In [9]:
# alternatively
import operator
follower_counts = edges.aggregateByKey(0, lambda a, x: a + 1, operator.iadd)
follower_counts.top(5, operator.itemgetter(1))

[(20, 121425), (13, 102854), (12, 100113), (586, 92777), (291, 92026)]

In [10]:
follower_counts.top(5, lambda x: -x[1])

[(518, 1), (540, 1), (32, 1), (612, 1), (700, 1)]

### Computing distances of all other vertices w.r.t. vertex - 12

In [12]:
def parse_edge(s):
    user, follower = s.split("\t")
    return (int(user), int(follower))

def step(item):
    prev_v, prev_d, next_v = item[0], item[1][0], item[1][1]
    return (next_v, prev_d + 1)

def complete(item):
    v, old_d, new_d = item[0], item[1][0], item[1][1]
    return (v, old_d if old_d is not None else new_d)

n = 400  # number of partitions
edges = sc.textFile("/data/twitter/twitter_sample_small.txt").map(parse_edge).cache()
forward_edges = edges.map(lambda e: (e[1], e[0])).partitionBy(n).persist()

In [13]:
x = 12
d = 0
distances = sc.parallelize([(x, d)]).partitionBy(n)
while True:
    candidates = distances.join(forward_edges, n).map(step)
    new_distances = distances.fullOuterJoin(candidates, n).map(complete, True).persist()
    count = new_distances.filter(lambda i: i[1] == d + 1).count()
    if count > 0:
        d += 1
        distances = new_distances
    else:
        break

In [27]:
distances.lookup(34)

[8]

### Find the shortest path from 12 to 34

In [29]:
from __future__ import print_function
print(12, end='')
target = 34
dist = distances.lookup(target)[0]
while dist:
    

12

In [30]:
distances.filter(lambda x: x[1] == 1).collect()

[(422, 1), (126, 1), (648, 1), (380, 1)]

In [33]:
distances.filter(lambda x: x[1] == 4).collect()

[(13, 4),
 (107, 4),
 (107, 4),
 (107, 4),
 (107, 4),
 (107, 4),
 (107, 4),
 (107, 4),
 (107, 4),
 (107, 4),
 (107, 4),
 (107, 4),
 (107, 4)]