/
runner.go
93 lines (78 loc) · 2.55 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package gio
import (
"context"
"fmt"
"log"
"os"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"github.com/chrislusf/gleam/pb"
)
// Serve starts processing stdin and writes output to stdout
func (runner *gleamRunner) runMapperReducer() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if runner.Option.IsProfiling {
cpuProfFile := fmt.Sprintf("mr_cpu_%d-s%d-t%d.pprof", runner.Option.HashCode,
runner.Option.StepId, runner.Option.TaskId)
pwd, _ := os.Getwd()
// println("saving pprof to", pwd+"/"+cpuProfFile)
cf, err := os.Create(cpuProfFile)
if err != nil {
log.Fatalf("failed to create cpu profile file %s: %v", pwd+"/"+cpuProfFile, err)
}
pprof.StartCPUProfile(cf)
defer pprof.StopCPUProfile()
memProfFile := fmt.Sprintf("mr_mem_%d-s%d-t%d.pprof", runner.Option.HashCode,
runner.Option.StepId, runner.Option.TaskId)
mf, err := os.Create(memProfFile)
// println("saving pprof to", pwd+"/"+memProfFile)
if err != nil {
log.Fatalf("failed to create memory profile file %s: %v", pwd+"/"+memProfFile, err)
}
defer func() {
runtime.GC()
pprof.Lookup("heap").WriteTo(mf, 0)
}()
}
stat.FlowHashCode = uint32(runner.Option.HashCode)
stat.Stats = []*pb.InstructionStat{
{
StepId: int32(runner.Option.StepId),
TaskId: int32(runner.Option.TaskId),
},
}
if runner.Option.Mapper != "" {
if fn, ok := mappers[MapperId(runner.Option.Mapper)]; ok {
if err := runner.processMapper(ctx, fn.Mapper); err != nil {
log.Fatalf("Failed to execute mapper %v: %v", os.Args, err)
}
return
}
log.Fatalf("Missing mapper function %v. Args: %v", runner.Option.Mapper, os.Args)
}
if runner.Option.Reducer != "" {
if runner.Option.KeyFields == "" {
log.Fatalf("Also expecting values for -gleam.keyFields! Actual arguments: %v", os.Args)
}
if fn, ok := reducers[ReducerId(runner.Option.Reducer)]; ok {
keyPositions := strings.Split(runner.Option.KeyFields, ",")
var keyIndexes []int
for _, keyPosition := range keyPositions {
keyIndex, keyIndexError := strconv.Atoi(keyPosition)
if keyIndexError != nil {
log.Fatalf("Failed to parse key index positions %v: %v", runner.Option.KeyFields, keyIndexError)
}
keyIndexes = append(keyIndexes, keyIndex)
}
if err := runner.processReducer(ctx, fn.Reducer, keyIndexes); err != nil {
log.Fatalf("Failed to execute reducer %v: %v", os.Args, err)
}
return
}
log.Fatalf("Missing reducer function %v. Args: %v", runner.Option.Reducer, os.Args)
}
log.Fatalf("Failed to find function to execute. Args: %v", os.Args)
}