Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote series and dataframes and distributed GC #932

Merged
merged 21 commits into from
Jun 30, 2024
Merged

Remote series and dataframes and distributed GC #932

merged 21 commits into from
Jun 30, 2024

Conversation

josevalim
Copy link
Member

@josevalim josevalim commented Jun 29, 2024

Automatically transfer data between nodes for remote series
and dataframes and perform distributed garbage collection.

The functions in Explorer.DataFrame and Explorer.Series
will automatically move operations on remote dataframes to
the nodes they belong to. This module provides additional
conveniences for manual placement.

Implementation details

There is a new module called Explorer.Remote.
In order to understand what it does, we need
to understand the challenges in working with remote series
and dataframes.

Series and dataframes are actually NIF resources: they are
pointers to blobs of memory operated by low-level libraries.
Those are represented in Erlang/Elixir as references (the
same as the one returned by make_ref/0). Once the reference
is garbage collected (based on refcounting), those NIF
resources are garbage collected and the memory is reclaimed.

When using Distributed Erlang, you may write this code:

remote_series = :erpc.call(node, Explorer.Series, :from_list, [[1, 2, 3]])

However, the code above will not work, because the series
will be allocated in the remote node and the remote node
won't hold a reference to said series! This means the series
is garbage collected and if we attempt to read it later on,
from the caller node, it will no longer exist. Therefore,
we must explicitly place these resources in remote nodes
by spawning processes to hold these refernces. That's what
the place/2 function in this module does.

We also need to guarantee these resources are not kept
forever by these remote nodes, so place/2 creates a
local NIF resource that notifies the remote resources
they have been GCed, effectively implementing a remote
garbage collector.

TODO

  • Make collect in dataframe transfer to the current node
  • Add collect to series
  • Add compute to dataframe
  • Handle dataframes (remove Shared.apply_impl)
  • Handle lazy series
  • Add node option to creation functions

Copy link
Contributor

@billylanchantin billylanchantin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool stuff! 🤯

So far I've only had one question. I'll probably have more after I read through again.

Comment on lines +45 to +51
Receives a data structure and traverses it looking
for remote dataframes and series.

If any is found, it spawns a process on the remote node
and sets up a distributed garbage collector. This function
only traverses maps, lists, and tuples, it does not support
arbitrary structs (such as map sets).
Copy link
Contributor

@billylanchantin billylanchantin Jun 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do a traversal? If place accepted only series and dataframes, I think the API would be simpler.

Are we worried the following is expensive or inconvenient?

resource_map = %{"foo" => resource1, "bar" => resource2}
pids_map = Map.new(resource_map, fn {k, v} -> {k, place(v)} end)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two reasons why we do a traversal:

  1. Convenience: it is easier if the rest of the Explorer Series/DataFrame API do not have to care about explicitly annotating what needs to be placed.

  2. FLAME support: we want to integrate this with FLAME, allowing developers to run custom code within a FLAME and that may return series/dataframes which are arbitrarily nested.

@josevalim josevalim merged commit 067685f into main Jun 30, 2024
4 checks passed
@josevalim josevalim deleted the jv-remote branch June 30, 2024 19:52
@jonatanklosko
Copy link
Contributor

Amazing!! 🔥🐈‍⬛

{:stop, reason, state}
end

def handle_info({:DOWN, _, _, pid, _}, state) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we monitor on hold to receive this message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should! I will try to add a test.

noreply_or_stop(%{state | pids: pids, refs: refs})
end

def handle_info({:DOWN, owner_ref, _, _, reason}, %{owner_ref: owner_ref} = state) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the idea is to allow multiple nodes to call hold, why do we terminate eagerly when the owner goes down, given that we already do cleanup in the other :DOWN chandler?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to do this if there are no pids or no refs, I will change accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but it should be enough to monitor on init and rely on the other clause, which wouldn't pop anything and would cause stop, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(the difference being, that if the initial node goes down and there are other nodes referencing, we keep the holder alive)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants