Skip to content

Commit

Permalink
Optimize Checkpoint Batching (#721)
Browse files Browse the repository at this point in the history
The CP implementation used to create an SMREntry for each key,
this patch changes that behavior. Now multiple keys are grouped
into a single SMREntry as a putAll.
  • Loading branch information
Maithem authored and dahliamalkhi committed Jun 19, 2017
1 parent 94915fe commit 9bc848d
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 32 deletions.
59 changes: 32 additions & 27 deletions runtime/src/main/java/org/corfudb/runtime/CheckpointWriter.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.corfudb.runtime;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Iterables;
import lombok.Getter;
import lombok.Setter;
import org.corfudb.protocols.logprotocol.CheckpointEntry;
Expand All @@ -23,6 +23,8 @@
import java.util.function.Consumer;
import java.util.function.Function;

import static com.google.common.collect.Iterables.partition;

/** Checkpoint writer for SMRMaps: take a snapshot of the
* object via TXBegin(), then dump the frozen object's
* state into CheckpointEntry records into the object's
Expand All @@ -43,12 +45,11 @@ public class CheckpointWriter {

final UUID checkpointStreamID;


Map<CheckpointEntry.CheckpointDictKey, String> mdKV = new HashMap<>();

/** Mutator lambda to change map key. Typically used for
* testing but could also be used for type conversion, etc.
*/
* testing but could also be used for type conversion, etc.
*/
@Getter
@Setter
Function<Object,Object> keyMutator = (x) -> x;
Expand Down Expand Up @@ -191,29 +192,33 @@ public List<Long> appendObjectState() {
ImmutableMap<CheckpointEntry.CheckpointDictKey,String> mdKV = ImmutableMap.copyOf(this.mdKV);
List<Long> continuationAddresses = new ArrayList<>();

Iterators.partition(map.keySet().stream()
.map(k -> {
return new SMREntry("put",
new Object[]{keyMutator.apply(k), valueMutator.apply(map.get(k))},
serializer);
}).iterator(), batchSize)
.forEachRemaining(entries -> {
MultiSMREntry smrEntries = new MultiSMREntry();
for (int i = 0; i < ((List) entries).size(); i++) {
smrEntries.addTo((SMREntry) ((List) entries).get(i));
}
CheckpointEntry cp = new CheckpointEntry(CheckpointEntry.CheckpointEntryType.CONTINUATION,
author, checkpointID, mdKV, smrEntries);
long pos = sv.append(Collections.singleton(checkpointStreamID), cp, null);

postAppendFunc.accept(cp, pos);
continuationAddresses.add(pos);

numEntries++;
// CheckpointEntry::serialize() has a side-effect we use
// for an accurate count of serialized bytes of SRMEntries.
numBytes += cp.getSmrEntriesBytes();
});

Iterable<List<Object>> partitions = Iterables.partition(map.keySet(), batchSize);

for (List<Object> partition : partitions) {
Map tmp = new HashMap();
for (Object k : partition) {
tmp.put(keyMutator.apply(k), valueMutator.apply(map.get(k)));
}

SMREntry smrEntry = new SMREntry("putAll", new Object[]{tmp}, serializer);
MultiSMREntry smrEntries = new MultiSMREntry();
smrEntries.addTo(smrEntry);

CheckpointEntry cp = new CheckpointEntry(CheckpointEntry.CheckpointEntryType.CONTINUATION,
author, checkpointID, mdKV, smrEntries);

long pos = sv.append(Collections.singleton(checkpointStreamID), cp, null);

postAppendFunc.accept(cp, pos);
continuationAddresses.add(pos);

numEntries++;
// CheckpointEntry::serialize() has a side-effect we use
// for an accurate count of serialized bytes of SRMEntries.
numBytes += cp.getSmrEntriesBytes();
}

return continuationAddresses;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.corfudb.runtime.checkpoint;

import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.util.serializer.ISerializer;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.lang.reflect.Type;
import java.util.Map;

/**
* A special serializer for checkpoint testing
*/
public class CPSerializer implements ISerializer {
GsonBuilder gsonBuilder = new GsonBuilder();
Gson gson = gsonBuilder.create();

private final byte type;

public CPSerializer(byte type) {
this.type = type;
}

public byte getType() {
return type;
}

public Object deserialize(ByteBuf b, CorfuRuntime rt) {
Type mapType = new TypeToken<Map<String, Long>>(){}.getType();

int classNameLength = b.readShort();
byte[] classNameBytes = new byte[classNameLength];
b.readBytes(classNameBytes, 0, classNameLength);
String className = new String(classNameBytes);
boolean isMap = false;

if (className.equals("null")) {
return null;
} else if (className.endsWith("HashMap")) {
isMap = true;
}

try (ByteBufInputStream bbis = new ByteBufInputStream(b)) {
try (InputStreamReader r = new InputStreamReader(bbis)) {
if (isMap) {
return gson.fromJson(r, mapType);
} else {
return gson.fromJson(r, Class.forName(className));
}
}
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
}

public void serialize(Object o, ByteBuf b) {
String className = o == null ? "null" : o.getClass().getName();
byte[] classNameBytes = className.getBytes();
b.writeShort(classNameBytes.length);
b.writeBytes(classNameBytes);
if (o == null) {
return;
}
try (ByteBufOutputStream bbos = new ByteBufOutputStream(b)) {
try (OutputStreamWriter osw = new OutputStreamWriter(bbos)) {
gson.toJson(o, osw);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.corfudb.runtime.object.transactions.TransactionType;
import org.corfudb.runtime.view.AbstractViewTest;
import org.corfudb.runtime.view.stream.BackpointerStreamView;
import org.corfudb.util.serializer.ISerializer;
import org.corfudb.util.serializer.Serializers;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -30,7 +31,8 @@
*/

public class CheckpointSmokeTest extends AbstractViewTest {

final byte serilizerByte = (byte) 20;
ISerializer serializer = new CPSerializer(serilizerByte);
public CorfuRuntime r;

@Before
Expand Down Expand Up @@ -220,6 +222,7 @@ public void checkpointWriterTest() throws Exception {
// Set up CP writer. Add fudgeFactor to all CP data,
// also used for assertion checks later.
CheckpointWriter cpw = new CheckpointWriter(getRuntime(), streamId, author, (SMRMap) m);
cpw.setSerializer(serializer);
cpw.setValueMutator((l) -> (Long) l + fudgeFactor);
cpw.setBatchSize(smallBatchSize);

Expand Down Expand Up @@ -290,6 +293,7 @@ public void checkpointWriterInterleavedTest() throws Exception {
// Set up CP writer, with interleaved writes for middle keys
middleTracker = -1;
CheckpointWriter cpw = new CheckpointWriter(getRuntime(), streamId, author, (SMRMap) m);
cpw.setSerializer(serializer);
cpw.setBatchSize(1);
cpw.setPostAppendFunc((cp, pos) -> {
// No mutation, be we need to add a history snapshot at this START/END location.
Expand Down Expand Up @@ -372,10 +376,12 @@ public void checkpointWriterInterleavedTest() throws Exception {
}

private Map<String, Long> instantiateMap(String streamName) {
Serializers.registerSerializer(serializer);
return r.getObjectsView()
.build()
.setStreamName(streamName)
.setTypeToken(new TypeToken<SMRMap<String, Long>>() {})
.setSerializer(serializer)
.open();
}

Expand Down Expand Up @@ -415,7 +421,7 @@ private void writeCheckpointRecords(UUID streamId, String checkpointAuthor, UUID
MultiSMREntry smrEntries = new MultiSMREntry();
if (objects != null) {
for (int i = 0; i < objects.length; i++) {
smrEntries.addTo(new SMREntry("put", (Object[]) objects[i], Serializers.JSON));
smrEntries.addTo(new SMREntry("put", (Object[]) objects[i], serializer));
}
}
CheckpointEntry cp2 = new CheckpointEntry(CheckpointEntry.CheckpointEntryType.CONTINUATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.corfudb.runtime.exceptions.TrimmedException;
import org.corfudb.runtime.object.AbstractObjectTest;
import org.corfudb.runtime.object.transactions.TransactionType;
import org.corfudb.util.serializer.ISerializer;
import org.corfudb.util.serializer.Serializers;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -32,12 +34,15 @@ void setRuntime() {
}

Map<String, Long> instantiateMap(String mapName) {
final byte serializerByte = (byte) 20;
ISerializer serializer = new CPSerializer(serializerByte);
Serializers.registerSerializer(serializer);
return (SMRMap<String, Long>)
instantiateCorfuObject(
getMyRuntime(),
new TypeToken<SMRMap<String, Long>>() {
},
mapName);
new TypeToken<SMRMap<String, Long>>() {},
mapName,
serializer);
}

final String streamNameA = "mystreamA";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.reflect.TypeToken;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.view.AbstractViewTest;
import org.corfudb.util.serializer.ISerializer;

/**
* Created by dalia on 4/11/17.
Expand Down Expand Up @@ -60,4 +61,14 @@ protected <T> Object instantiateCorfuObject(TypeToken<T> tType, String name) {
return instantiateCorfuObject(getRuntime(), tType, name);
}

protected <T> Object instantiateCorfuObject(CorfuRuntime r, TypeToken<T> tType, String name, ISerializer serializer) {
return (T)
r.getObjectsView()
.build()
.setStreamName(name) // stream name
.setTypeToken(tType) // a TypeToken of the specified class
.setSerializer(serializer)
.open(); // instantiate the object!
}

}

0 comments on commit 9bc848d

Please sign in to comment.