In [1]:
import sys
import os

sys.path.insert(0, '/opt/spark/python')
sys.path.insert(0, '/opt/spark/python/lib/py4j-0.10.9.7-src.zip')

os.environ['SPARK_HOME'] = '/opt/spark'

In [9]:
import pyspark
conf = pyspark.SparkConf()
conf.setMaster("spark://spark-master:7077")
conf.set("spark.driver.memory","1g")

sc = pyspark.SparkContext(conf=conf)

In [10]:
textFile = sc.textFile("/opt/spark-data/gutenberg-shakespeare.txt")

In [11]:
%%time
textFile.count()

CPU times: user 7.25 ms, sys: 3.2 ms, total: 10.4 ms
Wall time: 682 ms


                                                                                

124213

In [12]:
wordcount = textFile.flatMap(lambda line: line.split(" ")) \
            .map(lambda word: (word, 1)) \
            .reduceByKey(lambda a, b: a + b)

In [13]:
wc_result = wordcount.collect()

In [14]:
wc_result

[('', 516839),
 ('Shakespeare', 39),
 ('1', 13),
 ('fairest', 38),
 ('creatures', 24),
 ('we', 2483),
 ('increase,', 9),
 ('That', 2734),
 ('thereby', 19),
 ("beauty's", 28),
 ('rose', 40),
 ('never', 865),
 ('But', 2321),
 ('as', 4267),
 ('riper', 3),
 ('tender', 121),
 ('heir', 68),
 ('bear', 421),
 ('his', 6218),
 ('memory:', 1),
 ('thou', 4247),
 ('thine', 315),
 ('own', 659),
 ('bright', 50),
 ('thy', 3630),
 ("light's", 1),
 ('flame', 19),
 ('fuel,', 1),
 ('Making', 26),
 ('famine', 5),
 ('where', 605),
 ('abundance', 8),
 ('self', 98),
 ('foe,', 14),
 ('sweet', 580),
 ('cruel:', 1),
 ('Thou', 890),
 ('now', 1126),
 ("world's", 50),
 ('ornament,', 5),
 ('And', 7068),
 ('only', 219),
 ('herald', 16),
 ('Within', 82),
 ('churl', 3),
 ("mak'st", 21),
 ('in', 9565),
 ('niggarding:', 1),
 ('Pity', 14),
 ('world,', 127),
 ('this', 4761),
 ('due,', 8),
 ('grave', 80),
 ('When', 814),
 ('forty', 22),
 ('winters', 8),
 ('besiege', 8),
 ('dig', 9),
 ("youth's", 5),
 ('Will', 454),
 ('weed'

In [8]:
sc.stop()

In [19]:
%%file /opt/spark-data/small_graph.dat
y y
y a
a y
a m
m a

Writing /opt/spark-data/small_graph.dat


In [20]:
graph_data = sc.textFile("/opt/spark-data/small_graph.dat")
graph_data.take(5)

['y y', 'y a', 'a y', 'a m', 'm a']

In [21]:
links = graph_data.map(lambda line: (line.split(" ")[0], line.split(" ")[1]))
print(links.take(10))
links = links.groupByKey()
print(links.take(10))
links = links.mapValues(list)
print(links.take(10))

[('y', 'y'), ('y', 'a'), ('a', 'y'), ('a', 'm'), ('m', 'a')]
[('y', <pyspark.resultiterable.ResultIterable object at 0xffff941d63b0>), ('a', <pyspark.resultiterable.ResultIterable object at 0xffff9401ce80>), ('m', <pyspark.resultiterable.ResultIterable object at 0xffff9401cc70>)]
[('y', ['y', 'a']), ('a', ['y', 'm']), ('m', ['a'])]


In [22]:
N = links.count()
ranks = links.map(lambda line: (line[0], 1/N))
ranks.take(3)

[('y', 0.3333333333333333),
 ('a', 0.3333333333333333),
 ('m', 0.3333333333333333)]

In [23]:
print(links.take(3))
print(ranks.take(3))

[('y', ['y', 'a']), ('a', ['m', 'y']), ('m', ['a'])]
[('y', 0.3333333333333333), ('a', 0.3333333333333333), ('m', 0.3333333333333333)]


In [24]:
def calculateVotes(t):
    res = []
    for item in t[1][1]:
        count = len(t[1][1])
        res.append((item, t[1][0] / count))
    return res
    ###############################
calculateVotes(('y', (0.3333333333333333, ['y', 'a'])))

[('y', 0.16666666666666666), ('a', 0.16666666666666666)]

In [25]:
votes = ranks.join(links)
votes.take(10)

[('y', (0.3333333333333333, ['y', 'a'])),
 ('a', (0.3333333333333333, ['m', 'y'])),
 ('m', (0.3333333333333333, ['a']))]

In [26]:
votes = votes.flatMap(calculateVotes)
print(votes.take(10))

[('y', 0.16666666666666666), ('a', 0.16666666666666666), ('m', 0.16666666666666666), ('y', 0.16666666666666666), ('a', 0.3333333333333333)]


In [27]:
ranks = votes.reduceByKey(lambda x,y: x + y)
ranks.take(10)

[('y', 0.3333333333333333), ('m', 0.16666666666666666), ('a', 0.5)]

In [28]:
%%time
for i in range(10):
    votes = ranks.join(links).flatMap(calculateVotes)
    ranks = votes.reduceByKey(lambda x,y: x + y)
    print(ranks.take(3))

[('y', 0.41666666666666663), ('a', 0.3333333333333333), ('m', 0.25)]
[('y', 0.375), ('a', 0.4583333333333333), ('m', 0.16666666666666666)]
[('m', 0.22916666666666666), ('y', 0.41666666666666663), ('a', 0.35416666666666663)]
[('y', 0.38541666666666663), ('m', 0.17708333333333331), ('a', 0.4375)]
[('y', 0.4114583333333333), ('m', 0.21875), ('a', 0.36979166666666663)]
[('y', 0.390625), ('m', 0.18489583333333331), ('a', 0.42447916666666663)]
[('y', 0.4075520833333333), ('a', 0.3802083333333333), ('m', 0.21223958333333331)]
[('m', 0.19010416666666666), ('a', 0.416015625), ('y', 0.3938802083333333)]


                                                                                

[('a', 0.3870442708333333), ('y', 0.40494791666666663), ('m', 0.2080078125)]


                                                                                

[('y', 0.39599609375), ('a', 0.4104817708333333), ('m', 0.19352213541666666)]
CPU times: user 126 ms, sys: 63.5 ms, total: 189 ms
Wall time: 12.5 s


In [29]:
%%time
sum = 1
while sum > 0.01:
    old_ranks = ranks
    votes = ranks.join(links).flatMap(calculateVotes)
    ranks = votes.reduceByKey(lambda x,y: x + y)
    errors = old_ranks.join(ranks).mapValues(lambda v: abs(v[0] - v[1]))
    sum = errors.values().sum()
    print(sum)
    print(ranks.take(3))

                                                                                

0.037923177083333315
[('m', 0.20524088541666666), ('y', 0.40323893229166663), ('a', 0.39152018229166663)]


                                                                                

0.030680338541666713
[('y', 0.39737955729166663), ('m', 0.19576009114583331), ('a', 0.4068603515625)]


                                                                                

0.02482096354166674
[('m', 0.20343017578125), ('y', 0.4021199544270833), ('a', 0.39444986979166663)]


                                                                                

0.02008056640625
[('a', 0.40449015299479163), ('m', 0.19722493489583331), ('y', 0.398284912109375)]


                                                                                

0.01624552408854163
[('y', 0.4013875325520833), ('m', 0.20224507649739581), ('a', 0.3963673909505208)]


                                                                                

0.013142903645833343
[('y', 0.39887746175130206), ('a', 0.4029388427734375), ('m', 0.1981836954752604)]


                                                                                

0.010632832845052093
[('m', 0.20146942138671875), ('a', 0.39762242635091144), ('y', 0.40090815226236975)]


                                                                                

0.008602142333984347
[('m', 0.19881121317545572), ('a', 0.4019234975179036), ('y', 0.3992652893066406)]
CPU times: user 308 ms, sys: 133 ms, total: 441 ms
Wall time: 43.7 s
