-
Notifications
You must be signed in to change notification settings - Fork 52
/
instance.go
1274 lines (1207 loc) · 41.2 KB
/
instance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2017 GRAIL, Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package ec2cluster
//go:generate go run ../cmd/ec2instances/main.go instances
import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"fmt"
"io"
"math/rand"
"net/http"
"strings"
"text/template"
"time"
"unicode"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/grailbio/base/digest"
"github.com/grailbio/base/limiter"
"github.com/grailbio/base/retry"
"github.com/grailbio/base/status"
"github.com/grailbio/base/sync/once"
"github.com/grailbio/infra"
"github.com/grailbio/reflow"
bootc "github.com/grailbio/reflow/bootstrap/client"
"github.com/grailbio/reflow/bootstrap/common"
"github.com/grailbio/reflow/errors"
infra2 "github.com/grailbio/reflow/infra"
"github.com/grailbio/reflow/internal/ecrauth"
"github.com/grailbio/reflow/internal/execimage"
"github.com/grailbio/reflow/log"
"github.com/grailbio/reflow/pool"
poolc "github.com/grailbio/reflow/pool/client"
"github.com/grailbio/reflow/taskdb"
"golang.org/x/time/rate"
"gopkg.in/yaml.v2"
)
var (
// commonArgs are the arguments passed to both the bootstrap and reflow binary.
commonArgs = []string{"-config", "/etc/reflowconfig"}
// bootstrapArgs are the arguments passed to the bootstrap image
bootstrapArgs = append(commonArgs, "-session", "ec2metadata")
// reflowletArgs are the arguments passed to the reflow binary to run a reflowlet
reflowletArgs = append(commonArgs, "serve", "-ec2cluster")
)
const (
// ebsThroughputPremiumCost defines the higher premium in USD dollars
// we are willing to pay for an instance with at least ebsThroughputBenefitPct more EBS throughput
ebsThroughputPremiumCost = 0.03
// ebsThroughputPremiumPct defines the higher premium (as a percentage)
// we are willing to pay for at least ebsThroughputBenefitPct increased EBS throughput
// when choosing instance types.
ebsThroughputPremiumPct = 15.0
// ebsThroughputBenefitPct is the percentage higher EBS throughput we require
// to justify paying the premium.
ebsThroughputBenefitPct = 50.0
// gp2MaxThroughputMinSizeGiB is the minimum disk size of a gp2 EBS volume
// for maximum throughput.
// 334GiB is the smallest disk size that yields maximum throughput, as per
// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSVolumeTypes.html
gp2MaxThroughputMinSizeGiB = 334
// gp2PerEBSThresholdGiB is the disk size beyond which we would instead prefer
// to have EBS volumes of size gp2MaxThroughputMinSizeGiB to optimize for throughput.
gp2PerEBSThresholdGiB = 200
)
// the smallest acceptable disk sizes (GiB) per EBS volume type.
var minDiskSizes = map[string]uint64{
// EBS does not allow you to create ST1 volumes smaller than 500GiB.
ec2.VolumeTypeSt1: 500,
ec2.VolumeTypeGp2: 1,
ec2.VolumeTypeGp3: 1,
}
// instanceConfig represents a instance configuration.
type instanceConfig struct {
// Type is the EC2 instance type to be launched.
Type string
// EBSOptimized is true if we should request an EBS optimized instance.
EBSOptimized bool
// EBSThroughput is the max throughput for the EBS optimized instance.
EBSThroughput float64
// Resources holds the Reflow resources that are presented by this configuration.
// It does not include disk sizes; they are dynamic.
Resources reflow.Resources
// Price is the on-demand price for this instance type in fractional dollars, in available regions.
Price map[string]float64
// SpotOk tells whether spot is supported for this instance type.
SpotOk bool
// NVMe specifies whether EBS is exposed as NVMe devices.
NVMe bool
}
// Size returns the instance's instance size.
func (ic *instanceConfig) Size() (string, error) {
s := strings.Split(ic.Type, ".")
if len(s) != 2 {
return "", errors.New(fmt.Sprintf("cannot infer instance size of type %s", ic.Type))
}
return s[1], nil
}
var (
localDigest digest.Digest
localSize int64
digestOnce once.Task
reflowletOnce once.Task
reflowletFile reflow.File
)
// instance represents a concrete instance; it is launched from an instanceConfig
// and additional parameters.
type instance struct {
HTTPClient *http.Client
Config instanceConfig
ReflowConfig infra.Config
Log *log.Logger
Authenticator ecrauth.Interface
EC2 ec2iface.EC2API
TaskDB taskdb.TaskDB
InstanceTags map[string]string
Labels pool.Labels
Spot bool
InstanceProfile string
SecurityGroup string
Region string
BootstrapImage string
BootstrapExpiry time.Duration
Price float64
EBSType string
EBSSize uint64
NEBS int
AMI string
KeyName string
SshKeys []string
Immortal bool
NodeExporterMetricsPort int
CloudConfig cloudConfig
ReflowVersion string
Task *status.Task
SpotProber *spotProber
DescInstLimiter *limiter.BatchLimiter
DescSpotLimiter *limiter.BatchLimiter
ReqSpotLimiter *rate.Limiter
userData string
err error
ec2inst *ec2.Instance
}
type reflowletInstance struct {
ec2.Instance
// Version of the reflowlet instance running on EC2 (populated on instance startup)
Version string
// Digest of the executable running on the reflowlet instance
Digest string
}
func newReflowletInstance(inst *ec2.Instance) *reflowletInstance {
ri := &reflowletInstance{Instance: *inst}
ri.update()
return ri
}
func (i *reflowletInstance) update() {
for _, tag := range i.Instance.Tags {
if *tag.Key == "reflowlet:version" {
i.Version = *tag.Value
}
if *tag.Key == "reflowlet:digest" {
i.Digest = *tag.Value
}
if i.Digest != "" && i.Version != "" {
break
}
}
}
// Instance returns the EC2 instance metadata returned by a successful launch.
func (i *instance) Instance() *reflowletInstance {
return newReflowletInstance(i.ec2inst)
}
func (i *instance) ManagedInstance() ManagedInstance {
var id string
if i.err == nil && i.ec2inst != nil {
id = *i.ec2inst.InstanceId
}
return InstanceSpec{i.Config.Type, i.Config.Resources}.Instance(id)
}
type stateT int
const (
// Perform capacity check for EC2 spot.
stateCapacity stateT = iota
// Launch the instance via EC2.
stateLaunch
// Wait for the instance to enter running state.
stateWaitInstance
// Tag the instance
stateTag
// Describe the instance via EC2 to get the DNS name.
stateDescribeDns
// Wait for the bootstrap to become live (and metadata to become available).
stateWaitBootstrap
// Install the reflowlet image.
stateInstallImage
// Wait for reflowlet to become live (and metadata to become available).
stateWaitReflowlet
// Describe the instance via EC2 to get an updated version tag.
stateDescribeTags
stateDone
)
func (s stateT) String() string {
var what string
switch s {
case stateCapacity:
what = "probing for EC2 capacity"
case stateLaunch:
what = "launching EC2 instance"
case stateWaitInstance:
what = "waiting for instance to become ready"
case stateTag:
what = "tagging instance and EBS volumes"
case stateDescribeDns:
what = "describing instance (dns)"
case stateWaitBootstrap:
what = "waiting for the bootstrap to load"
case stateInstallImage:
what = "installing reflowlet image"
case stateWaitReflowlet:
what = "waiting for the reflowlet to load"
case stateDescribeTags:
what = "waiting for reflowlet version tag"
case stateDone:
what = "instance ready"
}
return what
}
// Go launches an instance, and returns when it fails or the context is done.
// On success (i.Err() == nil), the returned instance is in running state.
// Launch status is reported to the instance's task, if any.
func (i *instance) Go(ctx context.Context) {
i.configureEBS()
const maxTries = 10
var (
state stateT
id string
poolId reflow.StringDigest
dns string
n int
retryPolicy = retry.MaxRetries(retry.Backoff(5*time.Second, 30*time.Second, 1.75), maxTries)
)
defer func() {
if id == "" || state <= stateLaunch || state >= stateWaitReflowlet {
return
}
// Perform cleanup tasks
i.Log.Debugf("cleaning up non-reflowlet EC2 instance: %s (state: %s)", id, state)
// Terminate a successfully provisioned but un-viable instance.
ec2TerminateInstance(i.EC2, id, i.Log)
if i.TaskDB == nil || !poolId.IsValid() || state <= stateDescribeDns || state >= stateWaitReflowlet {
return
}
// Set end time of the taskDB row corresponding to this un-viable pool.
if err := i.TaskDB.SetEndTime(context.Background(), poolId.Digest(), time.Now()); err != nil {
i.Log.Debugf("taskdb pool %s SetEndTime: %v", poolId, err)
}
}()
for state < stateDone && ctx.Err() == nil {
switch state {
case stateCapacity:
if !i.Spot {
break
}
var ok bool
ok, i.err = i.SpotProber.HasCapacity(ctx, i.Config.Type)
if i.err == nil && !ok {
i.err = errors.E(errors.Unavailable, errors.New("ec2 capacity is likely exhausted"))
}
case stateLaunch:
i.Task.Print(state.String())
id, i.err = i.launch(ctx)
if i.err != nil {
i.Task.Printf("launch error: %v", i.err)
i.Log.Errorf("instance launch error: %v", i.err)
} else {
i.Log = i.Log.Tee(nil, fmt.Sprintf("[%s]: ", id))
i.Task.Title(id)
}
case stateWaitInstance:
i.print(id, state.String())
i.ec2inst, i.err = waitUntilRunning(ctx, i.DescInstLimiter, id, i.Log)
case stateTag:
i.print(id, state.String())
resources := make([]*string, 1+len(i.ec2inst.BlockDeviceMappings))
resources[0] = aws.String(id)
for i, bdm := range i.ec2inst.BlockDeviceMappings {
resources[i+1] = bdm.Ebs.VolumeId
}
_, i.err = i.EC2.CreateTags(&ec2.CreateTagsInput{Resources: resources, Tags: i.getTags()})
case stateDescribeDns:
i.print(id, state.String())
if dnsPtr := i.ec2inst.PublicDnsName; dnsPtr == nil || aws.StringValue(dnsPtr) == "" {
i.ec2inst, i.err = describeInstance(ctx, i.DescInstLimiter, id, i.Log)
}
if i.err != nil {
i.err = errors.E(state.String(), errors.Temporary, i.err)
break
}
dns = aws.StringValue(i.ec2inst.PublicDnsName)
spot := ""
if i.Spot {
spot = "spot "
}
i.Log.Debugf("launched %sinstance %v (%s): %s%s", spot, id, dns, i.Config.Type, i.Config.Resources)
if i.TaskDB != nil {
// Record the start of the new pool in TaskDB and set an initial KeepAlive until
// the pool (ie, reflowlet) has the chance to come up and takeover maintaining the row.
p := taskdb.Pool{
// TaskDB row id for the pool is based on the EC2 instance ID.
PoolID: reflow.NewStringDigest(id),
PoolType: aws.StringValue(i.ec2inst.InstanceType),
URI: dns,
}
p.Start = aws.TimeValue(i.ec2inst.LaunchTime)
p.ClusterName = i.InstanceTags[clusterNameKey]
p.User = i.InstanceTags[userKey]
p.ReflowVersion = i.ReflowVersion
if err := i.TaskDB.StartPool(ctx, p); err != nil {
i.Log.Debugf("taskdb pool %s StartPool: %v", poolId, err)
} else if err = i.TaskDB.KeepIDAlive(ctx, poolId.Digest(), time.Now().Add(1*time.Minute)); err != nil {
i.Log.Debugf("taskdb pool %s KeepIDAlive: %v", poolId, err)
}
}
case stateWaitBootstrap:
if n > maxTries/2 {
i.print(id, fmt.Sprintf("confirming instance still running (%d/%d)", n, maxTries))
if err := stillRunning(ctx, i.DescInstLimiter, id, i.Log); err != nil {
// Instance not running anymore
i.err = errors.E("wait bootstrap instance running", errors.Fatal, err)
break
}
}
i.print(id, state.String())
var c *bootc.Client
c, i.err = bootc.New(fmt.Sprintf("https://%s:9000/v1/", dns), i.HTTPClient, nil)
if i.err != nil {
i.err = errors.E(errors.Fatal, i.err)
break
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
i.err = c.Status(ctx)
cancel()
if i.err != nil && strings.HasSuffix(i.err.Error(), "connection refused") {
i.err = errors.E(errors.Temporary, i.err)
}
case stateInstallImage:
i.print(id, state.String())
clnt, err := bootc.New(fmt.Sprintf("https://%s:9000/v1/", *i.ec2inst.PublicDnsName), i.HTTPClient, nil)
if err != nil {
i.err = errors.E(errors.Fatal, err)
break
}
var repo reflow.Repository
err = i.ReflowConfig.Instance(&repo)
if err != nil {
i.err = errors.E(errors.Fatal, err)
break
}
ctx2, cancel := context.WithTimeout(ctx, 5*time.Minute)
err = getReflowletFile(ctx2, repo, i.Log)
cancel()
if err != nil {
i.err = errors.E(errors.Fatal, err)
break
}
ctx2, cancel = context.WithTimeout(ctx, 1*time.Minute)
reflowletimage := common.Image{
Path: reflowletFile.Source,
Args: reflowletArgs,
Name: "reflowlet",
}
i.Log.Debugf("installing reflowlet image %v", reflowletimage)
// Once the reflowlet image (a reflow binary) is installed on the instance,
// the instance goes from being a bootstrap instance to a reflowlet instance.
err = clnt.InstallImage(ctx2, reflowletimage)
cancel()
if err != nil && !errors.Is(errors.Net, err) {
if strings.Contains(err.Error(), common.ExecImageErrPrefix) {
i.err = errors.E(errors.Temporary, err)
// This indicates that for some reason, we were unable to install the image.
// 'Reset'ing 'reflowletOnce' eliminates errors due to a missing/corrupted image.
reflowletOnce.Reset()
} else {
i.err = errors.E(errors.Fatal, err)
}
break
}
case stateWaitReflowlet:
if n > maxTries/2 {
i.print(id, fmt.Sprintf("confirming instance still running (%d/%d)", n, maxTries))
if err := stillRunning(ctx, i.DescInstLimiter, id, i.Log); err != nil {
// Instance not running anymore
i.err = errors.E("wait bootstrap instance running", errors.Fatal, err)
break
}
}
i.print(id, state.String())
var c *poolc.Client
c, i.err = poolc.New(fmt.Sprintf("https://%s:9000/v1/", dns), i.HTTPClient, nil)
if i.err != nil {
i.err = errors.E(errors.Fatal, i.err)
break
}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
_, i.err = c.Config(ctx)
cancel()
if i.err != nil && strings.HasSuffix(i.err.Error(), "connection refused") {
i.err = errors.E(errors.Temporary, i.err)
}
case stateDescribeTags:
i.print(id, state.String())
i.ec2inst, i.err = describeInstance(ctx, i.DescInstLimiter, id, i.Log)
if i.err != nil {
i.err = errors.E(errors.Temporary, "%s: describe instance: %v", id)
break
}
ri := i.Instance()
if ri.Version == "" {
i.err = errors.E(errors.Temporary, "version tag unavailable")
}
default:
panic("unknown state")
}
if i.err == nil {
n = 0
state++
continue
}
if awserr, ok := i.err.(awserr.Error); ok {
switch awserr.Code() {
// According to EC2 API docs, these codes indicate
// capacity issues.
//
// http://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html
//
// TODO(marius): add a separate package for interpreting AWS errors.
case "InsufficientCapacity", "InsufficientInstanceCapacity", "InsufficientHostCapacity", "InsufficientReservedInstanceCapacity", "InstanceLimitExceeded", "Unsupported":
i.err = errors.E(errors.Unavailable, awserr)
}
}
switch {
case i.err == nil:
case errors.Is(errors.Fatal, i.err):
i.Log.Errorf("instance %v: %v", id, i.err)
return
case errors.Is(errors.Unavailable, i.err):
i.Log.Errorf("instance %v: %v", id, i.err)
// Return these immediately because our caller may be able to handle
// them by selecting a different instance type.
return
case !errors.Recover(i.err).Timeout() && !errors.Recover(i.err).Temporary():
i.Log.Errorf("error while %s: %v", state, i.err)
}
if err := retry.Wait(ctx, retryPolicy, n); err != nil {
if state == stateWaitReflowlet && errors.Is(errors.TooManyTries, err) {
// treat reflowlet timeouts as possibly problematic instance types.
i.err = errors.E(errors.Unavailable, errors.Errorf("reflowlet on %s not available: %v", *i.ec2inst.InstanceId, err))
}
break
}
n++
i.Log.Debugf("%s recoverable error (%d/%d): %v", id, n, maxTries, i.err)
}
if i.err != nil {
i.print(id, fmt.Sprintf("%v", i.err))
return
}
i.err = ctx.Err()
if i.err == nil {
i.print(id, "instance ready")
}
}
func (i *instance) print(id, msg string) {
i.Log.Debug(msg)
i.Task.Print(fmt.Sprintf("[%s] %s", id, msg))
}
func getReflowletFile(ctx context.Context, repo reflow.Repository, log *log.Logger) error {
return reflowletOnce.Do(func() error {
if !hasEmbedded() {
return execimage.ErrNoEmbeddedImage
}
d, sz, err := imageDigestAndSize()
if err != nil {
return err
}
reflowletFile, err = repo.Stat(ctx, d)
switch {
case err != nil:
case !reflowletFile.ContentHash.IsZero() && reflowletFile.ContentHash == d:
return nil
// TODO(swami/pboyapalli): The size check shouldn't be needed (since the ContentHash check is superior)
// but for some reason, the SHA values aren't getting set on the object, so we need to investigate why.
case reflowletFile.Size == sz:
return nil
}
// Image doesn't exist in repo (or doesn't match the local image's digest or size), so upload it.
r, err := execimage.EmbeddedLinuxImage()
if err != nil {
return err
}
defer r.Close()
log.Debugf("uploading reflow image (%s) to repo", d.Short())
repoDigest, err := repo.Put(ctx, r)
if err != nil {
return err
}
if repoDigest != d {
return errors.New("digests mismatch")
}
reflowletFile, err = repo.Stat(ctx, d)
return err
})
}
func imageDigestAndSize() (digest.Digest, int64, error) {
err := digestOnce.Do(func() error {
var err error
r, err := execimage.EmbeddedLinuxImage()
if err != nil {
return err
}
localDigest, localSize, err = execimage.DigestAndSize(r)
defer r.Close()
return err
})
return localDigest, localSize, err
}
func hasEmbedded() bool {
_, _, err := imageDigestAndSize()
return err == nil
}
// getTags gets the EC2 tags for the instance.
func (i *instance) getTags() (tags []*ec2.Tag) {
for k, v := range i.InstanceTags {
tags = append(tags, &ec2.Tag{Key: aws.String(k), Value: aws.String(v)})
}
for k, v := range i.Labels {
tags = append(tags, &ec2.Tag{Key: aws.String(k), Value: aws.String(v)})
}
return
}
// configureEBS configures the EBS volume size and number for optimal performance.
func (i *instance) configureEBS() {
if i.NEBS < 1 {
i.NEBS = 1
}
if min, ok := minDiskSizes[i.EBSType]; ok {
perEBS := i.EBSSize / uint64(i.NEBS)
if i.EBSType == ec2.VolumeTypeGp2 && perEBS > gp2PerEBSThresholdGiB {
min = gp2MaxThroughputMinSizeGiB
}
if perEBS < min {
perEBS = min
}
nmin := 1 + int(i.EBSSize/perEBS)
if i.NEBS > nmin {
i.NEBS = nmin
}
i.EBSSize = perEBS * uint64(i.NEBS)
}
}
func (i *instance) bootstrapExpiryArgs() []string {
// We don't want to set too small of an expiry
const minExpiry = 30 * time.Second
expiry := i.BootstrapExpiry
if expiry < minExpiry {
i.Log.Debugf("overriding bootstrap expiry %s with minimum %s", expiry, minExpiry)
expiry = minExpiry
}
return []string{"-expiry", fmt.Sprintf("%s", expiry)}
}
const ReflowletCloudwatchFlushMs = 5000
func (i *instance) launch(ctx context.Context) (string, error) {
// First we need to construct the cloud-config that's passed to
// our instances via EC2's user-data mechanism.
var c cloudConfig
if len(i.SshKeys) == 0 {
i.Log.Debugf("instance launch: missing public SSH key")
} else {
c.SshAuthorizedKeys = i.SshKeys
}
// /etc/ecrlogin contains the login command for ECR.
ecrFile := CloudFile{
Path: "/etc/ecrlogin",
Permissions: "0644",
Owner: "root",
}
var err error
ecrFile.Content, err = ecrauth.Login(context.TODO(), i.Authenticator)
if err != nil {
return "", err
}
c.AppendFile(ecrFile)
// /etc/reflowconfig contains the (YAML) marshaled configuration file
b, err := i.ReflowConfig.Marshal(true)
if err != nil {
return "", err
}
keys := make(infra.Keys)
err = yaml.Unmarshal(b, &keys)
if err != nil {
return "", err
}
// The remote side does not need a cluster implementation.
delete(keys, infra2.Cluster)
// If prometheus metrics are enabled, but a port for metrics is not set,
// then set a default port for reflowlet metrics.
if p := keys[infra2.Metrics]; p != nil {
if m := p.(string); strings.HasPrefix(m, "prometrics") {
var hasPort bool
for _, v := range strings.Split(m, ",") {
if hasPort = strings.HasPrefix(v, "port="); hasPort {
break
}
}
if !hasPort {
keys[infra2.Metrics] = fmt.Sprintf("%s,port=9100", m)
}
}
}
b, err = yaml.Marshal(keys)
if err != nil {
return "", err
}
c.AppendFile(CloudFile{
Path: "/etc/systemd/system.conf",
Permissions: "0644",
Owner: "root",
Content: `
[Manager]
DefaultLimitNOFILE=65536
`,
})
c.AppendFile(CloudFile{
Path: "/etc/reflowconfig",
Permissions: "0644",
Owner: "root",
Content: string(b),
})
// Write the bootstrapping script. It fetches the binary and runs it.
c.AppendFile(CloudFile{
Permissions: "0755",
Path: "/opt/bin/bootstrap",
Owner: "root",
Content: tmpl(`
#!/bin/bash
set -e
bin=/tmp/reflowbootstrap
echo "fetching: {{.binary}}"
curl -s {{.binary}} >$bin
chmod +x $bin
export V23_CREDENTIALS=/opt/.v23
export V23_CREDENTIALS_NO_LOCK=1
export V23_CREDENTIALS_NO_AGENT=1
echo "starting: $bin {{.bootstrapArgs}}"
$bin {{.bootstrapArgs}} || true
sleep 5
exit 1
`, args{
"binary": i.BootstrapImage,
"bootstrapArgs": strings.Join(append(bootstrapArgs, i.bootstrapExpiryArgs()...), " "),
}),
})
// Turn off CoreOS services that would restart or otherwise disrupt the instances.
c.CoreOS.Update.RebootStrategy = "off"
c.AppendUnit(CloudUnit{Name: "update-engine.service", Command: "stop"})
c.AppendUnit(CloudUnit{Name: "locksmithd.service", Command: "stop"})
// Configure the disks.
lvmGroupName := "data"
deviceName := fmt.Sprintf("%s_group/%s_vol", lvmGroupName, lvmGroupName)
if i.NEBS < 1 {
i.NEBS = 1
}
devices := make([]string, i.NEBS)
for idx := range devices {
if i.Config.NVMe {
devices[idx] = fmt.Sprintf("nvme%dn1", idx+1)
} else {
devices[idx] = fmt.Sprintf("xvd%c", 'b'+idx)
}
}
c.AppendUnit(CloudUnit{
Name: fmt.Sprintf("format-%s.service", lvmGroupName),
Command: "start",
Content: tmpl(`
[Unit]
Description=Format /dev/{{.name}}_group/{{.name}}_vol (after setting up LVM RAID0)
After={{range $_, $name := .devices}}dev-{{$name}}.device {{end}}
Requires={{range $_, $name := .devices}}dev-{{$name}}.device {{end}}
[Service]
Type=oneshot
RemainAfterExit=yes
ExecStartPre=/usr/sbin/pvcreate {{range $_, $name := .devices}}/dev/{{$name}} {{end}}
ExecStartPre=/usr/sbin/vgcreate {{.name}}_group {{range $_, $name := .devices}}/dev/{{$name}} {{end}}
ExecStartPre=/usr/bin/echo "sleeping to avoid race condition where lvcreate runs before the volume group is ready"
ExecStartPre=/usr/bin/sleep 1
ExecStartPre=/usr/sbin/lvcreate -l 100%%VG --stripes {{.devices|len}} --stripesize 256 -n {{.name}}_vol {{.name}}_group
ExecStart=-/usr/sbin/mkfs.ext4 /dev/{{.name}}_group/{{.name}}_vol
`, args{"devices": devices, "name": lvmGroupName}),
})
c.AppendUnit(CloudUnit{
Name: "mnt-data.mount",
Command: "start",
Content: tmpl(`
[Unit]
Description=device /dev/{{.name}} on path /mnt/data
{{if .mortal}}
OnFailure=poweroff.target
OnFailureJobMode=replace-irreversibly
{{end}}
[Mount]
What=/dev/{{.name}}
Where=/mnt/data
Type=ext4
Options=data=writeback
`, args{"mortal": !i.Immortal, "name": deviceName}),
})
c.AppendFile(CloudFile{
Path: "/etc/journald-cloudwatch-logs.conf",
Permissions: "0644",
Owner: "root",
Content: tmpl(`log_group = "reflow/reflowlet"
fields = ["_HOSTNAME", "PRIORITY", "MESSAGE"]
queue_poll_duration_ms = 1000
queue_flush_log_ms = {{.flush_log_ms}}
field_length = 1024
`, args{"flush_log_ms": ReflowletCloudwatchFlushMs}),
})
c.AppendUnit(CloudUnit{
Name: "journald-cloudwatch-logs.service",
Enable: true,
Command: "start",
Content: tmpl(`
[Unit]
Description=journald-cloudwatch-logs
Wants=basic.target
After=basic.target network.target
[Service]
LogLevelMax=5
Type=simple
ExecStartPre=/usr/bin/sh -c "/usr/bin/echo 'log_stream = \"'$(curl http://169.254.169.254/latest/meta-data/public-hostname)'\"' | /usr/bin/cat - /etc/journald-cloudwatch-logs.conf > /tmp/journald-cloudwatch-logs.conf"
ExecStartPre=/usr/bin/wget https://github.com/advantageous/systemd-cloud-watch/releases/download/v0.2.1/systemd-cloud-watch_linux -O /tmp/systemd-cloud-watch_linux
ExecStartPre=/usr/bin/chmod +x /tmp/systemd-cloud-watch_linux
ExecStart=/tmp/systemd-cloud-watch_linux /tmp/journald-cloudwatch-logs.conf
Restart=on-failure
RestartSec=30s
`, args{}),
})
var profile, akey, secret, token string
if i.InstanceProfile != "" {
profile = fmt.Sprintf("-a %s", i.InstanceProfile)
} else {
var creds *credentials.Credentials
err := i.ReflowConfig.Instance(&creds)
if err == nil {
if c, err := creds.Get(); err == nil {
akey = fmt.Sprintf("AWS_ACCESS_KEY_ID=%s", c.AccessKeyID)
secret = fmt.Sprintf("AWS_SECRET_ACCESS_KEY=%s", c.SecretAccessKey)
token = fmt.Sprintf("AWS_SESSION_TOKEN=%s", c.SessionToken)
}
}
}
if akey != "" || profile != "" {
c.AppendUnit(CloudUnit{
Name: "aws-xray.service",
Enable: true,
Command: "start",
Content: tmpl(`
[Unit]
Description=xray
Requires=network.target
After=network.target
[Service]
Environment="{{.aws_access_key_id}}"
Environment="{{.aws_secret_access_key}}"
Environment="{{.aws_session_token}}"
Type=simple
ExecStartPre=/usr/bin/wget https://s3.dualstack.us-east-2.amazonaws.com/aws-xray-assets.us-east-2/xray-daemon/aws-xray-daemon-linux-2.x.zip
ExecStartPre=/usr/bin/unzip aws-xray-daemon-linux-2.x.zip -d /tmp
ExecStart=/tmp/xray {{.profile}} -l debug`, args{"profile": profile, "aws_access_key_id": akey, "aws_secret_access_key": secret, "aws_session_token": token})})
}
if i.NodeExporterMetricsPort != 0 {
c.AppendFile(CloudFile{
Permissions: "0755",
Path: "/opt/bin/setup_node_exporter",
Owner: "root",
Content: tmpl(`
#!/bin/bash
cd /tmp
/usr/bin/curl -L https://github.com/prometheus/node_exporter/releases/download/v1.0.1/node_exporter-1.0.1.linux-amd64.tar.gz | tar -xvzf - && mv /tmp/node_exporter-1.0.1.linux-amd64/node_exporter /opt/bin/
rm -rf /tmp/node_exporter-1.0.1.linux-amd64
`, args{})})
c.AppendUnit(CloudUnit{
Name: "node_exporter.service",
Enable: true,
Command: "start",
Content: tmpl(`[Unit]
Description=node_exporter
[Service]
Restart=always
ExecStart=/opt/bin/node_exporter --web.listen-address=":{{.port}}"
ExecStartPre=/opt/bin/setup_node_exporter
[Install]
WantedBy=multi-user.target`, args{"port": i.NodeExporterMetricsPort}),
})
}
// We merge the user's cloud config before appending the bootstrap unit
// so that system units can be run before the bootstrap.
c.Merge(&i.CloudConfig)
c.AppendUnit(CloudUnit{
Name: "reflowlet.service",
Enable: true,
Command: "start",
Content: tmpl(`
[Unit]
Description=reflowlet
Requires=network.target
After=network.target
{{if .mortal}}
OnFailure=poweroff.target
OnFailureJobMode=replace-irreversibly
{{end}}
[Service]
OOMScoreAdjust=-1000
Type=oneshot
ExecStart=/opt/bin/bootstrap
`, args{"mortal": !i.Immortal}),
})
b, err = c.Marshal()
if err != nil {
return "", err
}
// Compress file so that we are below the 16KB limit for user data.
var gb bytes.Buffer
b64e := base64.NewEncoder(base64.StdEncoding, &gb)
ctr := byteCounter{writer: b64e}
gw := gzip.NewWriter(&ctr)
_, err = gw.Write(b)
if err != nil {
return "", err
}
err = gw.Close()
if err != nil {
return "", err
}
err = b64e.Close()
if err != nil {
return "", err
}
if ctr.count > 16<<10 {
return "", errors.New(fmt.Sprintf("size of userdata > 16384: %v ", ctr.count))
}
i.userData = gb.String()
if !i.Spot {
// TODO(swami): Support cycling AZs using appropriate subnets for on-demand instances also.
return i.ec2RunInstance()
}
// TODO(swami): Use spot placement score to determine best availability zone instead of cycling through.
// The spot placement score API is only available in aws-sdk-go-v2: v1.22.0
// https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/ec2@v1.22.0
// As of this change, we are still at: aws-sdk-go v1.38.24
var azs []string
if azs, err = availabilityZones(i.EC2, i.Region); err != nil {
i.Log.Debugf("instance launch: unable to determine AZs (will try without specifying AZ): %v", err)
}
// We shuffle the AZs so that we start with a random one before cycling through them (if unavailable).
rand.Shuffle(len(azs), func(i, j int) { azs[i], azs[j] = azs[j], azs[i] })
if len(azs) == 0 {
azs = append(azs, "")
}
var id string
var errs errors.Multi
for _, az := range azs {
id, err = i.ec2RunSpotInstance(ctx, az)
// In case of any error, we cycle through the AZs.
if err == nil {
break
}
if errors.Is(errors.Unavailable, err) {
i.Log.Debugf("spot instance (type: %s) seems to be unavailable in AZ %s: %v", i.Config.Type, az, err)
} else {
i.Log.Debugf("spot instance (type: %s) failed to launch in AZ %s: %v", i.Config.Type, az, err)
}
errs.Add(err)
}
// If we still have an error, we've exhausted all AZs, so we explicitly consider the instance type unavailable.
if err != nil {
i.Log.Debugf("spot instance (type: %s) seems to be unavailable in AZs (%s)", i.Config.Type, strings.Join(azs, ", "))
err = errors.E(errors.Unavailable, errs.Combined())
}
return id, err
}
type byteCounter struct {
count int64
writer io.Writer
}
func (b *byteCounter) Write(p []byte) (n int, err error) {
n, err = b.writer.Write(p)
if err != nil {
return
}
b.count += int64(n)
return
}
const (
spotReqTtl = 2 * time.Minute
spotReqRetryLim = 5
)
func (i *instance) ec2RunSpotInstance(ctx context.Context, az string) (string, error) {
i.Log.Debugf("generating ec2 spot instance request for instance type %v", i.Config.Type)
// First make a spot instance request.
params := &ec2.RequestSpotInstancesInput{
SpotPrice: aws.String(fmt.Sprintf("%.3f", i.Price)),
LaunchSpecification: &ec2.RequestSpotLaunchSpecification{
ImageId: aws.String(i.AMI),
EbsOptimized: aws.Bool(i.Config.EBSOptimized),
InstanceType: aws.String(i.Config.Type),
BlockDeviceMappings: i.ebsDeviceMappings(),
KeyName: nonemptyString(i.KeyName),
UserData: aws.String(i.userData),
IamInstanceProfile: &ec2.IamInstanceProfileSpecification{
Arn: aws.String(i.InstanceProfile),
},
SecurityGroupIds: []*string{aws.String(i.SecurityGroup)},
},
}
if az != "" {
// Use an availability zone only if specified.
params.LaunchSpecification.Placement = &ec2.SpotPlacement{AvailabilityZone: aws.String(az)}
// And if an availability zone is specified, determine if a specific subnet is known for it.
if subnet := subnetForAZ(az); subnet != "" {
params.LaunchSpecification.SubnetId = aws.String(subnet)
}
}
var (
policy = retry.MaxRetries(retry.Jitter(retry.Backoff(5*time.Second, 10*time.Second, 1.2), 0.2), spotReqRetryLim)
resp *ec2.RequestSpotInstancesOutput
err error