Skip to content

Commit

Permalink
[Java] add reference tracking support (#74)
Browse files Browse the repository at this point in the history
* add reference tracking support

* fix testWriteReferenceOrNull

* fix testWriteReferenceOrNull
  • Loading branch information
chaokunyang committed May 5, 2023
1 parent c9926ae commit d99e829
Show file tree
Hide file tree
Showing 6 changed files with 672 additions and 1 deletion.
29 changes: 28 additions & 1 deletion java/fury-core/src/main/java/io/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,31 @@

package io.fury;

public class Fury {}
import io.fury.util.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;

/**
* Cross-Lang Data layout: 1byte mask: 1-bit null: 0->null, 1->not null 1-bit endianness: 0->le,
* 1->be 1-bit target lang: 0->native, 1->x_lang if x_lang, will write current process language as a
* byte into buffer. 1-bit out-of-band serialization enable flag: 0 -> not enabled, 1 -> enabled.
* other bits reserved.
*
* <p>serialize/deserialize is user API for root object serialization, write/read api is for inner
* serialization.
*
* @author chaokunyang
*/
@NotThreadSafe
public final class Fury {
private static final Logger LOG = LoggerFactory.getLogger(Fury.class);

public static final byte NULL_FLAG = -3;
// This flag indicates that object is a not-null value.
// We don't use another byte to indicate REF, so that we can save one byte.
public static final byte REF_FLAG = -2;
// this flag indicates that the object is a non-null value.
public static final byte NOT_NULL_VALUE_FLAG = -1;
// this flag indicates that the object is a referencable and first read.
public static final byte REF_VALUE_FLAG = 0;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
/*
* Copyright 2023 The Fury authors
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fury.resolver;

import com.google.common.base.Preconditions;
import io.fury.Fury;
import io.fury.collection.IdentityObjectIntMap;
import io.fury.collection.IntArray;
import io.fury.collection.MapStatistics;
import io.fury.collection.ObjectArray;
import io.fury.memory.MemoryBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
* Resolving reference by tracking reference by an IdentityMap.
*
* @author chaokunyang
*/
// FIXME Will binding a separate reference resolver to every type have better performance?
// If so, we can have sophisticated reference control for every type.
public final class MapReferenceResolver implements ReferenceResolver {
private static final boolean ENABLE_FURY_REF_PROFILING =
"true".equalsIgnoreCase(System.getProperty("fury.enable_ref_profiling"));

// Map clean will zero all key array elements, which is unnecessary for
private static final int DEFAULT_MAP_CAPACITY = 4;
private static final int DEFAULT_ARRAY_CAPACITY = 4;
// use average size to amortise resize/clear cost.
// exponential smoothing can't reflect overall reference size, thus not
// suitable for amortization.
// FIXME median may be better, but calculate median streaming is complicated.
// FIXME is there a more accurate way to predict reference size?
// maybe more complicated exponential smoothing?
private long writeCounter;
private long writeTotalObjectSize = 0;
private long readCounter;
private long readTotalObjectSize = 0;
private final IdentityObjectIntMap<Object> writtenObjects =
new IdentityObjectIntMap<>(DEFAULT_MAP_CAPACITY, 0.51f);
private final ObjectArray readObjects = new ObjectArray(DEFAULT_ARRAY_CAPACITY);
private final IntArray readReferenceIds = new IntArray(DEFAULT_ARRAY_CAPACITY);

// last read object which is not a reference
private Object readObject;

public MapReferenceResolver() {}

@Override
public boolean writeReferenceOrNull(MemoryBuffer buffer, Object obj) {
buffer.grow(10);
if (obj == null) {
buffer.unsafeWriteByte(Fury.NULL_FLAG);
return true;
} else {
// The id should be consistent with `#nextReadRefId`
int newWriteRefId = writtenObjects.size;
int writtenRefId;
if (ENABLE_FURY_REF_PROFILING) {
// replaceReference is rare, just ignore it for profiling.
writtenRefId = writtenObjects.profilingPutOrGet(obj, newWriteRefId);
} else {
writtenRefId = writtenObjects.putOrGet(obj, newWriteRefId);
}
if (writtenRefId >= 0) {
// The obj has been written previously.
buffer.unsafeWriteByte(Fury.REF_FLAG);
buffer.unsafeWritePositiveVarInt(writtenRefId);
return true;
} else {
// The object is being written for the first time.
buffer.unsafeWriteByte(Fury.REF_VALUE_FLAG);
return false;
}
}
}

@Override
public boolean writeReferenceValueFlag(MemoryBuffer buffer, Object obj) {
assert obj != null;
buffer.grow(10);
// The id should be consistent with `#nextReadRefId`
int newWriteRefId = writtenObjects.size;
int writtenRefId;
if (ENABLE_FURY_REF_PROFILING) {
// replaceReference is rare, just ignore it for profiling.
writtenRefId = writtenObjects.profilingPutOrGet(obj, newWriteRefId);
} else {
writtenRefId = writtenObjects.putOrGet(obj, newWriteRefId);
}
if (writtenRefId >= 0) {
// The obj has been written previously.
buffer.unsafeWriteByte(Fury.REF_FLAG);
buffer.unsafeWritePositiveVarInt(writtenRefId);
return false;
} else {
// The object is being written for the first time.
buffer.unsafeWriteByte(Fury.REF_VALUE_FLAG);
return true;
}
}

@Override
public boolean writeNullFlag(MemoryBuffer buffer, Object obj) {
if (obj == null) {
buffer.unsafeWriteByte(Fury.NULL_FLAG);
return true;
}
return false;
}

@Override
public void replaceReference(Object original, Object newObject) {
int newObjectId = writtenObjects.get(newObject, -1);
Preconditions.checkArgument(newObjectId != -1);
writtenObjects.put(original, newObjectId);
}

/**
* Returns {@link Fury#NULL_FLAG} if the object is null and set {@link #readObject} to null.
*
* <p>Returns {@link Fury#NOT_NULL_VALUE_FLAG} if the object is not null and the object isn't a
* referencable value and first read.
*
* <p>Returns {@link Fury#REF_FLAG} if a reference to a previously read object was read, which is
* stored in {@link #readObject}.
*
* <p>Returns {@link Fury#REF_VALUE_FLAG} if the object is a referencable value and not null and
* the object is first read.
*/
@Override
public byte readReferenceOrNull(MemoryBuffer buffer) {
byte headFlag = buffer.readByte();
if (headFlag == Fury.REF_FLAG) {
// read reference id and get object from reference resolver
int referenceId = buffer.readPositiveVarInt();
readObject = getReadObject(referenceId);
} else {
readObject = null;
}
return headFlag;
}

@Override
public int preserveReferenceId() {
int nextReadRefId = readObjects.size();
readObjects.add(null);
readReferenceIds.add(nextReadRefId);
return nextReadRefId;
}

@Override
public int tryPreserveReferenceId(MemoryBuffer buffer) {
byte headFlag = buffer.readByte();
if (headFlag == Fury.REF_FLAG) {
// read reference id and get object from reference resolver
int referenceId = buffer.readPositiveVarInt();
readObject = getReadObject(referenceId);
} else {
readObject = null;
if (headFlag == Fury.REF_VALUE_FLAG) {
return preserveReferenceId();
}
}
// `headFlag` except `REF_FLAG` can be used as stub reference id because we use
// `refId >= NOT_NULL_VALUE_FLAG` to read data.
return headFlag;
}

@Override
public int lastPreservedReferenceId() {
return readReferenceIds.get(readReferenceIds.size - 1);
}

@Override
public void reference(Object object) {
int refId = readReferenceIds.pop();
setReadObject(refId, object);
}

@Override
public Object getReadObject(int id) {
return readObjects.get(id);
}

@Override
public Object getReadObject() {
return readObject;
}

@Override
public void setReadObject(int id, Object object) {
if (id >= 0) {
readObjects.set(id, object);
}
}

@Override
public void reset() {
resetWrite();
resetRead();
}

@Override
public void resetWrite() {
IdentityObjectIntMap<Object> writtenObjects = this.writtenObjects;
// TODO handle outlier big size.
long writeTotalObjectSize = this.writeTotalObjectSize + writtenObjects.size;
long writeCounter = this.writeCounter + 1;
if (writeCounter < 0 || writeTotalObjectSize < 0) { // overflow;
writeCounter = 1;
writeTotalObjectSize = writtenObjects.size;
}
this.writeCounter = writeCounter;
this.writeTotalObjectSize = writeTotalObjectSize;
int avg = (int) (writeTotalObjectSize / writeCounter);
if (avg <= DEFAULT_MAP_CAPACITY) {
avg = DEFAULT_MAP_CAPACITY;
}
writtenObjects.clearApproximate(avg);
}

@Override
public void resetRead() {
ObjectArray readObjects = this.readObjects;
// TODO handle outlier big size.
long readTotalObjectSize = this.readTotalObjectSize + readObjects.size();
long readCounter = this.readCounter + 1;
if (readCounter < 0 || readTotalObjectSize < 0) { // overflow;
readCounter = 1;
readTotalObjectSize = readObjects.size();
}
this.readCounter = readCounter;
this.readTotalObjectSize = readTotalObjectSize;
int avg = (int) (readTotalObjectSize / readCounter);
if (avg <= DEFAULT_ARRAY_CAPACITY) {
avg = DEFAULT_ARRAY_CAPACITY;
}
readObjects.clearApproximate(avg);
readReferenceIds.clear();
readObject = null;
}

public static class ReferenceStatistics {
LinkedHashMap<Class<?>, Integer> referenceTypeSummary;
int referenceCount;
MapStatistics mapStatistics;

public ReferenceStatistics(
LinkedHashMap<Class<?>, Integer> referenceTypeSummary, MapStatistics mapStatistics) {
this.referenceTypeSummary = referenceTypeSummary;
this.mapStatistics = mapStatistics;
referenceCount = referenceTypeSummary.values().stream().reduce(0, Integer::sum, Integer::sum);
}

@Override
public String toString() {
return "ReferenceStatistics{"
+ "referenceTypeSummary="
+ referenceTypeSummary
+ ", referenceCount="
+ referenceCount
+ ", mapProbeStatistics="
+ mapStatistics
+ '}';
}
}

public ReferenceStatistics referenceStatistics() {
return new ReferenceStatistics(referenceTypeSummary(), writtenObjects.getAndResetStatistics());
}

/** Returns a map which indicates counter for reference object type. */
public LinkedHashMap<Class<?>, Integer> referenceTypeSummary() {
Map<Class<?>, Integer> typeCounter = new HashMap<>();
writtenObjects.forEach(
(k, v) -> typeCounter.compute(k.getClass(), (key, value) -> value == null ? 1 : value + 1));
List<Map.Entry<Class<?>, Integer>> entries = new ArrayList<>(typeCounter.entrySet());
entries.sort(
(o1, o2) -> {
if (o1.getValue().equals(o2.getValue())) {
return o1.getKey().getName().compareTo(o2.getKey().getName());
} else {
return o2.getValue() - o1.getValue();
}
});
LinkedHashMap<Class<?>, Integer> result = new LinkedHashMap<>(entries.size());
entries.forEach(e -> result.put(e.getKey(), e.getValue()));
return result;
}
}
Loading

0 comments on commit d99e829

Please sign in to comment.