Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions commands/command_filter_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func filterCommand(cmd *cobra.Command, args []string) {
// until a read from that channel becomes blocking (in
// other words, we read until there are no more items
// immediately ready to be sent back to Git).
paths := pathnames(readAvailable(available))
paths := pathnames(readAvailable(available, q.BatchSize()))
if len(paths) == 0 {
// If `len(paths) == 0`, `tq.Watch()` has
// closed, indicating that all items have been
Expand Down Expand Up @@ -302,8 +302,8 @@ func incomingOrCached(r io.Reader, ptr *lfs.Pointer) (io.Reader, error) {
// 1. Reading from the channel of available items blocks, or ...
// 2. There is one item available, or ...
// 3. The 'tq.TransferQueue' is completed.
func readAvailable(ch <-chan *tq.Transfer) []*tq.Transfer {
ts := make([]*tq.Transfer, 0, 100)
func readAvailable(ch <-chan *tq.Transfer, cap int) []*tq.Transfer {
ts := make([]*tq.Transfer, 0, cap)

for {
select {
Expand Down
6 changes: 6 additions & 0 deletions tq/transfer_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,12 @@ func (q *TransferQueue) finishAdapter() {
}
}

// BatchSize returns the batch size of the receiving *TransferQueue, or, the
// number of transfers to accept before beginning work on them.
func (q *TransferQueue) BatchSize() int {
return q.batchSize
}

func (q *TransferQueue) Skip(size int64) {
q.meter.Skip(size)
}
Expand Down
7 changes: 7 additions & 0 deletions tq/transfer_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,10 @@ func TestRetryCounterCanNotRetryAfterExceedingRetryCount(t *testing.T) {
assert.Equal(t, 1, count)
assert.False(t, canRetry)
}

func TestBatchSizeReturnsBatchSize(t *testing.T) {
q := NewTransferQueue(
Upload, NewManifest(), "origin", WithBatchSize(3))

assert.Equal(t, 3, q.BatchSize())
}