Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

atc/api: limit concurrent requests #5429

Merged
merged 37 commits into from
May 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
19bc863
atc: first pass at limiting concurrent requests
Apr 13, 2020
e55b0ed
atc: validate concurrent request limits
Apr 14, 2020
3a9b0af
atc: behaviour: forbid negative limits
Apr 15, 2020
caf545a
atc: structure: integration test covers Validate()
Apr 15, 2020
84c1421
atc: structure: reduce helper function parameters
YoussB Apr 15, 2020
26286c8
atc/api: implement concurrent request limits
YoussB Apr 15, 2020
2bed544
atc/api: behaviour: ensure pool is released
Apr 15, 2020
ff9b5fe
atc/api: behaviour: 503 on concurrency overload
Apr 15, 2020
6d8eeaf
atc: behaviour: improve error message
Apr 16, 2020
0b05831
atc: behaviour: store pool references on policy
Apr 16, 2020
28972c7
atc: behaviour: log error when pool release fails
Apr 16, 2020
4907bbb
atc: structure: standardize concurrent terminology
Apr 16, 2020
edadfc6
atc: structure: all read actions called Get*
vito Apr 16, 2020
e0ce661
atc: behaviour: use colon in limit flags
vito Apr 16, 2020
c886fc4
atc: structure: fakeLogger => TestLogger
vito Apr 16, 2020
76439c0
atc: behaviour: panic on empty pool release
vito Apr 16, 2020
2f84c7e
add release note
Apr 16, 2020
318d9ce
atc: behaviour: fix flag parsing
Apr 20, 2020
a96ff3d
atc: behavior: add metrics for concurrent requests
Apr 20, 2020
87d5166
atc: test that concurrent request metric is emitted
Apr 20, 2020
9d66cd5
atc: behaviour: return 501 when concurrent limit is 0
Apr 20, 2020
faa0156
Merge branch 'issue/5421' of github.com:concourse/concourse into issu…
Apr 20, 2020
198a2b7
atc: use *vec for prometheus' concurrent reqs
Apr 20, 2020
b7b5b1f
atc: update concurrent reqs prometheus description
Apr 20, 2020
12fe9b3
atc: fix rebase issue in tests
Apr 20, 2020
929e3b0
update release-note wrt 501 status when disabled
Apr 20, 2020
0687afe
atc: structure: flatten context in happy path
Apr 21, 2020
e81ef51
atc: behavior: bail on invalid env vars
vito Apr 21, 2020
ddee656
topgun/k8s: fix duration format
Apr 23, 2020
8105be7
topgun/k8s: fix env vars in external worker tests
Apr 24, 2020
c745bc0
enrich release note
Apr 28, 2020
424d574
Improve description of limit_hit metric
Apr 28, 2020
b351976
atc: structure: split out pool tests
Apr 28, 2020
99a31bd
atc: behaviour: env var error mentions flag name
Apr 28, 2020
e1e6c89
cmd: structure: clean up redundant ifrit stuff
May 1, 2020
4ed8d3b
release-notes: typo
May 1, 2020
7fcbcdd
Merge branch 'master' into issue/5421
May 1, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 7 additions & 2 deletions atc/atccmd/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ type RunCommand struct {

Postgres flag.PostgresConfig `group:"PostgreSQL Configuration" namespace:"postgres"`

APIMaxOpenConnections int `long:"api-max-conns" description:"The maximum number of open connections for the api connection pool." default:"10"`
BackendMaxOpenConnections int `long:"backend-max-conns" description:"The maximum number of open connections for the backend connection pool." default:"50"`
ConcurrentRequestLimits map[wrappa.LimitedRoute]int `long:"concurrent-request-limit" description:"Limit the number of concurrent requests to an API endpoint (Example: ListAllJobs:5)"`
clarafu marked this conversation as resolved.
Show resolved Hide resolved
APIMaxOpenConnections int `long:"api-max-conns" description:"The maximum number of open connections for the api connection pool." default:"10"`
BackendMaxOpenConnections int `long:"backend-max-conns" description:"The maximum number of open connections for the backend connection pool." default:"50"`

CredentialManagement creds.CredentialManagementConfig `group:"Credential Management"`
CredentialManagers creds.Managers
Expand Down Expand Up @@ -1790,6 +1791,10 @@ func (cmd *RunCommand) constructAPIHandler(
}

apiWrapper := wrappa.MultiWrappa{
wrappa.NewConcurrentRequestLimitsWrappa(
logger,
wrappa.NewConcurrentRequestPolicy(cmd.ConcurrentRequestLimits),
),
wrappa.NewAPIMetricsWrappa(logger),
wrappa.NewAPIAuthWrappa(
checkPipelineAccessHandlerFactory,
Expand Down
18 changes: 18 additions & 0 deletions atc/atccmd/command_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package atccmd_test

import (
"fmt"
"testing"

"github.com/concourse/concourse/atc"
"github.com/concourse/concourse/atc/atccmd"
"github.com/jessevdk/go-flags"
"github.com/stretchr/testify/require"
Expand All @@ -27,6 +29,22 @@ func (s *CommandSuite) TestLetsEncryptDefaultIsUpToDate() {
s.Equal(opt.Default, []string{autocert.DefaultACMEDirectory})
}

func (s *CommandSuite) TestInvalidConcurrentRequestLimitAction() {
cmd := &atccmd.RunCommand{}
parser := flags.NewParser(cmd, flags.None)
_, err := parser.ParseArgs([]string{
"--client-secret",
"client-secret",
"--concurrent-request-limit",
fmt.Sprintf("%s:2", atc.GetInfo),
})

s.Contains(
err.Error(),
fmt.Sprintf("action '%s' is not supported", atc.GetInfo),
)
}

func TestSuite(t *testing.T) {
suite.Run(t, &CommandSuite{
Assertions: require.New(t),
Expand Down
32 changes: 32 additions & 0 deletions atc/integration/concurrent_request_limits_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package integration_test

import (
"net/http"

"github.com/concourse/concourse/atc"
"github.com/concourse/concourse/atc/wrappa"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("Concurrent request limits", func() {
BeforeEach(func() {
cmd.ConcurrentRequestLimits = map[wrappa.LimitedRoute]int{
wrappa.LimitedRoute(atc.ListAllJobs): 0,
}
})

It("disables ListAllJobs requests", func() {
client := login(atcURL, "test", "test")
httpClient := client.HTTPClient()
request, _ := http.NewRequest(
"GET",
client.URL()+"/api/v1/jobs",
nil,
)

response, _ := httpClient.Do(request)

Expect(response.StatusCode).To(Equal(http.StatusNotImplemented))
})
})
2 changes: 2 additions & 0 deletions atc/metric/emitter/newrelic.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func (emitter *NewRelicEmitter) Emit(logger lager.Logger, event metric.Event) {
"checks queue size",
"worker containers",
"worker volumes",
"concurrent requests",
"concurrent requests limit hit",
"http response time",
"database queries",
"database connections",
Expand Down
25 changes: 25 additions & 0 deletions atc/metric/emitter/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type PrometheusEmitter struct {
buildsStarted prometheus.Counter
buildsRunning prometheus.Gauge

concurrentRequestsLimitHit *prometheus.CounterVec
concurrentRequests *prometheus.GaugeVec

buildDurationsVec *prometheus.HistogramVec
buildsAborted prometheus.Counter
buildsErrored prometheus.Counter
Expand Down Expand Up @@ -151,6 +154,21 @@ func (config *PrometheusConfig) NewEmitter() (metric.Emitter, error) {
})
prometheus.MustRegister(buildsRunning)

concurrentRequestsLimitHit := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "concourse",
Subsystem: "concurrent_requests",
Name: "limit_hit_total",
Help: "Total number of requests rejected because the server was already serving too many concurrent requests.",
}, []string{"action"})
prometheus.MustRegister(concurrentRequestsLimitHit)

concurrentRequests := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "concourse",
Name: "concurrent_requests",
Help: "Number of concurrent requests being served by endpoints that have a specified limit of concurrent requests.",
}, []string{"action"})
prometheus.MustRegister(concurrentRequests)

buildsFinished := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "concourse",
Subsystem: "builds",
Expand Down Expand Up @@ -367,6 +385,9 @@ func (config *PrometheusConfig) NewEmitter() (metric.Emitter, error) {
buildsStarted: buildsStarted,
buildsRunning: buildsRunning,

concurrentRequestsLimitHit: concurrentRequestsLimitHit,
concurrentRequests: concurrentRequests,

buildDurationsVec: buildDurationsVec,
buildsAborted: buildsAborted,
buildsErrored: buildsErrored,
Expand Down Expand Up @@ -427,6 +448,10 @@ func (emitter *PrometheusEmitter) Emit(logger lager.Logger, event metric.Event)
emitter.buildsStarted.Add(event.Value)
case "builds running":
emitter.buildsRunning.Set(event.Value)
case "concurrent requests limit hit":
emitter.concurrentRequestsLimitHit.WithLabelValues(event.Attributes["action"]).Add(event.Value)
case "concurrent requests":
emitter.concurrentRequests.WithLabelValues(event.Attributes["action"]).Set(event.Value)
case "build finished":
emitter.buildFinishedMetrics(logger, event)
case "worker containers":
Expand Down
3 changes: 3 additions & 0 deletions atc/metric/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ var ChecksQueueSize = &Gauge{}
var ChecksStarted = &Counter{}
var ChecksEnqueued = &Counter{}

var ConcurrentRequests = map[string]*Gauge{}
var ConcurrentRequestsLimitHit = map[string]*Counter{}

type BuildCollectorDuration struct {
Duration time.Duration
}
Expand Down
26 changes: 26 additions & 0 deletions atc/metric/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,32 @@ func tick(logger lager.Logger) {
},
)

for action, gauge := range ConcurrentRequests {
emit(
logger.Session("concurrent-requests"),
Event{
Name: "concurrent requests",
Value: gauge.Max(),
Attributes: map[string]string{
"action": action,
},
},
)
}

for action, counter := range ConcurrentRequestsLimitHit {
emit(
logger.Session("concurrent-requests-limit-hit"),
Event{
Name: "concurrent requests limit hit",
Value: counter.Delta(),
Attributes: map[string]string{
"action": action,
},
},
)
}

emit(
logger.Session("checks-finished-with-error"),
Event{
Expand Down
124 changes: 92 additions & 32 deletions atc/metric/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ var _ = Describe("Periodic emission of metrics", func() {
metric.RegisterEmitter(emitterFactory)
emitterFactory.IsConfiguredReturns(true)
emitterFactory.NewEmitterReturns(emitter, nil)
a := &dbfakes.FakeConn{}
a.NameReturns("A")
b := &dbfakes.FakeConn{}
b.NameReturns("B")
metric.Databases = []db.Conn{a, b}
metric.Initialize(testLogger, "test", map[string]string{}, 1000)

process = ifrit.Invoke(metric.PeriodicallyEmit(lager.NewLogger("dont care"), 250*time.Millisecond))
})

JustBeforeEach(func() {
runner := metric.PeriodicallyEmit(
lager.NewLogger("dont care"),
250*time.Millisecond,
)

process = ifrit.Invoke(runner)
})

AfterEach(func() {
Expand All @@ -46,38 +49,95 @@ var _ = Describe("Periodic emission of metrics", func() {
metric.Deinitialize(nil)
})

It("emits database queries", func() {
Eventually(emitter.EmitCallCount).Should(BeNumerically(">=", 1))
Expect(emitter.Invocations()["Emit"]).To(
ContainElement(
Context("database-related metrics", func() {
BeforeEach(func() {
a := &dbfakes.FakeConn{}
a.NameReturns("A")
b := &dbfakes.FakeConn{}
b.NameReturns("B")
metric.Databases = []db.Conn{a, b}
})

It("emits database queries", func() {
Eventually(emitter.EmitCallCount).Should(BeNumerically(">=", 1))
Expect(emitter.Invocations()["Emit"]).To(
ContainElement(
MatchFields(IgnoreExtras, Fields{
"Name": Equal("database queries"),
}),
ContainElement(
MatchFields(IgnoreExtras, Fields{
"Name": Equal("database queries"),
}),
),
),
),
)
)

By("emits database connections for each pool")
Expect(emitter.Invocations()["Emit"]).To(
ContainElement(
By("emits database connections for each pool")
Expect(emitter.Invocations()["Emit"]).To(
ContainElement(
MatchFields(IgnoreExtras, Fields{
"Name": Equal("database connections"),
"Attributes": Equal(map[string]string{"ConnectionName": "A"}),
}),
ContainElement(
MatchFields(IgnoreExtras, Fields{
"Name": Equal("database connections"),
"Attributes": Equal(map[string]string{"ConnectionName": "A"}),
}),
),
),
),
)
Expect(emitter.Invocations()["Emit"]).To(
ContainElement(
)
Expect(emitter.Invocations()["Emit"]).To(
ContainElement(
MatchFields(IgnoreExtras, Fields{
"Name": Equal("database connections"),
"Attributes": Equal(map[string]string{"ConnectionName": "B"}),
}),
ContainElement(
MatchFields(IgnoreExtras, Fields{
"Name": Equal("database connections"),
"Attributes": Equal(map[string]string{"ConnectionName": "B"}),
}),
),
),
),
)
)
})
})

Context("concurrent requests", func() {
const action = "ListAllSomething"

BeforeEach(func() {
gauge := &metric.Gauge{}
gauge.Set(123)

counter := &metric.Counter{}
counter.IncDelta(10)

metric.ConcurrentRequests[action] = gauge
metric.ConcurrentRequestsLimitHit[action] = counter
})

It("emits", func() {
Eventually(emitter.EmitCallCount).Should(BeNumerically(">=", 1))

Expect(emitter.Invocations()["Emit"]).To(
ContainElement(
ContainElement(
MatchFields(IgnoreExtras, Fields{
"Name": Equal("concurrent requests"),
"Value": Equal(float64(123)),
"Attributes": Equal(map[string]string{
"action": action,
}),
}),
),
),
)

Expect(emitter.Invocations()["Emit"]).To(
ContainElement(
ContainElement(
MatchFields(IgnoreExtras, Fields{
"Name": Equal("concurrent requests limit hit"),
"Value": Equal(float64(10)),
"Attributes": Equal(map[string]string{
"action": action,
}),
}),
),
),
)
})
})
})
4 changes: 2 additions & 2 deletions atc/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ const (
GetLogLevel = "GetLogLevel"

DownloadCLI = "DownloadCLI"
GetInfo = "Info"
GetInfoCreds = "InfoCreds"
GetInfo = "GetInfo"
GetInfoCreds = "GetInfoCreds"

ListContainers = "ListContainers"
GetContainer = "GetContainer"
Expand Down