Skip to content

Commit

Permalink
optimize: remove useless panic
Browse files Browse the repository at this point in the history
  • Loading branch information
justlorain committed Jun 26, 2023
1 parent a5d79af commit ca7f7a2
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 54 deletions.
54 changes: 24 additions & 30 deletions helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ func (v *Violin) submit(wait bool, task func()) {
}
if wait {
ctx, done := context.WithCancel(context.Background())
v.taskChan <- func() {
v.taskC <- func() {
task()
done()
}
<-ctx.Done()
} else {
v.taskChan <- task
v.taskC <- task
}
_ = atomic.AddUint32(&v.taskNum, 1)
}
Expand All @@ -52,7 +52,7 @@ func (v *Violin) pause(ctx context.Context) {
wg.Done()
select {
case <-ctx.Done():
case <-v.pauseChan:
case <-v.pauseC:
}
})
}
Expand All @@ -61,33 +61,27 @@ func (v *Violin) pause(ctx context.Context) {

func (v *Violin) shutdown(wait bool) {
v.once.Do(func() {
close(v.taskChan)
close(v.pauseChan)
close(v.taskC)
close(v.pauseC)
if wait {
if !atomic.CompareAndSwapUint32(&v.status, statusPlaying, statusCleaning) {
panic(cleaningFailed)
}
_ = atomic.CompareAndSwapUint32(&v.status, statusPlaying, statusCleaning)
v.clean()
v.waitClose()
} else {
v.waitClose()
if !atomic.CompareAndSwapUint32(&v.status, statusPlaying, statusShutdown) {
panic(shutdownFailed)
}
_ = atomic.CompareAndSwapUint32(&v.status, statusPlaying, statusShutdown)
}
})
}

func (v *Violin) play() {
defer func() {
close(v.shutdownChan)
close(v.shutdownC)
if v.IsCleaning() {
_ = atomic.CompareAndSwapUint32(&v.status, statusCleaning, statusShutdown)
}
}()
if !atomic.CompareAndSwapUint32(&v.status, statusInitialized, statusPlaying) {
panic(playingFailed)
}
_ = atomic.CompareAndSwapUint32(&v.status, statusInitialized, statusPlaying)
wg := new(sync.WaitGroup)
timer := time.NewTimer(v.options.workerIdleTimeout)
defer timer.Stop()
Expand All @@ -98,26 +92,26 @@ LOOP:
go v.recruit(wg)
}
select {
case task, ok := <-v.waitingChan:
case task, ok := <-v.waitingC:
if !ok {
break LOOP
}
select {
case v.workerChan <- task:
case v.workerC <- task:
_ = atomic.AddUint32(&v.waitingTaskNum, ^uint32(0))
default:
v.waitingChan <- task
v.waitingC <- task
}
default:
select {
case task, ok := <-v.taskChan:
case task, ok := <-v.taskC:
if !ok {
break LOOP
}
select {
case v.workerChan <- task:
case v.workerC <- task:
default:
v.waitingChan <- task
v.waitingC <- task
_ = atomic.AddUint32(&v.waitingTaskNum, 1)
}
case <-timer.C:
Expand All @@ -134,12 +128,12 @@ LOOP:

func (v *Violin) clean() {
for v.WaitingTaskNum() > 0 {
task, ok := <-v.waitingChan
task, ok := <-v.waitingC
if !ok {
break
}
_ = atomic.AddUint32(&v.waitingTaskNum, ^uint32(0))
v.workerChan <- task
v.workerC <- task
}
}

Expand All @@ -155,9 +149,9 @@ LOOP:
break
}
select {
case <-v.dismissChan:
case <-v.dismissC:
break LOOP
case task, ok := <-v.workerChan:
case task, ok := <-v.workerC:
if !ok {
break LOOP
}
Expand All @@ -169,7 +163,7 @@ LOOP:

func (v *Violin) tryDismiss() {
select {
case v.dismissChan <- struct{}{}:
case v.dismissC <- struct{}{}:
default:
}
}
Expand All @@ -178,11 +172,11 @@ func (v *Violin) dismissAll() {
for v.WorkerNum() > 0 {
v.tryDismiss()
}
close(v.dismissChan)
close(v.dismissC)
}

func (v *Violin) waitClose() {
<-v.shutdownChan
close(v.waitingChan)
close(v.workerChan)
<-v.shutdownC
close(v.waitingC)
close(v.workerC)
}
38 changes: 15 additions & 23 deletions violin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync/atomic"
)

// Violin VIOLIN worker pool
type Violin struct {
options *options

Expand All @@ -32,12 +33,12 @@ type Violin struct {
waitingTaskNum uint32
status uint32

workerChan chan func()
taskChan chan func()
waitingChan chan func()
dismissChan chan struct{}
pauseChan chan struct{}
shutdownChan chan struct{}
workerC chan func()
taskC chan func()
waitingC chan func()
dismissC chan struct{}
pauseC chan struct{}
shutdownC chan struct{}
}

const (
Expand All @@ -48,28 +49,19 @@ const (
statusShutdown
)

const (
initializedFailed = "violin initialize failed"
playingFailed = "violin playing failed"
cleaningFailed = "violin cleaning failed"
shutdownFailed = "violin shutdown failed"
)

// New VIOLIN worker pool
func New(opts ...Option) *Violin {
options := newOptions(opts...)
v := &Violin{
options: options,
workerChan: make(chan func()),
taskChan: make(chan func()),
waitingChan: make(chan func(), options.waitingQueueSize),
dismissChan: make(chan struct{}),
pauseChan: make(chan struct{}),
shutdownChan: make(chan struct{}),
}
if !atomic.CompareAndSwapUint32(&v.status, 0, statusInitialized) {
panic(initializedFailed)
options: options,
workerC: make(chan func()),
taskC: make(chan func()),
waitingC: make(chan func(), options.waitingQueueSize),
dismissC: make(chan struct{}),
pauseC: make(chan struct{}),
shutdownC: make(chan struct{}),
}
_ = atomic.CompareAndSwapUint32(&v.status, 0, statusInitialized)
go v.play()
return v
}
Expand Down
2 changes: 1 addition & 1 deletion violin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestMaxWorkers(t *testing.T) {
}

// Wait for all queued tasks to be dispatched to workers.
assert.Equal(t, len(v.waitingChan), int(v.WaitingTaskNum()))
assert.Equal(t, len(v.waitingC), int(v.WaitingTaskNum()))

timeout := time.After(5 * time.Second)
for startCount := 0; startCount < max; {
Expand Down

0 comments on commit ca7f7a2

Please sign in to comment.