Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment in lock-free module invalidation #9639

Merged
merged 11 commits into from
Apr 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ public Future<BoxedUnit> executeAsynchronously(RuntimeContext ctx, ExecutionCont
return Future.apply(
() -> {
TruffleLogger logger = ctx.executionService().getLogger();
long writeCompilationLockTimestamp = ctx.locking().acquireWriteCompilationLock();
try {
logger.log(Level.FINE, "Invalidating modules, cancelling background jobs");
ctx.jobControlPlane().stopBackgroundJobs();
ctx.jobControlPlane().abortBackgroundJobs(DeserializeLibrarySuggestionsJob.class);

EnsoContext context = ctx.executionService().getContext();
context.getTopScope().getModules().forEach(module -> module.setIndexed(false));
context
.getTopScope()
.getModules()
.forEach(module -> ctx.state().suggestions().markIndexAsDirty(module));

context
.getPackageRepository()
Expand All @@ -47,19 +50,9 @@ public Future<BoxedUnit> executeAsynchronously(RuntimeContext ctx, ExecutionCont
.runBackground(new DeserializeLibrarySuggestionsJob(pkg.libraryName()));
return BoxedUnit.UNIT;
});

reply(new Runtime$Api$InvalidateModulesIndexResponse(), ctx);
} finally {
ctx.locking().releaseWriteCompilationLock();
logger.log(
Level.FINEST,
"Kept write compilation lock [{0}] for {1} milliseconds.",
new Object[] {
this.getClass().getSimpleName(),
System.currentTimeMillis() - writeCompilationLockTimestamp
});
reply(new Runtime$Api$InvalidateModulesIndexResponse(), ctx);
}

return BoxedUnit.UNIT;
},
ec);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.enso.interpreter.instrument.execution;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.enso.compiler.core.IR;
import org.enso.interpreter.runtime.Module;

public final class ModuleIndexing {

/**
* State of indexing encapsulating for a given IR
*
* @param isIndexed true, if IR has been already indexed. False otherwise
* @param ir IR of a module that has been/needs to be indexed
*/
public record IndexState(boolean isIndexed, IR ir) {
private IndexState toIndexed() {
assert !isIndexed;
return new IndexState(true, ir);
}

private IndexState withIr(IR ir) {
assert isIndexed;
return new IndexState(true, ir);
}
}

private final ConcurrentMap<Module, IndexState> modules;

private ModuleIndexing() {
this.modules = new ConcurrentHashMap<>();
}

public static ModuleIndexing createInstance() {
return new ModuleIndexing();
hubertp marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @return true, if module has been isIndexed. False otherwise.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return value isn't true anymore.

*/
public IndexState find(Module module) {
return modules.get(module);
}

/**
* Get index state for a module or assigns a new one.
*
* @param module module for which lookup is performed
* @param ir IR for which index is calculated, if new
* @return index state assigned to the module, or a new one if absent
*/
public IndexState getOrCreateFresh(Module module, IR ir) {
return modules.computeIfAbsent(module, m -> new IndexState(false, ir));
}

/**
* Attempts to update the index state for a module. If the provided state does not match the one
* currently assigned to the module, no update is performed.
*
* @param state reference index state to be updated
* @return true if the operation of updating the state was successful, false if the reference
* state was not up-to-date.
*/
public boolean markAsIndexed(Module module, IndexState state) {
AtomicBoolean updated = new AtomicBoolean(false);
modules.compute(
module,
(k, v) -> {
if (v == state) {
updated.set(true);
return state.toIndexed();
} else {
return v;
}
});
return updated.get();
}

/**
* Attempts to update the index state for a module with a given IR. If the provided state does not
* match the one currently assigned to the module, no update is performed.
*
* @param state reference index state to be updated
* @param ir IR for which the index has been calculated
* @return true if the operation of updating the state was successful, false if the reference
* state was not up-to-date.
*/
public boolean updateState(Module module, IndexState state, IR ir) {
AtomicBoolean updated = new AtomicBoolean(false);
modules.compute(
module,
(k, v) -> {
if (v == state) {
updated.set(true);
return state.withIr(ir);
} else {
return v;
}
});
return updated.get();
}

/** Clear index state for a provided module. */
public void markIndexAsDirty(Module module) {
modules.compute(module, (k, v) -> null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ class DeserializeLibrarySuggestionsCmd(
ctx: RuntimeContext,
ec: ExecutionContext
): Future[Unit] = {
ctx.jobProcessor.runBackground(
new DeserializeLibrarySuggestionsJob(request.libraryName)
)
Future.successful(())
Future {
ctx.jobProcessor.runBackground(
new DeserializeLibrarySuggestionsJob(request.libraryName)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ class RenameProjectCmd(
)

projectModules.foreach { module =>
Module.fromCompilerModule(module).setIndexed(false)
ctx.state.suggestions.markIndexAsDirty(
Module.fromCompilerModule(module)
)
ctx.endpoint.sendToClient(
Api.Response(
Api.SuggestionsDatabaseModuleUpdateNotification(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class CommandExecutionEngine(interpreterContext: InterpreterContext)
"Executing commands in a separate command pool"
)
interpreterContext.executionService.getContext
.newCachedThreadPool("command-pool", false)
.newCachedThreadPool("command-pool", 2, 10, 50, false)
}

private val sequentialExecutionService =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package org.enso.interpreter.instrument.execution

/** The state of the runtime.
*
* @param pendingEdits the storage for pending file edits
*/
final class ExecutionState(
/** The state of the runtime */
final class ExecutionState {

/** The storage for pending file edits */
val pendingEdits: PendingEdits = new PendingFileEdits()
)

val suggestions: ModuleIndexing = ModuleIndexing.createInstance()
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ final class JobExecutionEngine(
context.newCachedThreadPool(
"prioritized-job-pool",
2,
Integer.MAX_VALUE,
4,
JaroslavTulach marked this conversation as resolved.
Show resolved Hide resolved
50,
false
)

private val backgroundJobExecutor: ExecutorService =
context.newFixedThreadPool(1, "background-job-pool", false)
context.newCachedThreadPool("background-job-pool", 1, 4, 50, false)

private val runtimeContext =
RuntimeContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.enso.compiler.context.{
SuggestionBuilder,
SuggestionDiff
}
import org.enso.interpreter.instrument.execution.ModuleIndexing.IndexState
import org.enso.interpreter.instrument.execution.RuntimeContext
import org.enso.interpreter.runtime.Module
import org.enso.polyglot.data.Tree
Expand All @@ -15,7 +16,7 @@ import org.enso.polyglot.{ModuleExports, Suggestion}
import java.util.logging.Level

final class AnalyzeModuleInScopeJob(
modules: Iterable[Module]
modules: Iterable[(Module, IndexState, Boolean)]
) extends BackgroundJob[Unit](AnalyzeModuleInScopeJob.Priority) {

private val exportsBuilder = new ExportsBuilder
Expand All @@ -27,7 +28,7 @@ final class AnalyzeModuleInScopeJob(
// disable the suggestion updates and reduce the number of messages that
// runtime sends.
if (ctx.executionService.getContext.isProjectSuggestionsEnabled) {
modules.foreach(analyzeModuleInScope)
modules.foreach((analyzeModuleInScope _).tupled)
ctx.endpoint.sendToClient(
Api.Response(Api.AnalyzeModuleInScopeJobFinished())
)
Expand All @@ -37,31 +38,43 @@ final class AnalyzeModuleInScopeJob(
override def toString: String =
s"AnalyzeModuleInScopeJob($modules)"

private def analyzeModuleInScope(module: Module)(implicit
private def analyzeModuleInScope(
module: Module,
state: IndexState,
hasSource: Boolean
)(implicit
ctx: RuntimeContext
): Unit = {
if (!module.isIndexed && module.getSource != null) {
if (!state.isIndexed && hasSource) {
ctx.executionService.getLogger
.log(Level.FINEST, s"Analyzing module in scope ${module.getName}")
.log(Level.FINEST, s"Analyzing module in scope {0}", module.getName)
val moduleName = module.getName
val newSuggestions =
SuggestionBuilder(
module.asCompilerModule(),
ctx.executionService.getContext.getCompiler
)
.build(moduleName, module.getIr)
.build(moduleName, state.ir)
.filter(Suggestion.isGlobal)
val prevExports = ModuleExports(moduleName.toString, Set())
val newExports = exportsBuilder.build(module.getName, module.getIr)
val newExports = exportsBuilder.build(module.getName, state.ir)
val notification = Api.SuggestionsDatabaseModuleUpdateNotification(
module = moduleName.toString,
actions =
Vector(Api.SuggestionsDatabaseAction.Clean(moduleName.toString)),
exports = ModuleExportsDiff.compute(prevExports, newExports),
updates = SuggestionDiff.compute(Tree.empty, newSuggestions)
)
sendModuleUpdate(notification)
module.setIndexed(true)
if (ctx.state.suggestions.markAsIndexed(module, state)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, this looks race condition free.

sendModuleUpdate(notification)
} else {
ctx.executionService.getLogger
.log(
Level.FINEST,
s"Calculated index for module in scope {0} is not up-to-date. Discarding",
module.getName
)
}
}
}

Expand Down Expand Up @@ -89,7 +102,9 @@ object AnalyzeModuleInScopeJob {
* @param modules the list of modules to analyze
* @return the [[AnalyzeModuleInScopeJob]]
*/
def apply(modules: Iterable[Module]): AnalyzeModuleInScopeJob =
def apply(
modules: Iterable[(Module, IndexState, Boolean)]
): AnalyzeModuleInScopeJob =
new AnalyzeModuleInScopeJob(modules)

private val Priority = 11
Expand Down
Loading
Loading