-
Notifications
You must be signed in to change notification settings - Fork 0
/
builder.go
175 lines (145 loc) · 4.88 KB
/
builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// Copyright (c) 2020-2021 C4 Project
//
// This file is part of c4t.
// Licenced under the MIT licence; see `LICENSE`.
// Package builder describes a set of types and methods for building corpora asynchronously.
package builder
import (
"context"
"errors"
"fmt"
"github.com/c4-project/c4t/internal/subject/compilation"
"github.com/c4-project/c4t/internal/model/recipe"
"github.com/c4-project/c4t/internal/id"
"github.com/c4-project/c4t/internal/subject/corpus"
"github.com/c4-project/c4t/internal/subject"
)
var (
// ErrBadTarget occurs when the target request count for a Builder is non-positive.
ErrBadTarget = errors.New("number of builder requests must be positive")
// ErrBadBuilderName occurs when a builder request specifies a name that isn't in the builder's corpus.
ErrBadBuilderName = errors.New("requested subject name not in builder")
// ErrBadBuilderRequest occurs when a builder request has an unknown body type.
ErrBadBuilderRequest = errors.New("unhandled builder request type")
)
// Builder handles the assembly of corpora from asynchronously-constructed subjects.
type Builder struct {
// c is the corpus being built.
c corpus.Corpus
// m is the manifest for this builder task.
m Manifest
// obs is the observer set for the builder.
obs []Observer
// reqCh is the receiving channel for requests.
reqCh <-chan Request
// SendCh is the channel to which requests for the builder should be sent.
SendCh chan<- Request
}
// New constructs a Builder according to cfg.
// It fails if the number of target requests is negative.
func New(cfg Config) (*Builder, error) {
if cfg.NReqs <= 0 {
return nil, fmt.Errorf("%w: %d", ErrBadTarget, cfg.NReqs)
}
reqCh := make(chan Request)
b := Builder{
c: initCorpus(cfg.Init, cfg.NReqs),
m: cfg.Manifest,
obs: cfg.Observers,
reqCh: reqCh,
SendCh: reqCh,
}
return &b, nil
}
func initCorpus(init corpus.Corpus, nreqs int) corpus.Corpus {
if init == nil {
// The requests are probably all going to be add requests, so it's a good starter capacity.
return make(corpus.Corpus, nreqs)
}
return init.Copy()
}
// Run runs the builder in context ctx.
// Run is not thread-safe.
func (b *Builder) Run(ctx context.Context) (corpus.Corpus, error) {
OnBuild(StartMessage(b.m), b.obs...)
defer func() { OnBuild(EndMessage(), b.obs...) }()
for i := 0; i < b.m.NReqs; i++ {
if err := b.runSingle(i, ctx); err != nil {
return nil, err
}
}
return b.c, nil
}
func (b *Builder) runSingle(i int, ctx context.Context) error {
select {
case r := <-b.reqCh:
return b.runRequest(i, r)
case <-ctx.Done():
return ctx.Err()
}
}
func (b *Builder) runRequest(i int, r Request) error {
OnBuild(StepMessage(i, r), b.obs...)
switch {
case r.Add != nil:
return b.add(r.Name, subject.Subject(*r.Add))
case r.Compile != nil:
return b.addCompile(r.Name, r.Compile.CompilerID, r.Compile.Result)
case r.Recipe != nil:
return b.addRecipe(r.Name, r.Recipe.Arch, r.Recipe.Recipe)
case r.Run != nil:
return b.addRun(r.Name, r.Run.CompilerID, r.Run.Result)
default:
return fmt.Errorf("%w: %v", ErrBadBuilderRequest, r)
}
}
func (b *Builder) add(name string, s subject.Subject) error {
return b.c.Add(subject.Named{Name: name, Subject: s})
}
func (b *Builder) addCompile(name string, cid id.ID, res compilation.CompileResult) error {
return b.rmwSubject(name, func(s *subject.Subject) error {
return s.AddCompileResult(cid, res)
})
}
func (b *Builder) addRecipe(name string, arch id.ID, r recipe.Recipe) error {
return b.rmwSubject(name, func(s *subject.Subject) error {
return s.AddRecipe(arch, r)
})
}
func (b *Builder) addRun(name string, cid id.ID, r compilation.RunResult) error {
return b.rmwSubject(name, func(s *subject.Subject) error {
return s.AddRun(cid, r)
})
}
// rmwSubject hoists a mutating function over subjects so that it operates on the corpus subject name.
// This hoisting function is necessary because we can't directly mutate the subject in-place.
func (b *Builder) rmwSubject(name string, f func(*subject.Subject) error) error {
s, ok := b.c[name]
if !ok {
return fmt.Errorf("%w: %s", ErrBadBuilderName, name)
}
if err := f(&s); err != nil {
return err
}
b.c[name] = s
return nil
}
// ParBuild runs f in a parallelised manner across the subjects in src.
// It uses the responses from f in a Builder, and returns the resulting corpus.
// Note that src may be different from cfg.Init; this is useful when building a new corpus from scratch.
func ParBuild(ctx context.Context, nworkers int, src corpus.Corpus, cfg Config, f func(context.Context, subject.Named, chan<- Request) error) (corpus.Corpus, error) {
b, err := New(cfg)
if err != nil {
return nil, err
}
var c corpus.Corpus
perr := src.Par(ctx, nworkers,
func(ctx context.Context, named subject.Named) error {
return f(ctx, named, b.SendCh)
},
func(ctx context.Context) error {
c, err = b.Run(ctx)
return err
})
return c, perr
}