Skip to content

Commit

Permalink
change cache readAll() to utilize ranged read instead of single reads (
Browse files Browse the repository at this point in the history
  • Loading branch information
zjohnny226 authored and no2chem committed Jun 21, 2017
1 parent 4516f5f commit 0bc84e9
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;

import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -252,18 +253,23 @@ public void invalidateClientCache() {
}

/**
* Fetch an address for insertion into the cache.
* Fetch a collection of addresses for insertion into the cache.
*
* @param addresses An address to read from.
* @return A result to be cached. If the readresult is empty,
* This entry will be scheduled to self invalidate.
* @param addresses collection of addresses to read from.
* @return A result to be cached
*/
private @Nonnull Map<Long, ILogData> cacheFetch(Iterable<Long> addresses) {
return StreamSupport.stream(addresses.spliterator(), true)
.map(address -> layoutHelper(l ->
l.getReplicationMode(address).getReplicationProtocol(runtime)
.read(l, address)))
.collect(Collectors.toMap(ILogData::getGlobalAddress, r -> r));
//turn the addresses into Set for now to satisfy signature requirement down the line
Set<Long> readAddresses = new TreeSet<>();
Iterator<Long> iterator = addresses.iterator();
while(iterator.hasNext()){
readAddresses.add(iterator.next());
}

//doesn't handle the case where some address have a different replication mode
return layoutHelper(l -> l.getReplicationMode(readAddresses.iterator().next())
.getReplicationProtocol(runtime)
.readAll(l, readAddresses));
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package org.corfudb.runtime.view.replication;

import com.google.common.collect.Range;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import javax.annotation.Nullable;

import lombok.extern.slf4j.Slf4j;
import org.corfudb.protocols.wireprotocol.ILogData;
import org.corfudb.protocols.wireprotocol.LogData;
import org.corfudb.runtime.exceptions.OverwriteException;
import org.corfudb.runtime.exceptions.RecoveryException;
import org.corfudb.runtime.view.Layout;
import org.corfudb.util.CFUtils;



/**
* Created by mwei on 4/6/17.
*/
Expand Down Expand Up @@ -66,6 +71,35 @@ public ILogData peek(Layout layout, long globalAddress) {
return ret == null || ret.isEmpty() ? null : ret;
}

/**
* {@inheritDoc}
*/
@Override
public Map<Long, ILogData> readAll(Layout layout, Set<Long> globalAddresses) {
Range<Long> range = Range.encloseAll(globalAddresses);
long startAddress = range.lowerEndpoint();
long endAddress = range.upperEndpoint();
int numUnits = layout.getSegmentLength(startAddress);
log.trace("ReadAll[{}-{}]: chain {}/{}", startAddress, endAddress, numUnits, numUnits);

Map<Long, LogData> logResult = CFUtils.getUninterruptibly(layout
.getLogUnitClient(startAddress, numUnits - 1)
.read(null, range)).getReadSet();

//in case of a hole, do a normal read and use its hole fill policy
Map<Long, ILogData> returnResult = new TreeMap<>();
for (Map.Entry<Long, LogData> entry: logResult.entrySet()){
ILogData value = entry.getValue();
if (value == null || value.isEmpty()){
value = read(layout, entry.getKey());
}

returnResult.put(entry.getKey(), value);
}

return returnResult;
}

/** Propagate a write down the chain, ignoring
* any overwrite errors. It is expected that the
* write has already successfully completed at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package org.corfudb.runtime.view;

import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.Range;
import org.corfudb.infrastructure.LogUnitServerAssertions;
import org.corfudb.infrastructure.TestLayoutBuilder;
import org.corfudb.protocols.wireprotocol.*;
import org.corfudb.runtime.CorfuRuntime;
import org.junit.Before;
import org.junit.Test;

import java.util.*;
Expand All @@ -15,10 +19,8 @@
*/
public class AddressSpaceViewTest extends AbstractViewTest {

@Test
@SuppressWarnings("unchecked")
public void ensureStripingWorks()
throws Exception {
@Before
public void setup(){
addServer(SERVERS.PORT_0);
addServer(SERVERS.PORT_1);
addServer(SERVERS.PORT_2);
Expand All @@ -40,7 +42,12 @@ public void ensureStripingWorks()
.addToSegment()
.addToLayout()
.build());
}

@Test
@SuppressWarnings("unchecked")
public void ensureStripingWorks()
throws Exception {
CorfuRuntime r = getRuntime().connect();

UUID streamA = UUID.nameUUIDFromBytes("stream A".getBytes());
Expand Down Expand Up @@ -81,31 +88,9 @@ public void ensureStripingWorks()
@SuppressWarnings("unchecked")
public void ensureStripingReadAllWorks()
throws Exception {
addServer(SERVERS.PORT_0);
addServer(SERVERS.PORT_1);
addServer(SERVERS.PORT_2);

bootstrapAllServers(new TestLayoutBuilder()
.setEpoch(1L)
.addLayoutServer(SERVERS.PORT_0)
.addSequencer(SERVERS.PORT_0)
.buildSegment()
.buildStripe()
.addLogUnit(SERVERS.PORT_0)
.addToSegment()
.buildStripe()
.addLogUnit(SERVERS.PORT_1)
.addToSegment()
.buildStripe()
.addLogUnit(SERVERS.PORT_2)
.addToSegment()
.addToLayout()
.build());

//configure the layout accordingly
CorfuRuntime r = getRuntime().connect();

UUID streamA = UUID.nameUUIDFromBytes("stream A".getBytes());
byte[] testPayload = "hello world".getBytes();

final long ADDRESS_0 = 0;
Expand Down Expand Up @@ -138,4 +123,33 @@ public void ensureStripingReadAllWorks()
assertThat(m.get(ADDRESS_2).getPayload(getRuntime()))
.isEqualTo("3".getBytes());
}

@Test
@SuppressWarnings("unchecked")
public void readAllWithHoleFill()
throws Exception {
//configure the layout accordingly
CorfuRuntime r = getRuntime().connect();

byte[] testPayload = "hello world".getBytes();

final long ADDRESS_0 = 0;
final long ADDRESS_1 = 1;
final long ADDRESS_2 = 3;
Token token = new Token(ADDRESS_0, r.getLayoutView().getLayout().getEpoch());
r.getAddressSpaceView().write(token, testPayload);

assertThat(r.getAddressSpaceView().read(ADDRESS_0).getPayload(getRuntime()))
.isEqualTo("hello world".getBytes());

Range range = Range.closed(ADDRESS_0, ADDRESS_2);
ContiguousSet<Long> addresses = ContiguousSet.create(range, DiscreteDomain.longs());

Map<Long, ILogData> m = r.getAddressSpaceView().read(addresses);

assertThat(m.get(ADDRESS_0).getPayload(getRuntime()))
.isEqualTo("hello world".getBytes());
assertThat(m.get(ADDRESS_1).isHole());
assertThat(m.get(ADDRESS_2).isHole());
}
}

0 comments on commit 0bc84e9

Please sign in to comment.