Skip to content

Commit

Permalink
Add worker pool that can leak a byte buffer (#2610)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanhuhta committed Oct 31, 2023
1 parent 325fa01 commit f1f754f
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 4 deletions.
88 changes: 88 additions & 0 deletions examples/golang-push/rideshare/utility/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package utility

import (
"os"
"sync"
)

type workerPool struct {
poolLock *sync.Mutex
pool []chan struct{}
limit int
}

// Run a function using a pool.
func (c *workerPool) Run(fn func()) {
if os.Getenv("REGION") != "us-east" {
// Only leak memory for us-east. If not us-east, run the worker
// function in a blocking manner.

fn()
return
}

stop := make(chan struct{}, 1)
done := make(chan struct{}, 1)

c.poolLock.Lock()
size := len(c.pool)
if c.limit != 0 && size >= c.limit {
// We're at max pool limit, release a resource.
last := c.pool[size-1]
last <- struct{}{}
close(last)
c.pool = c.pool[:size-1]
}
c.pool = append(c.pool, stop)
c.poolLock.Unlock()

// Create a goroutine to run the function. It will write to done when the
// work is over, but won't clean up until it receives a signal from stop.
go doWork(fn, stop, done)

// Block until the worker signals it's done.
<-done
close(done)
}

// Closes the pool, cleaning up all resources.
func (c *workerPool) Close() {
c.poolLock.Lock()
defer c.poolLock.Unlock()

for _, c := range c.pool {
c <- struct{}{}
close(c)
}
c.pool = c.pool[:]
}

func doWork(fn func(), stop <-chan struct{}, done chan<- struct{}) {
buf := make([]byte, 0)

// Do work.
fn()

// Simulate the work in fn requiring some data to be added to a buffer.
const mb = 1 << 20
for i := 0; i < mb; i++ {
buf = append(buf, byte(i))
}

// Don't let the compiler optimize away the buf.
var _ = buf

// Signal we're done working.
done <- struct{}{}

// Block until we're told to clean up.
<-stop
}

func newPool(n int) *workerPool {
return &workerPool{
poolLock: &sync.Mutex{},
pool: make([]chan struct{}, 0, n),
limit: n,
}
}
11 changes: 7 additions & 4 deletions examples/golang-push/rideshare/utility/utility.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

const durationConstant = time.Duration(200 * time.Millisecond)

var pool = newPool(1_000)

func mutexLock(n int64) {
var i int64 = 0

Expand All @@ -30,9 +32,11 @@ func checkDriverAvailability(n int64) {
// start time is number of seconds since epoch
startTime := time.Now()

for time.Since(startTime) < time.Duration(n)*durationConstant {
i++
}
pool.Run(func() {
for time.Since(startTime) < time.Duration(n)*durationConstant {
i++
}
})

// Every other minute this will artificially create make requests in eu-north region slow
// this is just for demonstration purposes to show how performance impacts show up in the
Expand All @@ -41,7 +45,6 @@ func checkDriverAvailability(n int64) {
if os.Getenv("REGION") == "eu-north" && force_mutex_lock {
mutexLock(n)
}

}

func FindNearestVehicle(ctx context.Context, searchRadius int64, vehicle string) {
Expand Down

0 comments on commit f1f754f

Please sign in to comment.