Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version: "1.26"
go-version-file: 'go.mod'

- name: Run golangci-lint
uses: golangci/golangci-lint-action@v9

- name: Run tests
run: go test ./...

- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v7
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version: "1.26"
go-version-file: 'go.mod'

- name: Login to GHCR
run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u ${{ github.actor }} --password-stdin
Expand Down
26 changes: 26 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/jackc/pgx/v5 v5.9.2
github.com/jferrl/go-githubauth v1.6.0
github.com/joho/godotenv v1.5.1
github.com/ory/dockertest/v3 v3.12.0
go.opentelemetry.io/otel v1.43.0
go.opentelemetry.io/otel/metric v1.43.0
go.opentelemetry.io/otel/sdk/metric v1.43.0
Expand All @@ -24,16 +25,29 @@ require (
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
cloud.google.com/go/compute/metadata v0.9.0 // indirect
cloud.google.com/go/monitoring v1.24.2 // indirect
dario.cat/mergo v1.0.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.56.0 // indirect
github.com/MicahParks/keyfunc/v2 v2.1.0 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/andybalholm/brotli v1.2.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/continuity v0.4.5 // indirect
github.com/docker/cli v27.4.1+incompatible // indirect
github.com/docker/docker v27.1.1+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.1.0 // indirect
github.com/gofiber/schema v1.7.1 // indirect
github.com/gofiber/utils/v2 v2.0.5 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/go-querystring v1.2.0 // indirect
github.com/google/s2a-go v0.1.9 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
Expand All @@ -42,10 +56,21 @@ require (
github.com/klauspost/compress v1.18.6 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.22 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/sys/user v0.3.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runc v1.2.3 // indirect
github.com/philhofer/fwd v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/tinylib/msgp v1.6.4 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.71.0 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 // indirect
go.opentelemetry.io/otel/sdk v1.43.0 // indirect
Expand All @@ -62,4 +87,5 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect
google.golang.org/grpc v1.79.3 // indirect
google.golang.org/protobuf v1.36.10 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
133 changes: 105 additions & 28 deletions go.sum

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,12 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start background jobs with cancellable context
go ghTokenRefresher.RefreshTokens(ctx)
go userSync.StartSyncLoop()
go orgSync.StartSyncLoop()
// Start background jobs with cancellable context. Each runs under
// observability.SuperviseLoop so a panic produces a metric + log instead
// of silently killing the loop.
go observability.SuperviseLoop(ctx, "gh_token_refresher", ghTokenRefresher.RefreshTokens)
go observability.SuperviseLoop(ctx, "user_sync", userSync.StartSyncLoop)
go observability.SuperviseLoop(ctx, "org_sync", orgSync.StartSyncLoop)

// Handle shutdown signals
go func() {
Expand Down
12 changes: 12 additions & 0 deletions pkg/observability/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type Metrics struct {
TokenRefreshFailure metric.Int64Counter
TokenRefreshDisabled metric.Int64Counter
TokenRefreshRateLimit metric.Int64Counter

// Background job supervisor metrics
BgJobPanicsTotal metric.Int64Counter
}

// metricsInstance is the singleton instance
Expand Down Expand Up @@ -123,12 +126,21 @@ func createMetrics(meter metric.Meter) (*Metrics, error) {
return nil, err
}

bgJobPanicsTotal, err := meter.Int64Counter(
"bg_job_panics_total",
metric.WithDescription("Number of times a supervised background job has panicked (by job name)"),
)
if err != nil {
return nil, err
}

return &Metrics{
meter: meter,
TokenRefreshTotal: tokenRefreshTotal,
TokenRefreshSuccess: tokenRefreshSuccess,
TokenRefreshFailure: tokenRefreshFailure,
TokenRefreshDisabled: tokenRefreshDisabled,
TokenRefreshRateLimit: tokenRefreshRateLimit,
BgJobPanicsTotal: bgJobPanicsTotal,
}, nil
}
42 changes: 42 additions & 0 deletions pkg/observability/supervisor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package observability

import (
"context"
"runtime/debug"
"time"

"github.com/gofiber/fiber/v3/log"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const superviseRestartBackoff = time.Second

// SuperviseLoop runs fn in a panic-safe loop until ctx is cancelled.
func SuperviseLoop(ctx context.Context, name string, fn func(context.Context)) {
for {
if ctx.Err() != nil {
log.Infof("supervised loop %q exiting: %v", name, ctx.Err())
return
}
runOnce(ctx, name, fn)
select {
case <-ctx.Done():
log.Infof("supervised loop %q exiting: %v", name, ctx.Err())
return
case <-time.After(superviseRestartBackoff):
}
}
}
Comment thread
fernandonogueira marked this conversation as resolved.

func runOnce(ctx context.Context, name string, fn func(context.Context)) {
defer func() {
if r := recover(); r != nil {
log.Errorf("supervised loop %q panicked: %v\n%s", name, r, debug.Stack())
if m := GetMetrics(); m != nil && m.BgJobPanicsTotal != nil {
m.BgJobPanicsTotal.Add(ctx, 1, metric.WithAttributes(attribute.String("job", name)))
}
}
}()
fn(ctx)
}
5 changes: 5 additions & 0 deletions pkg/repository/drift_analysis_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
type DriftAnalysisRepository interface {
CreateDriftAnalysisRun(ctx context.Context, params queries.CreateDriftAnalysisRunParams) (queries.DriftAnalysisRun, error)
CreateDriftAnalysisProject(ctx context.Context, params queries.CreateDriftAnalysisProjectParams) (queries.DriftAnalysisProject, error)
CreateDriftAnalysisProjectsBatch(ctx context.Context, rows []queries.CreateDriftAnalysisProjectsBatchParams) (int64, error)
FindDriftAnalysisRunsByRepositoryID(ctx context.Context, repoId int64, page int) ([]queries.DriftAnalysisRun, error)
FindDriftAnalysisRunByUUID(ctx context.Context, uuid uuid.UUID) (queries.DriftAnalysisRun, error)
FindRunByRepoAndIdempotencyKey(ctx context.Context, repoId int64, idempotencyKey string) (queries.DriftAnalysisRun, error)
Expand Down Expand Up @@ -42,6 +43,10 @@ func (r *DriftAnalysisRepo) CreateDriftAnalysisProject(ctx context.Context, para
return r.db.Queries(ctx).CreateDriftAnalysisProject(ctx, params)
}

func (r *DriftAnalysisRepo) CreateDriftAnalysisProjectsBatch(ctx context.Context, rows []queries.CreateDriftAnalysisProjectsBatchParams) (int64, error) {
return r.db.Queries(ctx).CreateDriftAnalysisProjectsBatch(ctx, rows)
}

func (r *DriftAnalysisRepo) FindDriftAnalysisRunsByRepositoryID(ctx context.Context, repoId int64, page int) ([]queries.DriftAnalysisRun, error) {
params := queries.FindDriftAnalysisRunsByRepositoryIdParams{
RepositoryID: repoId,
Expand Down
49 changes: 49 additions & 0 deletions pkg/repository/queries/copyfrom.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/repository/queries/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/repository/queries/drift_analysis.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ INSERT INTO drift_analysis_project (drift_analysis_run_id, dir, type, drifted, s
VALUES (@drift_analysis_run_id, @dir, @type, @drifted, @succeeded, @init_output, @plan_output, @skipped_due_to_pr)
RETURNING *;

-- name: CreateDriftAnalysisProjectsBatch :copyfrom
INSERT INTO drift_analysis_project (drift_analysis_run_id, dir, type, drifted, succeeded, init_output, plan_output, skipped_due_to_pr)
VALUES (@drift_analysis_run_id, @dir, @type, @drifted, @succeeded, @init_output, @plan_output, @skipped_due_to_pr);

-- name: FindDriftAnalysisRunsByRepositoryId :many
SELECT *
FROM drift_analysis_run
Expand Down
11 changes: 11 additions & 0 deletions pkg/repository/queries/drift_analysis.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 4 additions & 8 deletions pkg/usecase/auth/github/oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,6 @@ func (o *OAuthHandler) Callback(c fiber.Ctx) error {
}

err = o.db.WithTx(ctx, func(ctx context.Context) error {
if err != nil {
return err
}

accessTokenExpiresAt := time.Unix(epoch+int64(tokenResponse.ExpiresIn), 0)
refreshTokenExpiresAt := time.Unix(epoch+int64(tokenResponse.RefreshTokenExpiresIn), 0)

Expand All @@ -225,7 +221,7 @@ func (o *OAuthHandler) Callback(c fiber.Ctx) error {
RefreshTokenExpiresAt: &refreshTokenExpiresAt,
}

_, err = o.userRepository.UpsertUserOnLogin(ctx, upsertUserParams)
_, err := o.userRepository.UpsertUserOnLogin(ctx, upsertUserParams)
if err != nil {
return err
}
Expand All @@ -236,14 +232,14 @@ func (o *OAuthHandler) Callback(c fiber.Ctx) error {
existingUser, err := o.userRepository.FindUserByProviderAndProviderId(ctx, args)
if err != nil {
log.Error("error finding user by provider and provider id: ", err)
return c.SendStatus(fiber.StatusInternalServerError)
return err
}

_, err = o.syncStatusUserRepository.CreateOrUpdateSyncStatusUser(ctx, existingUser.ID)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
log.Error("error creating sync status user: ", err)
return c.SendStatus(fiber.StatusInternalServerError)
return err
}
}

Expand All @@ -255,7 +251,7 @@ func (o *OAuthHandler) Callback(c fiber.Ctx) error {
jwtToken, err := jwt.GenerateJWTToken(userToken, o.cfg.Auth.JwtSecret)
if err != nil {
log.Error("error generating jwt token: ", err)
return c.SendStatus(fiber.StatusInternalServerError)
return err
}

// Use redirect URL from state if provided and allowed, otherwise use default
Expand Down
47 changes: 28 additions & 19 deletions pkg/usecase/drift_stream/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,18 @@ func (d *DriftStateHandler) HandleUpdate(c fiber.Ctx) error {
idemKeyPtr = &idemKey
}

// Pre-validate all project types before opening a transaction.
// (returning c.SendStatus from inside the closure would commit the partial tx).
projectTypes := make([]string, len(state.ProjectResults))
for i, project := range state.ProjectResults {
projectType, err := projectTypeToDBString(project.Project.Type)
if err != nil {
log.Errorf("Invalid project type %v at index %d: %v", project.Project.Type, i, err)
return c.SendStatus(fiber.StatusBadRequest)
}
projectTypes[i] = projectType
}

var runUUID uuid.UUID
err = d.driftAnalysisRepository.WithTx(c.Context(), func(ctx context.Context) error {
params := queries.CreateDriftAnalysisRunParams{
Expand All @@ -161,29 +173,26 @@ func (d *DriftStateHandler) HandleUpdate(c fiber.Ctx) error {
}
runUUID = run.Uuid

for _, project := range state.ProjectResults {
projectType, err := projectTypeToDBString(project.Project.Type)
if err != nil {
log.Errorf("Error converting project type to db string: %v", err)
return c.SendStatus(fiber.StatusBadRequest)
}

projectParams := queries.CreateDriftAnalysisProjectParams{
DriftAnalysisRunID: run.Uuid,
Dir: project.Project.Dir,
Type: projectType,
Drifted: project.Drifted,
Succeeded: project.Succeeded,
InitOutput: &project.InitOutput,
PlanOutput: &project.PlanOutput,
SkippedDueToPr: project.SkippedDueToPR,
if len(state.ProjectResults) > 0 {
batch := make([]queries.CreateDriftAnalysisProjectsBatchParams, len(state.ProjectResults))
for i, project := range state.ProjectResults {
batch[i] = queries.CreateDriftAnalysisProjectsBatchParams{
DriftAnalysisRunID: run.Uuid,
Dir: project.Project.Dir,
Type: projectTypes[i],
Drifted: project.Drifted,
Succeeded: project.Succeeded,
InitOutput: &project.InitOutput,
PlanOutput: &project.PlanOutput,
SkippedDueToPr: project.SkippedDueToPR,
}
}
Comment thread
fernandonogueira marked this conversation as resolved.
res, err := d.driftAnalysisRepository.CreateDriftAnalysisProject(ctx, projectParams)
inserted, err := d.driftAnalysisRepository.CreateDriftAnalysisProjectsBatch(ctx, batch)
if err != nil {
log.Errorf("Error creating drift analysis project: %v", err)
log.Errorf("Error batch-inserting drift analysis projects: %v", err)
return err
}
log.Debugf("Created drift analysis project: [ID: %d, dir: %s]", res.ID, project.Project.Dir)
log.Debugf("Batch-inserted %d drift analysis projects for run %s", inserted, run.Uuid)
}

log.Info("Created drift analysis run: ", run.Uuid)
Expand Down
Loading