Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker pool that can leak a byte buffer #2610

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions examples/golang-push/rideshare/utility/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package utility

import (
"os"
"sync"
)

type workerPool struct {
poolLock *sync.Mutex
pool []chan struct{}
limit int
}

// Run a function using a pool.
func (c *workerPool) Run(fn func()) {
if os.Getenv("REGION") != "us-east" {
// Only leak memory for us-east. If not us-east, run the worker
// function in a blocking manner.

fn()
return
}

stop := make(chan struct{}, 1)
done := make(chan struct{}, 1)

c.poolLock.Lock()
size := len(c.pool)
if c.limit != 0 && size >= c.limit {
// We're at max pool limit, release a resource.
last := c.pool[size-1]
last <- struct{}{}
close(last)
c.pool = c.pool[:size-1]
}
c.pool = append(c.pool, stop)
c.poolLock.Unlock()

// Create a goroutine to run the function. It will write to done when the
// work is over, but won't clean up until it receives a signal from stop.
go doWork(fn, stop, done)

// Block until the worker signals it's done.
<-done
close(done)
}

// Closes the pool, cleaning up all resources.
func (c *workerPool) Close() {
c.poolLock.Lock()
defer c.poolLock.Unlock()

for _, c := range c.pool {
c <- struct{}{}
close(c)
}
c.pool = c.pool[:]
}

func doWork(fn func(), stop <-chan struct{}, done chan<- struct{}) {
buf := make([]byte, 0)

// Do work.
fn()

// Simulate the work in fn requiring some data to be added to a buffer.
const mb = 1 << 20
for i := 0; i < mb; i++ {
buf = append(buf, byte(i))
}

// Don't let the compiler optimize away the buf.
var _ = buf

// Signal we're done working.
done <- struct{}{}

// Block until we're told to clean up.
<-stop
}

func newPool(n int) *workerPool {
return &workerPool{
poolLock: &sync.Mutex{},
pool: make([]chan struct{}, 0, n),
limit: n,
}
}
11 changes: 7 additions & 4 deletions examples/golang-push/rideshare/utility/utility.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

const durationConstant = time.Duration(200 * time.Millisecond)

var pool = newPool(1_000)

func mutexLock(n int64) {
var i int64 = 0

Expand All @@ -30,9 +32,11 @@ func checkDriverAvailability(n int64) {
// start time is number of seconds since epoch
startTime := time.Now()

for time.Since(startTime) < time.Duration(n)*durationConstant {
i++
}
pool.Run(func() {
for time.Since(startTime) < time.Duration(n)*durationConstant {
i++
}
})

// Every other minute this will artificially create make requests in eu-north region slow
// this is just for demonstration purposes to show how performance impacts show up in the
Expand All @@ -41,7 +45,6 @@ func checkDriverAvailability(n int64) {
if os.Getenv("REGION") == "eu-north" && force_mutex_lock {
mutexLock(n)
}

}

func FindNearestVehicle(ctx context.Context, searchRadius int64, vehicle string) {
Expand Down
Loading