Skip to content

Commit

Permalink
Merge pull request #12 from Netflix/shard-large-types
Browse files Browse the repository at this point in the history
Shard large types
  • Loading branch information
dkoszewnik committed Jan 27, 2017
2 parents 8f0215f + 2341c73 commit 14adb52
Show file tree
Hide file tree
Showing 44 changed files with 3,194 additions and 1,571 deletions.
Expand Up @@ -41,7 +41,6 @@
*/
public class HollowBlobHeader {

public static final int HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER = 1029;
public static final int HOLLOW_BLOB_VERSION_HEADER = 1030;

private Map<String, String> headerTags = new HashMap<String, String>();
Expand Down
Expand Up @@ -41,7 +41,7 @@ public HollowBlobHeader readHeader(InputStream is) throws IOException {
DataInputStream dis = new DataInputStream(is);

int headerVersion = dis.readInt();
if(headerVersion != HollowBlobHeader.HOLLOW_BLOB_VERSION_HEADER && headerVersion != HollowBlobHeader.HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER) {
if(headerVersion != HollowBlobHeader.HOLLOW_BLOB_VERSION_HEADER) {
throw new IOException("The HollowBlob you are trying to read is incompatible. "
+ "The expected Hollow blob version was " + HollowBlobHeader.HOLLOW_BLOB_VERSION_HEADER + " but the actual version was " + headerVersion);
}
Expand Down
Expand Up @@ -17,8 +17,6 @@
*/
package com.netflix.hollow.core.read.engine;

import static com.netflix.hollow.core.HollowBlobHeader.HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER;

import com.netflix.hollow.core.memory.encoding.VarInt;

import com.netflix.hollow.core.schema.HollowListSchema;
Expand Down Expand Up @@ -160,34 +158,33 @@ private void notifyEndUpdate() {
private String readTypeStateSnapshot(DataInputStream is, HollowBlobHeader header, HollowFilterConfig filter) throws IOException {
HollowSchema schema = HollowSchema.readFrom(is);

if(header.getBlobFormatVersion() != HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER)
skipForwardsCompatibilityBytes(is);

int numShards = readNumShards(is);

if(schema instanceof HollowObjectSchema) {
if(!filter.doesIncludeType(schema.getName())) {
HollowObjectTypeReadState.discardSnapshot(is, (HollowObjectSchema)schema);
HollowObjectTypeReadState.discardSnapshot(is, (HollowObjectSchema)schema, numShards);
} else {
HollowObjectSchema unfilteredSchema = (HollowObjectSchema)schema;
HollowObjectSchema filteredSchema = unfilteredSchema.filterSchema(filter);
populateTypeStateSnapshot(is, new HollowObjectTypeReadState(stateEngine, filteredSchema, unfilteredSchema));
populateTypeStateSnapshot(is, new HollowObjectTypeReadState(stateEngine, filteredSchema, unfilteredSchema, numShards));
}
} else if (schema instanceof HollowListSchema) {
if(!filter.doesIncludeType(schema.getName())) {
HollowListTypeReadState.discardSnapshot(is);
HollowListTypeReadState.discardSnapshot(is, numShards);
} else {
populateTypeStateSnapshot(is, new HollowListTypeReadState(stateEngine, (HollowListSchema)schema));
populateTypeStateSnapshot(is, new HollowListTypeReadState(stateEngine, (HollowListSchema)schema, numShards));
}
} else if(schema instanceof HollowSetSchema) {
if(!filter.doesIncludeType(schema.getName())) {
HollowSetTypeReadState.discardSnapshot(is);
HollowSetTypeReadState.discardSnapshot(is, numShards);
} else {
populateTypeStateSnapshot(is, new HollowSetTypeReadState(stateEngine, (HollowSetSchema)schema));
populateTypeStateSnapshot(is, new HollowSetTypeReadState(stateEngine, (HollowSetSchema)schema, numShards));
}
} else if(schema instanceof HollowMapSchema) {
if(!filter.doesIncludeType(schema.getName())) {
HollowMapTypeReadState.discardSnapshot(is);
HollowMapTypeReadState.discardSnapshot(is, numShards);
} else {
populateTypeStateSnapshot(is, new HollowMapTypeReadState(stateEngine, (HollowMapSchema)schema));
populateTypeStateSnapshot(is, new HollowMapTypeReadState(stateEngine, (HollowMapSchema)schema, numShards));
}
}

Expand All @@ -202,19 +199,29 @@ private void populateTypeStateSnapshot(DataInputStream is, HollowTypeReadState t
private String readTypeStateDelta(DataInputStream is, HollowBlobHeader header) throws IOException {
HollowSchema schema = HollowSchema.readFrom(is);

if(header.getBlobFormatVersion() != HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER)
skipForwardsCompatibilityBytes(is);
int numShards = readNumShards(is);

HollowTypeReadState typeState = stateEngine.getTypeState(schema.getName());
if(typeState != null) {
typeState.applyDelta(is, schema, stateEngine.getMemoryRecycler());
} else {
discardDelta(is, schema);
discardDelta(is, schema, numShards);
}

return schema.getName();
}

private int readNumShards(DataInputStream is) throws IOException {
int backwardsCompatibilityBytes = VarInt.readVInt(is);

if(backwardsCompatibilityBytes == 0)
return 1; /// produced by a version of hollow prior to 2.1.0, always only 1 shard.

skipForwardsCompatibilityBytes(is);

return VarInt.readVInt(is);
}

private void skipForwardsCompatibilityBytes(DataInputStream is) throws IOException {
int bytesToSkip = VarInt.readVInt(is);
while(bytesToSkip > 0) {
Expand All @@ -226,15 +233,15 @@ private void skipForwardsCompatibilityBytes(DataInputStream is) throws IOExcepti
}


private void discardDelta(DataInputStream dis, HollowSchema schema) throws IOException {
private void discardDelta(DataInputStream dis, HollowSchema schema, int numShards) throws IOException {
if(schema instanceof HollowObjectSchema)
HollowObjectTypeReadState.discardDelta(dis, (HollowObjectSchema)schema);
HollowObjectTypeReadState.discardDelta(dis, (HollowObjectSchema)schema, numShards);
else if(schema instanceof HollowListSchema)
HollowListTypeReadState.discardDelta(dis);
HollowListTypeReadState.discardDelta(dis, numShards);
else if(schema instanceof HollowSetSchema)
HollowSetTypeReadState.discardDelta(dis);
HollowSetTypeReadState.discardDelta(dis, numShards);
else if(schema instanceof HollowMapSchema)
HollowMapTypeReadState.discardDelta(dis);
HollowMapTypeReadState.discardDelta(dis, numShards);
}

}
Expand Up @@ -147,20 +147,20 @@ public HollowReadStateEngine getStateEngine() {
return stateEngine;
}

protected void notifyListenerAboutDeltaChanges(GapEncodedVariableLengthIntegerReader removals, GapEncodedVariableLengthIntegerReader additions) {
protected void notifyListenerAboutDeltaChanges(GapEncodedVariableLengthIntegerReader removals, GapEncodedVariableLengthIntegerReader additions, int shardNumber, int numShards) {
for(HollowTypeStateListener stateListener : stateListeners) {
removals.reset();
int removedOrdinal = removals.nextElement();
while(removedOrdinal < Integer.MAX_VALUE) {
stateListener.removedOrdinal(removedOrdinal);
stateListener.removedOrdinal((removedOrdinal * numShards) + shardNumber);
removals.advance();
removedOrdinal = removals.nextElement();
}

additions.reset();
int addedOrdinal = additions.nextElement();
while(addedOrdinal < Integer.MAX_VALUE) {
stateListener.addedOrdinal(addedOrdinal);
stateListener.addedOrdinal((addedOrdinal * numShards) + shardNumber);
additions.advance();
addedOrdinal = additions.nextElement();
}
Expand Down Expand Up @@ -193,5 +193,10 @@ public HollowTypeReadState getTypeState() {
* @return an approximate accounting of the current cost of the "ordinal holes" in this type state.
*/
public abstract long getApproximateHoleCostInBytes();

/**
* @return The number of shards into which this type is split. Sharding is transparent, so this has no effect on normal usage.
*/
public abstract int numShards();

}
Expand Up @@ -18,7 +18,6 @@
package com.netflix.hollow.core.read.engine.list;

import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray;

import com.netflix.hollow.core.util.RemovedOrdinalIterator;
import com.netflix.hollow.core.util.IntMap;
import com.netflix.hollow.core.memory.pool.WastefulRecycler;
Expand All @@ -33,10 +32,13 @@
public class HollowListDeltaHistoricalStateCreator {

private final HollowListTypeReadState typeState;
private final HollowListTypeDataElements stateEngineDataElements;
private final HollowListTypeDataElements stateEngineDataElements[];
private final HollowListTypeDataElements historicalDataElements;
private final RemovedOrdinalIterator iter;

private final int shardNumberMask;
private final int shardOrdinalShift;

private IntMap ordinalMapping;
private int nextOrdinal = 0;
private long nextStartElement = 0;
Expand All @@ -46,6 +48,8 @@ public HollowListDeltaHistoricalStateCreator(HollowListTypeReadState typeState)
this.stateEngineDataElements = typeState.currentDataElements();
this.historicalDataElements = new HollowListTypeDataElements(WastefulRecycler.DEFAULT_INSTANCE);
this.iter = new RemovedOrdinalIterator(typeState.getListener(PopulatedOrdinalListener.class));
this.shardNumberMask = stateEngineDataElements.length - 1;
this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(stateEngineDataElements.length);
}

public void populateHistory() {
Expand All @@ -70,7 +74,7 @@ public IntMap getOrdinalMapping() {
}

public HollowListTypeReadState createHistoricalTypeReadState() {
HollowListTypeReadState historicalTypeState = new HollowListTypeReadState(null, typeState.getSchema());
HollowListTypeReadState historicalTypeState = new HollowListTypeReadState(null, typeState.getSchema(), 1);
historicalTypeState.setCurrentData(historicalDataElements);
return historicalTypeState;
}
Expand All @@ -89,18 +93,21 @@ private void populateStats() {
historicalDataElements.maxOrdinal = removedEntryCount - 1;
historicalDataElements.totalNumberOfElements = totalElementCount;
historicalDataElements.bitsPerListPointer = totalElementCount == 0 ? 1 : 64 - Long.numberOfLeadingZeros(totalElementCount);
historicalDataElements.bitsPerElement = stateEngineDataElements.bitsPerElement;
historicalDataElements.bitsPerElement = stateEngineDataElements[0].bitsPerElement;

ordinalMapping = new IntMap(removedEntryCount);
}

private void copyRecord(int ordinal) {
long bitsPerElement = stateEngineDataElements.bitsPerElement;
long fromStartElement = ordinal == 0 ? 0 : stateEngineDataElements.listPointerArray.getElementValue((long)(ordinal - 1) * stateEngineDataElements.bitsPerListPointer, stateEngineDataElements.bitsPerListPointer);
long fromEndElement = stateEngineDataElements.listPointerArray.getElementValue((long)ordinal * stateEngineDataElements.bitsPerListPointer, stateEngineDataElements.bitsPerListPointer);
int shard = ordinal & shardNumberMask;
int shardOrdinal = ordinal >> shardOrdinalShift;

long bitsPerElement = stateEngineDataElements[shard].bitsPerElement;
long fromStartElement = shardOrdinal == 0 ? 0 : stateEngineDataElements[shard].listPointerArray.getElementValue((long)(shardOrdinal - 1) * stateEngineDataElements[shard].bitsPerListPointer, stateEngineDataElements[shard].bitsPerListPointer);
long fromEndElement = stateEngineDataElements[shard].listPointerArray.getElementValue((long)shardOrdinal * stateEngineDataElements[shard].bitsPerListPointer, stateEngineDataElements[shard].bitsPerListPointer);
long size = fromEndElement - fromStartElement;

historicalDataElements.elementArray.copyBits(stateEngineDataElements.elementArray, bitsPerElement * fromStartElement, bitsPerElement * nextStartElement, size * bitsPerElement);
historicalDataElements.elementArray.copyBits(stateEngineDataElements[shard].elementArray, bitsPerElement * fromStartElement, bitsPerElement * nextStartElement, size * bitsPerElement);
historicalDataElements.listPointerArray.setElementValue(historicalDataElements.bitsPerListPointer * nextOrdinal, historicalDataElements.bitsPerListPointer, nextStartElement + size);

ordinalMapping.put(ordinal, nextOrdinal);
Expand Down
Expand Up @@ -75,23 +75,28 @@ private void readFromStream(DataInputStream dis, boolean isDelta) throws IOExcep
elementArray = FixedLengthElementArray.deserializeFrom(dis, memoryRecycler);
}

static void discardFromStream(DataInputStream dis, boolean isDelta) throws IOException {
VarInt.readVInt(dis); /// max ordinal

if(isDelta) {
/// addition/removal ordinals
GapEncodedVariableLengthIntegerReader.discardEncodedDeltaOrdinals(dis);
GapEncodedVariableLengthIntegerReader.discardEncodedDeltaOrdinals(dis);
static void discardFromStream(DataInputStream dis, int numShards, boolean isDelta) throws IOException {
if(numShards > 1)
VarInt.readVInt(dis); /// max ordinal

for(int i=0;i<numShards;i++) {
VarInt.readVInt(dis); /// max ordinal

if(isDelta) {
/// addition/removal ordinals
GapEncodedVariableLengthIntegerReader.discardEncodedDeltaOrdinals(dis);
GapEncodedVariableLengthIntegerReader.discardEncodedDeltaOrdinals(dis);
}

/// statistics
VarInt.readVInt(dis);
VarInt.readVInt(dis);
VarInt.readVLong(dis);

/// fixed-length data
FixedLengthElementArray.discardFrom(dis);
FixedLengthElementArray.discardFrom(dis);
}

/// statistics
VarInt.readVInt(dis);
VarInt.readVInt(dis);
VarInt.readVLong(dis);

/// fixed-length data
FixedLengthElementArray.discardFrom(dis);
FixedLengthElementArray.discardFrom(dis);
}

public void applyDelta(HollowListTypeDataElements fromData, HollowListTypeDataElements deltaData) {
Expand Down

0 comments on commit 14adb52

Please sign in to comment.