/
pi_estimation.go
171 lines (137 loc) · 3.43 KB
/
pi_estimation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package main
import (
"flag"
"fmt"
"math/rand"
"os"
"runtime/pprof"
"time"
"github.com/ducminhnguyen/gleam/distributed"
"github.com/ducminhnguyen/gleam/flow"
"github.com/ducminhnguyen/gleam/gio"
"github.com/ducminhnguyen/gleam/util"
glow "github.com/chrislusf/glow/flow"
"sync"
"sync/atomic"
)
var (
master = flag.String("master", "localhost:45326", "master server location")
monteCarloMapperId = gio.RegisterMapper(monteCarloMapper)
sumReducerId = gio.RegisterReducer(sumReducer)
times = 1024 * 1024 * 2560
factor = 1024 * 1024
)
func main() {
gio.Init()
flag.Parse()
f, _ := os.Create("p.prof")
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
// uncomment this line if you setup the gleam master and agents
testPureGoGleam("distributed parallel 7", false)
testPureGoGleam("local mode parallel 7", true)
testDirectGo()
testDirectGoConcurrent()
// this is not fair since many optimization is not applied
testLocalFlow()
}
func testPureGoGleam(name string, isLocal bool) {
var count int64
startTime := time.Now()
f := flow.New("pi estimation").
Source("iteration times", util.Range(0, times/factor)).
PartitionByKey("partition", 7).
Map("monte carlo", monteCarloMapperId).
Reduce("sum", sumReducerId).
SaveFirstRowTo(&count)
if isLocal {
f.Run()
} else {
f.Run(distributed.Option().SetMaster(*master).SetProfiling(true))
}
fmt.Printf("pi = %f\n", 4.0*float64(count)/float64(times))
fmt.Printf("pure go gleam %s time cost: %s\n", name, time.Now().Sub(startTime))
fmt.Println()
}
func testDirectGo() {
startTime := time.Now()
r := rand.New(rand.NewSource(time.Now().Unix()))
var count int
for i := 0; i < times; i++ {
x, y := r.Float64(), r.Float64()
if x*x+y*y < 1 {
count++
}
}
fmt.Printf("pi = %f\n", 4.0*float64(count)/float64(times))
fmt.Printf("direct pure go time cost: %s\n", time.Now().Sub(startTime))
fmt.Println()
}
func testDirectGoConcurrent() {
startTime := time.Now()
var wg sync.WaitGroup
var finalCount int64
for i := 0; i < 7; i++ {
wg.Add(1)
go func() {
defer wg.Done()
r := rand.New(rand.NewSource(time.Now().Unix()))
var count int64
for i := 0; i < times/7; i++ {
x, y := r.Float64(), r.Float64()
if x*x+y*y < 1 {
count++
}
}
atomic.AddInt64(&finalCount, count)
}()
}
wg.Wait()
fmt.Printf("pi = %f\n", 4.0*float64(finalCount)/float64(times))
fmt.Printf("concurrent pure go time cost: %s\n", time.Now().Sub(startTime))
fmt.Println()
}
func testLocalFlow() {
startTime := time.Now()
ch := make(chan int)
go func() {
for i := 0; i < times/factor; i++ {
ch <- i
}
close(ch)
}()
glow.New().Channel(ch).Partition(7).Map(func(t int) int {
r := rand.New(rand.NewSource(time.Now().Unix()))
count := 0
for i := 0; i < factor; i++ {
x, y := r.Float32(), r.Float32()
if x*x+y*y < 1 {
count++
}
}
return count
}).Reduce(func(x, y int) int {
return x + y
}).Map(func(count int) {
fmt.Printf("pi = %f\n", 4.0*float64(count)/float64(times))
}).Run()
fmt.Printf("flow parallel 7 time cost: %s\n", time.Now().Sub(startTime))
fmt.Println()
}
func monteCarloMapper(row []interface{}) error {
r := rand.New(rand.NewSource(time.Now().Unix()))
var count int
for i := 0; i < factor; i++ {
x, y := r.Float32(), r.Float32()
if x*x+y*y < 1 {
count++
}
}
gio.Emit(count)
return nil
}
func sumReducer(x, y interface{}) (interface{}, error) {
a := x.(int64)
b := y.(int64)
return a + b, nil
}