Skip to content

Commit

Permalink
[Java] Optimize Collections.synchronized serialization performance (#188
Browse files Browse the repository at this point in the history
)

* optimize synchronized serializers

* fix doc
  • Loading branch information
chaokunyang committed May 10, 2023
1 parent 78549dc commit fd261dc
Show file tree
Hide file tree
Showing 6 changed files with 490 additions and 1 deletion.
4 changes: 3 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,6 @@ writeVarInt/writeVarLong in java/fury-core/src/main/java/io/fury/memory/MemoryBu
java/fury-core/src/main/java/io/fury/util/{FuryObjectMap/ObjectIntMap/IdentityMap/IdentityObjectIntMap/LongMap}.java are adapted from https://github.com/EsotericSoftware/kryo/blob/master/src/com/esotericsoftware/kryo/util/{ObjectMap/ObjectIntMap/IdentityMap/IdentityObjectIntMap/IntMap}.java
{ParentClassLoader/ChildFirstURLClassLoader} in java/fury-core/src/main/java/io/fury/util/ClassLoaderUtils.java is copied from https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/{ParentClassLoader/ChildFirstURLClassLoader}.java
java/fury-core/src/main/java/io/fury/type/Generics.java is modified from https://github.com/EsotericSoftware/kryo/blob/master/src/com/esotericsoftware/kryo/util/DefaultGenerics.java
java/fury-core/src/main/java/io/fury/ClassLoaderObjectInputStream.java is copied from https://github.com/apache/commons-io/blob/master/src/main/java/org/apache/commons/io/input/ClassLoaderObjectInputStream.java
java/fury-core/src/main/java/io/fury/io/ClassLoaderObjectInputStream.java is copied from https://github.com/apache/commons-io/blob/master/src/main/java/org/apache/commons/io/input/ClassLoaderObjectInputStream.java
java/fury-core/src/main/java/io/fury/serializer/SynchronizedSerializers.java is modified from https://github.com/magro/kryo-serializers/blob/master/src/main/java/de/javakaffee/kryoserializers/SynchronizedCollectionsSerializer.java

Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.fury.serializer.SerializerFactory;
import io.fury.serializer.Serializers;
import io.fury.serializer.StringSerializer;
import io.fury.serializer.SynchronizedSerializers;
import io.fury.serializer.TimeSerializers;
import io.fury.type.Descriptor;
import io.fury.type.GenericType;
Expand Down Expand Up @@ -253,6 +254,7 @@ private void addDefaultSerializers() {
addDefaultSerializer(
JdkProxySerializer.ReplaceStub.class,
new JdkProxySerializer(fury, JdkProxySerializer.ReplaceStub.class));
SynchronizedSerializers.registerSerializers(fury);
}

private void addDefaultSerializer(Class<?> type, Class<? extends Serializer> serializerClass) {
Expand Down Expand Up @@ -615,6 +617,10 @@ public Class<? extends Serializer> getSerializerClass(Class<?> cls) {
} else if (Externalizable.class.isAssignableFrom(cls)) {
return ExternalizableSerializer.class;
}
if (useReplaceResolveSerializer(cls)) {
// TODO(chaokunyang) switch to ReplaceResolveSerializer
return getJavaSerializer(cls);
}
if (requireJavaSerialization(cls)) {
return getJavaSerializer(cls);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Copyright 2010 Martin Grotzke
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an &quot;AS IS&quot; BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.fury.serializer;

import io.fury.Fury;
import io.fury.exception.FuryException;
import io.fury.memory.MemoryBuffer;
import io.fury.serializer.CollectionSerializers.CollectionSerializer;
import io.fury.serializer.MapSerializers.MapSerializer;
import io.fury.util.LoggerFactory;
import io.fury.util.Platform;
import io.fury.util.ReflectionUtils;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.slf4j.Logger;

/** Serializer for synchronized Collections and Maps created via Collections. */
// modified from
// https://github.com/magro/kryo-serializers/blob/master/src/main/java/de/javakaffee/kryoserializers/SynchronizedCollectionsSerializer.java
// but faster because of using unsafe instead of reflection.
@SuppressWarnings({"rawtypes", "unchecked"})
public class SynchronizedSerializers {
private static final Logger LOG = LoggerFactory.getLogger(SynchronizedSerializers.class);
private static Field SOURCE_COLLECTION_FIELD;
private static Field SOURCE_MAP_FIELD;
private static final long SOURCE_COLLECTION_FIELD_OFFSET;
private static final long SOURCE_MAP_FIELD_OFFSET;

static {
try {
// SynchronizedList/Set/Etc.. extends SynchronizedCollection
SOURCE_COLLECTION_FIELD =
Collections.synchronizedCollection(Collections.emptyList())
.getClass()
.getDeclaredField("c");
// SynchronizedSortedMap/SynchronizedNavigableMap extends SynchronizedMap
SOURCE_MAP_FIELD =
Collections.synchronizedMap(Collections.emptyMap()).getClass().getDeclaredField("m");
} catch (Exception e) {
LOG.warn(
"Could not access source collection field in "
+ "java.util.Collections$SynchronizedCollection/SynchronizedMap {}.",
e.toString());
}
SOURCE_COLLECTION_FIELD_OFFSET = ReflectionUtils.getFieldOffset(SOURCE_COLLECTION_FIELD);
SOURCE_MAP_FIELD_OFFSET = ReflectionUtils.getFieldOffset(SOURCE_MAP_FIELD);
}

public static final class SynchronizedCollectionSerializer
extends CollectionSerializer<Collection> {
private final SynchronizedFactory synchronizedFactory;

public SynchronizedCollectionSerializer(
Fury fury, Class cls, SynchronizedFactory synchronizedFactory) {
super(fury, cls, false, false);
this.synchronizedFactory = synchronizedFactory;
}

@Override
public void write(MemoryBuffer buffer, Collection object) {
// the ordinal could be replaced by s.th. else (e.g. a explicitly managed "id")
Object unwrapped = Platform.getObject(object, synchronizedFactory.sourceFieldOffset);
fury.writeReferencableToJava(buffer, unwrapped);
}

@Override
public Collection read(MemoryBuffer buffer) {
final Object sourceCollection = fury.readReferencableFromJava(buffer);
return (Collection) synchronizedFactory.create(sourceCollection);
}
}

public static final class SynchronizedMapSerializer extends MapSerializer<Map> {
private final SynchronizedFactory synchronizedFactory;

public SynchronizedMapSerializer(
Fury fury, Class cls, SynchronizedFactory synchronizedFactory) {
super(fury, cls, false, false);
this.synchronizedFactory = synchronizedFactory;
}

@Override
public void write(MemoryBuffer buffer, Map object) {
// the ordinal could be replaced by s.th. else (e.g. a explicitly managed "id")
Object unwrapped = Platform.getObject(object, synchronizedFactory.sourceFieldOffset);
fury.writeReferencableToJava(buffer, unwrapped);
}

@Override
public Map read(MemoryBuffer buffer) {
final Object sourceCollection = fury.readReferencableFromJava(buffer);
return (Map) synchronizedFactory.create(sourceCollection);
}
}

enum SynchronizedFactory {
COLLECTION(
Collections.synchronizedCollection(Arrays.asList("")).getClass(),
SOURCE_COLLECTION_FIELD_OFFSET) {
@Override
public Object create(final Object sourceCollection) {
return Collections.synchronizedCollection((Collection<?>) sourceCollection);
}
},
RANDOM_ACCESS_LIST(
Collections.synchronizedList(new ArrayList<Void>()).getClass(),
SOURCE_COLLECTION_FIELD_OFFSET) {
@Override
public Object create(final Object sourceCollection) {
return Collections.synchronizedList((List<?>) sourceCollection);
}
},
LIST(
Collections.synchronizedList(new LinkedList<Void>()).getClass(),
SOURCE_COLLECTION_FIELD_OFFSET) {
@Override
public Object create(final Object sourceCollection) {
return Collections.synchronizedList((List<?>) sourceCollection);
}
},
SET(
Collections.synchronizedSet(new HashSet<Void>()).getClass(),
SOURCE_COLLECTION_FIELD_OFFSET) {
@Override
public Object create(final Object sourceCollection) {
return Collections.synchronizedSet((Set<?>) sourceCollection);
}
},
SORTED_SET(
Collections.synchronizedSortedSet(new TreeSet<>()).getClass(),
SOURCE_COLLECTION_FIELD_OFFSET) {
@Override
public Object create(final Object sourceCollection) {
return Collections.synchronizedSortedSet((SortedSet<?>) sourceCollection);
}
},
MAP(
Collections.synchronizedMap(new HashMap<Void, Void>()).getClass(),
SOURCE_MAP_FIELD_OFFSET) {
@Override
public Object create(final Object sourceCollection) {
return Collections.synchronizedMap((Map<?, ?>) sourceCollection);
}
},
SORTED_MAP(
Collections.synchronizedSortedMap(new TreeMap<>()).getClass(), SOURCE_MAP_FIELD_OFFSET) {
@Override
public Object create(final Object sourceCollection) {
return Collections.synchronizedSortedMap((SortedMap<?, ?>) sourceCollection);
}
};

private final Class<?> type;
private final long sourceFieldOffset;

SynchronizedFactory(final Class<?> type, long sourceFieldOffset) {
this.type = type;
this.sourceFieldOffset = sourceFieldOffset;
}

public boolean isCollection() {
return Collection.class.isAssignableFrom(type);
}

public abstract Object create(Object sourceCollection);

static SynchronizedFactory valueOfType(final Class<?> type) {
for (final SynchronizedFactory item : values()) {
if (item.type.equals(type)) {
return item;
}
}
throw new IllegalArgumentException("The type " + type + " is not supported.");
}
}

/**
* Registering serializers for synchronized Collections and Maps created via {@link Collections}.
*
* @see Collections#synchronizedCollection(Collection)
* @see Collections#synchronizedList(List)
* @see Collections#synchronizedSet(Set)
* @see Collections#synchronizedSortedSet(SortedSet)
* @see Collections#synchronizedMap(Map)
* @see Collections#synchronizedSortedMap(SortedMap)
*/
public static void registerSerializers(Fury fury) {
if (SOURCE_COLLECTION_FIELD != null && SOURCE_MAP_FIELD != null) {
Set<? extends Class<?>> classSet =
Arrays.stream(SynchronizedFactory.values()).map(c -> c.type).collect(Collectors.toSet());
if (classSet.size() != SynchronizedFactory.values().length) {
throw new FuryException(
String.format(
"Enum types %s duplicate.", Arrays.toString(SynchronizedFactory.values())));
}
for (SynchronizedFactory factory : SynchronizedFactory.values()) {
if (factory.isCollection()) {
fury.registerSerializer(
factory.type, new SynchronizedCollectionSerializer(fury, factory.type, factory));
} else {
fury.registerSerializer(
factory.type, new SynchronizedMapSerializer(fury, factory.type, factory));
}
}
}
}
}
Loading

0 comments on commit fd261dc

Please sign in to comment.