Skip to content

Commit

Permalink
add interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
Soreing committed Feb 11, 2023
1 parent dad56c9 commit 867fff0
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 20 deletions.
32 changes: 18 additions & 14 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ type Poolable interface {
PoolRelease() // Called when the resource is permanently removed from the pool
}

type Pool interface {
Acquire(context.Context) (any, func(), error)
Close()
}

type resource struct {
value Poolable // The pooled value
expiry time.Time // Time till the resource is permanently removed
Expand All @@ -21,7 +26,7 @@ type request struct {
ctx context.Context // Context of the request to allow for cancellation
}

type Pool struct {
type pool struct {
count int // Current count of resources
limit int // Upper count limit of resources
ttl time.Duration // Time To Live duration of idle resources
Expand All @@ -45,8 +50,8 @@ func NewPool(
limit int,
ttl time.Duration,
constr func() Poolable,
) *Pool {
p := &Pool{
) Pool {
p := &pool{
count: 0,
limit: limit,
ttl: ttl,
Expand Down Expand Up @@ -74,7 +79,7 @@ func NewPool(

// Core handler of the pool, serving requests for resources and storing resources
// which have been freed up. Removes old resources if they have expired
func (p *Pool) handler() {
func (p *pool) handler() {
defer p.wg.Done()
for active := true; active; {
if p.expiry {
Expand Down Expand Up @@ -104,7 +109,7 @@ func (p *Pool) handler() {
// Serves a reqest or sets it as pending. If there is an idle resource, the request
// is immediately fulfilled. If there are no idle resources, one is created if there
// is capacity, otherwise the request will need to wait, and is set to pending.
func (p *Pool) serve(r *request) {
func (p *pool) serve(r *request) {
if p.icnt == 0 {
if p.count < p.limit {
p.count++
Expand All @@ -125,7 +130,7 @@ func (p *Pool) serve(r *request) {
// Stores a resource that was freed up. If there is a pending request, the resource
// is immediately used to fulfill it. Otherwise, the respurce is set to be idle, and
// if idle for too long, gets removed. Cancelled requests are skipped
func (p *Pool) store(r *resource) {
func (p *pool) store(r *resource) {
for idx := 0; idx < len(p.pend); {
select {
case <-p.pend[idx].ctx.Done():
Expand All @@ -149,13 +154,13 @@ func (p *Pool) store(r *resource) {

// Removes the oldest resource in the idle list permanently from the pool. The timer
// is set till the expiry of the next idle item, or canceled if the list is empty
func (p *Pool) dropLast() {
func (p *pool) dropLast() {
p.idle[0].value.PoolRelease()
p.count--
p.icnt--

for i := 0; i < p.icnt; i++ {
p.idle[i] = p.idle[i+1]
p.idle[i] = p.idle[i+1]
}

if p.icnt == 0 {
Expand All @@ -167,7 +172,7 @@ func (p *Pool) dropLast() {

// Rejects pending requests, releases idle resources and waits for in use resources
// to return before quitting
func (p *Pool) cleanup() {
func (p *pool) cleanup() {
for _, e := range p.pend {
close(e.ch)
}
Expand All @@ -186,10 +191,10 @@ func (p *Pool) cleanup() {
// Makes a request to the pool to acquire a resource. If the pool is closed, returns
// with an error. If the resource is acquired, it is returned with a done function
// to be called when the resource is no longer needed
func (p *Pool) Acquire(ctx context.Context) (any, func(), error) {
func (p *pool) Acquire(ctx context.Context) (any, func(), error) {
ch := make(chan *resource)
select {
case _, _ = <-p.close:
case <-p.close:
return nil, nil, errors.New("pool's closed")
default:
p.reqs <- &request{
Expand All @@ -216,8 +221,7 @@ func (p *Pool) Acquire(ctx context.Context) (any, func(), error) {
}

// Closes the pool
func (p *Pool) Close() {
func (p *pool) Close() {
close(p.close)
p.wg.Wait()
}

11 changes: 5 additions & 6 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestServe_Empty(t *testing.T) {
pl := NewPool(5, time.Second, func() Poolable {
count++
return NewItem(count)
})
}).(*pool)

val1, done1, err1 := pl.Acquire(context.Background())
val2, done2, err2 := pl.Acquire(context.Background())
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestServe_Empty(t *testing.T) {
func TestServe_Idle(t *testing.T) {
pl := NewPool(5, time.Second, func() Poolable {
return NewItem(0)
})
}).(*pool)

pl.count += 3
pl.exch <- &resource{value: NewItem(1)}
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestServe_Idle(t *testing.T) {
func TestServe_Full(t *testing.T) {
pl := NewPool(3, time.Second, func() Poolable {
return NewItem(0)
})
}).(*pool)

pl.count += 3
done := false
Expand All @@ -109,7 +109,7 @@ func TestServe_Full(t *testing.T) {
func TestRelease(t *testing.T) {
pl := NewPool(3, time.Second, func() Poolable {
return NewItem(0)
})
}).(*pool)

val1, done1, err1 := pl.Acquire(context.Background())
val2, done2, err2 := pl.Acquire(context.Background())
Expand All @@ -135,7 +135,7 @@ func TestRelease(t *testing.T) {
func TestExpire(t *testing.T) {
pl := NewPool(3, time.Millisecond*100, func() Poolable {
return NewItem(0)
})
}).(*pool)

val1, done1, err1 := pl.Acquire(context.Background())
val2, done2, err2 := pl.Acquire(context.Background())
Expand Down Expand Up @@ -168,4 +168,3 @@ func TestExpire(t *testing.T) {
}
}
}

0 comments on commit 867fff0

Please sign in to comment.