Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: ["1.24.13", "1.25.7"]
go-version: ["1.25.8", "1.26.1"]

services:
redis:
Expand Down Expand Up @@ -80,7 +80,7 @@ jobs:
WQ_CONCURRENCY: 32

- name: Check coverage for new code
if: github.event_name == 'pull_request' && matrix.go-version == '1.25.7'
if: github.event_name == 'pull_request' && matrix.go-version == '1.26.1'
run: |
# Setup Python for diff-cover
python -m pip install --upgrade pip
Expand Down Expand Up @@ -147,7 +147,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.25.7"
go-version: "1.26.1"
cache: true

- name: Download dependencies
Expand Down Expand Up @@ -181,7 +181,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.25.7"
go-version: "1.26.1"
cache: true

- name: Install govulncheck
Expand All @@ -208,7 +208,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.25.7"
go-version: "1.26.1"
cache: true

- name: Build application
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/retest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.25.7"
go-version: "1.26.1"
cache: true

- name: Generate mocks
Expand Down
14 changes: 14 additions & 0 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/IsaacDSC/gqueue/internal/fetcher"
"github.com/IsaacDSC/gqueue/internal/interstore"
"github.com/IsaacDSC/gqueue/internal/storests"
"github.com/IsaacDSC/gqueue/pkg/telemetry"
"github.com/redis/go-redis/v9"
)

Expand Down Expand Up @@ -58,6 +59,13 @@ func main() {
scope := flag.String("scope", "all", "service to run")
flag.Parse()

_, err := telemetry.New(telemetry.Config{
Enabled: conf.MetricsEnabled,
})
if err != nil {
panic(err)
}

redisClient := redis.NewClient(&redis.Options{Addr: conf.Cache.CacheAddr})
if err := redisClient.Ping(ctx).Err(); err != nil {
panic(err)
Expand Down Expand Up @@ -116,6 +124,12 @@ func main() {
for _, closeFn := range closers {
closeFn()
}

shutdownCtx, shutdownCancel := context.WithTimeout(ctx, 5*time.Second)
defer shutdownCancel()
if err := telemetry.Shutdown(shutdownCtx); err != nil {
log.Printf("Error shutting down telemetry: %v", err)
}
}

func scopeOrAll(scope, expected string) bool {
Expand Down
8 changes: 7 additions & 1 deletion cmd/setup/backoffice/httpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/IsaacDSC/gqueue/internal/domain"
"github.com/IsaacDSC/gqueue/internal/interstore"
"github.com/IsaacDSC/gqueue/pkg/httpadapter"
"github.com/IsaacDSC/gqueue/pkg/telemetry"
"github.com/redis/go-redis/v9"
)

Expand All @@ -27,6 +28,9 @@ func Start(
) *http.Server {
mux := http.NewServeMux()

// Rota de métricas para Prometheus.
mux.Handle("/metrics", telemetry.Handler())

routes := []httpadapter.HttpHandle{
health.GetHealthCheckHandler(),
backofficeapp.PatchConsumer(store),
Expand All @@ -47,7 +51,9 @@ func Start(
// config.ProjectID: config.SecretKey,
// })

handler := middleware.CORSMiddleware(middleware.LoggerMiddleware(mux))
handler := middleware.CORSMiddleware(
middleware.MetricsMiddleware(cfg.BACKOFFICE_APP_NAME, middleware.LoggerMiddleware(mux)),
)
// h := authorization.Middleware(handler.ServeHTTP)

env := cfg.Get()
Expand Down
10 changes: 8 additions & 2 deletions cmd/setup/httpsvc/http_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ import (
"github.com/IsaacDSC/gqueue/internal/app/health"
"github.com/IsaacDSC/gqueue/internal/cfg"
"github.com/IsaacDSC/gqueue/pkg/httpadapter"
"github.com/IsaacDSC/gqueue/pkg/telemetry"
)

func StartHttpServer(ctx context.Context, env cfg.Config, routes []httpadapter.HttpHandle, port string) *http.Server {
func StartHttpServer(ctx context.Context, env cfg.Config, routes []httpadapter.HttpHandle, port string, serviceName string) *http.Server {

mux := http.NewServeMux()

// Rota de métricas para Prometheus.
mux.Handle("/metrics", telemetry.Handler())

routes = append(routes, health.GetHealthCheckHandler())

for _, route := range routes {
Expand All @@ -28,7 +32,9 @@ func StartHttpServer(ctx context.Context, env cfg.Config, routes []httpadapter.H
// config.ProjectID: config.SecretKey,
// })

handler := middleware.CORSMiddleware(middleware.LoggerMiddleware(mux))
handler := middleware.CORSMiddleware(
middleware.MetricsMiddleware(serviceName, middleware.LoggerMiddleware(mux)),
)
// h := authorization.Middleware(handler.ServeHTTP)

server := &http.Server{
Expand Down
5 changes: 5 additions & 0 deletions cmd/setup/memstore/task_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/IsaacDSC/gqueue/internal/interstore"
"github.com/IsaacDSC/gqueue/pkg/ctxlogger"
"github.com/IsaacDSC/gqueue/pkg/telemetry"
)

func SyncMemStore(ctx context.Context, memStore *interstore.MemStore) {
Expand All @@ -14,11 +15,15 @@ func SyncMemStore(ctx context.Context, memStore *interstore.MemStore) {
for {
select {
case <-trigger.C:
start := time.Now()
if err := memStore.LoadInMemStore(ctx); err != nil {
l.Error("Error refreshing mem store with events from persistent store", "error", err)
continue
}

duration := time.Since(start).Seconds()
telemetry.MemActivityDuration.Record(ctx, duration)

l.Debug("Executed periodic refresh of mem store with events from persistent store", "scope", "pubsub")
case <-ctx.Done():
trigger.Stop()
Expand Down
77 changes: 76 additions & 1 deletion cmd/setup/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (

"github.com/IsaacDSC/gqueue/pkg/ctxlogger"
"github.com/IsaacDSC/gqueue/pkg/logs"
"github.com/IsaacDSC/gqueue/pkg/telemetry"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"go.opentelemetry.io/otel/attribute"
)

type CORSConfig struct {
Expand All @@ -29,7 +31,7 @@ func DefaultCORSConfig() CORSConfig {
AllowedHeaders: []string{"Content-Type", "Authorization", "X-Requested-With"},
ExposedHeaders: []string{},
AllowCredentials: false,
MaxAge: 86400, // 24 horas
MaxAge: 86400, // 24 hours
}
}

Expand Down Expand Up @@ -128,6 +130,29 @@ func AsynqLogger(h asynq.Handler) asynq.Handler {
})
}

func AsynqMetrics(next asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
meter := telemetry.Meter("task-consumer")
ctx = telemetry.WithMeter(ctx, meter)

attrs := []attribute.KeyValue{
attribute.String("task.event_name", t.Type()),
}

telemetry.TaskConsumerTotalProcessing.Increment(ctx, attrs...)
defer telemetry.TaskConsumerTotalProcessing.Decrement(ctx, attrs...)

if err := next.ProcessTask(ctx, t); err != nil {
telemetry.TaskConsumerTotalFailure.Count(ctx, 1, attrs...)
return err
}

telemetry.TaskConsumerTotalSuccess.Count(ctx, 1, attrs...)

return nil
})
}

func LoggerMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
Expand All @@ -145,3 +170,53 @@ func LoggerMiddleware(next http.Handler) http.Handler {
next.ServeHTTP(w, r)
})
}

type statusRecorder struct {
http.ResponseWriter
statusCode int
}

func (r *statusRecorder) WriteHeader(code int) {
r.statusCode = code
r.ResponseWriter.WriteHeader(code)
}

// MetricsMiddleware records HTTP metrics per service using OpenTelemetry.
func MetricsMiddleware(serviceName string, next http.Handler) http.Handler {
// Create instruments once per middleware chain.
meter := telemetry.Meter(serviceName)

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
if path == "/metrics" || path == "/health" {
next.ServeHTTP(w, r)
return
}

start := time.Now()

rec := &statusRecorder{
ResponseWriter: w,
statusCode: http.StatusOK,
}

// Ensure the Meter is available in the request context.
ctxWithMeter := telemetry.WithMeter(r.Context(), meter)
r = r.WithContext(ctxWithMeter)

next.ServeHTTP(rec, r)

duration := time.Since(start).Seconds()

attrs := []attribute.KeyValue{
attribute.String("http.method", r.Method),
attribute.String("http.route", path),
attribute.Int("http.response_code", rec.statusCode),
attribute.String("service.name", serviceName),
}

ctx := r.Context()
telemetry.HTTPServerRequests.Increment(ctx, attrs...)
telemetry.HTTPServerRequestDuration.Record(ctx, duration, attrs...)
})
}
2 changes: 1 addition & 1 deletion cmd/setup/pubsub/http_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ func (s *Service) startHttpServer(ctx context.Context, env cfg.Config) *http.Ser
pubsubapp.PublisherEvent(s.memStore, s.gcppublisher, s.insightsStore),
}

return httpsvc.StartHttpServer(ctx, env, routes, env.PubsubApiPort.String())
return httpsvc.StartHttpServer(ctx, env, routes, env.PubsubApiPort.String(), cfg.PUBSUB_APP_NAME)
}
1 change: 1 addition & 0 deletions cmd/setup/task/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func (s *Service) consumer(ctx context.Context, env cfg.Config, asynqCfg asynq.C

mux := asynq.NewServeMux()
mux.Use(middleware.AsynqLogger)
mux.Use(middleware.AsynqMetrics)

events := []asynqsvc.AsynqHandle{
taskapp.GetRequestHandle(s.fetch, s.insightsStore).ToAsynqHandler(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/setup/task/http_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ func (s *Service) startHttpServer(ctx context.Context, env cfg.Config) *http.Ser
taskapp.PublisherEvent(s.memStore, s.asynqPublisher, s.insightsStore),
}

return httpsvc.StartHttpServer(ctx, env, routes, env.TaskApiPort.String())
return httpsvc.StartHttpServer(ctx, env, routes, env.TaskApiPort.String(), cfg.TASK_APP_NAME)
}
Loading