Skip to content

Commit

Permalink
Addresses in a batch are sequential (#3706)
Browse files Browse the repository at this point in the history
* Addresses in a batch are sequential
  • Loading branch information
PavelZaytsev committed Jul 21, 2023
1 parent 2381213 commit c6238a3
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.corfudb.runtime.clients.LogUnitClient;
import org.corfudb.util.CFUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -106,17 +107,44 @@ TransferSegmentRangeSingle getUnknownAddressesInRangeForRange(TransferSegmentRan
.build();
}

static List<List<Long>> splitNonSequentialAddresses(List<Long> batch) {
List<List<Long>> lists = new ArrayList<>();
if (batch.size() < 2) {
lists.add(batch);
return lists;
}

List<Long> sequentialList = new ArrayList<>();

sequentialList.add(batch.get(0));
for (int i = 0; i < batch.size() - 1; i++) {
long current = batch.get(i);
long next = batch.get(i + 1);
if (current + 1 == next) {
sequentialList.add(next);
} else {
lists.add(sequentialList);
sequentialList = new ArrayList<>();
sequentialList.add(next);
}
}
lists.add(sequentialList);
return lists;
}

/**
* Transform the given range into a stream of batch requests.
*
* @param range A transfer segment range that contains unknown addresses and maybe available
* log unit servers.
* @return A stream of transfer batch requests.
*/
Stream<TransferBatchRequest> rangeToBatchRequestStream(TransferSegmentRangeSingle range) {
public Stream<TransferBatchRequest> rangeToBatchRequestStream(TransferSegmentRangeSingle range) {
ImmutableList<Long> unknownAddressesInRange = range.getUnknownAddressesInRange();
Optional<ImmutableList<String>> availableServers = range.getAvailableServers();
return Lists.partition(unknownAddressesInRange, batchSize).stream()
return StateTransferManager.splitNonSequentialAddresses(unknownAddressesInRange)
.stream()
.flatMap(list -> Lists.partition(list,batchSize).stream())
.map(partition -> TransferBatchRequest
.builder()
.addresses(partition)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.corfudb.infrastructure.log.statetransfer;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.corfudb.common.util.Tuple;
import org.corfudb.infrastructure.log.statetransfer.batch.TransferBatchRequest;
Expand All @@ -18,7 +19,11 @@
import org.corfudb.util.NodeLocator;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
Expand All @@ -30,6 +35,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.corfudb.infrastructure.log.statetransfer.segment.StateTransferType.CONSISTENT_READ;
import static org.corfudb.infrastructure.log.statetransfer.segment.StateTransferType.PROTOCOL_READ;
import static org.corfudb.infrastructure.log.statetransfer.segment.TransferSegmentStatus.SegmentState.RESTORED;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand All @@ -43,6 +49,12 @@ private StateTransferManager getDefaultInstance() {
mock(BasicTransferProcessor.class), mock(ParallelTransferProcessor.class));
}

private StateTransferManager getDefaultInstance(int batchSize) {
return getConfiguredInstance(mock(LogUnitClient.class),
mock(BasicTransferProcessor.class), mock(ParallelTransferProcessor.class),
batchSize);
}

private StateTransferManager getConfiguredInstance(LogUnitClient client,
BasicTransferProcessor processor1,
ParallelTransferProcessor processor2) {
Expand All @@ -54,6 +66,18 @@ private StateTransferManager getConfiguredInstance(LogUnitClient client,
.build();
}

private StateTransferManager getConfiguredInstance(LogUnitClient client,
BasicTransferProcessor processor1,
ParallelTransferProcessor processor2,
int batchSize) {
return StateTransferManager.builder()
.batchSize(batchSize)
.logUnitClient(client)
.basicTransferProcessor(processor1)
.parallelTransferProcessor(processor2)
.build();
}

private TransferSegmentRange getSingleRange(long start,
long end,
StateTransferType type,
Expand Down Expand Up @@ -460,6 +484,136 @@ void testRangeToBatchRequestStreamWithServers() {
assertThat(collect).isEqualTo(expected);
}

boolean sequential(List<Long> batch) {
if (batch.size() < 2) {
return true;
}
for (int i = 0; i < batch.size() - 1; i++) {
Long current = batch.get(i) + 1;
Long next = batch.get(i + 1);
if (!current.equals(next)) {
return false;
}
}
return true;
}

boolean batchedAndSequential(List<TransferBatchRequest> batches, int batchSize) {
for (TransferBatchRequest batch : batches) {
if (batch.getAddresses().size() > batchSize) {
return false;
}
if (!sequential(batch.getAddresses())) {
return false;
}
}
return true;
}

long getSequentialOrRandomNumber(Random random, long num, int bound) {
final int sequentialBound = 2;
final int totalBound = 3;
if (random.nextInt(totalBound) < sequentialBound) {
return num + 1;
} else {
return random.nextInt(bound);
}
}

ImmutableList<Long> generateRandomListOfSize(Random random, int size, int bound) {
LinkedHashSet<Long> set = new LinkedHashSet<>();
for (int i = 0; i < size; i++) {
final long randomNumber = random.nextInt(bound);
set.add(randomNumber);
final long sequentialOrRandomNumber = getSequentialOrRandomNumber(random, randomNumber, bound);
set.add(sequentialOrRandomNumber);

}
return new ArrayList<>(set).stream().sorted().collect(ImmutableList.toImmutableList());
}

TransferSegmentRangeSingle getTestTransferSegmentRange(ImmutableList<Long> addresses) {
Preconditions.checkArgument(!addresses.isEmpty());
return TransferSegmentRangeSingle
.builder()
.availableServers(Optional.empty())
.unknownAddressesInRange(addresses)
.startAddress(addresses.get(0))
.endAddress(addresses.get(addresses.size() - 1))
.typeOfTransfer(PROTOCOL_READ)
.split(false)
.status(TransferSegmentStatus.builder().build())
.build();
}

List<TransferBatchRequest> toBatches(List<List<Long>> addresses) {
return addresses
.stream()
.map(x -> TransferBatchRequest.builder().addresses(x).build())
.collect(Collectors.toList());
}

@Test
void partitionWorkloadTest() {
// Test that partition functionality is preserved
int batchSize = 3;
StateTransferManager stateTransferManager = getDefaultInstance(batchSize);
ImmutableList<Long> initList = ImmutableList.of(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
TransferSegmentRangeSingle testTransferSegmentRange = getTestTransferSegmentRange(initList);
List<TransferBatchRequest> batches = stateTransferManager.rangeToBatchRequestStream(testTransferSegmentRange).collect(Collectors.toList());

List<List<Long>> expectedList = ImmutableList.of(
ImmutableList.of(1L, 2L, 3L),
ImmutableList.of(4L, 5L, 6L),
ImmutableList.of(7L, 8L, 9L)
);

assertThat(toBatches(expectedList)).isEqualTo(batches);

batchSize = 2;
stateTransferManager = getDefaultInstance(batchSize);
batches = stateTransferManager.rangeToBatchRequestStream(testTransferSegmentRange).collect(Collectors.toList());
expectedList = ImmutableList.of(
ImmutableList.of(1L, 2L),
ImmutableList.of(3L, 4L),
ImmutableList.of(5L, 6L),
ImmutableList.of(7L, 8L),
ImmutableList.of(9L)
);
assertThat(toBatches(expectedList)).isEqualTo(batches);
batchSize = 8;
stateTransferManager = getDefaultInstance(batchSize);
batches = stateTransferManager.rangeToBatchRequestStream(testTransferSegmentRange).collect(Collectors.toList());
expectedList = ImmutableList.of(
ImmutableList.of(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L),
ImmutableList.of(9L)
);
assertThat(toBatches(expectedList)).isEqualTo(batches);
batchSize = 20;
stateTransferManager = getDefaultInstance(batchSize);
batches = stateTransferManager.rangeToBatchRequestStream(testTransferSegmentRange).collect(Collectors.toList());
expectedList = ImmutableList.of(
ImmutableList.of(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L)
);
assertThat(toBatches(expectedList)).isEqualTo(batches);

// Test sequential aspect
int seed = 42;
Random rand = new Random(seed);
int numBound = 100;
int listSizeMax = 100;
int batchSizeMax = 100;
for (int size = 1; size < listSizeMax; size++) {
for (int batchSizeCurrent = 1; batchSizeCurrent < batchSizeMax; batchSizeCurrent++) {
final ImmutableList<Long> longs = generateRandomListOfSize(rand, size, numBound);
testTransferSegmentRange = getTestTransferSegmentRange(longs);
stateTransferManager = getDefaultInstance(batchSizeCurrent);
final List<TransferBatchRequest> batchRequests = stateTransferManager.rangeToBatchRequestStream(testTransferSegmentRange).collect(Collectors.toList());
assertThat(batchedAndSequential(batchRequests, batchSizeCurrent)).isTrue();
}
}
}

@Test
void createBatchWorkload() {
TransferSegmentRangeSingle range1 = (TransferSegmentRangeSingle) getSingleRange(0L,
Expand Down

0 comments on commit c6238a3

Please sign in to comment.