In [4]:
from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setAppName("MyGraph").setMaster("local"))

In [5]:
raw_data = sc.textFile("/data/twitter/twitter_sample_small.txt")

In [7]:
! head -3 /data/twitter/twitter_sample_small.txt

12	2241
12	13349
12	41873


In [8]:
def parse_edge(s):
    user, follower = s.split("\t")
    return (int(user), int(follower))

In [9]:
edges = raw_data.map(parse_edge).cache()

In [10]:
edges.take(3)

[(12, 2241), (12, 13349), (12, 41873)]

In [19]:
import operator
follower_counts = edges.mapValues(lambda x: 1).reduceByKey(lambda a,b: a+b)

In [46]:
follower_counts.top(5, operator.itemgetter(1))

[(20, 12352), (13, 10333), (12, 10030), (586, 9261), (291, 9054)]

In [43]:
r = operator.itemgetter(1)

In [44]:
a=[1,2,3,4]
r(a)

2

https://docs.python.org/2.7/library/operator.html

 operator.itemgetter(item)
operator.itemgetter(*items)

    Return a callable object that fetches item from its operand using the operand’s __getitem__() method. If multiple items are specified, returns a tuple of lookup values. For example:

        After f = itemgetter(2), the call f(r) returns r[2].
        After g = itemgetter(2, 5, 3), the call g(r) returns (r[2], r[5], r[3]).

    Equivalent to:

    def itemgetter(*items):
        if len(items) == 1:
            item = items[0]
            def g(obj):
                return obj[item]
        else:
            def g(obj):
                return tuple(obj[item] for item in items)

In [47]:
follower_counts2 = edges.aggregateByKey(0, lambda a, x: a+1, operator.iadd)
follower_counts2.top(5,operator.itemgetter(1))

[(20, 12352), (13, 10333), (12, 10030), (586, 9261), (291, 9054)]

__aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])__ When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. 

In [61]:
forward_edges = edges.map(lambda e: (e[1], e[0])).cache() # swap vrx
forward_edges.take(3)

[(2241, 12), (13349, 12), (41873, 12)]

In [66]:
forward_edges.lookup(12)

[126, 380, 422, 648]

In [67]:
x = 12 # apriory vrx
distances = sc.parallelize([(x,0)]) # first distance
distances.take(1)

[(12, 0)]

In [68]:
distances.lookup(12)

[0]

In [69]:
distances.join(forward_edges)
distances.take(3)

[(12, 0)]

square topology
```
1-2
| |
3-4
```

In [70]:
edg = sc.parallelize([(1,2),(1,3),(2,1),(2,4),(3,1),(3,4),(4,3),(4,2)])

In [71]:
fwd_edg = edg.map(lambda e: (e[1], e[0])).cache()

In [72]:
edg.collect()

[(1, 2), (1, 3), (2, 1), (2, 4), (3, 1), (3, 4), (4, 3), (4, 2)]

In [73]:
fwd_edg.collect()

[(2, 1), (3, 1), (1, 2), (4, 2), (1, 3), (4, 3), (3, 4), (2, 4)]

In [74]:
x=4
dst = sc.parallelize([(x,0)])

In [77]:
dst.join(fwd_edg).collect() # vrx,((distance, neighbour),...)

[(4, (0, 2)), (4, (0, 3))]

In [78]:
def step(item):
    prev_v, prev_d, next_v = item[0], item[1][0], item[1][1]
    return (next_v, prev_d + 1)

In [79]:
dst.join(fwd_edg).map(step).collect()

[(2, 1), (3, 1)]

In [84]:
candidates = dst.join(fwd_edg).map(step)
candidates.collect()

[(2, 1), (3, 1)]

In [85]:
dst.fullOuterJoin(candidates).collect()

[(3, (None, 1)), (4, (0, None)), (2, (None, 1))]

In [86]:
def complete(item): # check for visited vrx
    v, old_d, new_d = item[0], item[1][0], item[1][1]
    return (v, old_d if old_d is not None else new_d)

In [82]:
new_dst = dst.fullOuterJoin(candidates).map(complete)

In [83]:
new_dst.collect()

[(3, 1), (4, 0), (2, 1)]

In [93]:
x = 4
d = 0
distances = sc.parallelize([(x, d)])
while True:
    candidates = distances.join(fwd_edg).map(step)
    new_dst = distances.fullOuterJoin(candidates).map(complete)
    count = new_dst.filter(lambda i: i[1] == d + 1).count() # number of new vrx
    if count > 0:
        d += 1
        distances = new_dst
        print("d = ", d, "count = ", count)
    else:
        break

('d = ', 1, 'count = ', 2)
('d = ', 2, 'count = ', 2)


In [89]:
distances.collect()

[(1, 2), (1, 2), (2, 1), (3, 1), (4, 0), (4, 0)]

In [92]:
x = 4
d = 0
n = 4
distances = sc.parallelize([(x, d)]).partitionBy(n)
fwd_edg = edg.map(lambda e: (e[1], e[0])).partitionBy(n).persist()
while True:
    candidates = distances.join(fwd_edg).map(step)
    new_dst = distances.fullOuterJoin(candidates).map(complete, True).persist()
    count = new_dst.filter(lambda i: i[1] == d + 1).count() # number of new vrx
    if count > 0:
        d += 1
        distances = new_dst
        print("d = ", d, "count = ", count)
    else:
        break

('d = ', 1, 'count = ', 2)
('d = ', 2, 'count = ', 2)
