# Cálculo de Pi com Monte Carlo com Spark

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder\
        .master('local')\
        .appName('PI')\
        .getOrCreate()

In [3]:
spark

In [4]:
from random import random

In [7]:
def f(_):
    x = random()
    y = random()
    if x**2 + y**2 <1:
        return 1
    else:
        return 0

In [52]:
num_samples = 99999

#### Criar o Contexto do Spark

In [53]:
sc = spark.sparkContext

In [54]:
sample_nums = sc.parallelize(range(0,num_samples))

In [55]:
sample_nums.take(5)

[0, 1, 2, 3, 4]

In [56]:
sample = sample_nums.map(f)

In [57]:
sample.take(5)

[0, 1, 1, 1, 1]

In [58]:
count = sample.reduce(lambda a,b : a + b)

In [59]:
type(count)

int

In [60]:
count

78350

In [61]:
print("PI é aproximadamente = {0:.5f}".format(4.0*count/num_samples))

PI é aproximadamente = 3.13403


# Page Rank

In [62]:
import sys

In [63]:
with open('links.tsv', 'r') as f:
    f = f.read()
    d = {}
    i = 0
    for l in f.strip().split("\n"):
        if len(l) == 0 or l[0] == "#":
            continue
        s, t = tuple(l.split())
        if s not in d:
            d[s] = i
            i += 1
        if t not in d:
            d[t] = i
            i += 1

In [64]:
d

{'%C3%81ed%C3%A1n_mac_Gabr%C3%A1in': 0,
 'Bede': 1,
 'Columba': 2,
 'D%C3%A1l_Riata': 3,
 'Great_Britain': 4,
 'Ireland': 5,
 'Isle_of_Man': 6,
 'Monarchy': 7,
 'Orkney': 8,
 'Picts': 9,
 'Scotland': 10,
 'Wales': 11,
 '%C3%85land': 12,
 '20th_century': 13,
 'Baltic_Sea': 14,
 'Crimean_War': 15,
 'Currency': 16,
 'Euro': 17,
 'European_Union': 18,
 'Finland': 19,
 'League_of_Nations': 20,
 'List_of_countries_by_system_of_government': 21,
 'Nationality': 22,
 'Parliamentary_system': 23,
 'Police': 24,
 'Russia': 25,
 'Stockholm': 26,
 'Sweden': 27,
 'Time_zone': 28,
 'Tourism': 29,
 'United_Kingdom': 30,
 'World_War_II': 31,
 '%C3%89douard_Manet': 32,
 'Absinthe': 33,
 'Beer': 34,
 'Claude_Monet': 35,
 'Diego_Vel%C3%A1zquez': 36,
 'Edgar_Allan_Poe': 37,
 'France': 38,
 'Francisco_Goya': 39,
 'Germany': 40,
 'Impressionism': 41,
 'Italy': 42,
 'Landscape': 43,
 'Netherlands': 44,
 'Painting': 45,
 'Paris': 46,
 'Photography': 47,
 'Raphael': 48,
 'Renaissance': 49,
 'United_States_dollar

In [70]:
with open('links.tsv','r') as f:
    f = f.read()
    output = open('wiki_links.txt','a')
    for l in f.strip().split('\n'):
        if len(l) ==0 or l[0] == "#":
            continue
        s, t = tuple(l.split())
        output.write("{} {}\n".format(d[s],d[t]))
        print("{} {}".format(d[s],d[t]))
    output.close()

0 1
0 2
0 3
0 4
0 5
0 6
0 7
0 8
0 9
0 10
0 11
12 13
12 14
12 15
12 16
12 17
12 18
12 19
12 20
12 21
12 22
12 23
12 24
12 25
12 26
12 27
12 28
12 29
12 30
12 31
32 33
32 34
32 35
32 36
32 37
32 38
32 39
32 40
32 41
32 42
32 43
32 44
32 45
32 46
32 47
32 48
32 49
32 27
32 50
32 51
52 53
52 54
52 55
52 5
52 56
52 57
52 58
52 30
59 3
59 60
59 61
59 4
59 5
59 62
59 63
59 10
59 64
59 65
66 67
66 68
66 69
66 70
66 71
66 72
66 73
66 74
66 75
66 76
66 77
66 78
66 18
66 19
66 38
66 79
66 40
66 80
66 81
66 82
66 83
66 84
66 5
66 42
66 85
66 86
66 87
66 88
66 89
66 44
66 90
66 91
66 92
66 93
66 94
66 95
66 96
66 97
66 98
66 99
66 100
66 30
66 101
66 102
66 103
66 104
66 105
106 107
106 108
106 109
106 34
106 110
106 111
106 112
106 113
106 38
106 40
106 114
106 115
106 42
106 116
106 62
106 117
106 118
106 7
106 119
106 120
106 121
106 10
106 122
106 27
106 123
106 64
107 106
107 124
107 125
107 126
107 127
107 128
107 129
107 130
107 131
107 132
107 133
107 13
107 134
107 135
107 136
107 137
107 

In [71]:
sc2 = spark.sparkContext

In [73]:
adjList = sc2.textFile('wiki_links.txt')

In [75]:
adjList.take(5)

['0 1', '0 2', '0 3', '0 4', '0 5']

In [76]:
adjList2 = adjList.map(lambda line: line.split(" "))

In [79]:
adjList2.take(5)

[['0', '1'], ['0', '2'], ['0', '3'], ['0', '4'], ['0', '5']]

In [80]:
adjList3 = adjList2.map(lambda x: [int(x[0]),int(x[1])])

In [81]:
adjList3.take(5)

[[0, 1], [0, 2], [0, 3], [0, 4], [0, 5]]

In [82]:
adjList4 = adjList3.groupByKey()

In [83]:
adjList4.take(5)

[(0, <pyspark.resultiterable.ResultIterable at 0x1193ba898>),
 (12, <pyspark.resultiterable.ResultIterable at 0x1193a0c50>),
 (32, <pyspark.resultiterable.ResultIterable at 0x10fa36400>),
 (52, <pyspark.resultiterable.ResultIterable at 0x10fa366a0>),
 (59, <pyspark.resultiterable.ResultIterable at 0x10fa36198>)]

In [84]:
for i in adjList4.take(5):
    print(i[0], '-->',[v for v in i[1]])

0 --> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
12 --> [13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]
32 --> [33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 27, 50, 51]
52 --> [53, 54, 55, 5, 56, 57, 58, 30]
59 --> [3, 60, 61, 4, 5, 62, 63, 10, 64, 65]


In [85]:
numberOfNodes = adjList4.count()

In [86]:
numberOfNodes

4587

In [87]:
PageRnakValues = adjList4.mapValues(lambda v: 1.0)

In [88]:
PageRnakValues.take(5)

[(0, 1.0), (12, 1.0), (32, 1.0), (52, 1.0), (59, 1.0)]

In [91]:
for i in range(1,31):
    print("Interação nro. ",i)
    joinRDD = adjList4.join(PageRnakValues)
    #print(joinRDD.take(2))
    contributions = joinRDD.flatMap(lambda x: [(v,x[1][1]/len(x[1][0])) for v in x[1][0]])
    accumulations = contributions.reduceByKey(lambda x, y: x + y)
    PageRnakValues = accumulations.mapValues(lambda v: v*0.85 +.15/float(numberOfNodes))

Interação nro.  1
Interação nro.  2
Interação nro.  3
Interação nro.  4
Interação nro.  5
Interação nro.  6
Interação nro.  7
Interação nro.  8
Interação nro.  9
Interação nro.  10
Interação nro.  11
Interação nro.  12
Interação nro.  13
Interação nro.  14
Interação nro.  15
Interação nro.  16
Interação nro.  17
Interação nro.  18
Interação nro.  19
Interação nro.  20
Interação nro.  21
Interação nro.  22
Interação nro.  23
Interação nro.  24
Interação nro.  25
Interação nro.  26
Interação nro.  27
Interação nro.  28
Interação nro.  29
Interação nro.  30


In [92]:
PageRnakValues.take(100)

[(107, 0.025350229186307277),
 (108, 0.040790069670530636),
 (109, 0.03835338358432998),
 (34, 0.013062330973240367),
 (110, 0.03133688350481),
 (111, 0.06305204270537902),
 (112, 0.014438446069571722),
 (113, 0.014199708005505926),
 (38, 0.27622789958058397),
 (40, 0.20689717002151595),
 (114, 0.005605009261544385),
 (115, 0.17853110063524222),
 (42, 0.1613809322904772),
 (116, 0.008791950903623677),
 (62, 0.01576225093564215),
 (117, 0.007638507920158619),
 (118, 0.058437230727520245),
 (7, 0.03914680438194219),
 (119, 0.03622532982572327),
 (120, 0.04809197665613418),
 (121, 0.0866056277252083),
 (10, 0.08269182926560532),
 (122, 0.022239651233419564),
 (27, 0.08649242755484707),
 (123, 0.028680627568179164),
 (64, 0.03205058030946733),
 (106, 0.02867533927241636),
 (124, 0.03156683305751023),
 (125, 0.0333623018332024),
 (126, 0.026739073270661173),
 (127, 0.043975872680085536),
 (128, 0.051830516148752656),
 (129, 0.043718128198936),
 (130, 0.06813302159478837),
 (131, 0.106083977