Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shard large types #12

Merged
merged 22 commits into from Jan 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5530753
Type sharding: initial commit sharding OBJECT types
dkoszewnik Jan 10, 2017
c2e0d4c
Merge branch 'master' into shard-large-types
dkoszewnik Jan 10, 2017
bf2c8dc
Type sharding: minor improvements and fixes related to sharding OBJEC…
dkoszewnik Jan 10, 2017
bc7558c
Type sharding: sharding LIST types
dkoszewnik Jan 11, 2017
c6c9e9d
Type sharding: minor modifications
dkoszewnik Jan 11, 2017
4e3a89e
Type sharding: List pointers may require fewer bits if sharding resul…
dkoszewnik Jan 11, 2017
bba6a3b
Type sharding: sharding SET types
dkoszewnik Jan 11, 2017
82ca191
Type sharding: minor modifications
dkoszewnik Jan 11, 2017
12f0cda
Type sharding: sharding MAP types
dkoszewnik Jan 12, 2017
66a36e2
Type sharding: minor modifications
dkoszewnik Jan 12, 2017
890c568
Type sharding: added annotation @HollowShardLargeType for HollowObjec…
dkoszewnik Jan 12, 2017
3ba62b5
Type sharding: need to swap() the ArraySegmentRecycler between applyi…
dkoszewnik Jan 13, 2017
95cd0f5
Type sharding: HollowWriteStateEngine will refuse to restore a state …
dkoszewnik Jan 14, 2017
afe4554
Type sharding: unless otherwise specified, the number of shards is au…
dkoszewnik Jan 18, 2017
788352c
Type sharding: HollowObjectMapper must default number of shards to -1…
dkoszewnik Jan 18, 2017
dd23044
Type sharding: unit tests
dkoszewnik Jan 20, 2017
ef1900d
Type sharding: default target max type shard size is set to Long.MAX_…
dkoszewnik Jan 20, 2017
06b3db9
Type sharding: Added missing license headers
dkoszewnik Jan 20, 2017
e0bdb21
Type sharding: Added comments based on Tim's review
dkoszewnik Jan 25, 2017
c27eeb2
Type sharding: Correct min/max shard ordinal calculation
dkoszewnik Jan 25, 2017
3ed20ff
Type sharding: Revert min/max shard ordinal calculation, rename varia…
dkoszewnik Jan 26, 2017
2341c73
Type sharding: minor variable rename
dkoszewnik Jan 26, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -41,7 +41,6 @@
*/
public class HollowBlobHeader {

public static final int HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER = 1029;
public static final int HOLLOW_BLOB_VERSION_HEADER = 1030;

private Map<String, String> headerTags = new HashMap<String, String>();
Expand Down
Expand Up @@ -41,7 +41,7 @@ public HollowBlobHeader readHeader(InputStream is) throws IOException {
DataInputStream dis = new DataInputStream(is);

int headerVersion = dis.readInt();
if(headerVersion != HollowBlobHeader.HOLLOW_BLOB_VERSION_HEADER && headerVersion != HollowBlobHeader.HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER) {
if(headerVersion != HollowBlobHeader.HOLLOW_BLOB_VERSION_HEADER) {
throw new IOException("The HollowBlob you are trying to read is incompatible. "
+ "The expected Hollow blob version was " + HollowBlobHeader.HOLLOW_BLOB_VERSION_HEADER + " but the actual version was " + headerVersion);
}
Expand Down
Expand Up @@ -17,8 +17,6 @@
*/
package com.netflix.hollow.core.read.engine;

import static com.netflix.hollow.core.HollowBlobHeader.HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER;

import com.netflix.hollow.core.memory.encoding.VarInt;

import com.netflix.hollow.core.schema.HollowListSchema;
Expand Down Expand Up @@ -160,34 +158,33 @@ private void notifyEndUpdate() {
private String readTypeStateSnapshot(DataInputStream is, HollowBlobHeader header, HollowFilterConfig filter) throws IOException {
HollowSchema schema = HollowSchema.readFrom(is);

if(header.getBlobFormatVersion() != HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER)
skipForwardsCompatibilityBytes(is);

int numShards = readNumShards(is);

if(schema instanceof HollowObjectSchema) {
if(!filter.doesIncludeType(schema.getName())) {
HollowObjectTypeReadState.discardSnapshot(is, (HollowObjectSchema)schema);
HollowObjectTypeReadState.discardSnapshot(is, (HollowObjectSchema)schema, numShards);
} else {
HollowObjectSchema unfilteredSchema = (HollowObjectSchema)schema;
HollowObjectSchema filteredSchema = unfilteredSchema.filterSchema(filter);
populateTypeStateSnapshot(is, new HollowObjectTypeReadState(stateEngine, filteredSchema, unfilteredSchema));
populateTypeStateSnapshot(is, new HollowObjectTypeReadState(stateEngine, filteredSchema, unfilteredSchema, numShards));
}
} else if (schema instanceof HollowListSchema) {
if(!filter.doesIncludeType(schema.getName())) {
HollowListTypeReadState.discardSnapshot(is);
HollowListTypeReadState.discardSnapshot(is, numShards);
} else {
populateTypeStateSnapshot(is, new HollowListTypeReadState(stateEngine, (HollowListSchema)schema));
populateTypeStateSnapshot(is, new HollowListTypeReadState(stateEngine, (HollowListSchema)schema, numShards));
}
} else if(schema instanceof HollowSetSchema) {
if(!filter.doesIncludeType(schema.getName())) {
HollowSetTypeReadState.discardSnapshot(is);
HollowSetTypeReadState.discardSnapshot(is, numShards);
} else {
populateTypeStateSnapshot(is, new HollowSetTypeReadState(stateEngine, (HollowSetSchema)schema));
populateTypeStateSnapshot(is, new HollowSetTypeReadState(stateEngine, (HollowSetSchema)schema, numShards));
}
} else if(schema instanceof HollowMapSchema) {
if(!filter.doesIncludeType(schema.getName())) {
HollowMapTypeReadState.discardSnapshot(is);
HollowMapTypeReadState.discardSnapshot(is, numShards);
} else {
populateTypeStateSnapshot(is, new HollowMapTypeReadState(stateEngine, (HollowMapSchema)schema));
populateTypeStateSnapshot(is, new HollowMapTypeReadState(stateEngine, (HollowMapSchema)schema, numShards));
}
}

Expand All @@ -202,19 +199,29 @@ private void populateTypeStateSnapshot(DataInputStream is, HollowTypeReadState t
private String readTypeStateDelta(DataInputStream is, HollowBlobHeader header) throws IOException {
HollowSchema schema = HollowSchema.readFrom(is);

if(header.getBlobFormatVersion() != HOLLOW_BLOB_OLD_FORMAT_VERSION_HEADER)
skipForwardsCompatibilityBytes(is);
int numShards = readNumShards(is);

HollowTypeReadState typeState = stateEngine.getTypeState(schema.getName());
if(typeState != null) {
typeState.applyDelta(is, schema, stateEngine.getMemoryRecycler());
} else {
discardDelta(is, schema);
discardDelta(is, schema, numShards);
}

return schema.getName();
}

private int readNumShards(DataInputStream is) throws IOException {
int backwardsCompatibilityBytes = VarInt.readVInt(is);

if(backwardsCompatibilityBytes == 0)
return 1; /// produced by a version of hollow prior to 2.1.0, always only 1 shard.

skipForwardsCompatibilityBytes(is);

return VarInt.readVInt(is);
}

private void skipForwardsCompatibilityBytes(DataInputStream is) throws IOException {
int bytesToSkip = VarInt.readVInt(is);
while(bytesToSkip > 0) {
Expand All @@ -226,15 +233,15 @@ private void skipForwardsCompatibilityBytes(DataInputStream is) throws IOExcepti
}


private void discardDelta(DataInputStream dis, HollowSchema schema) throws IOException {
private void discardDelta(DataInputStream dis, HollowSchema schema, int numShards) throws IOException {
if(schema instanceof HollowObjectSchema)
HollowObjectTypeReadState.discardDelta(dis, (HollowObjectSchema)schema);
HollowObjectTypeReadState.discardDelta(dis, (HollowObjectSchema)schema, numShards);
else if(schema instanceof HollowListSchema)
HollowListTypeReadState.discardDelta(dis);
HollowListTypeReadState.discardDelta(dis, numShards);
else if(schema instanceof HollowSetSchema)
HollowSetTypeReadState.discardDelta(dis);
HollowSetTypeReadState.discardDelta(dis, numShards);
else if(schema instanceof HollowMapSchema)
HollowMapTypeReadState.discardDelta(dis);
HollowMapTypeReadState.discardDelta(dis, numShards);
}

}
Expand Up @@ -147,20 +147,20 @@ public HollowReadStateEngine getStateEngine() {
return stateEngine;
}

protected void notifyListenerAboutDeltaChanges(GapEncodedVariableLengthIntegerReader removals, GapEncodedVariableLengthIntegerReader additions) {
protected void notifyListenerAboutDeltaChanges(GapEncodedVariableLengthIntegerReader removals, GapEncodedVariableLengthIntegerReader additions, int shardNumber, int numShards) {
for(HollowTypeStateListener stateListener : stateListeners) {
removals.reset();
int removedOrdinal = removals.nextElement();
while(removedOrdinal < Integer.MAX_VALUE) {
stateListener.removedOrdinal(removedOrdinal);
stateListener.removedOrdinal((removedOrdinal * numShards) + shardNumber);
removals.advance();
removedOrdinal = removals.nextElement();
}

additions.reset();
int addedOrdinal = additions.nextElement();
while(addedOrdinal < Integer.MAX_VALUE) {
stateListener.addedOrdinal(addedOrdinal);
stateListener.addedOrdinal((addedOrdinal * numShards) + shardNumber);
additions.advance();
addedOrdinal = additions.nextElement();
}
Expand Down Expand Up @@ -193,5 +193,10 @@ public HollowTypeReadState getTypeState() {
* @return an approximate accounting of the current cost of the "ordinal holes" in this type state.
*/
public abstract long getApproximateHoleCostInBytes();

/**
* @return The number of shards into which this type is split. Sharding is transparent, so this has no effect on normal usage.
*/
public abstract int numShards();

}
Expand Up @@ -18,7 +18,6 @@
package com.netflix.hollow.core.read.engine.list;

import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray;

import com.netflix.hollow.core.util.RemovedOrdinalIterator;
import com.netflix.hollow.core.util.IntMap;
import com.netflix.hollow.core.memory.pool.WastefulRecycler;
Expand All @@ -33,10 +32,13 @@
public class HollowListDeltaHistoricalStateCreator {

private final HollowListTypeReadState typeState;
private final HollowListTypeDataElements stateEngineDataElements;
private final HollowListTypeDataElements stateEngineDataElements[];
private final HollowListTypeDataElements historicalDataElements;
private final RemovedOrdinalIterator iter;

private final int shardNumberMask;
private final int shardOrdinalShift;

private IntMap ordinalMapping;
private int nextOrdinal = 0;
private long nextStartElement = 0;
Expand All @@ -46,6 +48,8 @@ public HollowListDeltaHistoricalStateCreator(HollowListTypeReadState typeState)
this.stateEngineDataElements = typeState.currentDataElements();
this.historicalDataElements = new HollowListTypeDataElements(WastefulRecycler.DEFAULT_INSTANCE);
this.iter = new RemovedOrdinalIterator(typeState.getListener(PopulatedOrdinalListener.class));
this.shardNumberMask = stateEngineDataElements.length - 1;
this.shardOrdinalShift = 31 - Integer.numberOfLeadingZeros(stateEngineDataElements.length);
}

public void populateHistory() {
Expand All @@ -70,7 +74,7 @@ public IntMap getOrdinalMapping() {
}

public HollowListTypeReadState createHistoricalTypeReadState() {
HollowListTypeReadState historicalTypeState = new HollowListTypeReadState(null, typeState.getSchema());
HollowListTypeReadState historicalTypeState = new HollowListTypeReadState(null, typeState.getSchema(), 1);
historicalTypeState.setCurrentData(historicalDataElements);
return historicalTypeState;
}
Expand All @@ -89,18 +93,21 @@ private void populateStats() {
historicalDataElements.maxOrdinal = removedEntryCount - 1;
historicalDataElements.totalNumberOfElements = totalElementCount;
historicalDataElements.bitsPerListPointer = totalElementCount == 0 ? 1 : 64 - Long.numberOfLeadingZeros(totalElementCount);
historicalDataElements.bitsPerElement = stateEngineDataElements.bitsPerElement;
historicalDataElements.bitsPerElement = stateEngineDataElements[0].bitsPerElement;

ordinalMapping = new IntMap(removedEntryCount);
}

private void copyRecord(int ordinal) {
long bitsPerElement = stateEngineDataElements.bitsPerElement;
long fromStartElement = ordinal == 0 ? 0 : stateEngineDataElements.listPointerArray.getElementValue((long)(ordinal - 1) * stateEngineDataElements.bitsPerListPointer, stateEngineDataElements.bitsPerListPointer);
long fromEndElement = stateEngineDataElements.listPointerArray.getElementValue((long)ordinal * stateEngineDataElements.bitsPerListPointer, stateEngineDataElements.bitsPerListPointer);
int shard = ordinal & shardNumberMask;
int shardOrdinal = ordinal >> shardOrdinalShift;

long bitsPerElement = stateEngineDataElements[shard].bitsPerElement;
long fromStartElement = shardOrdinal == 0 ? 0 : stateEngineDataElements[shard].listPointerArray.getElementValue((long)(shardOrdinal - 1) * stateEngineDataElements[shard].bitsPerListPointer, stateEngineDataElements[shard].bitsPerListPointer);
long fromEndElement = stateEngineDataElements[shard].listPointerArray.getElementValue((long)shardOrdinal * stateEngineDataElements[shard].bitsPerListPointer, stateEngineDataElements[shard].bitsPerListPointer);
long size = fromEndElement - fromStartElement;

historicalDataElements.elementArray.copyBits(stateEngineDataElements.elementArray, bitsPerElement * fromStartElement, bitsPerElement * nextStartElement, size * bitsPerElement);
historicalDataElements.elementArray.copyBits(stateEngineDataElements[shard].elementArray, bitsPerElement * fromStartElement, bitsPerElement * nextStartElement, size * bitsPerElement);
historicalDataElements.listPointerArray.setElementValue(historicalDataElements.bitsPerListPointer * nextOrdinal, historicalDataElements.bitsPerListPointer, nextStartElement + size);

ordinalMapping.put(ordinal, nextOrdinal);
Expand Down
Expand Up @@ -75,23 +75,28 @@ private void readFromStream(DataInputStream dis, boolean isDelta) throws IOExcep
elementArray = FixedLengthElementArray.deserializeFrom(dis, memoryRecycler);
}

static void discardFromStream(DataInputStream dis, boolean isDelta) throws IOException {
VarInt.readVInt(dis); /// max ordinal

if(isDelta) {
/// addition/removal ordinals
GapEncodedVariableLengthIntegerReader.discardEncodedDeltaOrdinals(dis);
GapEncodedVariableLengthIntegerReader.discardEncodedDeltaOrdinals(dis);
static void discardFromStream(DataInputStream dis, int numShards, boolean isDelta) throws IOException {
if(numShards > 1)
VarInt.readVInt(dis); /// max ordinal

for(int i=0;i<numShards;i++) {
VarInt.readVInt(dis); /// max ordinal

if(isDelta) {
/// addition/removal ordinals
GapEncodedVariableLengthIntegerReader.discardEncodedDeltaOrdinals(dis);
GapEncodedVariableLengthIntegerReader.discardEncodedDeltaOrdinals(dis);
}

/// statistics
VarInt.readVInt(dis);
VarInt.readVInt(dis);
VarInt.readVLong(dis);

/// fixed-length data
FixedLengthElementArray.discardFrom(dis);
FixedLengthElementArray.discardFrom(dis);
}

/// statistics
VarInt.readVInt(dis);
VarInt.readVInt(dis);
VarInt.readVLong(dis);

/// fixed-length data
FixedLengthElementArray.discardFrom(dis);
FixedLengthElementArray.discardFrom(dis);
}

public void applyDelta(HollowListTypeDataElements fromData, HollowListTypeDataElements deltaData) {
Expand Down