Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CI] fix CI #439

Merged
merged 3 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/run_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ case $1 in
# check naming and others
mvn -T16 checkstyle:check
set +e
mvn -T16 test -pl '!fury-format,!fury-testsuite'
mvn -T16 test -pl '!fury-format,!fury-testsuite,!fury-benchmark'
testcode=$?
if [[ $testcode -ne 0 ]]; then
exit $testcode
Expand Down
20 changes: 10 additions & 10 deletions java/fury-core/src/main/java/io/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -1500,23 +1500,23 @@ public ThreadSafeFury buildThreadSafeFuryPool(int minPoolSize, int maxPoolSize)
* @return ThreadSafeFuryPool
*/
public ThreadSafeFury buildThreadSafeFuryPool(
int minPoolSize, int maxPoolSize, long expireTime, TimeUnit timeUnit) {
int minPoolSize, int maxPoolSize, long expireTime, TimeUnit timeUnit) {
if (minPoolSize < 0 || maxPoolSize < 0 || minPoolSize > maxPoolSize) {
throw new IllegalArgumentException(
String.format(
"thread safe fury pool's init pool size error, please check it, min:[%s], max:[%s]",
minPoolSize, maxPoolSize));
String.format(
"thread safe fury pool's init pool size error, please check it, min:[%s], max:[%s]",
minPoolSize, maxPoolSize));
}
finish();
ClassLoader loader = this.classLoader;
this.classLoader = null;
ThreadSafeFury threadSafeFury =
new ThreadPoolFury(
classLoader -> new Fury(FuryBuilder.this, classLoader),
minPoolSize,
maxPoolSize,
expireTime,
timeUnit);
new ThreadPoolFury(
classLoader -> new Fury(FuryBuilder.this, classLoader),
minPoolSize,
maxPoolSize,
expireTime,
timeUnit);
threadSafeFury.setClassLoader(loader);
return threadSafeFury;
}
Expand Down
155 changes: 86 additions & 69 deletions java/fury-core/src/main/java/io/fury/pool/ClassLoaderFuryPooled.java
Original file line number Diff line number Diff line change
@@ -1,99 +1,116 @@
/*
* Copyright 2023 The Fury authors
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fury.pool;

import io.fury.Fury;
import io.fury.util.LoggerFactory;
import org.slf4j.Logger;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.slf4j.Logger;

/** A thread-safe object pool of {@link Fury}. */
public class ClassLoaderFuryPooled {

private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderFuryPooled.class);
private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderFuryPooled.class);

private final Function<ClassLoader, Fury> furyFactory;
private final Function<ClassLoader, Fury> furyFactory;

private final ClassLoader classLoader;
private final ClassLoader classLoader;

/**
* idle Fury cache change. by : 1. getLoaderBind() 2. returnObject(LoaderBinding) 3.
* addObjAndWarp()
*/
private final Queue<Fury> idleCacheQueue;
/**
* idle Fury cache change. by : 1. getLoaderBind() 2. returnObject(LoaderBinding) 3.
* addObjAndWarp()
*/
private final Queue<Fury> idleCacheQueue;

/** active cache size's number change by : 1. getLoaderBind() 2. returnObject(LoaderBinding). */
private final AtomicInteger activeCacheNumber = new AtomicInteger(0);
/** active cache size's number change by : 1. getLoaderBind() 2. returnObject(LoaderBinding). */
private final AtomicInteger activeCacheNumber = new AtomicInteger(0);

/**
* Dynamic capacity expansion and contraction The user sets the maximum number of object pools.
* Math.max(maxPoolSize, CPU * 2)
*/
private final int maxPoolSize;
/**
* Dynamic capacity expansion and contraction The user sets the maximum number of object pools.
* Math.max(maxPoolSize, CPU * 2)
*/
private final int maxPoolSize;

private final Lock lock = new ReentrantLock();
private final Condition furyCondition = lock.newCondition();
private final Lock lock = new ReentrantLock();
private final Condition furyCondition = lock.newCondition();

public ClassLoaderFuryPooled(
ClassLoader classLoader,
Function<ClassLoader, Fury> furyFactory,
int minPoolSize,
int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
this.furyFactory = furyFactory;
this.classLoader = classLoader;
idleCacheQueue = new ConcurrentLinkedQueue<>();
while (idleCacheQueue.size() < minPoolSize) {
addFury();
}
public ClassLoaderFuryPooled(
ClassLoader classLoader,
Function<ClassLoader, Fury> furyFactory,
int minPoolSize,
int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
this.furyFactory = furyFactory;
this.classLoader = classLoader;
idleCacheQueue = new ConcurrentLinkedQueue<>();
while (idleCacheQueue.size() < minPoolSize) {
addFury();
}
}

public Fury getFury() {
try {
lock.lock();
Fury fury = idleCacheQueue.poll();
while (fury == null) {
if (activeCacheNumber.get() < maxPoolSize) {
addFury();
} else {
furyCondition.await();
}
fury = idleCacheQueue.poll();
if (fury == null) {
continue;
}
break;
}
activeCacheNumber.incrementAndGet();
return fury;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
lock.unlock();
public Fury getFury() {
try {
lock.lock();
Fury fury = idleCacheQueue.poll();
while (fury == null) {
if (activeCacheNumber.get() < maxPoolSize) {
addFury();
} else {
furyCondition.await();
}
}

public void returnFury(Fury fury) {
try {
lock.lock();
idleCacheQueue.add(fury);
activeCacheNumber.decrementAndGet();
furyCondition.signalAll();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
lock.unlock();
fury = idleCacheQueue.poll();
if (fury == null) {
continue;
}
break;
}
activeCacheNumber.incrementAndGet();
return fury;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}

private void addFury() {
Fury fury = furyFactory.apply(classLoader);
idleCacheQueue.add(fury);
public void returnFury(Fury fury) {
try {
lock.lock();
idleCacheQueue.add(fury);
activeCacheNumber.decrementAndGet();
furyCondition.signalAll();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
lock.unlock();
}
}

private void addFury() {
Fury fury = furyFactory.apply(classLoader);
idleCacheQueue.add(fury);
}
}
Loading