Skip to content

Commit

Permalink
Merge pull request #375 from Netflix/flat-records
Browse files Browse the repository at this point in the history
Flat Records
  • Loading branch information
dkoszewnik committed Jan 14, 2019
2 parents 1f402fa + c7870e2 commit 615201e
Show file tree
Hide file tree
Showing 18 changed files with 1,105 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@
import com.netflix.hollow.core.util.SimultaneousExecutor;
import com.netflix.hollow.core.write.HollowTypeWriteState;
import com.netflix.hollow.core.write.objectmapper.RecordPrimaryKey;
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecord;
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecordDumper;
import com.netflix.hollow.tools.traverse.TransitiveSetTraverser;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Used by HollowIncrementalProducer for Delta-Based Producer Input
Expand All @@ -44,6 +49,7 @@ public class HollowIncrementalCyclePopulator implements HollowProducer.Populator

private final double threadsPerCpu;
private final Map<RecordPrimaryKey, Object> mutations;


HollowIncrementalCyclePopulator(Map<RecordPrimaryKey, Object> mutations, double threadsPerCpu) {
this.mutations = mutations;
Expand Down Expand Up @@ -138,13 +144,32 @@ private void removeRecordsFromNewState(HollowProducer.WriteState newState, Map<S
}

private void addRecords(final HollowProducer.WriteState newState) {
List<Map.Entry<RecordPrimaryKey, Object>> entryList = new ArrayList<>(mutations.entrySet());

AtomicInteger nextMutation = new AtomicInteger(0);

SimultaneousExecutor executor = new SimultaneousExecutor(threadsPerCpu);
for(final Map.Entry<RecordPrimaryKey, Object> entry : mutations.entrySet()) {
executor.execute(new Runnable() {
public void run() {
if(entry.getValue() != DELETE_RECORD)
newState.add(entry.getValue());
for(int i=0;i<executor.getCorePoolSize();i++) {
executor.execute(() -> {
FlatRecordDumper flatRecordDumper = null;
int currentMutationIdx = nextMutation.getAndIncrement();

while(currentMutationIdx < entryList.size()) {
Object currentMutation = entryList.get(currentMutationIdx).getValue();

if(currentMutation != DELETE_RECORD) {
if(currentMutation instanceof FlatRecord) {
if(flatRecordDumper == null)
flatRecordDumper = new FlatRecordDumper(newState.getStateEngine());
flatRecordDumper.dump((FlatRecord)currentMutation);
} else {
newState.add(currentMutation);
}
}

currentMutationIdx = nextMutation.getAndIncrement();
}

});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netflix.hollow.api.consumer.fs.HollowFilesystemAnnouncementWatcher;
import com.netflix.hollow.core.util.SimultaneousExecutor;
import com.netflix.hollow.core.write.objectmapper.RecordPrimaryKey;
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecord;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -99,6 +100,11 @@ public void addOrModify(Collection<Object> objList) {
addOrModify(obj);
}
}

public void addOrModify(FlatRecord flatRecord) {
RecordPrimaryKey pk = flatRecord.getRecordPrimaryKey();
mutations.put(pk, flatRecord);
}

public void addOrModifyInParallel(Collection<Object> objList) {
executeInParallel(objList, new Callback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,22 @@ public int copy(long srcPos, byte[] data, int destPos, int length) {

return dataPosition - destPos;
}

/**
* checks equality for a specified range of bytes in two arrays
*
* @param rangeStart the start position of the comparison range in this array
* @param compareTo the other array to compare
* @param cmpStart the start position of the comparison range in the other array
* @param length the length of the comparison range
* @return
*/
public boolean rangeEquals(long rangeStart, SegmentedByteArray compareTo, long cmpStart, int length) {
for(int i=0;i<length;i++)
if(get(rangeStart + i) != compareTo.get(cmpStart + i))
return false;
return true;
}

/**
* Copies the data from the provided source array into this array, guaranteeing that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package com.netflix.hollow.core.write;

import static com.netflix.hollow.core.write.HollowWriteFieldUtils.readIntBits;
import static com.netflix.hollow.core.write.HollowWriteFieldUtils.readLongBits;

import com.netflix.hollow.core.memory.ByteData;
import com.netflix.hollow.core.memory.ByteDataBuffer;
import com.netflix.hollow.core.memory.ThreadSafeBitSet;
Expand Down Expand Up @@ -443,24 +446,4 @@ private ByteDataBuffer getByteArray(ByteDataBuffer buffers[], int index) {
return buffers[index];
}

static int readIntBits(ByteData data, long fieldPosition) {
int intBits = (data.get(fieldPosition++) & 0xFF) << 24;
intBits |= (data.get(fieldPosition++) & 0xFF) << 16;
intBits |= (data.get(fieldPosition++) & 0xFF) << 8;
intBits |= (data.get(fieldPosition) & 0xFF);
return intBits;
}

static long readLongBits(ByteData data, long fieldPosition) {
long longBits = (long) (data.get(fieldPosition++) & 0xFF) << 56;
longBits |= (long) (data.get(fieldPosition++) & 0xFF) << 48;
longBits |= (long) (data.get(fieldPosition++) & 0xFF) << 40;
longBits |= (long) (data.get(fieldPosition++) & 0xFF) << 32;
longBits |= (long) (data.get(fieldPosition++) & 0xFF) << 24;
longBits |= (data.get(fieldPosition++) & 0xFF) << 16;
longBits |= (data.get(fieldPosition++) & 0xFF) << 8;
longBits |= (data.get(fieldPosition) & 0xFF);
return longBits;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.netflix.hollow.core.write;

import com.netflix.hollow.core.memory.ByteData;

public class HollowWriteFieldUtils {

public static int readIntBits(ByteData data, long fieldPosition) {
int intBits = (data.get(fieldPosition++) & 0xFF) << 24;
intBits |= (data.get(fieldPosition++) & 0xFF) << 16;
intBits |= (data.get(fieldPosition++) & 0xFF) << 8;
intBits |= (data.get(fieldPosition) & 0xFF);
return intBits;
}

public static long readLongBits(ByteData data, long fieldPosition) {
long longBits = (long) (data.get(fieldPosition++) & 0xFF) << 56;
longBits |= (long) (data.get(fieldPosition++) & 0xFF) << 48;
longBits |= (long) (data.get(fieldPosition++) & 0xFF) << 40;
longBits |= (long) (data.get(fieldPosition++) & 0xFF) << 32;
longBits |= (long) (data.get(fieldPosition++) & 0xFF) << 24;
longBits |= (data.get(fieldPosition++) & 0xFF) << 16;
longBits |= (data.get(fieldPosition++) & 0xFF) << 8;
longBits |= (data.get(fieldPosition) & 0xFF);
return longBits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ private int fieldHashCode(HollowObjectSchema schema, int fieldIdx, SegmentedByte
return 0;
return data.get(offset) == 1 ? 1231 : 1237;
case DOUBLE:
long longBits = HollowObjectTypeWriteState.readLongBits(data, offset);
long longBits = HollowWriteFieldUtils.readLongBits(data, offset);
return (int)(longBits ^ (longBits >>> 32));
case FLOAT:
return HollowObjectTypeWriteState.readIntBits(data, offset);
return HollowWriteFieldUtils.readIntBits(data, offset);
default:
throw new IllegalArgumentException("Schema "+schema.getName()+" has unknown field type for field " + schema.getFieldName(fieldIdx) + ": " + schema.getFieldType(fieldIdx));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.netflix.hollow.core.write.HollowListWriteRecord;
import com.netflix.hollow.core.write.HollowTypeWriteState;
import com.netflix.hollow.core.write.HollowWriteRecord;
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecordWriter;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
Expand Down Expand Up @@ -67,14 +69,31 @@ public int write(Object obj) {

List<?> l = (List<?>)obj;

HollowListWriteRecord rec = (HollowListWriteRecord)writeRecord();
HollowListWriteRecord rec = copyToWriteRecord(l, null);

int assignedOrdinal = writeState.add(rec);

if(obj instanceof MemoizedList) {
((MemoizedList<?>)obj).__assigned_ordinal = (long)assignedOrdinal | cycleSpecificAssignedOrdinalBits();
}

return assignedOrdinal;
}

public int writeFlat(Object obj, FlatRecordWriter flatRecordWriter) {
HollowListWriteRecord rec = copyToWriteRecord((List<?>)obj, flatRecordWriter);
return flatRecordWriter.write(schema, rec);
}

private HollowListWriteRecord copyToWriteRecord(List<?> l, FlatRecordWriter flatRecordWriter) {
HollowListWriteRecord rec = (HollowListWriteRecord)writeRecord();
if(ignoreListOrdering) {
IntList ordinalList = getIntList();
for(Object o : l) {
if(o == null) {
throw new NullPointerException(String.format(NULL_ELEMENT_MESSAGE, schema));
}
int ordinal = elementMapper.write(o);
int ordinal = flatRecordWriter == null ? elementMapper.write(o) : elementMapper.writeFlat(o, flatRecordWriter);
ordinalList.add(ordinal);
}
ordinalList.sort();
Expand All @@ -85,19 +104,12 @@ public int write(Object obj) {
if (o == null) {
throw new NullPointerException(String.format(NULL_ELEMENT_MESSAGE, schema));
}
int ordinal = elementMapper.write(o);
int ordinal = flatRecordWriter == null ? elementMapper.write(o) : elementMapper.writeFlat(o, flatRecordWriter);
rec.addElement(ordinal);
}
}

int assignedOrdinal = writeState.add(rec);

if(obj instanceof MemoizedList) {
((MemoizedList<?>)obj).__assigned_ordinal = (long)assignedOrdinal | cycleSpecificAssignedOrdinalBits();
}

return assignedOrdinal;
}
return rec;
}

@Override
protected HollowWriteRecord newWriteRecord() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.netflix.hollow.core.write.HollowTypeWriteState;
import com.netflix.hollow.core.write.HollowWriteRecord;
import com.netflix.hollow.core.write.HollowWriteStateEngine;
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecordWriter;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Map;
Expand Down Expand Up @@ -76,7 +78,25 @@ protected int write(Object obj) {

Map<?, ?> m = (Map<?, ?>)obj;

HollowMapWriteRecord rec = (HollowMapWriteRecord)writeRecord();
HollowMapWriteRecord rec = copyToWriteRecord(m, null);

int assignedOrdinal = writeState.add(rec);

if(obj instanceof MemoizedMap) {
((MemoizedMap<?, ?>)obj).__assigned_ordinal = (long)assignedOrdinal | cycleSpecificAssignedOrdinalBits();
}

return assignedOrdinal;
}

@Override
protected int writeFlat(Object obj, FlatRecordWriter flatRecordWriter) {
HollowMapWriteRecord rec = copyToWriteRecord((Map<?,?>)obj, flatRecordWriter);
return flatRecordWriter.write(schema, rec);
}

private HollowMapWriteRecord copyToWriteRecord(Map<?, ?> m, FlatRecordWriter flatRecordWriter) {
HollowMapWriteRecord rec = (HollowMapWriteRecord)writeRecord();
for(Map.Entry<?, ?>entry : m.entrySet()) {
Object key = entry.getKey();
if(key == null) {
Expand All @@ -86,21 +106,22 @@ protected int write(Object obj) {
if(value == null) {
throw new NullPointerException(String.format(NULL_VALUE_MESSAGE, schema));
}
int keyOrdinal = keyMapper.write(key);
int valueOrdinal = valueMapper.write(value);
int hashCode = hashCodeFinder.hashCode(keyMapper.getTypeName(), keyOrdinal, key);

int keyOrdinal, valueOrdinal;
if(flatRecordWriter == null) {
keyOrdinal = keyMapper.write(key);
valueOrdinal = valueMapper.write(value);
} else {
keyOrdinal = keyMapper.writeFlat(key, flatRecordWriter);
valueOrdinal = valueMapper.writeFlat(value, flatRecordWriter);
}

int hashCode = hashCodeFinder.hashCode(keyMapper.getTypeName(), keyOrdinal, key);

rec.addEntry(keyOrdinal, valueOrdinal, hashCode);
}

int assignedOrdinal = writeState.add(rec);

if(obj instanceof MemoizedMap) {
((MemoizedMap<?, ?>)obj).__assigned_ordinal = (long)assignedOrdinal | cycleSpecificAssignedOrdinalBits();
}

return assignedOrdinal;
}
return rec;
}

@Override
protected HollowWriteRecord newWriteRecord() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.netflix.hollow.core.write.objectmapper;

import com.netflix.hollow.core.write.HollowWriteStateEngine;
import com.netflix.hollow.core.write.objectmapper.flatrecords.FlatRecordWriter;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.HashSet;
Expand Down Expand Up @@ -71,6 +73,14 @@ public int add(Object o) {
HollowTypeMapper typeMapper = getTypeMapper(o.getClass(), null, null);
return typeMapper.write(o);
}

/**
* Warning: Experimental. This is a BETA API and is subject to breaking changes.
*/
public void writeFlat(Object o, FlatRecordWriter flatRecordWriter) {
HollowTypeMapper typeMapper = getTypeMapper(o.getClass(), null, null);
typeMapper.writeFlat(o, flatRecordWriter);
}

/**
* Extracts the primary key from the specified POJO.
Expand Down
Loading

0 comments on commit 615201e

Please sign in to comment.