Skip to content

Improve performance of NewThreadWorker, disable search for setRemoveOnCancelPolicy() on Android API < 21 #3121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 86 additions & 18 deletions src/main/java/rx/internal/schedulers/NewThreadWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import rx.plugins.*;
import rx.subscriptions.*;

import static rx.internal.util.PlatformDependent.ANDROID_API_VERSION_IS_NOT_ANDROID;

/**
* @warn class description missing
*/
Expand All @@ -39,17 +41,25 @@ public class NewThreadWorker extends Scheduler.Worker implements Subscription {
/** Force the use of purge (true/false). */
private static final String PURGE_FORCE_KEY = "rx.scheduler.jdk6.purge-force";
private static final String PURGE_THREAD_PREFIX = "RxSchedulerPurge-";
/** Forces the use of purge even if setRemoveOnCancelPolicy is available. */
private static final boolean PURGE_FORCE;
private static final boolean SHOULD_TRY_ENABLE_CANCEL_POLICY;
/** The purge frequency in milliseconds. */
public static final int PURGE_FREQUENCY;
private static final ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> EXECUTORS;
private static final AtomicReference<ScheduledExecutorService> PURGE;
static {
EXECUTORS = new ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor>();
PURGE = new AtomicReference<ScheduledExecutorService>();
PURGE_FORCE = Boolean.getBoolean(PURGE_FORCE_KEY);
PURGE_FREQUENCY = Integer.getInteger(FREQUENCY_KEY, 1000);

// Forces the use of purge even if setRemoveOnCancelPolicy is available
final boolean purgeForce = Boolean.getBoolean(PURGE_FORCE_KEY);

final int androidApiVersion = PlatformDependent.getAndroidApiVersion();

// According to http://developer.android.com/reference/java/util/concurrent/ScheduledThreadPoolExecutor.html#setRemoveOnCancelPolicy(boolean)
// setRemoveOnCancelPolicy available since Android API 21
SHOULD_TRY_ENABLE_CANCEL_POLICY = !purgeForce
&& (androidApiVersion == ANDROID_API_VERSION_IS_NOT_ANDROID || androidApiVersion >= 21);
}
/**
* Registers the given executor service and starts the purge thread if not already started.
Expand Down Expand Up @@ -85,6 +95,7 @@ public void run() {
public static void deregisterExecutor(ScheduledExecutorService service) {
EXECUTORS.remove(service);
}

/** Purges each registered executor and eagerly evicts shutdown executors. */
static void purgeExecutors() {
try {
Expand All @@ -102,32 +113,89 @@ static void purgeExecutors() {
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
}
}

/**

/**
* Improves performance of {@link #tryEnableCancelPolicy(ScheduledExecutorService)}.
* Also, it works even for inheritance: {@link Method} of base class can be invoked on the instance of child class.
*/
private static volatile Object cachedSetRemoveOnCancelPolicyMethod;

/**
* Possible value of {@link #cachedSetRemoveOnCancelPolicyMethod} which means that cancel policy is not supported.
*/
private static final Object SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED = new Object();

/**
* Tries to enable the Java 7+ setRemoveOnCancelPolicy.
* <p>{@code public} visibility reason: called from other package(s) within RxJava.
* If the method returns false, the {@link #registerExecutor(ScheduledThreadPoolExecutor)} may
* be called to enable the backup option of purging the executors.
* @param exec the executor to call setRemoveOnCaneclPolicy if available.
* @param executor the executor to call setRemoveOnCaneclPolicy if available.
* @return true if the policy was successfully enabled
*/
public static boolean tryEnableCancelPolicy(ScheduledExecutorService exec) {
if (!PURGE_FORCE) {
for (Method m : exec.getClass().getMethods()) {
if (m.getName().equals("setRemoveOnCancelPolicy")
&& m.getParameterTypes().length == 1
&& m.getParameterTypes()[0] == Boolean.TYPE) {
try {
m.invoke(exec, true);
return true;
} catch (Exception ex) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(ex);
}
public static boolean tryEnableCancelPolicy(ScheduledExecutorService executor) {
if (SHOULD_TRY_ENABLE_CANCEL_POLICY) {
final boolean isInstanceOfScheduledThreadPoolExecutor = executor instanceof ScheduledThreadPoolExecutor;

final Method methodToCall;

if (isInstanceOfScheduledThreadPoolExecutor) {
final Object localSetRemoveOnCancelPolicyMethod = cachedSetRemoveOnCancelPolicyMethod;

if (localSetRemoveOnCancelPolicyMethod == SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED) {
return false;
}

if (localSetRemoveOnCancelPolicyMethod == null) {
Method method = findSetRemoveOnCancelPolicyMethod(executor);

cachedSetRemoveOnCancelPolicyMethod = method != null
? method
: SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED;

methodToCall = method;
} else {
methodToCall = (Method) localSetRemoveOnCancelPolicyMethod;
}
} else {
methodToCall = findSetRemoveOnCancelPolicyMethod(executor);
}

if (methodToCall != null) {
try {
methodToCall.invoke(executor, true);
return true;
} catch (Exception e) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
}
}
}

return false;
}

/**
* Tries to find {@code "setRemoveOnCancelPolicy(boolean)"} method in the class of passed executor.
*
* @param executor whose class will be used to search for required method.
* @return {@code "setRemoveOnCancelPolicy(boolean)"} {@link Method}
* or {@code null} if required {@link Method} was not found.
*/
static Method findSetRemoveOnCancelPolicyMethod(ScheduledExecutorService executor) {
// The reason for the loop is to avoid NoSuchMethodException being thrown on JDK 6
// which is more costly than looping through ~70 methods.
for (final Method method : executor.getClass().getMethods()) {
if (method.getName().equals("setRemoveOnCancelPolicy")) {
final Class<?>[] parameterTypes = method.getParameterTypes();

if (parameterTypes.length == 1 && parameterTypes[0] == Boolean.TYPE) {
return method;
}
}
}

return null;
}

/* package */
public NewThreadWorker(ThreadFactory threadFactory) {
Expand Down
46 changes: 35 additions & 11 deletions src/main/java/rx/internal/util/PlatformDependent.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,55 @@

/**
* Allow platform dependent logic such as checks for Android.
*
*
* Modeled after Netty with some code copy/pasted from: https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/internal/PlatformDependent.java
*/
public final class PlatformDependent {

private static final boolean IS_ANDROID = isAndroid0();
/**
* Possible value of {@link #getAndroidApiVersion()} which means that the current platform is not Android.
*/
public static final int ANDROID_API_VERSION_IS_NOT_ANDROID = 0;

private static final int ANDROID_API_VERSION = resolveAndroidApiVersion();

private static final boolean IS_ANDROID = ANDROID_API_VERSION != ANDROID_API_VERSION_IS_NOT_ANDROID;

/**
* Returns {@code true} if and only if the current platform is Android
* Returns {@code true} if and only if the current platform is Android.
*/
public static boolean isAndroid() {
return IS_ANDROID;
}

private static boolean isAndroid0() {
boolean android;
/**
* Returns version of Android API.
*
* @return version of Android API or {@link #ANDROID_API_VERSION_IS_NOT_ANDROID } if version
* can not be resolved or if current platform is not Android.
*/
public static int getAndroidApiVersion() {
return ANDROID_API_VERSION;
}

/**
* Resolves version of Android API.
*
* @return version of Android API or {@link #ANDROID_API_VERSION_IS_NOT_ANDROID} if version can not be resolved
* or if the current platform is not Android.
* @see <a href="http://developer.android.com/reference/android/os/Build.VERSION.html#SDK_INT">Documentation</a>
*/
private static int resolveAndroidApiVersion() {
try {
Class.forName("android.app.Application", false, getSystemClassLoader());
android = true;
return (Integer) Class
.forName("android.os.Build$VERSION", true, getSystemClassLoader())
.getField("SDK_INT")
.get(null);
} catch (Exception e) {
// Failed to load the class uniquely available in Android.
android = false;
// Can not resolve version of Android API, maybe current platform is not Android
// or API of resolving current Version of Android API has changed in some release of Android
return ANDROID_API_VERSION_IS_NOT_ANDROID;
}

return android;
}

/**
Expand Down
65 changes: 65 additions & 0 deletions src/test/java/rx/internal/schedulers/NewThreadWorkerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package rx.internal.schedulers;

import org.junit.Test;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.lang.reflect.Modifier.FINAL;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

public class NewThreadWorkerTest {

@Test
public void findSetRemoveOnCancelPolicyMethodShouldFindMethod() {
ScheduledExecutorService executor = spy(new ScheduledThreadPoolExecutor(1));
Method setRemoveOnCancelPolicyMethod = NewThreadWorker.findSetRemoveOnCancelPolicyMethod(executor);

assertNotNull(setRemoveOnCancelPolicyMethod);
assertEquals("setRemoveOnCancelPolicy", setRemoveOnCancelPolicyMethod.getName());
assertEquals(1, setRemoveOnCancelPolicyMethod.getParameterTypes().length);
assertEquals(Boolean.TYPE, setRemoveOnCancelPolicyMethod.getParameterTypes()[0]);
verifyZeroInteractions(executor);
}

@Test
public void findSetRemoveOnCancelPolicyMethodShouldNotFindMethod() {
ScheduledExecutorService executor = mock(ScheduledExecutorService.class);

Method setRemoveOnCancelPolicyMethod = NewThreadWorker.findSetRemoveOnCancelPolicyMethod(executor);
assertNull(setRemoveOnCancelPolicyMethod);
verifyZeroInteractions(executor);
}

private static abstract class ScheduledExecutorServiceWithSetRemoveOnCancelPolicy implements ScheduledExecutorService {
// Just declaration of required method to allow run tests on JDK 6
public void setRemoveOnCancelPolicy(@SuppressWarnings("UnusedParameters") boolean value) {}
}

@Test
public void tryEnableCancelPolicyShouldInvokeMethodOnExecutor() {
ScheduledExecutorServiceWithSetRemoveOnCancelPolicy executor
= mock(ScheduledExecutorServiceWithSetRemoveOnCancelPolicy.class);

boolean result = NewThreadWorker.tryEnableCancelPolicy(executor);

assertTrue(result);
verify(executor).setRemoveOnCancelPolicy(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fail on a Java 6 build but I'm not sure if people actually build RxJava from source on Java 6. What I would do is subclass ScheduledThreadPoolExecutor and add the setRemoveOnCancelPolicy(boolean) method. For Java 7+, this will count as a simple override.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice workaround!

verifyNoMoreInteractions(executor);
}

@Test
public void tryEnableCancelPolicyShouldNotInvokeMethodOnExecutor() {
// This executor does not have setRemoveOnCancelPolicy method
ScheduledExecutorService executor = mock(ScheduledExecutorService.class);

boolean result = NewThreadWorker.tryEnableCancelPolicy(executor);

assertFalse(result);
verifyZeroInteractions(executor);
}
}