Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

atc: behaviour: emit tasks waiting prometheus metric #5448

Merged
merged 54 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
cbed34e
add configuration for NewRelic insights url
shyamz-22 May 1, 2020
9197cd2
add release notes
shyamz-22 May 1, 2020
002d8a3
fix test by using correct assertion
shyamz-22 May 1, 2020
511bcb5
Merge release notes with upstream changes
shyamz-22 May 1, 2020
97e925b
Merge changes from upstream branch
shyamz-22 May 2, 2020
013cfb3
atc: handle groups claims properly
May 4, 2020
b647326
atc: behaviour: emit tasks waiting prometheus metric
Apr 15, 2020
3e74ba7
Add periodic test for tasks waiting metrics
Apr 30, 2020
947c351
update latest.md
Apr 30, 2020
35f6115
add client test to check if metrics are gauged properly
shyamz-22 May 4, 2020
256dc8a
add test to check if task waiting metric is exposed by PrometheusEmitter
shyamz-22 May 4, 2020
e0ec268
test if task lock is released in RunTaskStep when waiting for worker …
shyamz-22 May 4, 2020
82a042d
make test less flaky
shyamz-22 May 5, 2020
008ec24
[#5082] Close ssh conn on worker when keepalive fails
xtreme-sameer-vohra Mar 17, 2020
f82ed96
[#5082] Refactor keepalive into a separate func
xtreme-sameer-vohra Mar 27, 2020
d70ca26
[#5082] Increase tsa/client keepalive timeout
xtreme-sameer-vohra Apr 9, 2020
858edcf
[#5082] add release note
xtreme-sameer-vohra Mar 27, 2020
a41fb70
Merge remote-tracking branch 'upstream/master'
shyamz-22 May 6, 2020
2b635f2
simplify url function
shyamz-22 May 6, 2020
94ed811
Merge pull request #5532 from shyamz-22/master
xtreme-sameer-vohra May 6, 2020
ad94403
Merge pull request #5542 from concourse/fix-groups-claims
May 7, 2020
31b4e9e
contributing: add instructions for local k8s
Apr 30, 2020
206c85d
contributing: instructions for -nodes=N
May 7, 2020
df98844
Merge branch 'master' into 5082-workers-stall
xtreme-sameer-vohra May 7, 2020
3df973f
Merge pull request #5323 from concourse/5082-workers-stall
xtreme-sameer-vohra May 7, 2020
5d30607
atc/db: less cryptic error message
taylorsilva May 5, 2020
62dec49
Update atc/db/check_factory.go
taylorsilva May 7, 2020
7c5c115
Update testflight/custom_resource_check_test.go
taylorsilva May 7, 2020
41c013c
Update testflight/resource_type_test.go
taylorsilva May 7, 2020
d961aa7
atc: log db transaction query
Apr 30, 2020
41c9523
Merge pull request #5518 from concourse/install-tiller-contributing
May 7, 2020
3b07fdc
Make k8s-topgun produce less output
taylorsilva May 7, 2020
22c8f0a
oops: remove a focus
taylorsilva May 7, 2020
ba65bca
topgun: prometheus: update deps and disable worker
May 7, 2020
bc2ff0c
Merge pull request #5546 from concourse/less-cryptic-err
taylorsilva May 7, 2020
9ffa9e2
Merge branch 'master' into log-tx-query
xtreme-sameer-vohra May 8, 2020
53ee9e6
Run go mod tidy
taylorsilva May 8, 2020
0e06ef7
Reduce the frequency of call to kubectl logs
taylorsilva May 8, 2020
8f150bd
Move kubeClient setup to BeforeEach
taylorsilva May 8, 2020
44cea28
Remove call to kubectl logs
taylorsilva May 8, 2020
e9de452
Merge pull request #5520 from concourse/log-tx-query
May 9, 2020
3abdcd3
Merge pull request #5567 from concourse/make-k8s-topgun-less-noisy
taylorsilva May 11, 2020
8e9f120
add config file for PullRequest bot automation
May 7, 2020
313b9c6
Merge pull request #5568 from concourse/k8s-topgun-prometheus-fix
May 11, 2020
3ac5db5
Merge pull request #5564 from concourse/contributor-automation
May 11, 2020
5f3215b
atc: behaviour: emit tasks waiting prometheus metric
Apr 15, 2020
ce8d014
Add periodic test for tasks waiting metrics
Apr 30, 2020
e5b2275
update latest.md
Apr 30, 2020
c4bfdcd
add client test to check if metrics are gauged properly
shyamz-22 May 4, 2020
08a4485
add test to check if task waiting metric is exposed by PrometheusEmitter
shyamz-22 May 4, 2020
b267a1f
test if task lock is released in RunTaskStep when waiting for worker …
shyamz-22 May 4, 2020
07e1b4a
make test less flaky
shyamz-22 May 5, 2020
11a0216
Refactor choose task worker to make it more testable
shyamz-22 May 12, 2020
920eeb5
fix merge conflicts with latest.md
shyamz-22 May 12, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/auto_pr_team.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
org: concourse
team: contributors
45 changes: 45 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,43 @@ Kubernetes-related testing are all end-to-end, living under `topgun/k8s`. They
require access to a real Kubernetes cluster with access granted through a
properly configured `~/.kube/config` file.

[`kind`] is a great choice when it comes to running a local Kubernetes cluster -
all you need is `docker`, and the `kind` CLI. If you wish to run the tests with
a high degree of concurrency, it's advised to have multiple kubernetes nodes.
This can be achieved with the following `kind` config:

```yaml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker
- role: worker
```


With the cluster up, the next step is to have a proper [Tiller] setup (the tests
still run with Helm 2):


```bash
kubectl create serviceaccount \
--namespace kube-system \
tiller

kubectl create clusterrolebinding \
tiller-cluster-rule \
--clusterrole=cluster-admin \
--serviceaccount=kube-system:tiller

helm init \
--service-account tiller \
--upgrade
```


The tests require a few environment variables to be set:

- `CONCOURSE_IMAGE_TAG` or `CONCOURSE_IMAGE_DIGEST`: the tag or digest to use
Expand All @@ -576,12 +613,20 @@ to define the postgres chart that Concourse depends on.
- `CONCOURSE_CHART_DIR`: location in the filesystem where a copy of [`the Concourse Helm
chart`][concourse-helm-chart] exists.


With those set, go to `topgun/k8s` and run Ginkgo:

```sh
# run the test cases serially
ginkgo .

# run the test cases with a concurrency level of 16
ginkgo -nodes=16 .
```

[`kind`]: https://kind.sigs.k8s.io/
[Tiller]: https://v2.helm.sh/docs/install/


### A note on `topgun`

Expand Down
11 changes: 8 additions & 3 deletions atc/api/accessor/accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,17 @@ func (a *access) connectorID() string {
}

func (a *access) groups() []string {
groups := []string{}
if raw, ok := a.claims()["groups"]; ok {
if claim, ok := raw.([]string); ok {
return claim
if rawGroups, ok := raw.([]interface{}); ok {
for _, rawGroup := range rawGroups {
if group, ok := rawGroup.(string); ok {
groups = append(groups, group)
}
}
}
}
return []string{}
return groups
}

func (a *access) adminTeams() []string {
Expand Down
4 changes: 2 additions & 2 deletions atc/api/accessor/accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ var _ = Describe("Accessor", func() {
verification.HasToken = true
verification.IsTokenValid = true
verification.RawClaims = map[string]interface{}{
"groups": []string{"some-group"},
"groups": []interface{}{"some-group"},
"federated_claims": map[string]interface{}{
"connector_id": "some-connector",
},
Expand Down Expand Up @@ -649,7 +649,7 @@ var _ = Describe("Accessor", func() {
"user_id": "some-user-id",
"user_name": "some-user-name",
},
"groups": []string{"some-group"},
"groups": []interface{}{"some-group"},
}
})

Expand Down
11 changes: 9 additions & 2 deletions atc/atccmd/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ var retryingDriverName = "too-many-connections-retrying"
var flyClientID = "fly"
var flyClientSecret = "Zmx5"

var workerAvailabilityPollingInterval = 5 * time.Second
var workerStatusPublishInterval = 1 * time.Minute

type ATCCommand struct {
RunCommand RunCommand `command:"run"`
Migration Migration `command:"migrate"`
Expand Down Expand Up @@ -643,7 +646,7 @@ func (cmd *RunCommand) constructAPIMembers(
)

pool := worker.NewPool(workerProvider)
workerClient := worker.NewClient(pool, workerProvider, compressionLib)
workerClient := worker.NewClient(pool, workerProvider, compressionLib, workerAvailabilityPollingInterval, workerStatusPublishInterval)

credsManagers := cmd.CredentialManagers
dbPipelineFactory := db.NewPipelineFactory(dbConn, lockFactory)
Expand Down Expand Up @@ -871,7 +874,11 @@ func (cmd *RunCommand) constructBackendMembers(
)

pool := worker.NewPool(workerProvider)
workerClient := worker.NewClient(pool, workerProvider, compressionLib)
workerClient := worker.NewClient(pool,
workerProvider,
compressionLib,
workerAvailabilityPollingInterval,
workerStatusPublishInterval)

defaultLimits, err := cmd.parseDefaultLimits()
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions atc/db/check_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package db
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -143,7 +142,7 @@ func (c *checkFactory) TryCreateCheck(logger lager.Logger, checkable Checkable,
parentType, found := resourceTypes.Parent(checkable)
if found {
if parentType.Version() == nil {
return nil, false, errors.New("parent type has no version")
return nil, false, fmt.Errorf("resource type '%s' has no version", parentType.Name())
}
}

Expand Down
46 changes: 42 additions & 4 deletions atc/db/log_conn.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package db

import (
"context"
"database/sql"
"strings"

"code.cloudfoundry.org/lager"
"github.com/Masterminds/squirrel"
)

// Log returns a wrapper of DB connection which contains a wraper of DB transactions
// so all queries could be logged by givin logger
func Log(logger lager.Logger, conn Conn) Conn {
return &logConn{
Conn: conn,
Expand All @@ -21,21 +24,56 @@ type logConn struct {
logger lager.Logger
}

func (c *logConn) Begin() (Tx, error) {
tx, err := c.Conn.Begin()
if err != nil {
return nil, err
}

return &logDbTx{Tx: tx, logger: c.logger}, nil
}

func (c *logConn) Query(query string, args ...interface{}) (*sql.Rows, error) {
c.logger.Debug("query", lager.Data{"query": c.strip(query)})
c.logger.Debug("query", lager.Data{"query": strip(query)})
return c.Conn.Query(query, args...)
}

func (c *logConn) QueryRow(query string, args ...interface{}) squirrel.RowScanner {
c.logger.Debug("query-row", lager.Data{"query": c.strip(query)})
c.logger.Debug("query-row", lager.Data{"query": strip(query)})
return c.Conn.QueryRow(query, args...)
}

func (c *logConn) Exec(query string, args ...interface{}) (sql.Result, error) {
c.logger.Debug("exec", lager.Data{"query": c.strip(query)})
c.logger.Debug("exec", lager.Data{"query": strip(query)})
return c.Conn.Exec(query, args...)
}

func (c *logConn) strip(query string) string {
func strip(query string) string {
return strings.Join(strings.Fields(query), " ")
}

type logDbTx struct {
Tx

logger lager.Logger
}

func (t *logDbTx) Query(query string, args ...interface{}) (*sql.Rows, error) {
t.logger.Debug("tx-query", lager.Data{"query": strip(query)})
return t.Tx.Query(query, args...)
}

func (t *logDbTx) QueryRow(query string, args ...interface{}) squirrel.RowScanner {
t.logger.Debug("tx-query-row", lager.Data{"query": strip(query)})
return t.Tx.QueryRow(query, args...)
}

func (t *logDbTx) Exec(query string, args ...interface{}) (sql.Result, error) {
t.logger.Debug("tx-exec", lager.Data{"query": strip(query)})
return t.Tx.Exec(query, args...)
}

func (t *logDbTx) QueryRowContext(ctx context.Context, query string, args ...interface{}) squirrel.RowScanner {
t.logger.Debug("tx-query-row-context", lager.Data{"query": strip(query)})
return t.Tx.QueryRowContext(ctx, query, args...)
}
4 changes: 3 additions & 1 deletion atc/metric/emitter/newrelic.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -39,6 +40,7 @@ type (
NewRelicConfig struct {
AccountID string `long:"newrelic-account-id" description:"New Relic Account ID"`
APIKey string `long:"newrelic-api-key" description:"New Relic Insights API Key"`
Url string `long:"newrelic-insights-api-url" default:"https://insights-collector.newrelic.com" description:"Base Url for insights Insert API"`
ServicePrefix string `long:"newrelic-service-prefix" default:"" description:"An optional prefix for emitted New Relic events"`
BatchSize uint64 `long:"newrelic-batch-size" default:"2000" description:"Number of events to batch together before emitting"`
BatchDuration time.Duration `long:"newrelic-batch-duration" default:"60s" description:"Length of time to wait between emitting until all currently batched events are emitted"`
Expand All @@ -65,7 +67,7 @@ func (config *NewRelicConfig) NewEmitter() (metric.Emitter, error) {

return &NewRelicEmitter{
Client: client,
Url: "https://insights-collector.newrelic.com/v1/accounts/" + config.AccountID + "/events",
Url: fmt.Sprintf("%s/v1/accounts/%s/events", config.Url, config.AccountID),
apikey: config.APIKey,
prefix: config.ServicePrefix,
containers: new(stats),
Expand Down
20 changes: 20 additions & 0 deletions atc/metric/emitter/newrelic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package emitter_test
import (
"compress/gzip"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"
Expand Down Expand Up @@ -142,6 +143,25 @@ var _ = Describe("NewRelicEmitter", func() {
Entry("is disabled", true, ""),
)
})

Context("NewRelicConfiguration", func() {
It("sends events to configured endpoint", func() {
config := &emitter.NewRelicConfig{
AccountID: "123456",
APIKey: "eu019347923874648573934074",
Url: server.URL(),
}

server.RouteToHandler(http.MethodPost, "/v1/accounts/123456/events", verifyEvents(1))

e, _ := config.NewEmitter()
e.Emit(testLogger, testEvent)

newRelicEmitter := e.(*emitter.NewRelicEmitter)
Expect(newRelicEmitter.Url).To(Equal(fmt.Sprintf("%s/v1/accounts/123456/events", server.URL())))
Eventually(server.ReceivedRequests).Should(HaveLen(1))
})
})
})

func verifyEvents(expectedEvents int) http.HandlerFunc {
Expand Down
14 changes: 14 additions & 0 deletions atc/metric/emitter/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type PrometheusEmitter struct {
concurrentRequestsLimitHit *prometheus.CounterVec
concurrentRequests *prometheus.GaugeVec

tasksWaiting prometheus.Gauge

buildDurationsVec *prometheus.HistogramVec
buildsAborted prometheus.Counter
buildsErrored prometheus.Counter
Expand Down Expand Up @@ -169,6 +171,14 @@ func (config *PrometheusConfig) NewEmitter() (metric.Emitter, error) {
}, []string{"action"})
prometheus.MustRegister(concurrentRequests)

tasksWaiting := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "concourse",
Subsystem: "tasks",
Name: "waiting",
Help: "Number of Concourse tasks currently waiting.",
})
prometheus.MustRegister(tasksWaiting)

buildsFinished := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "concourse",
Subsystem: "builds",
Expand Down Expand Up @@ -388,6 +398,8 @@ func (config *PrometheusConfig) NewEmitter() (metric.Emitter, error) {
concurrentRequestsLimitHit: concurrentRequestsLimitHit,
concurrentRequests: concurrentRequests,

tasksWaiting: tasksWaiting,

buildDurationsVec: buildDurationsVec,
buildsAborted: buildsAborted,
buildsErrored: buildsErrored,
Expand Down Expand Up @@ -452,6 +464,8 @@ func (emitter *PrometheusEmitter) Emit(logger lager.Logger, event metric.Event)
emitter.concurrentRequestsLimitHit.WithLabelValues(event.Attributes["action"]).Add(event.Value)
case "concurrent requests":
emitter.concurrentRequests.WithLabelValues(event.Attributes["action"]).Set(event.Value)
case "tasks waiting":
emitter.tasksWaiting.Set(event.Value)
case "build finished":
emitter.buildFinishedMetrics(logger, event)
case "worker containers":
Expand Down