Skip to content

Commit

Permalink
feat(java): support meta compression by Deflater (#1663)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR support meta compression and add Deflater as default compressor.

In our test, it can compress meta by reduce size of **243** without
introducing any performance cost:

```
before:
Fury | STRUCT | false | array | 1227 |

after
STRUCT | false | array | 984 |

```

## Related issues
#1660 


## Does this PR introduce any user-facing change?

<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/incubator-fury/issues/new/choose)
describing the need to do so and update the document if necessary.
-->

- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?


## Benchmark

<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
-->
  • Loading branch information
chaokunyang authored Jun 1, 2024
1 parent 062bd76 commit da5f847
Show file tree
Hide file tree
Showing 11 changed files with 368 additions and 42 deletions.
4 changes: 3 additions & 1 deletion docs/guide/java_serialization_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ public class Example {
| `registerGuavaTypes` | Whether to pre-register Guava types such as `RegularImmutableMap`/`RegularImmutableList`. These types are not public API, but seem pretty stable. | `true` |
| `requireClassRegistration` | Disabling may allow unknown classes to be deserialized, potentially causing security risks. | `true` |
| `suppressClassRegistrationWarnings` | Whether to suppress class registration warnings. The warnings can be used for security audit, but may be annoying, this suppression will be enabled by default. | `true` |
| `shareMetaContext` | Enables or disables meta share mode. | `false` |
| `metaShareEnabled` | Enables or disables meta share mode. | `false` |
| `scopedMetaShareEnabled` | Scoped meta share focuses on a single serialization process. Metadata created or identified during this process is exclusive to it and is not shared with by other serializations. | `false` |
| `metaCompressor` | Set a compressor for meta compression. Note that the passed MetaCompressor should be thread-safe. By default, a `Deflater` based compressor `DeflaterMetaCompressor` will be used. Users can pass other compressor such as `zstd` for better compression rate. | `DeflaterMetaCompressor` |
| `deserializeNonexistentClass` | Enables or disables deserialization/skipping of data for non-existent classes. | `true` if `CompatibleMode.Compatible` is set, otherwise false. |
| `codeGenEnabled` | Disabling may result in faster initial serialization but slower subsequent serializations. | `true` |
| `asyncCompilationEnabled` | If enabled, serialization uses interpreter mode first and switches to JIT serialization after async serializer JIT for a class is finished. | `false` |
Expand Down
13 changes: 13 additions & 0 deletions java/fury-core/src/main/java/org/apache/fury/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.fury.Fury;
import org.apache.fury.meta.MetaCompressor;
import org.apache.fury.serializer.Serializer;
import org.apache.fury.serializer.TimeSerializers;
import org.apache.fury.util.Preconditions;
Expand All @@ -51,6 +52,7 @@ public class Config implements Serializable {
private final boolean registerGuavaTypes;
private final boolean metaShareEnabled;
private final boolean scopedMetaShareEnabled;
private final MetaCompressor metaCompressor;
private final boolean asyncCompilationEnabled;
private final boolean deserializeNonexistentClass;
private final boolean scalaOptimizationEnabled;
Expand All @@ -77,6 +79,7 @@ public Config(FuryBuilder builder) {
defaultJDKStreamSerializerType = builder.defaultJDKStreamSerializerType;
metaShareEnabled = builder.metaShareEnabled;
scopedMetaShareEnabled = builder.scopedMetaShareEnabled;
metaCompressor = builder.metaCompressor;
deserializeNonexistentClass = builder.deserializeNonexistentClass;
if (deserializeNonexistentClass) {
// Only in meta share mode or compatibleMode, fury knows how to deserialize
Expand Down Expand Up @@ -192,6 +195,14 @@ public boolean isScopedMetaShareEnabled() {
return scopedMetaShareEnabled;
}

/**
* Returns a {@link MetaCompressor} to compress class metadata such as field names and types. The
* returned {@link MetaCompressor} should be thread safe.
*/
public MetaCompressor getMetaCompressor() {
return metaCompressor;
}

/**
* Whether deserialize/skip data of un-existed class. If not enabled, an exception will be thrown
* if class not exist.
Expand Down Expand Up @@ -247,6 +258,7 @@ public boolean equals(Object o) {
&& registerGuavaTypes == config.registerGuavaTypes
&& metaShareEnabled == config.metaShareEnabled
&& scopedMetaShareEnabled == config.scopedMetaShareEnabled
&& Objects.equals(metaCompressor, config.metaCompressor)
&& asyncCompilationEnabled == config.asyncCompilationEnabled
&& deserializeNonexistentClass == config.deserializeNonexistentClass
&& scalaOptimizationEnabled == config.scalaOptimizationEnabled
Expand Down Expand Up @@ -278,6 +290,7 @@ public int hashCode() {
registerGuavaTypes,
metaShareEnabled,
scopedMetaShareEnabled,
metaCompressor,
asyncCompilationEnabled,
deserializeNonexistentClass,
scalaOptimizationEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.fury.logging.Logger;
import org.apache.fury.logging.LoggerFactory;
import org.apache.fury.memory.Platform;
import org.apache.fury.meta.DeflaterMetaCompressor;
import org.apache.fury.meta.MetaCompressor;
import org.apache.fury.pool.ThreadPoolFury;
import org.apache.fury.resolver.ClassResolver;
import org.apache.fury.serializer.JavaSerializer;
Expand Down Expand Up @@ -77,6 +79,7 @@ public final class FuryBuilder {
boolean scalaOptimizationEnabled = false;
boolean suppressClassRegistrationWarnings = true;
boolean deserializeNonexistentEnumValueAsNull = false;
MetaCompressor metaCompressor = new DeflaterMetaCompressor();

public FuryBuilder() {}

Expand Down Expand Up @@ -251,6 +254,16 @@ public FuryBuilder withScopedMetaShare(boolean scoped) {
return this;
}

/**
* Set a compressor for meta compression. Note that the passed {@link MetaCompressor} should be
* thread-safe. By default, a `Deflater` based compressor {@link DeflaterMetaCompressor} will be
* used. Users can pass other compressor such as `zstd` for better compression rate.
*/
public FuryBuilder withMetaCompressor(MetaCompressor metaCompressor) {
this.metaCompressor = MetaCompressor.checkMetaCompressor(metaCompressor);
return this;
}

/**
* Whether deserialize/skip data of un-existed class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class ClassDef implements Serializable {
static final int SCHEMA_COMPATIBLE_FLAG = 0b10000;
public static final int SIZE_TWO_BYTES_FLAG = 0b100000;
static final int OBJECT_TYPE_FLAG = 0b1000000;
static final int COMPRESSION_FLAG = 0b10000000;
// TODO use field offset to sort field, which will hit l1-cache more. Since
// `objectFieldOffset` is not part of jvm-specification, it may change between different jdk
// vendor. But the deserialization peer use the class definition to create deserializer, it's OK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.fury.meta;

import static org.apache.fury.meta.ClassDef.COMPRESSION_FLAG;
import static org.apache.fury.meta.ClassDef.SIZE_TWO_BYTES_FLAG;
import static org.apache.fury.meta.Encoders.fieldNameEncodings;
import static org.apache.fury.meta.Encoders.pkgEncodings;
Expand Down Expand Up @@ -50,33 +51,42 @@ public static ClassDef decodeClassDef(ClassResolver classResolver, MemoryBuffer
size = buffer.readByte() & 0xff;
encoded.writeByte(size);
}
buffer.checkReadableBytes(size);
encoded.writeBytes(buffer.getBytes(buffer.readerIndex(), size));
byte[] encodedClassDef = buffer.readBytes(size);
encoded.writeBytes(encodedClassDef);
if ((id & COMPRESSION_FLAG) != 0) {
encodedClassDef =
classResolver
.getFury()
.getConfig()
.getMetaCompressor()
.decompress(encodedClassDef, 0, size);
}
MemoryBuffer classDefBuf = MemoryBuffer.fromByteArray(encodedClassDef);
long header = id & 0xff;
int numClasses = (int) (header & 0b1111);
if (numClasses == 0b1111) {
numClasses += buffer.readVarUint32Small7();
numClasses += classDefBuf.readVarUint32Small7();
}
numClasses += 1;
String className = null;
String className;
List<ClassDef.FieldInfo> classFields = new ArrayList<>();
ClassSpec classSpec = null;
for (int i = 0; i < numClasses; i++) {
// | num fields + register flag | header + package name | header + class name
// | header + type id + field name | next field info | ... |
int currentClassHeader = buffer.readVarUint32Small7();
int currentClassHeader = classDefBuf.readVarUint32Small7();
boolean isRegistered = (currentClassHeader & 0b1) != 0;
int numFields = currentClassHeader >>> 1;
if (isRegistered) {
int registeredId = buffer.readVarUint32Small7();
int registeredId = classDefBuf.readVarUint32Small7();
className = classResolver.getClassInfo((short) registeredId).getCls().getName();
} else {
String pkg = readPkgName(buffer);
String typeName = readTypeName(buffer);
String pkg = readPkgName(classDefBuf);
String typeName = readTypeName(classDefBuf);
classSpec = Encoders.decodePkgAndClass(pkg, typeName);
className = classSpec.entireClassName;
}
List<ClassDef.FieldInfo> fieldInfos = readFieldsInfo(buffer, className, numFields);
List<ClassDef.FieldInfo> fieldInfos = readFieldsInfo(classDefBuf, className, numFields);
classFields.addAll(fieldInfos);
}
Preconditions.checkNotNull(classSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

package org.apache.fury.meta;

import static org.apache.fury.meta.ClassDef.COMPRESSION_FLAG;
import static org.apache.fury.meta.ClassDef.OBJECT_TYPE_FLAG;
import static org.apache.fury.meta.ClassDef.SCHEMA_COMPATIBLE_FLAG;
import static org.apache.fury.meta.ClassDef.SIZE_TWO_BYTES_FLAG;
import static org.apache.fury.meta.Encoders.fieldNameEncodingsList;
import static org.apache.fury.meta.Encoders.pkgEncodingsList;
import static org.apache.fury.meta.Encoders.typeNameEncodingsList;
import static org.apache.fury.util.MathUtils.toInt;

import java.lang.reflect.Field;
import java.util.ArrayList;
Expand Down Expand Up @@ -124,20 +126,7 @@ static MemoryBuffer encodeClassDef(
Class<?> type,
Map<String, List<FieldInfo>> classLayers,
boolean isObjectType) {
MemoryBuffer buffer = MemoryUtils.buffer(32);
buffer.increaseWriterIndex(9); // header + one byte size
long header;
int encodedSize = classLayers.size() - 1; // num class must be greater than 0
if (encodedSize > 0b1110) {
header = 0b1111;
buffer.writeVarUint32Small7(encodedSize - 0b1110);
} else {
header = encodedSize;
}
header |= SCHEMA_COMPATIBLE_FLAG;
if (isObjectType) {
header |= OBJECT_TYPE_FLAG;
}
MemoryBuffer classDefBuf = MemoryBuffer.newHeapBuffer(128);
for (Map.Entry<String, List<FieldInfo>> entry : classLayers.entrySet()) {
String className = entry.getKey();
List<FieldInfo> fields = entry.getValue();
Expand All @@ -146,36 +135,63 @@ static MemoryBuffer encodeClassDef(
int currentClassHeader = (fields.size() << 1);
if (classResolver.isRegistered(type)) {
currentClassHeader |= 1;
buffer.writeVarUint32Small7(currentClassHeader);
buffer.writeVarUint32Small7(classResolver.getRegisteredClassId(type));
classDefBuf.writeVarUint32Small7(currentClassHeader);
classDefBuf.writeVarUint32Small7(classResolver.getRegisteredClassId(type));
} else {
buffer.writeVarUint32Small7(currentClassHeader);
classDefBuf.writeVarUint32Small7(currentClassHeader);
Class<?> currentType = getType(type, className);
Tuple2<String, String> encoded = Encoders.encodePkgAndClass(currentType);
writePkgName(buffer, encoded.f0);
writeTypeName(buffer, encoded.f1);
writePkgName(classDefBuf, encoded.f0);
writeTypeName(classDefBuf, encoded.f1);
}
writeFieldsInfo(buffer, fields);
writeFieldsInfo(classDefBuf, fields);
}
byte[] compressed =
classResolver
.getFury()
.getConfig()
.getMetaCompressor()
.compress(classDefBuf.getHeapMemory(), 0, classDefBuf.writerIndex());
boolean isCompressed = false;
if (compressed.length < classDefBuf.writerIndex()) {
isCompressed = true;
classDefBuf = MemoryBuffer.fromByteArray(compressed);
classDefBuf.writerIndex(compressed.length);
}
long hash =
MurmurHash3.murmurhash3_x64_128(
classDefBuf.getHeapMemory(), 0, classDefBuf.writerIndex(), 47)[0];
long header;
int numClasses = classLayers.size() - 1; // num class must be greater than 0
if (numClasses > 0b1110) {
header = 0b1111;
} else {
header = numClasses;
}
header |= SCHEMA_COMPATIBLE_FLAG;
if (isObjectType) {
header |= OBJECT_TYPE_FLAG;
}
if (isCompressed) {
header |= COMPRESSION_FLAG;
}
byte[] encodedClassDef = buffer.getBytes(0, buffer.writerIndex());
long hash = MurmurHash3.murmurhash3_x64_128(encodedClassDef, 0, encodedClassDef.length, 47)[0];
// this id will be part of generated codec, a negative number won't be allowed in class name.
hash <<= 8;
header |= Math.abs(hash);
int len = buffer.writerIndex() - 9;
MemoryBuffer buffer = MemoryUtils.buffer(classDefBuf.writerIndex() + 10);
int len = classDefBuf.writerIndex() + toInt(numClasses > 0b1110);
if (len > 255) {
header |= SIZE_TWO_BYTES_FLAG;
}
buffer.putInt64(0, header);
if (len > 255) {
MemoryBuffer buf = MemoryBuffer.newHeapBuffer(len + 1);
buf.writeInt64(header);
buf.writeInt16((short) len);
buf.writeBytes(buffer.getBytes(9, len));
buffer = buf;
buffer.writeInt64(header);
buffer.writeInt16((short) len);
} else {
buffer.putByte(8, (byte) len);
buffer.writeInt64(header);
buffer.writeByte(len);
}
if (numClasses > 0b1110) {
buffer.writeVarUint32Small7(numClasses - 0b1110);
}
buffer.writeBytes(classDefBuf.getHeapMemory(), 0, classDefBuf.writerIndex());
return buffer;
}

Expand Down
Loading

0 comments on commit da5f847

Please sign in to comment.