Skip to content

Commit

Permalink
refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
MarvinJWendt committed Nov 27, 2023
1 parent 3405ef6 commit 7cae1ab
Showing 1 changed file with 47 additions and 39 deletions.
86 changes: 47 additions & 39 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pool

import (
"context"
"errors"
"sync"
"time"
)
Expand Down Expand Up @@ -64,46 +65,53 @@ func (p *Pool[T]) SetErrorHandler(handler ErrorHandler[T]) *Pool[T] {
func (p *Pool[T]) Start() {
for i := 0; i < p.config.MaxWorkers; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case item, ok := <-p.queue:
if !ok {
return // Stop the goroutine if the queue is closed
}

// Create a timeout context for each task
workerCtx, cancel := context.WithTimeout(p.ctx, p.config.Timeout)
defer cancel()

// Run the handler in a separate goroutine to enable timeout control
done := make(chan struct{})
var err error
go func() {
err = p.handler(workerCtx, item)
close(done)
}()

// Wait for the task to complete or for the timeout
select {
case <-workerCtx.Done():
// Timeout occurred, handle the timeout error
if p.errorHandler != nil {
p.errorHandler(workerCtx.Err(), p)
}
case <-done:
// Task completed, handle any error returned by the handler
if err != nil && p.errorHandler != nil {
p.errorHandler(err, p)
}
}

case <-p.ctx.Done():
return // Stop the goroutine if the context is cancelled
}
go p.worker()
}
}

func (p *Pool[T]) worker() {
defer p.wg.Done()
for {

Check failure on line 74 in pool.go

View workflow job for this annotation

GitHub Actions / lint

for statement without condition should never be cuddled (wsl)
select {
case item, ok := <-p.queue:
if !ok {
return
}
}()
p.processTask(item)

Check failure on line 80 in pool.go

View workflow job for this annotation

GitHub Actions / lint

expressions should not be cuddled with blocks (wsl)
case <-p.ctx.Done():
return
}
}
}

func (p *Pool[T]) processTask(item T) {
var workerCtx context.Context
var cancel context.CancelFunc

Check failure on line 89 in pool.go

View workflow job for this annotation

GitHub Actions / lint

declarations should never be cuddled (wsl)
if p.config.Timeout > 0 {

Check failure on line 90 in pool.go

View workflow job for this annotation

GitHub Actions / lint

only one cuddle assignment allowed before if statement (wsl)
workerCtx, cancel = context.WithTimeout(p.ctx, p.config.Timeout)
} else {
workerCtx, cancel = context.WithCancel(p.ctx)
}
defer cancel()

Check failure on line 95 in pool.go

View workflow job for this annotation

GitHub Actions / lint

only one cuddle assignment allowed before defer statement (wsl)

done := make(chan error)
go func() {
done <- p.handler(workerCtx, item)
}()

select {
case <-workerCtx.Done():
if !errors.Is(workerCtx.Err(), context.Canceled) {
p.handleError(workerCtx.Err())
}
case err := <-done:
p.handleError(err)
}
}

func (p *Pool[T]) handleError(err error) {
if err != nil && p.errorHandler != nil {
p.errorHandler(err, p)
}
}

Expand Down

0 comments on commit 7cae1ab

Please sign in to comment.