Skip to content

Commit

Permalink
Merge pull request #3 from Firobe/upgrade-to-13
Browse files Browse the repository at this point in the history
Prepare for OCaml 4.14
  • Loading branch information
mrmr1993 committed May 16, 2022
2 parents 672547b + 17dd4be commit e35ee20
Show file tree
Hide file tree
Showing 35 changed files with 468 additions and 213 deletions.
2 changes: 1 addition & 1 deletion LICENSE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License

Copyright (c) 2014--2019 Jane Street Group, LLC <opensource@janestreet.com>
Copyright (c) 2014--2020 Jane Street Group, LLC <opensource@janestreet.com>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
56 changes: 0 additions & 56 deletions README.org

This file was deleted.

Empty file added doc/dune
Empty file.
74 changes: 74 additions & 0 deletions doc/main.org
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#+TITLE: Rpc\_parallel

=Rpc_parallel= is a library that uses processes to achieve
parallelism. Because of the garbage collector and async locks,
thread-level parallelism in OCaml is not achievable.

The library works by spawning processes that start rpc servers. The
spawned process is running /proc/self/exe (i.e. the same executable as
the running process). Communication between a "master" and a "worker"
involves sending rpc queries and receiving rpc responses. The "worker"
already has the code to do computation because it is running the same
binary!

* Mental Model

- =Worker.t= identifies a worker rpc server
- =spawn= (=serve=) starts a worker rpc server in another process (the same
process)
- =client= connects to a worker rpc server
- =run= dispatches on a connection to a worker rpc server

* Top-level

It is highly recommended for =Rpc_parallel.start_app= and =Rpc_parallel.Make=
calls to be top-level. But the real requirements are:

1) The master's state is initialized before any calls to =spawn=. This will be
achieved either by =Rpc_parallel.start_app= or
=Rpc_parallel.Expert.start_master_server_exn=.

2) Spawned workers (runs of your executable with a certain environment variable
set) must start running as a worker. This will be achieved either by
=Rpc_parallel.start_app= or =Rpc_parallel.Expert.worker_command=.

3) Spawned workers must be able to find their function implementations when they
start running as a worker. These implementations are gathered on the
application of the =Rpc_parallel.Make= functor.

4) The worker implementations must be defined completely and in the same order,
regardless of master and worker code paths. This is necessary for the masters
and workers to agree on certain generated ids.

* Monitoring your workers

Uncaught exceptions in workers will always result in the worker
calling =Shutdown.shutdown=. The master can be notified of these
exceptions in multiple ways:

- If the exception occured in a function implementation =f= before =f= is
determined, the exception will be returned back to the caller. E.g. the caller
of =spawn= or =run= will get an =Error.t= describing the exception.

- If the exception occured after =f= is determined, =on_failure exn= will be
called (in =Monitor.current ()= at the time of =spawn=) in the spawning
process.

- If =redirect_stderr= specifies a file, the worker will also write its
exception to that file before shutting down.

* Dealing with long async cycles

Long async cycles can cause the connections to your workers to close.
If you are using =~shutdown_on:Disconnect= (which is recommended!),
then this connection closing will result in your worker shutting down.

You can bump the =max_message_size=, =heartbeat_config=, and
=handshake_timeout= settings that are used for all rpc communication.
These settings are determined by (in descending order of preference):

1) The environment variable =RPC_PARALLEL_RPC_SETTINGS= (see
=Rpc_settings= in =lib/rpc_parallel/src/parallel.ml= for how to
construct a value)
2) Arguments supplied to =start_app= or =Expert.start_master_server_exn=
3) The defaults supplied by the =Rpc= library
55 changes: 55 additions & 0 deletions doc/migrating-off-managed.org
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#+TITLE: Migrating off Rpc\_parallel.Managed

* What is Rpc\_parallel.Managed?

This module exposes a =Make= functor that is built on top of
=Rpc_parallel.Make=. The primary difference is that the interface
doesn't expose a =Connection.t=. Under the hood, =spawn= will spawn
a worker and make a connection to the worker. This connection is
stored in a hash table. When this connection is closed, the
connection is removed from the table. =run= looks for a cached
connection, reconnecting if there is none.

* Why you shouldn't use this

The semantics of the reconnect and error reporting are not
well-defined.

Regarding reconnect, the library will attempt to reconnect, but it
won't attempt to respawn. Unless your worker is intentionally
closing connections, it is most likely the case that a connection
closure is indicative of a problem that would require a respawn
(e.g. the worker actually exited).

Regarding error reporting, there is an exposed =on_failure= callback
that is passed through as an argument to =on_failure= for the
unmanaged worker. In addition, there is an
=on_connection_to_worker_closed= callback used to report when the
first connection is closed. Subsequent connection closures don't
trigger the callback. Some classes of errors might result in
=on_failure= and =on_connection_to_worker_closed= both being called
while others result in just one of them being called.

This module was created primarily for backwards compatibility with
code that used earlier versions of =Rpc_parallel=. New code should
use =Rpc_parallel.Make=.

* How to migrate

If your code is never reconnecting to a spawned worker, you can
safely use =Rpc_parallel.Make= and pass through
=~shutdown_on:Connection_closed= to =spawn=. This will give you back
a =Connection.t=, so it isn't even possible for you to reconnect.

If you don't know if you are reconnecting, you can add some logging
to the =on_error= callback that you supply to the managed worker's
=spawn= function. In general, if you aren't storing your =Worker.t=
anywhere and are immediately calling =run= after =spawn=, you are
almost certainly not reconnecting.

If you are relying on reconnect, we don't have a great alternative
right now. We recommend you think carefully about the semantics you
want for your reconnect logic and code something up. If there is
enough desire, we could add some library support. From looking
through our code base, very few users (if any!) of
=Rpc_parallel.Managed= actually reconnect.
2 changes: 1 addition & 1 deletion dune-project
Original file line number Diff line number Diff line change
@@ -1 +1 @@
(lang dune 1.5)
(lang dune 1.10)
2 changes: 1 addition & 1 deletion example/alternative_init.ml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ let main_command =
let%map (_connection : Worker.Connection.t) =
Worker.spawn_exn
~on_failure:Error.raise
~shutdown_on:Disconnect
~shutdown_on:Connection_closed
~connection_state_init_arg:()
~redirect_stdout:`Dev_null
~redirect_stderr:`Dev_null
Expand Down
2 changes: 1 addition & 1 deletion example/async_log.ml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ let main () =
let%bind worker =
Worker.spawn_exn
~on_failure:Error.raise
~shutdown_on:Heartbeater_timeout
~shutdown_on:Heartbeater_connection_timeout
~redirect_stdout:`Dev_null
~redirect_stderr:`Dev_null
()
Expand Down
2 changes: 1 addition & 1 deletion example/reverse_direct_pipe.ml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ let main () =
let%bind connections =
Array.init shards ~f:(fun id ->
Shard.spawn_exn
~shutdown_on:Disconnect
~shutdown_on:Connection_closed
~redirect_stdout:`Dev_null
~redirect_stderr:`Dev_null
~on_failure:Error.raise
Expand Down
2 changes: 1 addition & 1 deletion example/reverse_pipe.ml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ let main () =
let%bind connections =
Array.init shards ~f:(fun id ->
Shard.spawn_exn
~shutdown_on:Disconnect
~shutdown_on:Connection_closed
~redirect_stdout:`Dev_null
~redirect_stderr:`Dev_null
~on_failure:Error.raise
Expand Down
2 changes: 1 addition & 1 deletion example/rpc_direct_pipe.ml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ let main max log_dir () =
Sum_worker.spawn
~on_failure:Error.raise
?cd:log_dir
~shutdown_on:Disconnect
~shutdown_on:Connection_closed
~redirect_stdout
~redirect_stderr
~connection_state_init_arg:()
Expand Down
2 changes: 1 addition & 1 deletion example/spawn_in_foreground.ml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ end

let main () =
Worker.spawn_in_foreground
~shutdown_on:Disconnect
~shutdown_on:Connection_closed
~connection_state_init_arg:()
~on_failure:Error.raise
()
Expand Down
6 changes: 3 additions & 3 deletions example/stream_workers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ module Worker = struct
end

let handle_error worker err =
failwiths (sprintf "error in %s" worker) err Error.sexp_of_t
failwiths ~here:[%here] (sprintf "error in %s" worker) err Error.sexp_of_t
;;

let command =
Expand All @@ -160,7 +160,7 @@ let command =
(fun num_workers num_elements () ->
(* Spawn a stream worker *)
Stream_worker.spawn
~shutdown_on:Heartbeater_timeout
~shutdown_on:Heartbeater_connection_timeout
~redirect_stdout:`Dev_null
~redirect_stderr:`Dev_null
num_elements
Expand All @@ -169,7 +169,7 @@ let command =
(* Spawn workers and tell them about the stream worker *)
Deferred.Or_error.List.init num_workers ~f:(fun i ->
Worker.spawn
~shutdown_on:Disconnect
~shutdown_on:Connection_closed
~connection_state_init_arg:()
~redirect_stdout:`Dev_null
~redirect_stderr:`Dev_null
Expand Down
4 changes: 2 additions & 2 deletions example/worker_binprot.ml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ let command =
Command.Spec.(empty)
(fun () ->
Worker.spawn
~shutdown_on:Heartbeater_timeout
~shutdown_on:Heartbeater_connection_timeout
~redirect_stdout:`Dev_null
~redirect_stderr:`Dev_null
~on_failure:Error.raise
Expand All @@ -102,7 +102,7 @@ let command =
Worker.Connection.client worker ()
>>=? fun worker_conn ->
Dispatcher.spawn
~shutdown_on:Disconnect
~shutdown_on:Connection_closed
~redirect_stdout:`Dev_null
~redirect_stderr:`Dev_null
~on_failure:Error.raise
Expand Down
7 changes: 4 additions & 3 deletions example/workers_as_masters.ml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ module Primary_worker = struct
Deferred.List.init ~how:`Parallel num_workers ~f:(fun _i ->
let%map secondary_worker =
Secondary_worker.spawn_exn
~shutdown_on:Heartbeater_timeout
~shutdown_on:Heartbeater_connection_timeout
~redirect_stdout:`Dev_null
~redirect_stderr:`Dev_null
~on_failure:Error.raise
Expand All @@ -87,7 +87,8 @@ module Primary_worker = struct
let ping_impl ~worker_state:() ~conn_state:() () =
Deferred.List.map ~how:`Parallel (Bag.to_list workers) ~f:(fun (name, worker) ->
match%bind Secondary_worker.Connection.client worker () with
| Error e -> failwiths "failed connecting to worker" e [%sexp_of: Error.t]
| Error e ->
failwiths ~here:[%here] "failed connecting to worker" e [%sexp_of: Error.t]
| Ok conn ->
(match%map
Secondary_worker.Connection.run
Expand Down Expand Up @@ -131,7 +132,7 @@ let command =
(fun primary secondary () ->
Deferred.Or_error.List.init ~how:`Parallel primary ~f:(fun worker_id ->
Primary_worker.spawn
~shutdown_on:Disconnect
~shutdown_on:Connection_closed
~redirect_stdout:`Dev_null
~redirect_stderr:`Dev_null
~on_failure:Error.raise
Expand Down
3 changes: 2 additions & 1 deletion expect_test/dune
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
(library (name rpc_parallel_expect_test) (libraries async core rpc_parallel)
(library (name rpc_parallel_expect_test)
(libraries async core re rpc_parallel transaction core_kernel.uuid)
(preprocess (pps ppx_jane)))
Loading

0 comments on commit e35ee20

Please sign in to comment.