Skip to content

Commit

Permalink
[Java] add thread-safe fury serializer (#278)
Browse files Browse the repository at this point in the history
add thread -safe fury serializer
  • Loading branch information
chaokunyang committed May 19, 2023
1 parent d971d29 commit 7860e56
Show file tree
Hide file tree
Showing 4 changed files with 459 additions and 0 deletions.
18 changes: 18 additions & 0 deletions java/fury-core/src/main/java/io/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -1430,5 +1430,23 @@ public Fury build() {
this.classLoader = null;
return new Fury(this, loader);
}

/** Build thread safe fury. */
public ThreadSafeFury buildThreadSafeFury() {
return buildThreadLocalFury();
}

/** Build thread safe fury backed by {@link ThreadLocalFury}. */
public ThreadLocalFury buildThreadLocalFury() {
finish();
ClassLoader loader = this.classLoader;
// clear classLoader to avoid `LoaderBinding#furyFactory` lambda capture classLoader by
// capturing `FuryBuilder`, which make `classLoader` not able to be gc.
this.classLoader = null;
ThreadLocalFury threadSafeFury =
new ThreadLocalFury(classLoader -> new Fury(FuryBuilder.this, classLoader));
threadSafeFury.setClassLoader(loader);
return threadSafeFury;
}
}
}
109 changes: 109 additions & 0 deletions java/fury-core/src/main/java/io/fury/ThreadLocalFury.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2023 The Fury authors
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fury;

import io.fury.memory.MemoryBuffer;
import io.fury.memory.MemoryUtils;
import io.fury.util.LoaderBinding;
import io.fury.util.LoaderBinding.StagingType;
import java.nio.ByteBuffer;
import java.util.function.Function;
import javax.annotation.concurrent.ThreadSafe;

/**
* A thread safe serialization entrance for {@link Fury} by binding a {@link Fury} for every thread.
* Note that the thread shouldn't be created and destroyed frequently, otherwise the {@link Fury}
* will be created and destroyed frequently, which is slow.
*
* @author chaokunyang
*/
@ThreadSafe
public class ThreadLocalFury implements ThreadSafeFury {

private final ThreadLocal<MemoryBuffer> bufferLocal =
ThreadLocal.withInitial(() -> MemoryUtils.buffer(32));

private final ThreadLocal<LoaderBinding> bindingThreadLocal;

public ThreadLocalFury(Function<ClassLoader, Fury> furyFactory) {
bindingThreadLocal =
ThreadLocal.withInitial(
() -> {
LoaderBinding binding = new LoaderBinding(furyFactory);
binding.setClassLoader(Thread.currentThread().getContextClassLoader());
return binding;
});
// init and warm for current thread.
// Fury creation took about 1~2 ms, but first creation
// in a process load some classes which is not cheap.
bindingThreadLocal.get().get();
}

public <R> R execute(Function<Fury, R> action) {
Fury fury = bindingThreadLocal.get().get();
return action.apply(fury);
}

public byte[] serialize(Object obj) {
MemoryBuffer buffer = bufferLocal.get();
buffer.writerIndex(0);
bindingThreadLocal.get().get().serialize(buffer, obj);
return buffer.getBytes(0, buffer.writerIndex());
}

public MemoryBuffer serialize(MemoryBuffer buffer, Object obj) {
return bindingThreadLocal.get().get().serialize(buffer, obj);
}

public Object deserialize(byte[] bytes) {
return bindingThreadLocal.get().get().deserialize(bytes);
}

public Object deserialize(long address, int size) {
return bindingThreadLocal.get().get().deserialize(address, size);
}

public Object deserialize(MemoryBuffer buffer) {
return bindingThreadLocal.get().get().deserialize(buffer);
}

public Object deserialize(ByteBuffer byteBuffer) {
return bindingThreadLocal.get().get().deserialize(MemoryUtils.wrap(byteBuffer));
}

public void setClassLoader(ClassLoader classLoader) {
setClassLoader(classLoader, StagingType.SOFT_STAGING);
}

public void setClassLoader(ClassLoader classLoader, StagingType stagingType) {
bindingThreadLocal.get().setClassLoader(classLoader, stagingType);
}

public ClassLoader getClassLoader() {
return bindingThreadLocal.get().getClassLoader();
}

public void clearClassLoader(ClassLoader loader) {
bindingThreadLocal.get().clearClassLoader(loader);
}

public Fury getCurrentFury() {
return bindingThreadLocal.get().get();
}
}
89 changes: 89 additions & 0 deletions java/fury-core/src/main/java/io/fury/ThreadSafeFury.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2023 The Fury authors
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fury;

import io.fury.memory.MemoryBuffer;
import io.fury.util.LoaderBinding;
import java.nio.ByteBuffer;
import java.util.function.Function;

/**
* Thread safe serializer interface. {@link Fury} is not thread-safe, the implementation of this
* interface will be thread-safe. And support switch classloader dynamically.
*
* @author chaokunyang
*/
public interface ThreadSafeFury {

/**
* Provide a context to execution operations on {@link Fury} directly and return the executed
* result.
*/
<R> R execute(Function<Fury, R> action);

byte[] serialize(Object obj);

MemoryBuffer serialize(MemoryBuffer buffer, Object obj);

Object deserialize(byte[] bytes);

Object deserialize(long address, int size);

Object deserialize(MemoryBuffer buffer);

Object deserialize(ByteBuffer byteBuffer);

/**
* Set classLoader of serializer for current thread only.
*
* @see LoaderBinding#setClassLoader(ClassLoader)
*/
void setClassLoader(ClassLoader classLoader);

/**
* Set classLoader of serializer for current thread only.
*
* <p>If <code>staging</code> is true, a cached {@link Fury} instance will be returned if not
* null, and the previous classloader and associated {@link Fury} instance won't be gc unless
* {@link #clearClassLoader} is called explicitly. If false, and the passed <code>classLoader
* </code> is different, a new {@link Fury} instance will be created, previous classLoader and
* associated {@link Fury} instance will be cleared.
*
* @param classLoader {@link ClassLoader} for resolving unregistered class name to class
* @param stagingType Whether cache previous classloader and associated {@link Fury} instance.
* @see LoaderBinding#setClassLoader(ClassLoader, LoaderBinding.StagingType)
*/
void setClassLoader(ClassLoader classLoader, LoaderBinding.StagingType stagingType);

/** Returns classLoader of serializer for current thread. */
ClassLoader getClassLoader();

/**
* Clean up classloader set by {@link #setClassLoader(ClassLoader, LoaderBinding.StagingType)},
* <code>
* classLoader
* </code> won't be referenced by {@link Fury} after this call and can be gc if it's not
* referenced by other objects.
*
* @see LoaderBinding#clearClassLoader(ClassLoader)
*/
void clearClassLoader(ClassLoader loader);

Fury getCurrentFury();
}

0 comments on commit 7860e56

Please sign in to comment.