This repository has been archived by the owner on Jan 21, 2022. It is now read-only.
/
director_service.go
119 lines (101 loc) · 3.55 KB
/
director_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package dummy
import (
"strconv"
"strings"
"time"
bltassets "github.com/cloudfoundry-incubator/bosh-load-tests/assets"
bltcom "github.com/cloudfoundry-incubator/bosh-load-tests/command"
bosherr "github.com/cloudfoundry/bosh-utils/errors"
boshsys "github.com/cloudfoundry/bosh-utils/system"
)
type DirectorService struct {
directorMigrationCommand string
directorStartCommand string
workerStartCommand string
assetsProvider bltassets.Provider
directorConfig *DirectorConfig
cmdRunner boshsys.CmdRunner
directorProcess boshsys.Process
workerProcesses []boshsys.Process
portWaiter PortWaiter
numWorkers int
}
func NewDirectorService(
directorMigrationCommand string,
directorStartCommand string,
workerStartCommand string,
directorConfig *DirectorConfig,
cmdRunner boshsys.CmdRunner,
assetsProvider bltassets.Provider,
portWaiter PortWaiter,
numWorkers int,
) *DirectorService {
return &DirectorService{
directorMigrationCommand: directorMigrationCommand,
directorStartCommand: directorStartCommand,
workerStartCommand: workerStartCommand,
directorConfig: directorConfig,
cmdRunner: cmdRunner,
assetsProvider: assetsProvider,
portWaiter: portWaiter,
numWorkers: numWorkers,
}
}
func (s *DirectorService) Start() error {
err := s.directorConfig.Write()
if err != nil {
return err
}
migrationCommand := bltcom.CreateCommand(s.directorMigrationCommand)
migrationCommand.Args = append(migrationCommand.Args, "-c", s.directorConfig.DirectorConfigPath())
_, _, _, err = s.cmdRunner.RunComplexCommand(migrationCommand)
if err != nil {
return bosherr.WrapError(err, "running migrations")
}
directorCommand := bltcom.CreateCommand(s.directorStartCommand)
directorCommand.Args = append(directorCommand.Args, "-c", s.directorConfig.DirectorConfigPath())
s.directorProcess, err = s.cmdRunner.RunComplexCommandAsync(directorCommand)
if err != nil {
return bosherr.WrapError(err, "starting director")
}
s.directorProcess.Wait()
err = s.portWaiter.Wait("DirectorService", "127.0.0.1", s.directorConfig.DirectorPort())
if err != nil {
return bosherr.WrapError(err, "Waiting for director to start up")
}
for i := 1; i <= s.numWorkers; i++ {
workerStartCommand := bltcom.CreateCommand(s.workerStartCommand)
workerStartCommand.Env["QUEUES"] = "normal,urgent"
workerStartCommand.Args = append(workerStartCommand.Args, "-c", s.directorConfig.WorkerConfigPath(i), "-i", strconv.Itoa(i))
workerProcess, err := s.cmdRunner.RunComplexCommandAsync(workerStartCommand)
if err != nil {
return bosherr.WrapError(err, "starting worker")
}
workerProcess.Wait()
s.workerProcesses = append(s.workerProcesses, workerProcess)
if err != nil {
return bosherr.WrapError(err, "Waiting for worker to start up")
}
}
return s.waitForWorkersToStart()
}
func (s *DirectorService) Stop() {
for _, process := range s.workerProcesses {
process.TerminateNicely(5 * time.Second)
}
s.directorProcess.TerminateNicely(5 * time.Second)
}
func (s *DirectorService) waitForWorkersToStart() error {
cmd := boshsys.Command{
Name: "bash",
Args: []string{"-c", "ps ax | grep bosh-director/bin/bosh-director-worker | grep -v grep | wc -l"},
}
for i := 0; i < 30; i++ {
stdout, _, _, _ := s.cmdRunner.RunComplexCommand(cmd)
if strings.TrimSpace(stdout) == strconv.Itoa(s.numWorkers) {
return nil
}
time.Sleep(1 * time.Second)
}
return bosherr.Error("Timed out waiting for workers to start")
}