Skip to content

Commit

Permalink
fix: use registry for runtime messaging (#121)
Browse files Browse the repository at this point in the history
This uses a registry to track the runtimes, so that if one
crashes and is restarted, it will register itself with the
registry and the LSP process doesn't need to track them itself.

This also streamlines the logic for waiting for the the runtimes
to be ready.
  • Loading branch information
mhanberg committed Jul 21, 2023
1 parent e9c317e commit 639493c
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 158 deletions.
260 changes: 128 additions & 132 deletions lib/next_ls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule NextLS do
:runtime_task_supervisor,
:dynamic_supervisor,
:extensions,
:extension_registry,
:registry,
:symbol_table
])

Expand All @@ -55,7 +55,8 @@ defmodule NextLS do
task_supervisor = Keyword.fetch!(args, :task_supervisor)
runtime_task_supervisor = Keyword.fetch!(args, :runtime_task_supervisor)
dynamic_supervisor = Keyword.fetch!(args, :dynamic_supervisor)
extension_registry = Keyword.fetch!(args, :extension_registry)

registry = Keyword.fetch!(args, :registry)
extensions = Keyword.get(args, :extensions, [NextLS.ElixirExtension])
cache = Keyword.fetch!(args, :cache)
symbol_table = Keyword.fetch!(args, :symbol_table)
Expand All @@ -72,9 +73,8 @@ defmodule NextLS do
task_supervisor: task_supervisor,
runtime_task_supervisor: runtime_task_supervisor,
dynamic_supervisor: dynamic_supervisor,
extension_registry: extension_registry,
registry: registry,
extensions: extensions,
runtime_tasks: nil,
ready: false,
client_capabilities: nil
)}
Expand Down Expand Up @@ -208,39 +208,44 @@ defmodule NextLS do
def handle_request(%TextDocumentFormatting{params: %{text_document: %{uri: uri}}}, lsp) do
document = lsp.assigns.documents[uri]

{_, %{runtime: runtime}} =
Enum.find(lsp.assigns.runtimes, fn {_name, %{uri: wuri}} -> String.starts_with?(uri, wuri) end)

with {:ok, {formatter, _}} <- Runtime.call(runtime, {Mix.Tasks.Format, :formatter_for_file, [".formatter.exs"]}),
{:ok, response} when is_binary(response) or is_list(response) <-
Runtime.call(runtime, {Kernel, :apply, [formatter, [Enum.join(document, "\n")]]}) do
{:reply,
[
%TextEdit{
new_text: IO.iodata_to_binary(response),
range: %Range{
start: %Position{line: 0, character: 0},
end: %Position{
line: length(document),
character: document |> List.last() |> String.length() |> Kernel.-(1) |> max(0)
}
}
}
], lsp}
else
{:error, :not_ready} ->
GenLSP.notify(lsp, %GenLSP.Notifications.WindowShowMessage{
params: %GenLSP.Structures.ShowMessageParams{
type: GenLSP.Enumerations.MessageType.info(),
message: "The NextLS runtime is still initializing!"
}
})

{:reply, nil, lsp}
[resp] =
dispatch(lsp.assigns.registry, :runtimes, fn entries ->
for {runtime, %{uri: wuri}} <- entries, String.starts_with?(uri, wuri) do
with {:ok, {formatter, _}} <-
Runtime.call(runtime, {Mix.Tasks.Format, :formatter_for_file, [".formatter.exs"]}),
{:ok, response} when is_binary(response) or is_list(response) <-
Runtime.call(runtime, {Kernel, :apply, [formatter, [Enum.join(document, "\n")]]}) do
{:reply,
[
%TextEdit{
new_text: IO.iodata_to_binary(response),
range: %Range{
start: %Position{line: 0, character: 0},
end: %Position{
line: length(document),
character: document |> List.last() |> String.length() |> Kernel.-(1) |> max(0)
}
}
}
], lsp}
else
{:error, :not_ready} ->
GenLSP.notify(lsp, %GenLSP.Notifications.WindowShowMessage{
params: %GenLSP.Structures.ShowMessageParams{
type: GenLSP.Enumerations.MessageType.info(),
message: "The NextLS runtime is still initializing!"
}
})

{:reply, nil, lsp}

_ ->
{:reply, nil, lsp}
end
end
end)

_ ->
{:reply, nil, lsp}
end
resp
end

def handle_request(%Shutdown{}, lsp) do
Expand All @@ -267,87 +272,79 @@ defmodule NextLS do
{:ok, _} =
DynamicSupervisor.start_child(
lsp.assigns.dynamic_supervisor,
{extension, cache: lsp.assigns.cache, registry: lsp.assigns.extension_registry, publisher: self()}
{extension, cache: lsp.assigns.cache, registry: lsp.assigns.registry, publisher: self()}
)
end

GenLSP.log(lsp, "[NextLS] Booting runtime...")

runtimes =
for %{uri: uri, name: name} <- lsp.assigns.workspace_folders do
token = token()
progress_start(lsp, token, "Initializing NextLS runtime for folder #{name}...")
GenLSP.log(lsp, "[NextLS] Booting runtimes...")

{:ok, runtime} =
DynamicSupervisor.start_child(
lsp.assigns.dynamic_supervisor,
{NextLS.Runtime,
task_supervisor: lsp.assigns.runtime_task_supervisor,
extension_registry: lsp.assigns.extension_registry,
working_dir: URI.parse(uri).path,
parent: self(),
logger: lsp.assigns.logger}
)
for %{uri: uri, name: name} <- lsp.assigns.workspace_folders do
token = token()
progress_start(lsp, token, "Initializing NextLS runtime for folder #{name}...")
parent = self()

Process.monitor(runtime)

{name,
%{uri: uri, runtime: runtime, refresh_ref: {token, "NextLS runtime for folder #{name} has initialized!"}}}
end

lsp = assign(lsp, runtimes: Map.new(runtimes))

tasks =
for {name, workspace} <- runtimes do
Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn ->
with false <- wait_until(fn -> NextLS.Runtime.ready?(workspace.runtime) end) do
GenLSP.error(lsp, "[NextLS] Failed to start runtime for folder #{name}")
raise "Failed to boot runtime"
end
{:ok, runtime} =
DynamicSupervisor.start_child(
lsp.assigns.dynamic_supervisor,
{NextLS.Runtime,
name: name,
task_supervisor: lsp.assigns.runtime_task_supervisor,
registry: lsp.assigns.registry,
working_dir: URI.parse(uri).path,
uri: uri,
parent: self(),
on_initialized: fn status ->
if status == :ready do
progress_end(lsp, token, "NextLS runtime for folder #{name} has initialized!")
GenLSP.log(lsp, "[NextLS] Runtime for folder #{name} is ready...")
send(parent, {:runtime_ready, name, self()})
else
progress_end(lsp, token)
GenLSP.error(lsp, "[NextLS] Runtime for folder #{name} failed to initialize")
end
end,
logger: lsp.assigns.logger}
)

GenLSP.log(lsp, "[NextLS] Runtime for folder #{name} is ready...")
ref = Process.monitor(runtime)

{name, :ready}
end)
end
Process.put(ref, name)

refresh_refs =
tasks |> Enum.zip_with(runtimes, fn task, {_name, runtime} -> {task.ref, runtime.refresh_ref} end) |> Map.new()
{name, %{uri: uri, runtime: runtime}}
end

{:noreply,
assign(lsp,
refresh_refs: Map.merge(lsp.assigns.refresh_refs, refresh_refs),
runtime_tasks: tasks
)}
{:noreply, lsp}
end

def handle_notification(%TextDocumentDidSave{}, %{assigns: %{ready: false}} = lsp) do
{:noreply, lsp}
end

# TODO: add some test cases for saving files in multiple workspaces
def handle_notification(
%TextDocumentDidSave{
params: %GenLSP.Structures.DidSaveTextDocumentParams{text: text, text_document: %{uri: uri}}
},
%{assigns: %{ready: true}} = lsp
) do
for task <- Task.Supervisor.children(lsp.assigns.task_supervisor),
task not in for(t <- lsp.assigns.runtime_tasks, do: t.pid) do
for task <- Task.Supervisor.children(lsp.assigns.task_supervisor) do
Process.exit(task, :kill)
end

token = token()
refresh_refs =
dispatch(lsp.assigns.registry, :runtimes, fn entries ->
for {pid, %{name: name, uri: wuri}} <- entries, String.starts_with?(uri, wuri), into: %{} do
token = token()
progress_start(lsp, token, "Compiling...")

progress_start(lsp, token, "Compiling...")
runtimes = Enum.to_list(lsp.assigns.runtimes)
task =
Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn ->
{name, Runtime.compile(pid)}
end)

tasks =
for {name, r} <- runtimes do
Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn -> {name, Runtime.compile(r.runtime)} end)
end

refresh_refs =
tasks |> Enum.zip_with(runtimes, fn task, {_name, runtime} -> {task.ref, runtime.refresh_ref} end) |> Map.new()
{task.ref, {token, "Compiled!"}}
end
end)

{:noreply,
lsp
Expand All @@ -363,8 +360,7 @@ defmodule NextLS do
%TextDocumentDidChange{params: %{text_document: %{uri: uri}, content_changes: [%{text: text}]}},
lsp
) do
for task <- Task.Supervisor.children(lsp.assigns.task_supervisor),
task not in for(t <- lsp.assigns.runtime_tasks, do: t.pid) do
for task <- Task.Supervisor.children(lsp.assigns.task_supervisor) do
Process.exit(task, :kill)
end

Expand Down Expand Up @@ -420,30 +416,27 @@ defmodule NextLS do
{:noreply, lsp}
end

def handle_info({ref, resp}, %{assigns: %{refresh_refs: refs}} = lsp) when is_map_key(refs, ref) do
Process.demonitor(ref, [:flush])
{{token, msg}, refs} = Map.pop(refs, ref)
def handle_info({:runtime_ready, name, runtime_pid}, lsp) do
token = token()
progress_start(lsp, token, "Compiling...")

progress_end(lsp, token, msg)
task =
Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn ->
{name, Runtime.compile(runtime_pid)}
end)

lsp =
case resp do
{name, :ready} ->
token = token()
progress_start(lsp, token, "Compiling...")
refresh_refs = Map.put(lsp.assigns.refresh_refs, task.ref, {token, "Compiled!"})

task =
Task.Supervisor.async_nolink(lsp.assigns.task_supervisor, fn ->
{name, Runtime.compile(lsp.assigns.runtimes[name].runtime)}
end)
{:noreply, assign(lsp, ready: true, refresh_refs: refresh_refs)}
end

assign(lsp, ready: true, refresh_refs: Map.put(refs, task.ref, {token, "Compiled!"}))
def handle_info({ref, _resp}, %{assigns: %{refresh_refs: refs}} = lsp) when is_map_key(refs, ref) do
Process.demonitor(ref, [:flush])
{{token, msg}, refs} = Map.pop(refs, ref)

_ ->
assign(lsp, refresh_refs: refs)
end
progress_end(lsp, token, msg)

{:noreply, lsp}
{:noreply, assign(lsp, refresh_refs: refs)}
end

def handle_info({:DOWN, ref, :process, _pid, _reason}, %{assigns: %{refresh_refs: refs}} = lsp)
Expand All @@ -455,35 +448,20 @@ defmodule NextLS do
{:noreply, assign(lsp, refresh_refs: refs)}
end

def handle_info({:DOWN, _ref, :process, runtime, _reason}, %{assigns: %{runtimes: runtimes}} = lsp) do
{name, _} = Enum.find(runtimes, fn {_name, %{runtime: r}} -> r == runtime end)
def handle_info({:DOWN, ref, :process, _runtime, _reason}, lsp) do
name = Process.get(ref)
Process.delete(ref)

GenLSP.error(lsp, "[NextLS] The runtime for #{name} has crashed")

{:noreply, assign(lsp, runtimes: Map.drop(runtimes, name))}
{:noreply, lsp}
end

def handle_info(message, lsp) do
GenLSP.log(lsp, "[NextLS] Unhanded message: #{inspect(message)}")
GenLSP.log(lsp, "[NextLS] Unhandled message: #{inspect(message)}")
{:noreply, lsp}
end

defp wait_until(cb) do
wait_until(120, cb)
end

defp wait_until(0, _cb) do
false
end

defp wait_until(n, cb) do
if cb.() do
true
else
Process.sleep(1000)
wait_until(n - 1, cb)
end
end

defp progress_start(lsp, token, msg) do
GenLSP.notify(lsp, %GenLSP.Notifications.DollarProgress{
params: %GenLSP.Structures.ProgressParams{
Expand Down Expand Up @@ -527,4 +505,22 @@ defmodule NextLS do

defp elixir_kind_to_lsp_kind(kind) when kind in [:def, :defp, :defmacro, :defmacrop],
do: GenLSP.Enumerations.SymbolKind.function()

# NOTE: this is only possible because the registry is not partitioned
# if it is partitioned, then the callback is called multiple times
# and this method of extracting the result doesn't really make sense
defp dispatch(registry, key, callback) do
ref = make_ref()
me = self()

Registry.dispatch(registry, key, fn entries ->
result = callback.(entries)

send(me, {ref, result})
end)

receive do
{^ref, result} -> result
end
end
end
2 changes: 1 addition & 1 deletion lib/next_ls/extensions/elixir_extension.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule NextLS.ElixirExtension do
registry = Keyword.fetch!(args, :registry)
publisher = Keyword.fetch!(args, :publisher)

Registry.register(registry, :extension, :elixir)
Registry.register(registry, :extensions, :elixir)

{:ok, %{cache: cache, registry: registry, publisher: publisher}}
end
Expand Down
4 changes: 2 additions & 2 deletions lib/next_ls/lsp_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ defmodule NextLS.LSPSupervisor do
{GenLSP.Buffer, buffer_opts},
{NextLS.DiagnosticCache, name: :diagnostic_cache},
{NextLS.SymbolTable, name: :symbol_table, path: hidden_folder},
{Registry, name: NextLS.ExtensionRegistry, keys: :duplicate},
{Registry, name: NextLS.Registry, keys: :duplicate},
{NextLS,
cache: :diagnostic_cache,
symbol_table: :symbol_table,
task_supervisor: NextLS.TaskSupervisor,
runtime_task_supervisor: :runtime_task_supervisor,
dynamic_supervisor: NextLS.DynamicSupervisor,
extension_registry: NextLS.ExtensionRegistry}
registry: NextLS.Registry}
]

Supervisor.init(children, strategy: :one_for_one)
Expand Down
Loading

0 comments on commit 639493c

Please sign in to comment.