Skip to content

Commit

Permalink
perf: increase task throughput in Android using thread pool executor (i…
Browse files Browse the repository at this point in the history
…nvertase#4981)

* implement thread pool executor service
* add getTransactionalExecutor method
* use transactional executor during write action
* transactional executor with identifier
* execute database event in serial
* execute firestore event in serial
* absctract task excutor
* do not re-excute rejected task while shutdown
* add documentation
* tests configuration
* disable identified executor when maximum pool size is zero
* update document
* Avoid race condition in executors
  • Loading branch information
kmsbernard committed Apr 4, 2021
1 parent ba41d00 commit a439ed7
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 24 deletions.
@@ -0,0 +1,121 @@
package io.invertase.firebase.common;

/*
* Copyright (c) 2016-present Invertase Limited & Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this library except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import io.invertase.firebase.common.ReactNativeFirebaseJSON;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.SynchronousQueue;

public class TaskExecutorService {
private static final String MAXIMUM_POOL_SIZE_KEY = "android_task_executor_maximum_pool_size";
private static final String KEEP_ALIVE_SECONDS_KEY = "android_task_executor_keep_alive_seconds";

private final String name;
private final int maximumPoolSize;
private final int keepAliveSeconds;
private static Map<String, ExecutorService> executors = new HashMap<>();

TaskExecutorService(String name) {
this.name = name;
ReactNativeFirebaseJSON json = ReactNativeFirebaseJSON.getSharedInstance();
this.maximumPoolSize = json.getIntValue(MAXIMUM_POOL_SIZE_KEY, 1);
this.keepAliveSeconds = json.getIntValue(KEEP_ALIVE_SECONDS_KEY, 3);
}

public ExecutorService getExecutor() {
boolean isTransactional = maximumPoolSize <= 1;
return getExecutor(isTransactional, "");
}

public ExecutorService getTransactionalExecutor() {
return getExecutor(true, "");
}

public ExecutorService getTransactionalExecutor(String identifier) {
String executorIdentifier = maximumPoolSize != 0 ? identifier : "";
return getExecutor(true, executorIdentifier);
}

public ExecutorService getExecutor(boolean isTransactional, String identifier) {
String executorName = getExecutorName(isTransactional, identifier);
synchronized(executors) {
ExecutorService existingExecutor = executors.get(executorName);
if (existingExecutor == null) {
ExecutorService newExecutor = getNewExecutor(isTransactional);
executors.put(executorName, newExecutor);
return newExecutor;
}
return existingExecutor;
}
}

private ExecutorService getNewExecutor(boolean isTransactional) {
if (isTransactional == true) {
return Executors.newSingleThreadExecutor();
} else {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, maximumPoolSize, keepAliveSeconds, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
threadPoolExecutor.setRejectedExecutionHandler(executeInFallback);
return threadPoolExecutor;
}
}

private RejectedExecutionHandler executeInFallback = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown() || executor.isTerminated() || executor.isTerminating()) {
return;
}
ExecutorService fallbackExecutor = getTransactionalExecutor();
fallbackExecutor.execute(r);
};
};

public String getExecutorName(boolean isTransactional, String identifier) {
if (isTransactional == true) {
return name + "TransactionalExecutor" + identifier;
}
return name + "Executor" + identifier;
}

public void shutdown() {
Set<String> existingExecutorNames = executors.keySet();
existingExecutorNames.removeIf((executorName) -> {
return executorName.startsWith(name) == false;
});
existingExecutorNames.forEach((executorName) -> {
removeExecutor(executorName);
});
}

public void removeExecutor(String executorName) {
synchronized(executors) {
ExecutorService existingExecutor = executors.get(executorName);
if (existingExecutor != null) {
existingExecutor.shutdownNow();
executors.remove(executorName);
}
}
}
}
Expand Up @@ -18,23 +18,24 @@
*/

import android.content.Context;
import io.invertase.firebase.common.TaskExecutorService;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.OverridingMethodsMustInvokeSuper;

public class UniversalFirebaseModule {
private static Map<String, ExecutorService> executors = new HashMap<>();
private final TaskExecutorService executorService;

private final Context context;
private final String serviceName;

protected UniversalFirebaseModule(Context context, String serviceName) {
this.context = context;
this.serviceName = serviceName;
this.executorService = new TaskExecutorService(getName());
}

public Context getContext() {
Expand All @@ -46,11 +47,7 @@ public Context getApplicationContext() {
}

protected ExecutorService getExecutor() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) return existingSingleThreadExecutor;
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
executors.put(getName(), newSingleThreadExecutor);
return newSingleThreadExecutor;
return executorService.getExecutor();
}

public String getName() {
Expand All @@ -59,11 +56,7 @@ public String getName() {

@OverridingMethodsMustInvokeSuper
public void onTearDown() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) {
existingSingleThreadExecutor.shutdownNow();
executors.remove(getName());
}
executorService.shutdown();
}

public Map<String, Object> getConstants() {
Expand Down
Expand Up @@ -53,6 +53,11 @@ public boolean getBooleanValue(String key, boolean defaultValue) {
return jsonObject.optBoolean(key, defaultValue);
}

public int getIntValue(String key, int defaultValue) {
if (jsonObject == null) return defaultValue;
return jsonObject.optInt(key, defaultValue);
}

public long getLongValue(String key, long defaultValue) {
if (jsonObject == null) return defaultValue;
return jsonObject.optLong(key, defaultValue);
Expand Down
Expand Up @@ -21,15 +21,16 @@
import android.content.Context;
import com.facebook.react.bridge.*;
import io.invertase.firebase.interfaces.ContextProvider;
import io.invertase.firebase.common.TaskExecutorService;

import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ReactNativeFirebaseModule extends ReactContextBaseJavaModule implements ContextProvider {
private static Map<String, ExecutorService> executors = new HashMap<>();
private final TaskExecutorService executorService;

private String moduleName;

public ReactNativeFirebaseModule(
Expand All @@ -38,6 +39,7 @@ public ReactNativeFirebaseModule(
) {
super(reactContext);
this.moduleName = moduleName;
this.executorService = new TaskExecutorService(getName());
}

public static void rejectPromiseWithExceptionMap(Promise promise, Exception exception) {
Expand Down Expand Up @@ -74,20 +76,25 @@ public ReactContext getContext() {
}

public ExecutorService getExecutor() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) return existingSingleThreadExecutor;
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
executors.put(getName(), newSingleThreadExecutor);
return newSingleThreadExecutor;
return executorService.getExecutor();
}

public ExecutorService getTransactionalExecutor() {
return executorService.getTransactionalExecutor();
}

public ExecutorService getTransactionalExecutor(String identifier) {
return executorService.getTransactionalExecutor(identifier);
}

@Override
public void onCatalystInstanceDestroy() {
ExecutorService existingSingleThreadExecutor = executors.get(getName());
if (existingSingleThreadExecutor != null) {
existingSingleThreadExecutor.shutdownNow();
executors.remove(getName());
}
executorService.shutdown();
}

public void removeEventListeningExecutor(String identifier) {
String executorName = executorService.getExecutorName(true, identifier);
executorService.removeExecutor(executorName);
}

public Context getApplicationContext() {
Expand Down
8 changes: 8 additions & 0 deletions firebase-schema.json
Expand Up @@ -65,6 +65,14 @@
"perf_auto_collection_enabled": {
"description": "Disable or enable auto collection of performance monitoring data collection.\n This is useful for opt-in-first data flows, for example when dealing with GDPR compliance.\nThis can be overridden in JavaScript.",
"type": "boolean"
},
"android_task_executor_maximum_pool_size": {
"description": "Maximum pool size of ThreadPoolExecutor used by RNFirebase for Android. Defaults to `1`.\n Larger values typically improve performance when executing large numbers of asynchronous tasks, e.g. Firestore queries.",
"type": "number"
},
"android_task_executor_keep_alive_seconds": {
"description": "Keep-alive time of ThreadPoolExecutor used by RNFirebase for Android, in seconds. Defaults to `3`.\n Excess threads in the pool executor will be terminated if they have been idle for more than the keep-alive time.",
"type": "number"
}
}
}
Expand Down

0 comments on commit a439ed7

Please sign in to comment.