-
Notifications
You must be signed in to change notification settings - Fork 847
/
command.go
2110 lines (1784 loc) · 67.3 KB
/
command.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package atccmd
import (
"context"
"crypto/tls"
"crypto/x509"
"database/sql"
"errors"
"fmt"
"math/rand"
"net"
"net/http"
_ "net/http/pprof"
"net/url"
"os"
"strings"
"time"
"code.cloudfoundry.org/clock"
"code.cloudfoundry.org/lager/v3"
"code.cloudfoundry.org/lager/v3/lagerctx"
"github.com/concourse/concourse"
"github.com/concourse/concourse/atc"
"github.com/concourse/concourse/atc/api"
"github.com/concourse/concourse/atc/api/accessor"
"github.com/concourse/concourse/atc/api/auth"
"github.com/concourse/concourse/atc/api/buildserver"
"github.com/concourse/concourse/atc/api/containerserver"
"github.com/concourse/concourse/atc/api/pipelineserver"
"github.com/concourse/concourse/atc/api/policychecker"
"github.com/concourse/concourse/atc/auditor"
"github.com/concourse/concourse/atc/builds"
"github.com/concourse/concourse/atc/component"
"github.com/concourse/concourse/atc/compression"
"github.com/concourse/concourse/atc/creds"
"github.com/concourse/concourse/atc/creds/noop"
"github.com/concourse/concourse/atc/db"
"github.com/concourse/concourse/atc/db/encryption"
"github.com/concourse/concourse/atc/db/lock"
"github.com/concourse/concourse/atc/db/migration"
"github.com/concourse/concourse/atc/engine"
"github.com/concourse/concourse/atc/gc"
"github.com/concourse/concourse/atc/lidar"
"github.com/concourse/concourse/atc/metric"
"github.com/concourse/concourse/atc/pauser"
"github.com/concourse/concourse/atc/policy"
"github.com/concourse/concourse/atc/scheduler"
"github.com/concourse/concourse/atc/scheduler/algorithm"
"github.com/concourse/concourse/atc/syslog"
"github.com/concourse/concourse/atc/util"
"github.com/concourse/concourse/atc/worker"
"github.com/concourse/concourse/atc/wrappa"
"github.com/concourse/concourse/skymarshal/dexserver"
"github.com/concourse/concourse/skymarshal/legacyserver"
"github.com/concourse/concourse/skymarshal/skycmd"
"github.com/concourse/concourse/skymarshal/skyserver"
"github.com/concourse/concourse/skymarshal/storage"
"github.com/concourse/concourse/skymarshal/token"
"github.com/concourse/concourse/tracing"
"github.com/concourse/concourse/web"
"github.com/concourse/flag/v2"
"github.com/cppforlife/go-semi-semantic/version"
"github.com/go-jose/go-jose/v3/jwt"
"github.com/hashicorp/go-multierror"
"github.com/jessevdk/go-flags"
gocache "github.com/patrickmn/go-cache"
"github.com/tedsuo/ifrit"
"github.com/tedsuo/ifrit/grouper"
"github.com/tedsuo/ifrit/http_server"
"github.com/tedsuo/ifrit/sigmon"
"golang.org/x/crypto/acme"
"golang.org/x/crypto/acme/autocert"
"golang.org/x/oauth2"
"golang.org/x/time/rate"
"gopkg.in/yaml.v2"
// dynamically registered metric emitters
_ "github.com/concourse/concourse/atc/metric/emitter"
// dynamically registered policy checkers
_ "github.com/concourse/concourse/atc/policy/opa"
// dynamically registered credential managers
_ "github.com/concourse/concourse/atc/creds/conjur"
_ "github.com/concourse/concourse/atc/creds/credhub"
_ "github.com/concourse/concourse/atc/creds/dummy"
_ "github.com/concourse/concourse/atc/creds/kubernetes"
_ "github.com/concourse/concourse/atc/creds/secretsmanager"
_ "github.com/concourse/concourse/atc/creds/ssm"
_ "github.com/concourse/concourse/atc/creds/vault"
)
const algorithmLimitRows = 100
var schedulerCache = gocache.New(10*time.Second, 10*time.Second)
var defaultDriverName = "postgres"
var retryingDriverName = "too-many-connections-retrying"
var flyClientID = "fly"
var flyClientSecret = "Zmx5"
type ATCCommand struct {
RunCommand RunCommand `command:"run"`
Migration Migration `command:"migrate"`
}
type RunCommand struct {
Logger flag.Lager
varSourcePool creds.VarSourcePool
BindIP flag.IP `long:"bind-ip" default:"0.0.0.0" description:"IP address on which to listen for web traffic."`
BindPort uint16 `long:"bind-port" default:"8080" description:"Port on which to listen for HTTP traffic."`
TLSBindPort uint16 `long:"tls-bind-port" description:"Port on which to listen for HTTPS traffic."`
TLSCert flag.File `long:"tls-cert" description:"File containing an SSL certificate."`
TLSKey flag.File `long:"tls-key" description:"File containing an RSA private key, used to encrypt HTTPS traffic."`
TLSCaCert flag.File `long:"tls-ca-cert" description:"File containing the client CA certificate, enables mTLS"`
LetsEncrypt struct {
Enable bool `long:"enable-lets-encrypt" description:"Automatically configure TLS certificates via Let's Encrypt/ACME."`
ACMEURL flag.URL `long:"lets-encrypt-acme-url" description:"URL of the ACME CA directory endpoint." default:"https://acme-v02.api.letsencrypt.org/directory"`
} `group:"Let's Encrypt Configuration"`
ExternalURL flag.URL `long:"external-url" description:"URL used to reach any ATC from the outside world."`
Postgres flag.PostgresConfig `group:"PostgreSQL Configuration" namespace:"postgres"`
ConcurrentRequestLimits map[wrappa.LimitedRoute]int `long:"concurrent-request-limit" description:"Limit the number of concurrent requests to an API endpoint (Example: ListAllJobs:5)"`
APIMaxOpenConnections int `long:"api-max-conns" description:"The maximum number of open connections for the api connection pool." default:"10"`
BackendMaxOpenConnections int `long:"backend-max-conns" description:"The maximum number of open connections for the backend connection pool." default:"50"`
CredentialManagement creds.CredentialManagementConfig `group:"Credential Management"`
CredentialManagers creds.Managers
EncryptionKey flag.Cipher `long:"encryption-key" description:"A 16 or 32 length key used to encrypt sensitive information before storing it in the database."`
OldEncryptionKey flag.Cipher `long:"old-encryption-key" description:"Encryption key previously used for encrypting sensitive information. If provided without a new key, data is encrypted. If provided with a new key, data is re-encrypted."`
DebugBindIP flag.IP `long:"debug-bind-ip" default:"127.0.0.1" description:"IP address on which to listen for the pprof debugger endpoints."`
DebugBindPort uint16 `long:"debug-bind-port" default:"8079" description:"Port on which to listen for the pprof debugger endpoints."`
InterceptIdleTimeout time.Duration `long:"intercept-idle-timeout" default:"0m" description:"Length of time for a intercepted session to be idle before terminating."`
ComponentRunnerInterval time.Duration `long:"component-runner-interval" default:"10s" description:"Interval on which runners are kicked off for builds, locks, scans, and checks"`
LidarScannerInterval time.Duration `long:"lidar-scanner-interval" default:"10s" description:"Interval on which the resource scanner will run to see if new checks need to be scheduled"`
GlobalResourceCheckTimeout time.Duration `long:"global-resource-check-timeout" default:"1h" description:"Time limit on checking for new versions of resources."`
ResourceCheckingInterval time.Duration `long:"resource-checking-interval" default:"1m" description:"Interval on which to check for new versions of resources."`
ResourceTypeCheckingInterval time.Duration `long:"resource-type-checking-interval" default:"1m" description:"Interval on which to check for new versions of resource types."`
ResourceWithWebhookCheckingInterval time.Duration `long:"resource-with-webhook-checking-interval" default:"1m" description:"Interval on which to check for new versions of resources that has webhook defined."`
MaxChecksPerSecond int `long:"max-checks-per-second" description:"Maximum number of checks that can be started per second. If not specified, this will be calculated as (# of resources)/(resource checking interval). -1 value will remove this maximum limit of checks per second."`
PausePipelinesAfter int `long:"pause-pipelines-after" default:"0" description:"The number of days after which a pipeline will be automatically paused if none of its jobs have run in more than the given number of days. A value of zero disables this component."`
PipelinePauserInterval time.Duration `long:"pipeline-pauser-interval" default:"24h" hidden:"true" description:"The frequency on which the Pipeline Pauser component will be run to check if any pipelines need to be paused."`
ContainerPlacementStrategyOptions worker.PlacementOptions `group:"Container Placement Strategy"`
BaggageclaimResponseHeaderTimeout time.Duration `long:"baggageclaim-response-header-timeout" default:"1m" description:"How long to wait for Baggageclaim to send the response header."`
StreamingArtifactsCompression string `long:"streaming-artifacts-compression" default:"gzip" choice:"gzip" choice:"zstd" choice:"raw" description:"Compression algorithm for internal streaming."`
StreamingSizeLimitationInMB float64 `long:"streaming-size-limitation" default:"0.0" description:"Internal volume streaming size limitation in MB. In case of small limitation needed, float can be used like 0.01."`
GardenRequestTimeout time.Duration `long:"garden-request-timeout" default:"5m" description:"How long to wait for requests to Garden to complete. 0 means no timeout."`
CLIArtifactsDir flag.Dir `long:"cli-artifacts-dir" description:"Directory containing downloadable CLI binaries."`
WebPublicDir flag.Dir `long:"web-public-dir" description:"Web public/ directory to serve live for local development."`
Metrics struct {
HostName string `long:"metrics-host-name" description:"Host string to attach to emitted metrics."`
Attributes map[string]string `long:"metrics-attribute" description:"A key-value attribute to attach to emitted metrics. Can be specified multiple times." value-name:"NAME:VALUE"`
BufferSize uint32 `long:"metrics-buffer-size" default:"1000" description:"The size of the buffer used in emitting event metrics."`
CaptureErrorMetrics bool `long:"capture-error-metrics" description:"Enable capturing of error log metrics"`
} `group:"Metrics & Diagnostics"`
Tracing tracing.Config `group:"Tracing" namespace:"tracing"`
PolicyCheckers struct {
Filter policy.Filter
} `group:"Policy Checking"`
Server struct {
XFrameOptions string `long:"x-frame-options" default:"deny" description:"The value to set for the X-Frame-Options header."`
ContentSecurityPolicy string `long:"content-security-policy" default:"frame-ancestors 'none'" description:"The value to set for the Content-Security-Policy header."`
ClusterName string `long:"cluster-name" description:"A name for this Concourse cluster, to be displayed on the dashboard page."`
ClientID string `long:"client-id" default:"concourse-web" description:"Client ID to use for login flow"`
ClientSecret string `long:"client-secret" required:"true" description:"Client secret to use for login flow"`
} `group:"Web Server"`
LogDBQueries bool `long:"log-db-queries" description:"Log database queries."`
LogClusterName bool `long:"log-cluster-name" description:"Log cluster name."`
GC struct {
Interval time.Duration `long:"interval" default:"30s" description:"Interval on which to perform garbage collection."`
OneOffBuildGracePeriod time.Duration `long:"one-off-grace-period" default:"5m" description:"Period after which one-off build containers will be garbage-collected."`
MissingGracePeriod time.Duration `long:"missing-grace-period" default:"5m" description:"Period after which to reap containers and volumes that were created but went missing from the worker."`
HijackGracePeriod time.Duration `long:"hijack-grace-period" default:"5m" description:"Period after which hijacked containers will be garbage collected"`
FailedGracePeriod time.Duration `long:"failed-grace-period" default:"120h" description:"Period after which failed containers will be garbage collected"`
CheckRecyclePeriod time.Duration `long:"check-recycle-period" default:"1m" description:"Period after which to reap checks that are completed."`
VarSourceRecyclePeriod time.Duration `long:"var-source-recycle-period" default:"5m" description:"Period after which to reap var_sources that are not used."`
} `group:"Garbage Collection" namespace:"gc"`
BuildTrackerInterval time.Duration `long:"build-tracker-interval" default:"10s" description:"Interval on which to run build tracking."`
TelemetryOptIn bool `long:"telemetry-opt-in" hidden:"true" description:"Enable anonymous concourse version reporting."`
DefaultBuildLogsToRetain uint64 `long:"default-build-logs-to-retain" description:"Default build logs to retain, 0 means all"`
MaxBuildLogsToRetain uint64 `long:"max-build-logs-to-retain" description:"Maximum build logs to retain, 0 means not specified. Will override values configured in jobs"`
DefaultDaysToRetainBuildLogs uint64 `long:"default-days-to-retain-build-logs" description:"Default days to retain build logs. 0 means unlimited"`
MaxDaysToRetainBuildLogs uint64 `long:"max-days-to-retain-build-logs" description:"Maximum days to retain build logs, 0 means not specified. Will override values configured in jobs"`
JobSchedulingMaxInFlight uint64 `long:"job-scheduling-max-in-flight" default:"32" description:"Maximum number of jobs to be scheduling at the same time"`
DefaultCpuLimit *int `long:"default-task-cpu-limit" description:"Default max number of cpu shares per task, 0 means unlimited"`
DefaultMemoryLimit *string `long:"default-task-memory-limit" description:"Default maximum memory per task, 0 means unlimited"`
Auditor struct {
EnableBuildAuditLog bool `long:"enable-build-auditing" description:"Enable auditing for all api requests connected to builds."`
EnableContainerAuditLog bool `long:"enable-container-auditing" description:"Enable auditing for all api requests connected to containers."`
EnableJobAuditLog bool `long:"enable-job-auditing" description:"Enable auditing for all api requests connected to jobs."`
EnablePipelineAuditLog bool `long:"enable-pipeline-auditing" description:"Enable auditing for all api requests connected to pipelines."`
EnableResourceAuditLog bool `long:"enable-resource-auditing" description:"Enable auditing for all api requests connected to resources."`
EnableSystemAuditLog bool `long:"enable-system-auditing" description:"Enable auditing for all api requests connected to system transactions."`
EnableTeamAuditLog bool `long:"enable-team-auditing" description:"Enable auditing for all api requests connected to teams."`
EnableWorkerAuditLog bool `long:"enable-worker-auditing" description:"Enable auditing for all api requests connected to workers."`
EnableVolumeAuditLog bool `long:"enable-volume-auditing" description:"Enable auditing for all api requests connected to volumes."`
}
Syslog struct {
Hostname string `long:"syslog-hostname" description:"Client hostname with which the build logs will be sent to the syslog server." default:"atc-syslog-drainer"`
Address string `long:"syslog-address" description:"Remote syslog server address with port (Example: 0.0.0.0:514)."`
Transport string `long:"syslog-transport" description:"Transport protocol for syslog messages (Currently supporting tcp, udp & tls)."`
DrainInterval time.Duration `long:"syslog-drain-interval" description:"Interval over which checking is done for new build logs to send to syslog server (duration measurement units are s/m/h; eg. 30s/30m/1h)" default:"30s"`
CACerts []string `long:"syslog-ca-cert" description:"Paths to PEM-encoded CA cert files to use to verify the Syslog server SSL cert."`
} ` group:"Syslog Drainer Configuration"`
Auth struct {
AuthFlags skycmd.AuthFlags
MainTeamFlags skycmd.AuthTeamFlags `group:"Authentication (Main Team)" namespace:"main-team"`
} `group:"Authentication"`
ConfigRBAC flag.File `long:"config-rbac" description:"Customize RBAC role-action mapping."`
SystemClaimKey string `long:"system-claim-key" default:"aud" description:"The token claim key to use when matching system-claim-values"`
SystemClaimValues []string `long:"system-claim-value" default:"concourse-worker" description:"Configure which token requests should be considered 'system' requests."`
FeatureFlags struct {
EnableGlobalResources bool `long:"enable-global-resources" description:"Enable equivalent resources across pipelines and teams to share a single version history."`
EnableRedactSecrets bool `long:"enable-redact-secrets" description:"Enable redacting secrets in build logs."`
EnableBuildRerunWhenWorkerDisappears bool `long:"enable-rerun-when-worker-disappears" description:"Enable automatically build rerun when worker disappears or a network error occurs"`
EnableAcrossStep bool `long:"enable-across-step" description:"Enable the experimental across step to be used in jobs. The API is subject to change."`
EnablePipelineInstances bool `long:"enable-pipeline-instances" description:"Enable pipeline instances"`
EnableP2PVolumeStreaming bool `long:"enable-p2p-volume-streaming" description:"Enable P2P volume streaming. NOTE: All workers must be on the same LAN network"`
EnableCacheStreamedVolumes bool `long:"enable-cache-streamed-volumes" description:"When enabled, streamed resource volumes will be cached on the destination worker."`
EnableResourceCausality bool `long:"enable-resource-causality" description:"Enable the resource causality page. Computing causality can be expensive for the database. "`
} `group:"Feature Flags"`
BaseResourceTypeDefaults flag.File `long:"base-resource-type-defaults" description:"Base resource type defaults"`
P2pVolumeStreamingTimeout time.Duration `long:"p2p-volume-streaming-timeout" description:"Timeout value of p2p volume streaming" default:"15m"`
DisplayUserIdPerConnector map[string]string `long:"display-user-id-per-connector" description:"Define how to display user ID for each authentication connector. Format is <connector>:<fieldname>. Valid field names are user_id, name, username and email, where name maps to claims field username, and username maps to claims field preferred username"`
DefaultGetTimeout time.Duration `long:"default-get-timeout" description:"Default timeout of get steps"`
DefaultPutTimeout time.Duration `long:"default-put-timeout" description:"Default timeout of put steps"`
DefaultTaskTimeout time.Duration `long:"default-task-timeout" description:"Default timeout of task steps"`
NumGoroutineThreshold int `long:"num-goroutine-threshold" description:"When number of goroutines reaches to this threshold, then slow down current ATC. This helps distribute workloads across ATCs evenly."`
DBNotificationBusQueueSize int `long:"db-notification-bus-queue-size" default:"10000" description:"DB notification bus queue size, default is 10000. If UI often misses loading running build logs, then consider to increase the queue size."`
}
type Migration struct {
lockFactory lock.LockFactory
Postgres flag.PostgresConfig `group:"PostgreSQL Configuration" namespace:"postgres"`
EncryptionKey flag.Cipher `long:"encryption-key" description:"A 16 or 32 length key used to encrypt sensitive information before storing it in the database."`
OldEncryptionKey flag.Cipher `long:"old-encryption-key" description:"Encryption key previously used for encrypting sensitive information. If provided without a new key, data is decrypted. If provided with a new key, data is re-encrypted."`
CurrentDBVersion bool `long:"current-db-version" description:"Print the current database version and exit"`
SupportedDBVersion bool `long:"supported-db-version" description:"Print the max supported database version and exit"`
MigrateDBToVersion int `long:"migrate-db-to-version" description:"Migrate to the specified database version and exit"`
MigrateToLatestVersion bool `long:"migrate-to-latest-version" description:"Migrate to the latest migration version and exit"`
}
func (m *Migration) Execute(args []string) error {
lockConns, err := constructLockConns(defaultDriverName, m.Postgres.ConnectionString())
if err != nil {
return err
}
defer func() {
for _, conn := range lockConns {
conn.Close()
}
}()
m.lockFactory = lock.NewLockFactory(lockConns, metric.LogLockAcquired, metric.LogLockReleased)
if m.MigrateToLatestVersion {
return m.migrateToLatestVersion()
}
if m.CurrentDBVersion {
return m.currentDBVersion()
}
if m.SupportedDBVersion {
return m.supportedDBVersion()
}
if m.MigrateDBToVersion > 0 {
return m.migrateDBToVersion()
}
if m.OldEncryptionKey.AEAD != nil {
return m.rotateEncryptionKey()
}
return errors.New("must specify one of `--migrate-to-latest-version`, `--current-db-version`, `--supported-db-version`, `--migrate-db-to-version`, or `--old-encryption-key`")
}
func (cmd *Migration) currentDBVersion() error {
helper := migration.NewOpenHelper(
defaultDriverName,
cmd.Postgres.ConnectionString(),
cmd.lockFactory,
nil,
nil,
)
version, err := helper.CurrentVersion()
if err != nil {
return err
}
fmt.Println(version)
return nil
}
func (cmd *Migration) supportedDBVersion() error {
helper := migration.NewOpenHelper(
defaultDriverName,
cmd.Postgres.ConnectionString(),
cmd.lockFactory,
nil,
nil,
)
version, err := helper.SupportedVersion()
if err != nil {
return err
}
fmt.Println(version)
return nil
}
func (cmd *Migration) migrateDBToVersion() error {
version := cmd.MigrateDBToVersion
var newKey *encryption.Key
var oldKey *encryption.Key
if cmd.EncryptionKey.AEAD != nil {
newKey = encryption.NewKey(cmd.EncryptionKey.AEAD)
}
if cmd.OldEncryptionKey.AEAD != nil {
oldKey = encryption.NewKey(cmd.OldEncryptionKey.AEAD)
}
helper := migration.NewOpenHelper(
defaultDriverName,
cmd.Postgres.ConnectionString(),
cmd.lockFactory,
newKey,
oldKey,
)
err := helper.MigrateToVersion(version)
if err != nil {
return fmt.Errorf("could not migrate to version: %d Reason: %s", version, err.Error())
}
fmt.Println("Successfully migrated to version:", version)
return nil
}
func (cmd *Migration) rotateEncryptionKey() error {
var newKey *encryption.Key
var oldKey *encryption.Key
if cmd.EncryptionKey.AEAD != nil {
newKey = encryption.NewKey(cmd.EncryptionKey.AEAD)
}
if cmd.OldEncryptionKey.AEAD != nil {
oldKey = encryption.NewKey(cmd.OldEncryptionKey.AEAD)
}
helper := migration.NewOpenHelper(
defaultDriverName,
cmd.Postgres.ConnectionString(),
cmd.lockFactory,
newKey,
oldKey,
)
version, err := helper.CurrentVersion()
if err != nil {
return err
}
return helper.MigrateToVersion(version)
}
func (cmd *Migration) migrateToLatestVersion() error {
helper := migration.NewOpenHelper(
defaultDriverName,
cmd.Postgres.ConnectionString(),
cmd.lockFactory,
nil,
nil,
)
version, err := helper.SupportedVersion()
if err != nil {
return err
}
return helper.MigrateToVersion(version)
}
func (cmd *ATCCommand) WireDynamicFlags(commandFlags *flags.Command) {
cmd.RunCommand.WireDynamicFlags(commandFlags)
}
func (cmd *RunCommand) WireDynamicFlags(commandFlags *flags.Command) {
var (
metricsGroup *flags.Group
policyChecksGroup *flags.Group
credsGroup *flags.Group
authGroup *flags.Group
)
groups := commandFlags.Groups()
for i := 0; i < len(groups); i++ {
group := groups[i]
if credsGroup == nil && group.ShortDescription == "Credential Management" {
credsGroup = group
}
if metricsGroup == nil && group.ShortDescription == "Metrics & Diagnostics" {
metricsGroup = group
}
if policyChecksGroup == nil && group.ShortDescription == "Policy Checking" {
policyChecksGroup = group
}
if authGroup == nil && group.ShortDescription == "Authentication" {
authGroup = group
}
if metricsGroup != nil && credsGroup != nil && authGroup != nil && policyChecksGroup != nil {
break
}
groups = append(groups, group.Groups()...)
}
if metricsGroup == nil {
panic("could not find Metrics & Diagnostics group for registering emitters")
}
if policyChecksGroup == nil {
panic("could not find Policy Checking group for registering policy checkers")
}
if credsGroup == nil {
panic("could not find Credential Management group for registering managers")
}
if authGroup == nil {
panic("could not find Authentication group for registering connectors")
}
managerConfigs := make(creds.Managers)
for name, p := range creds.ManagerFactories() {
managerConfigs[name] = p.AddConfig(credsGroup)
}
cmd.CredentialManagers = managerConfigs
metric.Metrics.WireEmitters(metricsGroup)
policy.WireCheckers(policyChecksGroup)
skycmd.WireConnectors(authGroup)
skycmd.WireTeamConnectors(authGroup.Find("Authentication (Main Team)"))
}
func (cmd *RunCommand) Execute(args []string) error {
runner, err := cmd.Runner(args)
if err != nil {
return err
}
return <-ifrit.Invoke(sigmon.New(runner)).Wait()
}
func (cmd *RunCommand) Runner(positionalArguments []string) (ifrit.Runner, error) {
if cmd.ExternalURL.URL == nil {
cmd.ExternalURL = cmd.DefaultURL()
}
if len(positionalArguments) != 0 {
return nil, fmt.Errorf("unexpected positional arguments: %v", positionalArguments)
}
err := cmd.validate()
if err != nil {
return nil, err
}
logger, reconfigurableSink := cmd.Logger.Logger("atc")
if cmd.LogClusterName {
logger = logger.WithData(lager.Data{
"cluster": cmd.Server.ClusterName,
})
}
commandSession := logger.Session("cmd")
startTime := time.Now()
commandSession.Info("start")
defer commandSession.Info("finish", lager.Data{
"duration": time.Since(startTime),
})
atc.EnableGlobalResources = cmd.FeatureFlags.EnableGlobalResources
atc.EnableRedactSecrets = cmd.FeatureFlags.EnableRedactSecrets
atc.EnableBuildRerunWhenWorkerDisappears = cmd.FeatureFlags.EnableBuildRerunWhenWorkerDisappears
atc.EnableAcrossStep = cmd.FeatureFlags.EnableAcrossStep
atc.EnablePipelineInstances = cmd.FeatureFlags.EnablePipelineInstances
atc.EnableCacheStreamedVolumes = cmd.FeatureFlags.EnableCacheStreamedVolumes
atc.EnableResourceCausality = cmd.FeatureFlags.EnableResourceCausality
atc.DefaultCheckInterval = cmd.ResourceCheckingInterval
atc.DefaultWebhookInterval = cmd.ResourceWithWebhookCheckingInterval
atc.DefaultResourceTypeInterval = cmd.ResourceTypeCheckingInterval
if cmd.BaseResourceTypeDefaults.Path() != "" {
content, err := os.ReadFile(cmd.BaseResourceTypeDefaults.Path())
if err != nil {
return nil, err
}
defaults := map[string]atc.Source{}
err = yaml.Unmarshal(content, &defaults)
if err != nil {
return nil, err
}
atc.LoadBaseResourceTypeDefaults(defaults)
}
err = db.SetNotificationBusQueueSize(cmd.DBNotificationBusQueueSize)
if err != nil {
return nil, err
}
//FIXME: These only need to run once for the entire binary. At the moment,
//they rely on state of the command.
db.SetupConnectionRetryingDriver(
"postgres",
cmd.Postgres.ConnectionString(),
retryingDriverName,
)
// Register the sink that collects error metrics
if cmd.Metrics.CaptureErrorMetrics {
errorSinkCollector := metric.NewErrorSinkCollector(
logger,
metric.Metrics,
)
logger.RegisterSink(&errorSinkCollector)
}
err = cmd.Tracing.Prepare()
if err != nil {
return nil, err
}
// Connection tracker is off by default. Can be turned on/ff at runtime.
http.HandleFunc("/debug/connections", func(w http.ResponseWriter, r *http.Request) {
for _, stack := range db.GlobalConnectionTracker.Current() {
fmt.Fprintln(w, stack)
}
})
http.HandleFunc("/debug/connections/on", func(w http.ResponseWriter, r *http.Request) {
db.InitConnectionTracker(true)
})
http.HandleFunc("/debug/connections/off", func(w http.ResponseWriter, r *http.Request) {
db.InitConnectionTracker(false)
})
if err := cmd.configureMetrics(logger); err != nil {
return nil, err
}
lockConns, err := constructLockConns(retryingDriverName, cmd.Postgres.ConnectionString())
if err != nil {
return nil, err
}
lockFactory := lock.NewLockFactory(lockConns, metric.LogLockAcquired, metric.LogLockReleased)
apiConn, err := cmd.constructDBConn(retryingDriverName, logger, cmd.APIMaxOpenConnections, cmd.APIMaxOpenConnections/2, "api", lockFactory)
if err != nil {
return nil, err
}
backendConn, err := cmd.constructDBConn(retryingDriverName, logger, cmd.BackendMaxOpenConnections, cmd.BackendMaxOpenConnections/2, "backend", lockFactory)
if err != nil {
return nil, err
}
gcConn, err := cmd.constructDBConn(retryingDriverName, logger, 5, 2, "gc", lockFactory)
if err != nil {
return nil, err
}
workerConn, err := cmd.constructDBConn(retryingDriverName, logger, 1, 1, "worker", lockFactory)
if err != nil {
return nil, err
}
err = db.CacheWarmUp(backendConn)
if err != nil {
return nil, err
}
storage, err := storage.NewPostgresStorage(logger, cmd.Postgres)
if err != nil {
return nil, err
}
secretManager, err := cmd.secretManager(logger)
if err != nil {
return nil, err
}
cmd.varSourcePool = creds.NewVarSourcePool(
logger.Session("var-source-pool"),
cmd.CredentialManagement,
cmd.GC.VarSourceRecyclePeriod,
1*time.Minute,
clock.NewClock(),
)
members, err := cmd.constructMembers(logger, reconfigurableSink, apiConn, workerConn, backendConn, gcConn, storage, lockFactory, secretManager)
if err != nil {
return nil, err
}
members = append(members, grouper.Member{
Name: "periodic-metrics",
Runner: metric.PeriodicallyEmit(
logger.Session("periodic-metrics"),
metric.Metrics,
10*time.Second,
),
})
onReady := func() {
logData := lager.Data{
"http": cmd.nonTLSBindAddr(),
"debug": cmd.debugBindAddr(),
}
if cmd.isTLSEnabled() {
logData["https"] = cmd.tlsBindAddr()
}
logger.Info("listening", logData)
}
onExit := func() {
for _, closer := range []Closer{apiConn, backendConn, gcConn, storage, workerConn} {
closer.Close()
}
for _, closer := range lockConns {
closer.Close()
}
cmd.varSourcePool.Close()
}
return run(grouper.NewParallel(os.Interrupt, members), onReady, onExit), nil
}
func (cmd *RunCommand) constructMembers(
logger lager.Logger,
reconfigurableSink *lager.ReconfigurableSink,
apiConn db.Conn,
workerConn db.Conn,
backendConn db.Conn,
gcConn db.Conn,
storage storage.Storage,
lockFactory lock.LockFactory,
secretManager creds.Secrets,
) ([]grouper.Member, error) {
if cmd.TelemetryOptIn {
url := fmt.Sprintf("http://telemetry.concourse-ci.org/?version=%s", concourse.Version)
go func() {
_, err := http.Get(url)
if err != nil {
logger.Error("telemetry-version", err)
}
}()
}
policyChecker, err := policy.Initialize(logger, cmd.Server.ClusterName, concourse.Version, cmd.PolicyCheckers.Filter)
if err != nil {
return nil, err
}
workerCache, err := db.NewWorkerCache(logger.Session("worker-cache"), backendConn, 1*time.Minute)
if err != nil {
return nil, err
}
checkBuildsChan := make(chan db.Build, 2000)
apiMembers, err := cmd.constructAPIMembers(logger, reconfigurableSink, apiConn, workerConn, storage, lockFactory, secretManager, policyChecker, workerCache, checkBuildsChan)
if err != nil {
return nil, err
}
backendComponents, err := cmd.backendComponents(logger, backendConn, lockFactory, secretManager, policyChecker, workerCache, checkBuildsChan)
if err != nil {
return nil, err
}
gcComponents, err := cmd.gcComponents(logger, gcConn, lockFactory)
if err != nil {
return nil, err
}
// use backendConn so that the Component objects created by the factory uses
// the backend connection pool when reloading.
componentFactory := db.NewComponentFactory(
backendConn,
cmd.NumGoroutineThreshold,
rander{},
clock.NewClock(),
db.RealGoroutineCounter{})
bus := backendConn.Bus()
members := apiMembers
components := append(backendComponents, gcComponents...)
for _, c := range components {
dbComponent, err := componentFactory.CreateOrUpdate(c.Component)
if err != nil {
return nil, err
}
componentLogger := logger.Session(c.Component.Name)
members = append(members, grouper.Member{
Name: c.Component.Name,
Runner: &component.Runner{
Logger: componentLogger,
Interval: cmd.ComponentRunnerInterval,
Component: dbComponent,
Bus: bus,
Schedulable: &component.Coordinator{
Locker: lockFactory,
Component: dbComponent,
Runnable: c.Runnable,
},
},
})
if drainable, ok := c.Runnable.(component.Drainable); ok {
members = append(members, grouper.Member{
Name: c.Component.Name + "-drainer",
Runner: drainRunner{
logger: componentLogger.Session("drain"),
drainer: drainable,
},
})
}
}
return members, nil
}
func (cmd *RunCommand) constructAPIMembers(
logger lager.Logger,
reconfigurableSink *lager.ReconfigurableSink,
dbConn db.Conn,
workerConn db.Conn,
storage storage.Storage,
lockFactory lock.LockFactory,
secretManager creds.Secrets,
policyChecker policy.Checker,
workerCache *db.WorkerCache,
checkBuildsChan chan db.Build,
) ([]grouper.Member, error) {
httpClient, err := cmd.skyHttpClient()
if err != nil {
return nil, err
}
teamFactory := db.NewTeamFactory(dbConn, lockFactory)
workerTeamFactory := db.NewTeamFactory(workerConn, lockFactory)
_, err = teamFactory.CreateDefaultTeamIfNotExists()
if err != nil {
return nil, err
}
err = cmd.configureAuthForDefaultTeam(teamFactory)
if err != nil {
return nil, err
}
userFactory := db.NewUserFactory(dbConn)
dbResourceConfigFactory := db.NewResourceConfigFactory(dbConn, lockFactory)
pool, err := cmd.constructPool(dbConn, lockFactory, workerCache)
if err != nil {
return nil, err
}
// The worker factory has its own connection pool (for worker registration)
dbWorkerFactory := db.NewWorkerFactory(workerConn, workerCache)
credsManagers := cmd.CredentialManagers
dbPipelineFactory := db.NewPipelineFactory(dbConn, lockFactory)
dbJobFactory := db.NewJobFactory(dbConn, lockFactory)
dbResourceFactory := db.NewResourceFactory(dbConn, lockFactory)
dbContainerRepository := db.NewContainerRepository(dbConn)
dbVolumeRepository := db.NewVolumeRepository(dbConn)
gcContainerDestroyer := gc.NewDestroyer(logger, dbContainerRepository, dbVolumeRepository)
dbBuildFactory := db.NewBuildFactory(dbConn, lockFactory, cmd.GC.OneOffBuildGracePeriod, cmd.GC.FailedGracePeriod)
dbCheckFactory := db.NewCheckFactory(dbConn, lockFactory, secretManager, cmd.varSourcePool, checkBuildsChan, nil)
dbAccessTokenFactory := db.NewAccessTokenFactory(dbConn)
dbClock := db.NewClock()
dbWall := db.NewWall(dbConn, &dbClock)
tokenVerifier := cmd.constructTokenVerifier(dbAccessTokenFactory)
teamsCacher := accessor.NewTeamsCacher(
logger,
dbConn.Bus(),
teamFactory,
time.Minute,
time.Minute,
)
displayUserIdGenerator, err := skycmd.NewSkyDisplayUserIdGenerator(cmd.DisplayUserIdPerConnector)
if err != nil {
return nil, err
}
accessFactory := accessor.NewAccessFactory(
tokenVerifier,
teamsCacher,
cmd.SystemClaimKey,
cmd.SystemClaimValues,
displayUserIdGenerator,
)
middleware := token.NewMiddleware(cmd.Auth.AuthFlags.SecureCookies)
apiHandler, err := cmd.constructAPIHandler(
logger,
reconfigurableSink,
teamFactory,
workerTeamFactory,
dbPipelineFactory,
dbJobFactory,
dbResourceFactory,
dbWorkerFactory,
dbVolumeRepository,
dbContainerRepository,
gcContainerDestroyer,
dbBuildFactory,
dbCheckFactory,
dbResourceConfigFactory,
userFactory,
pool,
secretManager,
credsManagers,
accessFactory,
dbWall,
policyChecker,
)
if err != nil {
return nil, err
}
webHandler, err := cmd.constructWebHandler(logger)
if err != nil {
return nil, err
}
authHandler, err := cmd.constructAuthHandler(
logger,
storage,
dbAccessTokenFactory,
userFactory,
displayUserIdGenerator,
)
if err != nil {
return nil, err
}
loginHandler, err := cmd.constructLoginHandler(
logger,
httpClient,
middleware,
)
if err != nil {
return nil, err
}
legacyHandler, err := cmd.constructLegacyHandler(
logger,
)
if err != nil {
return nil, err
}
var httpHandler, httpsHandler http.Handler
if cmd.isTLSEnabled() {
httpHandler = cmd.constructHTTPHandler(
logger,
tlsRedirectHandler{
matchHostname: cmd.ExternalURL.URL.Hostname(),
externalHost: cmd.ExternalURL.URL.Host,
baseHandler: webHandler,
},
// note: intentionally not wrapping API; redirecting is more trouble than
// it's worth.
// we're mainly interested in having the web UI consistently https:// -
// API requests will likely not respect the redirected https:// URI upon
// the next request, plus the payload will have already been sent in
// plaintext
apiHandler,
tlsRedirectHandler{
matchHostname: cmd.ExternalURL.URL.Hostname(),
externalHost: cmd.ExternalURL.URL.Host,
baseHandler: authHandler,
},
tlsRedirectHandler{
matchHostname: cmd.ExternalURL.URL.Hostname(),
externalHost: cmd.ExternalURL.URL.Host,
baseHandler: loginHandler,
},
tlsRedirectHandler{
matchHostname: cmd.ExternalURL.URL.Hostname(),
externalHost: cmd.ExternalURL.URL.Host,
baseHandler: legacyHandler,
},
middleware,
)
httpsHandler = cmd.constructHTTPHandler(
logger,
webHandler,
apiHandler,
authHandler,
loginHandler,
legacyHandler,
middleware,
)
} else {
httpHandler = cmd.constructHTTPHandler(
logger,
webHandler,
apiHandler,
authHandler,
loginHandler,
legacyHandler,
middleware,
)
}
members := []grouper.Member{
{Name: "debug", Runner: http_server.New(
cmd.debugBindAddr(),
http.DefaultServeMux,
)},
{Name: "web", Runner: http_server.New(
cmd.nonTLSBindAddr(),
httpHandler,
)},
}
if httpsHandler != nil {
tlsConfig, err := cmd.tlsConfig(logger, dbConn)