Skip to content

Commit

Permalink
STEP-4 solution code used for outbox service.
Browse files Browse the repository at this point in the history
  • Loading branch information
ekhvalov committed Nov 21, 2023
1 parent 0526b38 commit f93d9e6
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 105 deletions.
60 changes: 29 additions & 31 deletions internal/repositories/jobs/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"time"

store "github.com/ekhvalov/bank-chat-service/internal/store/gen"
"github.com/ekhvalov/bank-chat-service/internal/store/gen/job"
"github.com/ekhvalov/bank-chat-service/internal/types"
)
Expand All @@ -18,41 +17,40 @@ type Job struct {
}

func (r *Repo) FindAndReserveJob(ctx context.Context, until time.Time) (Job, error) {
var j *store.Job
query := `
with cte as (
select "id" from "jobs"
where "available_at" <= now()
and "reserved_until" <= now()
limit 1 for update skip locked
)
update "jobs" as "j"
set "attempts" = "attempts" + 1, "reserved_until" = $1
from cte
where "cte"."id" = "j"."id" returning
"j".id,
"j".name,
"j".payload,
"j".attempts;`

findAndReserve := func(ctx context.Context) error {
now := time.Now()
var err error
j, err = r.db.Job(ctx).Query().
Where(
job.AvailableAtLTE(now),
job.ReservedUntilLTE(now),
).
ForUpdate().
First(ctx)
if err != nil {
if store.IsNotFound(err) {
return ErrNoJobs
}
return fmt.Errorf("find job: %v", err)
}
rows, err := r.db.Job(ctx).QueryContext(ctx, query, until)
if err != nil {
return Job{}, fmt.Errorf("query context: %w", err)
}
defer rows.Close()

j, err = j.Update().AddAttempts(1).SetReservedUntil(until).Save(ctx)
if err != nil {
return fmt.Errorf("update job: %v", err)
if !rows.Next() {
if err := rows.Err(); err != nil {
return Job{}, fmt.Errorf("rows err: %v", err)
}
return nil
}
if err := r.db.RunInTx(ctx, findAndReserve); err != nil {
return Job{}, err
return Job{}, ErrNoJobs
}

return Job{
ID: j.ID,
Name: j.Name,
Payload: j.Payload,
Attempts: j.Attempts,
}, nil
var j Job
if err := rows.Scan(&j.ID, &j.Name, &j.Payload, &j.Attempts); err != nil {
return Job{}, fmt.Errorf("scan job: %v", err)
}
return j, nil
}

func (r *Repo) CreateJob(ctx context.Context, name, payload string, availableAt time.Time) (types.JobID, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/services/outbox/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func (s *Service) Put(ctx context.Context, name, payload string, availableAt time.Time) (types.JobID, error) {
jobID, err := s.repo.CreateJob(ctx, name, payload, availableAt)
jobID, err := s.jobsRepo.CreateJob(ctx, name, payload, availableAt)
if err != nil {
return types.JobIDNil, fmt.Errorf("create job: %v", err)
}
Expand Down
149 changes: 89 additions & 60 deletions internal/services/outbox/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/ekhvalov/bank-chat-service/internal/types"
)

const serviceName = "outbox"

type jobsRepository interface {
CreateJob(ctx context.Context, name, payload string, availableAt time.Time) (types.JobID, error)
FindAndReserveJob(ctx context.Context, until time.Time) (jobsrepo.Job, error)
Expand All @@ -30,38 +32,38 @@ type Options struct {
workers int `option:"mandatory" validate:"min=1,max=32"`
idleTime time.Duration `option:"mandatory" validate:"min=100ms,max=10s"`
reserveFor time.Duration `option:"mandatory" validate:"min=1s,max=10m"`
repo jobsRepository `option:"mandatory" validate:"required"`
db transactor `option:"mandatory" validate:"required"`
jobsRepo jobsRepository `option:"mandatory" validate:"required"`
txtor transactor `option:"mandatory" validate:"required"`
lg *zap.Logger `option:"mandatory" validate:"required"`
}

type Service struct {
Options
registry map[string]Job
mutex *sync.Mutex
jobs map[string]Job
mutex *sync.Mutex
}

func New(opts Options) (*Service, error) {
if err := opts.Validate(); err != nil {
return nil, fmt.Errorf("validate options: %v", err)
}
return &Service{
Options: opts,
registry: make(map[string]Job),
mutex: &sync.Mutex{},
Options: opts,
jobs: make(map[string]Job),
mutex: &sync.Mutex{},
}, nil
}

func (s *Service) RegisterJob(job Job) error {
s.mutex.Lock()
defer s.mutex.Unlock()

_, exists := s.registry[job.Name()]
_, exists := s.jobs[job.Name()]
if exists {
return fmt.Errorf("job %q is already registered", job.Name())
}

s.registry[job.Name()] = job
s.jobs[job.Name()] = job

return nil
}
Expand All @@ -74,81 +76,108 @@ func (s *Service) MustRegisterJob(job Job) {

func (s *Service) Run(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)

for i := 0; i < s.workers; i++ {
eg.Go(func() error { return s.handleJobs(ctx) })
}
if err := eg.Wait(); err != nil && errors.Is(err, context.Canceled) {
return fmt.Errorf("run: %v", err)
logger := zap.L().Named(serviceName).With(zap.Int("worker", i+1))
eg.Go(func() error {
for {
// Process all available jobs in one go.
if err := s.processAvailableJobs(ctx, logger); err != nil {
if ctx.Err() != nil {
return nil //nolint:nilerr
}
logger.Warn("process jobs error", zap.Error(err))
return err
}

select {
case <-ctx.Done():
return nil
case <-time.After(s.idleTime):
}
}
})
}
return nil

return eg.Wait()
}

func (s *Service) handleJobs(ctx context.Context) error {
for nil == ctx.Err() {
job, err := s.repo.FindAndReserveJob(ctx, time.Now().Add(s.reserveFor))
if err != nil {
func (s *Service) processAvailableJobs(ctx context.Context, log *zap.Logger) error {
for {
select {
case <-ctx.Done():
return nil
default:
}

if err := s.findAndProcessJob(ctx, log); err != nil {
if errors.Is(err, jobsrepo.ErrNoJobs) {
idle(ctx, s.idleTime)
continue
log.Debug("no jobs found to process")
return nil
}
return fmt.Errorf("find and reserve: %v", err)
}
if err = s.handleJob(ctx, job); err != nil {
return fmt.Errorf("handle job %q: %v", job.ID, err)
return err
}
}
return nil
}

func (s *Service) handleJob(ctx context.Context, job jobsrepo.Job) error {
s.mutex.Lock()
defer s.mutex.Unlock()
func (s *Service) findAndProcessJob(ctx context.Context, log *zap.Logger) error {
job, err := s.jobsRepo.FindAndReserveJob(ctx, time.Now().Local().Add(s.reserveFor))
if err != nil {
return fmt.Errorf("find and reserve job: %w", err)
}

log = log.With(
zap.String("job_name", job.Name),
zap.Stringer("job_id", job.ID),
zap.Int("attempt_number", job.Attempts))

handler, ok := s.registry[job.Name]
j, ok := s.jobs[job.Name]
if !ok {
if err := s.moveJobToFailed(ctx, job, "handler is not found"); err != nil {
return fmt.Errorf("fail no handler job: %v", err)
}
return nil
log.Warn("drop to dlq: job is not registered")
return s.dlq(ctx, job.ID, job.Name, job.Payload, "unknown job")
}

ctxTimeout, cancel := context.WithTimeout(ctx, handler.ExecutionTimeout())
defer cancel()
err := handler.Handle(ctxTimeout, job.Payload)
if nil == err {
if err := s.repo.DeleteJob(ctx, job.ID); err != nil {
return fmt.Errorf("delete job: %v", err)
func() {
ctx, cancel := context.WithTimeout(ctx, j.ExecutionTimeout())
defer cancel()

err = j.Handle(ctx, job.Payload)
}()

if err != nil {
log.Warn("handle job error", zap.Error(err))

if job.Attempts >= j.MaxAttempts() {
log.Warn("drop to dlq: job max attempts exceeded")
return s.dlq(
ctx,
job.ID,
job.Name,
job.Payload,
fmt.Sprintf("max attempts exceeded: %v", err),
)
}
return nil
}

s.lg.Error("handle payload", zap.String("job_id", job.ID.String()), zap.Error(err))
if job.Attempts >= handler.MaxAttempts() {
if err := s.moveJobToFailed(ctx, job, "attempts limit exceeded"); err != nil {
return fmt.Errorf("fail attempts limit exceeded job: %v", err)
}
// Intentionally delete job with context.Background() to avoid case when job is handled,
// but ctx is already closed before deleting.
if err := s.jobsRepo.DeleteJob(context.Background(), job.ID); err != nil {
log.Warn("delete job error", zap.Error(err))
}
return nil
}

func (s *Service) moveJobToFailed(ctx context.Context, job jobsrepo.Job, reason string) error {
return s.db.RunInTx(ctx, func(ctx context.Context) error {
if err := s.repo.DeleteJob(ctx, job.ID); err != nil {
return fmt.Errorf("delete job %q: %v", job.ID, err)
}
if err := s.repo.CreateFailedJob(ctx, job.Name, job.Payload, reason); err != nil {
func (s *Service) dlq(ctx context.Context, jobID types.JobID, name, payload, reason string) error {
return s.txtor.RunInTx(ctx, func(ctx context.Context) error {
if err := s.jobsRepo.CreateFailedJob(ctx, name, payload, reason); err != nil {
return fmt.Errorf("create failed job: %v", err)
}
return nil
})
}

func idle(ctx context.Context, idleTime time.Duration) {
timer := time.NewTimer(idleTime)
defer timer.Stop()
if err := s.jobsRepo.DeleteJob(ctx, jobID); err != nil {
return fmt.Errorf("delete job: %v", err)
}

select {
case <-timer.C:
case <-ctx.Done():
}
return nil
})
}
24 changes: 12 additions & 12 deletions internal/services/outbox/service_options.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions internal/store/gen/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f93d9e6

Please sign in to comment.