Skip to content

Commit

Permalink
Add task timeout handling and update README
Browse files Browse the repository at this point in the history
- Added a timeout field to the goPool struct to store the global timeout duration.
- Created a WithTimeout function to set this timeout duration.
- Added timeout handling logic in the worker.start method. If a task is not completed within the specified timeout period, it is considered failed and a timeout error is returned.
- Updated the README file to include the new task timeout handling feature and removed outdated "For example" descriptions.

Signed-off-by: Daniel Hu <tao.hu@merico.dev>
  • Loading branch information
daniel-hutao committed Jul 25, 2023
1 parent 8ea252a commit ade1387
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 7 deletions.
10 changes: 5 additions & 5 deletions README.md
Expand Up @@ -12,17 +12,17 @@ GoPool is a high-performance, feature-rich, and easy-to-use worker pool library

- **Graceful Shutdown**: GoPool can shut down gracefully. It stops accepting new tasks and waits for all ongoing tasks to complete before shutting down when there are no more tasks or a shutdown signal is received.

- **Error Handling**: GoPool can handle errors that occur during task execution. For example, it can provide an error callback function.
- **Error Handling**: GoPool can handle errors that occur during task execution.

- **Task Timeout Handling**: GoPool can handle task execution timeouts. For example, it can set a timeout period, and if a task is not completed within this period, the task is considered failed.
- **Task Timeout Handling**: GoPool can handle task execution timeouts. If a task is not completed within the specified timeout period, the task is considered failed and a timeout error is returned.

- **Task Priority**: GoPool supports task priority. Tasks with higher priority are processed first.

- **Task Result Retrieval**: GoPool provides a way to retrieve task results. For example, it can provide a result callback function.
- **Task Result Retrieval**: GoPool provides a way to retrieve task results.

- **Task Retry**: GoPool provides a retry mechanism for failed tasks. For example, it can set the number of retries and the retry interval.
- **Task Retry**: GoPool provides a retry mechanism for failed tasks.

- **Task Progress Tracking**: GoPool provides task progress tracking. For example, it can provide a progress callback function or a method to query the current task progress.
- **Task Progress Tracking**: GoPool provides task progress tracking.

- **Concurrency Control**: GoPool can control the number of concurrent tasks to prevent system overload.

Expand Down
3 changes: 3 additions & 0 deletions gopool.go
Expand Up @@ -2,6 +2,7 @@ package gopool

import (
"sync"
"time"
)

// Task represents a function that will be executed by a worker.
Expand All @@ -16,6 +17,7 @@ type goPool struct {
taskQueue chan task
lock sync.Locker
cond *sync.Cond
timeout time.Duration
}

// NewGoPool creates a new pool of workers.
Expand All @@ -27,6 +29,7 @@ func NewGoPool(maxWorkers int, opts ...Option) *goPool {
workerStack: make([]int, maxWorkers),
taskQueue: make(chan task, 1e6),
lock: new(sync.Mutex),
timeout: 0,
}
for _, opt := range opts {
opt(pool)
Expand Down
12 changes: 11 additions & 1 deletion option.go
@@ -1,6 +1,9 @@
package gopool

import "sync"
import (
"sync"
"time"
)

// Option represents an option for the pool.
type Option func(*goPool)
Expand All @@ -19,3 +22,10 @@ func WithMinWorkers(minWorkers int) Option {
p.minWorkers = minWorkers
}
}

// WithTimeout sets the timeout for the pool.
func WithTimeout(timeout time.Duration) Option {
return func(p *goPool) {
p.timeout = timeout
}
}
32 changes: 31 additions & 1 deletion worker.go
@@ -1,5 +1,10 @@
package gopool

import (
"context"
"fmt"
)

// worker represents a worker in the pool.
type worker struct {
taskQueue chan task
Expand All @@ -15,7 +20,32 @@ func (w *worker) start(pool *goPool, workerIndex int) {
go func() {
for t := range w.taskQueue {
if t != nil {
t()
if pool.timeout > 0 {
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), pool.timeout)
defer cancel()

// Create a channel to receive the result of the task
done := make(chan struct{})

// Run the task in a separate goroutine
go func() {
t()
close(done)
}()

// Wait for the task to finish or for the context to timeout
select {
case <-done:
// The task finished successfully
case <-ctx.Done():
// The context timed out, the task took too long
fmt.Println("Task timed out")
}
} else {
// If timeout is not set or is zero, just run the task
t()
}
}
pool.pushWorker(workerIndex)
}
Expand Down

0 comments on commit ade1387

Please sign in to comment.