Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment in lock-free module invalidation #9639

Merged
merged 11 commits into from
Apr 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ public Future<BoxedUnit> executeAsynchronously(RuntimeContext ctx, ExecutionCont
return Future.apply(
() -> {
TruffleLogger logger = ctx.executionService().getLogger();
long writeCompilationLockTimestamp = ctx.locking().acquireWriteCompilationLock();
try {
logger.log(Level.FINE, "Invalidating modules, cancelling background jobs");
ctx.jobControlPlane().stopBackgroundJobs();
ctx.jobControlPlane().abortBackgroundJobs(DeserializeLibrarySuggestionsJob.class);

EnsoContext context = ctx.executionService().getContext();
context.getTopScope().getModules().forEach(module -> module.setIndexed(false));
context
.getTopScope()
.getModules()
.forEach(module -> ctx.state().suggestions().markIndexAsDirty(module));

context
.getPackageRepository()
Expand All @@ -47,19 +50,9 @@ public Future<BoxedUnit> executeAsynchronously(RuntimeContext ctx, ExecutionCont
.runBackground(new DeserializeLibrarySuggestionsJob(pkg.libraryName()));
return BoxedUnit.UNIT;
});

reply(new Runtime$Api$InvalidateModulesIndexResponse(), ctx);
} finally {
ctx.locking().releaseWriteCompilationLock();
logger.log(
Level.FINEST,
"Kept write compilation lock [{0}] for {1} milliseconds.",
new Object[] {
this.getClass().getSimpleName(),
System.currentTimeMillis() - writeCompilationLockTimestamp
});
reply(new Runtime$Api$InvalidateModulesIndexResponse(), ctx);
}

return BoxedUnit.UNIT;
},
ec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ class DeserializeLibrarySuggestionsCmd(
ctx: RuntimeContext,
ec: ExecutionContext
): Future[Unit] = {
ctx.jobProcessor.runBackground(
new DeserializeLibrarySuggestionsJob(request.libraryName)
)
Future.successful(())
Future {
ctx.jobProcessor.runBackground(
new DeserializeLibrarySuggestionsJob(request.libraryName)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ class RenameProjectCmd(
)

projectModules.foreach { module =>
Module.fromCompilerModule(module).setIndexed(false)
ctx.state.suggestions.markIndexAsDirty(
Module.fromCompilerModule(module)
)
ctx.endpoint.sendToClient(
Api.Response(
Api.SuggestionsDatabaseModuleUpdateNotification(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class CommandExecutionEngine(interpreterContext: InterpreterContext)
"Executing commands in a separate command pool"
)
interpreterContext.executionService.getContext
.newCachedThreadPool("command-pool", false)
.newCachedThreadPool("command-pool", 2, 10, 50, false)
}

private val sequentialExecutionService =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package org.enso.interpreter.instrument.execution

/** The state of the runtime.
*
* @param pendingEdits the storage for pending file edits
*/
final class ExecutionState(
/** The state of the runtime */
final class ExecutionState {

/** The storage for pending file edits */
val pendingEdits: PendingEdits = new PendingFileEdits()
)

val suggestions: ModuleIndexing = new ModuleIndexing()
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ final class JobExecutionEngine(
context.newCachedThreadPool(
"prioritized-job-pool",
2,
Integer.MAX_VALUE,
4,
JaroslavTulach marked this conversation as resolved.
Show resolved Hide resolved
50,
false
)

private val backgroundJobExecutor: ExecutorService =
context.newFixedThreadPool(1, "background-job-pool", false)
context.newCachedThreadPool("background-job-pool", 1, 4, 50, false)

private val runtimeContext =
RuntimeContext(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.enso.interpreter.instrument.execution

import org.enso.interpreter.runtime.Module

import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap};

/** Modules need to be indexed for building suggestions DB. */
final class ModuleIndexing {

object IndexState extends Enumeration {
val NotIndexed, NeedsIndexing, Indexed = Value
}

private val modules: ConcurrentMap[Module, IndexState.Value] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is now rewritten into a new ModuleIndexing Scala class that uses ConcurrentHashMap. Good encapsulation, but I still miss the answer to the fundamental question: What is the invariant to be consistent with?

new ConcurrentHashMap()

/** @return true, if module has been indexed. False otherwise.
*/
def isIndexed(module: Module): Boolean = {
modules.getOrDefault(module, IndexState.NotIndexed) == IndexState.Indexed;
}

/** Marks the module as fully indexed.
*
* Prior to indexing the module, it should be set as not index.
* If during the indexing operation, module is invalidated, it will have to be re-indexed until successful.
*/
def markAsIndexed(module: Module): Boolean = {
val v = modules.compute(
module,
{ case (_, v) =>
if (v == IndexState.NotIndexed) IndexState.Indexed else v
}
)
v == IndexState.Indexed
}

/** Marks the module as requiring indexing. */
def markIndexAsDirty(module: Module): Unit = {
modules.compute(module, { case _ => IndexState.NeedsIndexing });
}

/** Marks the module as not indexed */
def markAsNotIndexed(module: Module): Unit = {
modules.compute(module, { case _ => IndexState.NotIndexed })
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ final class AnalyzeModuleInScopeJob(
private def analyzeModuleInScope(module: Module)(implicit
ctx: RuntimeContext
): Unit = {
if (!module.isIndexed && module.getSource != null) {
if (!ctx.state.suggestions.isIndexed(module) && module.getSource != null) {
ctx.state.suggestions.markAsNotIndexed(module)
ctx.executionService.getLogger
.log(Level.FINEST, s"Analyzing module in scope ${module.getName}")
val moduleName = module.getName
Expand All @@ -60,8 +61,13 @@ final class AnalyzeModuleInScopeJob(
exports = ModuleExportsDiff.compute(prevExports, newExports),
updates = SuggestionDiff.compute(Tree.empty, newSuggestions)
)
sendModuleUpdate(notification)
module.setIndexed(true)
if (!ctx.state.suggestions.markAsIndexed(module)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I see continues to be the same. Line 44 deals with state.suggestions and line 64 does that as well. But meanwhile anything can happen. What if a new request to markAsNotIndexed comes in another thread?

there was a reason why logic was added to Analyze*Job to deal with this scenario

My experience with Enso code base and its readiness for concurrent execution doesn't support claims that advocate reason and/or logic. Even if they were present, it is questionable they were used in a sound way. Except using the most advanced concurrent building blocks, I only see attempts to avoid answers to the basic question: how that is supposed to work at all?

ctx.executionService.getLogger
.log(Level.FINEST, s"Analyzing module in scope ${module.getName}")
analyzeModuleInScope(module);
} else {
sendModuleUpdate(notification)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object AnalyzeModuleJob {
)(implicit ctx: RuntimeContext): Unit = {
val moduleName = module.getName
val compiler = ctx.executionService.getContext.getCompiler
if (module.isIndexed) {
if (ctx.state.suggestions.isIndexed(module)) {
ctx.executionService.getLogger
.log(Level.FINEST, s"Analyzing indexed module $moduleName")
val prevSuggestions =
Expand All @@ -74,6 +74,7 @@ object AnalyzeModuleJob {
)
sendModuleUpdate(notification)
} else {
ctx.state.suggestions.markAsNotIndexed(module)
ctx.executionService.getLogger
.log(Level.FINEST, s"Analyzing not-indexed module ${module.getName}")
val newSuggestions =
Expand All @@ -88,8 +89,11 @@ object AnalyzeModuleJob {
exports = ModuleExportsDiff.compute(prevExports, newExports),
updates = SuggestionDiff.compute(Tree.empty, newSuggestions)
)
sendModuleUpdate(notification)
module.setIndexed(true)
if (ctx.state.suggestions.markAsIndexed(module)) {
sendModuleUpdate(notification)
} else {
doAnalyzeModule(module, changeset)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ final class EnsureCompiledJob(
// Side-effect: ensures that module's source is correctly initialized.
module.getSource()
invalidateCaches(module, changeset)
if (module.isIndexed) {
if (ctx.state.suggestions.isIndexed(module)) {
ctx.jobProcessor.runBackground(
AnalyzeModuleJob(module, changeset)
)
Expand All @@ -145,7 +145,8 @@ final class EnsureCompiledJob(
private def ensureCompiledScope(modulesInScope: Iterable[Module])(implicit
ctx: RuntimeContext
): Iterable[CompilationStatus] = {
val notIndexedModulesInScope = modulesInScope.filter(!_.isIndexed)
val notIndexedModulesInScope =
modulesInScope.filter(m => !ctx.state.suggestions.isIndexed(m))
val (modulesToAnalyzeBuilder, compilationStatusesBuilder) =
notIndexedModulesInScope.foldLeft(
(Set.newBuilder[Module], Vector.newBuilder[CompilationStatus])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.oracle.truffle.api.TruffleLogger
import org.enso.compiler.core.Implicits.AsMetadata
import org.enso.compiler.core.ir.Function
import org.enso.compiler.core.ir.Name
import org.enso.compiler.core.ir.module.scope.definition
import org.enso.compiler.core.ir.module.scope.{definition, Definition}
import org.enso.compiler.pass.analyse.{
CachePreferenceAnalysis,
DataflowAnalysis
Expand All @@ -27,6 +27,7 @@ import org.enso.interpreter.runtime.control.ThreadInterruptedException
import org.enso.pkg.QualifiedName
import org.enso.polyglot.runtime.Runtime.Api

import java.util.UUID
import java.util.logging.Level
import scala.annotation.unused

Expand Down Expand Up @@ -536,26 +537,36 @@ object UpsertVisualizationJob {
visualizationExpression match {
case Api.VisualizationExpression.ModuleMethod(methodPointer, _) =>
module.getIr.bindings
.collect { case method: definition.Method =>
val methodReference = method.methodReference
val methodReferenceName = methodReference.methodName.name
val methodReferenceTypeOpt = methodReference.typePointer.map(_.name)

val externalIdOpt = method.body match {
case fun: Function => fun.body.getExternalId
case _ => method.getExternalId
}
externalIdOpt.filter { _ =>
methodReferenceName == methodPointer.name &&
methodReferenceTypeOpt.isEmpty
}
.collectFirst {
case ExternalIdOfMethod(externalId, methodReference)
if methodReference.methodName.name == methodPointer.name =>
externalId
}
.flatten
.headOption

case _: Api.VisualizationExpression.Text => None
}

private object ExternalIdOfMethod {
def unapply(d: Definition): Option[(UUID, Name.MethodReference)] = {
d match {
case method: definition.Method =>
val methodReference = method.methodReference
val methodReferenceTypeOpt = methodReference.typePointer.map(_.name)

Option
.when(methodReferenceTypeOpt.isEmpty)(
method.body match {
case fun: Function => fun.body.getExternalId
case _ => method.getExternalId
}
)
.flatten
.map((_, methodReference))
case _ =>
None
}
}
}

/** Update the caches. */
private def invalidateCaches(
visualization: Visualization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,15 +723,15 @@ public int getJobParallelism() {

/**
* @param name human-readable name of the pool
* @param min minimal number of threads kept-alive in the pool
* @param max maximal number of available threads
* @param maxQueueSize maximal number of pending tasks
* @param systemThreads use system threads or polyglot threads
* @return new execution service for this context
*/
public ExecutorService newCachedThreadPool(String name, boolean systemThreads) {
return threadExecutors.newCachedThreadPool(name, systemThreads);
}

public ExecutorService newCachedThreadPool(String name, int min, int max, boolean systemThreads) {
return threadExecutors.newCachedThreadPool(name, systemThreads, min, max);
public ExecutorService newCachedThreadPool(
String name, int min, int max, int maxQueueSize, boolean systemThreads) {
return threadExecutors.newCachedThreadPool(name, systemThreads, min, max, maxQueueSize);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public final class Module implements EnsoObject {
private final Map<Source, Module> allSources = new WeakHashMap<>();
private final Package<TruffleFile> pkg;
private CompilationStage compilationStage = CompilationStage.INITIAL;
private boolean isIndexed = false;
private org.enso.compiler.core.ir.Module ir;
private Map<UUID, IR> uuidsMap;
private QualifiedName name;
Expand Down Expand Up @@ -494,18 +493,6 @@ public void renameProject(String newName) {
this.name = name.renameProject(newName);
}

/**
* @return the indexed flag.
*/
public boolean isIndexed() {
return isIndexed;
}

/** Set the indexed flag. */
public void setIndexed(boolean indexed) {
isIndexed = indexed;
}

/**
* @return the source file of this module.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,15 @@ final class ThreadExecutors {
this.context = context;
}

ExecutorService newCachedThreadPool(String name, boolean systemThread) {
var s = Executors.newCachedThreadPool(new Factory(name, systemThread));
pools.put(s, name);
return s;
}

ExecutorService newCachedThreadPool(String name, boolean systemThread, int min, int max) {
assert min >= 0;
assert max <= Integer.MAX_VALUE;
ExecutorService newCachedThreadPool(
String name, boolean systemThread, int min, int max, int maxQueueSize) {
var s =
new ThreadPoolExecutor(
min,
max,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new LinkedBlockingQueue<>(maxQueueSize),
new Factory(name, systemThread));
pools.put(s, name);
return s;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ private scala.Option<SuggestionsCache.CachedSuggestions> deserializeSuggestionsI
return scala.Option.apply(loaded.get());
} else {
logSerializationManager(
Level.FINE, "Unable to load suggestions for library [{0}].", libraryName);
Level.WARNING, "Unable to load suggestions for library [{0}].", libraryName);
return scala.Option.empty();
}
}
Expand Down
Loading