-
-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding concurrent batch writes #96
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no power over if this gets merged or not, I'm just waiting for PR in the same repo to get merged myself. I thought I'd provide some feedback on your PR. I think you need some big changes to it if you want this thing merged.
batchwrite.go
Outdated
|
||
channel := make(chan BatchResponse) | ||
|
||
batchCounter := 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need this variable when you could just do
for _, batch := range totalBatch {
go bw.writeBatch(ctx, batch, channel)
}
and I don't think you even need the batch counter. It doesn't really seem like you're using it for any reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was using this for testing and debugging. Forgot to remove it!
batchwrite.go
Outdated
} | ||
|
||
if batchResponse.Error != nil { | ||
err = batchResponse.Error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you rather use some kind of multi error thing here? To me you could have multiple errors that occur from all your batches that you'd want to record. I'd suggest something like hashicorp/go-multierror
batchwrite.go
Outdated
} | ||
|
||
// Receive | ||
for j := 0; j < end; j++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you don't need to do it this way. I'd think a better way to do this would be
for {
select {
case batchResponse, ok := <- channel:
// logic for handing errors / unexpected stream close / updating written records
// when all batches responded, return the wrote count.
case <-ctx.Done():
// do some error logic because we timed out the ctx, return the currently recorded wrote records
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will try this. However the final writeBatch does not close the channel. Would it suffice to count and break within the case or is there a smarter way about it??
batchwrite.go
Outdated
batchCounter := 0 | ||
wrote = 0 | ||
err = nil | ||
for i := 0; i < iterations; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you take those suggestions, you could get rid of this outer for loop. and just have those two.
batchwrite.go
Outdated
} | ||
} | ||
|
||
close(channel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make this a deferred operation, as I wouldn't want to prematurely close the channel. I think with your current code this is a real possibility.
return 0, bw.err | ||
} | ||
|
||
cfg := new(batchWriteConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're not really using the notion of your threads really. I think it's overly complicating your operation as it's currently implemented. All you are really doing here is spinning up a go routine for each batch you're running, and you'll make too many operation if your threads number is larger than your total batches.
If you were truly concerned about how many go routines were concurrently running, you'd be making a worker pool, like https://gobyexample.com/worker-pools and send your batches to be processed by the worker pool. I'd also not call these things "Threads" as a common Go convention would be to refer to it as "Pool Size".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will try this. Thanks!
batchwrite.go
Outdated
} | ||
|
||
// Sets the number of threads to process the request | ||
func WithThreards(threads int) BatchWriteOption { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use the word thread, use pool size.
@greggjs Thanks for your suggestions. I've made the changes! Let me know what you think! |
a7c082f
to
dcb6fb3
Compare
dcb6fb3
to
a18f213
Compare
Added
RunConcurrently()
to concurrently executeBatchWriteItem()