diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..f0d5d10 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,10 @@ +root = true + +[*] +charset = utf-8 + +indent_style = tab +indent_size = 4 + +end_of_line = lf +insert_final_newline = true diff --git a/config/config.exs b/config/config.exs new file mode 100644 index 0000000..786ffc0 --- /dev/null +++ b/config/config.exs @@ -0,0 +1,30 @@ +# This file is responsible for configuring your application +# and its dependencies with the aid of the Mix.Config module. +use Mix.Config + +# This configuration is loaded before any dependency and is restricted +# to this project. If another project depends on this project, this +# file won't be loaded nor affect the parent project. For this reason, +# if you want to provide default values for your application for +# 3rd-party users, it should be done in your "mix.exs" file. + +# You can configure for your application as: +# +# config :distributed, key: :value +# +# And access this configuration in your application as: +# +# Application.get_env(:distributed, :key) +# +# Or configure a 3rd-party app: +# +# config :logger, level: :info +# + +# It is also possible to import configuration files, relative to this +# directory. For example, you can emulate configuration per environment +# by uncommenting the line below and defining dev.exs, test.exs and such. +# Configuration from the imported file will override the ones defined +# here (which is why it is important to import them last). +# +# import_config "#{Mix.env}.exs" diff --git a/lib/distributed.ex b/lib/distributed.ex new file mode 100644 index 0000000..de93a59 --- /dev/null +++ b/lib/distributed.ex @@ -0,0 +1,98 @@ +defmodule Distributed do + @moduledoc """ + Make your systems distributed, replicated, scaled well, easily. + + [![Hex Version](https://img.shields.io/hexpm/v/distributed.svg?style=flat-square)](https://hex.pm/packages/distributed) [![Docs](https://img.shields.io/badge/api-docs-orange.svg?style=flat-square)](https://hexdocs.pm/distributed) [![Hex downloads](https://img.shields.io/hexpm/dt/distributed.svg?style=flat-square)](https://hex.pm/packages/distributed) [![GitHub](https://img.shields.io/badge/vcs-GitHub-blue.svg?style=flat-square)](https://github.com/ertgl/distributed) [![MIT License](https://img.shields.io/hexpm/l/distributed.svg?style=flat-square)](LICENSE.txt) + --- + + ### Tutorial + + This is an example of a replicated `GenServer`. + + defmodule Storage.KV do + + use GenServer + + def start_link() do + GenServer.start_link(__MODULE__, [initial_state: %{}], name: __MODULE__.process_id()) + end + + def init(opts \\\\ []) do + {:ok, Keyword.get(opts, :initial_state, %{})} + end + + def process_id() do + Storage.KV + end + + def handle_cast({:set, key, value}, state) do + {:noreply, Map.put(state, key, value)} + end + + def handle_call({:get, key, default}, _from, state) do + {:reply, Map.get(state, key, default), state} + end + + def handle_call({:has, key}, _from, state) do + {:reply, Map.has_key?(state, key), state} + end + + def handle_call({:pop, key, default}, _from, state) do + {value, new_state} = Map.pop(state, key, default) + {:reply, value, new_state} + end + + def get(key, default \\\\ nil) do + Distributed.Scaler.GenServer.call(__MODULE__.process_id(), {:get, key, default}) + end + + def set(key, value) do + Distributed.Replicator.GenServer.cast(__MODULE__.process_id(), {:set, key, value}) + |> List.first() + end + + def has?(key) do + Distributed.Scaler.GenServer.call(__MODULE__.process_id(), {:has, key}) + end + + def pop(key, default \\\\ nil) do + Distributed.Replicator.GenServer.call(__MODULE__.process_id(), {:pop, key, default}) + |> List.first() + end + + end + + You can see the example as a small project on [GitHub](https://github.com/ertgl/storage). + + + ### Installation: + + If [you have Hex](https://hex.pm), the package can be installed + by adding `:distributed` to your list of dependencies in `mix.exs`: + + def application do + [ + extra_applications: [ + :distributed, + ], + ] + end + + def deps do + [ + {:distributed, "~> 0.1.0"}, + ] + end + """ + + defmacro __using__(opts \\ []) do + scaler_opts = Keyword.get(opts, :scaler, []) + replicator_opts = Keyword.get(opts, :replicator, []) + + quote do + use Distributed.Scaler, unquote(scaler_opts) + use Distributed.Replicator, unquote(replicator_opts) + end + end + +end diff --git a/lib/distributed/application.ex b/lib/distributed/application.ex new file mode 100644 index 0000000..01c5d71 --- /dev/null +++ b/lib/distributed/application.ex @@ -0,0 +1,26 @@ +defmodule Distributed.Application do + # See http://elixir-lang.org/docs/stable/elixir/Application.html + # for more information on OTP Applications + @moduledoc false + + use Application + + def start(_type, _args) do + import Supervisor.Spec, warn: false + # Define workers and child supervisors to be supervised + children = [ + # Starts a worker by calling: Distributed.Worker.start_link(arg1, arg2, arg3) + # worker(Distributed.Worker, [arg1, arg2, arg3]), + worker(Distributed.Node, []), + worker(Distributed.Node.Iterator, []), + ] + # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html + # for other strategies and supported options + opts = [ + name: Distributed.Supervisor, + strategy: :one_for_one, + ] + Supervisor.start_link(children, opts) + end + +end diff --git a/lib/distributed/node.ex b/lib/distributed/node.ex new file mode 100644 index 0000000..cc1ac64 --- /dev/null +++ b/lib/distributed/node.ex @@ -0,0 +1,81 @@ +defmodule Distributed.Node do + @moduledoc """ + Functions in `Distributed.Node` module help to know which node is the master and which are the slaves. + """ + + use GenServer + + @doc false + def start_link() do + GenServer.start_link(__MODULE__, [], name: __MODULE__.process_id()) + end + + @doc false + def init(_opts \\ []) do + {:ok, %{}} + end + + @doc false + def process_id() do + {:global, Distributed.Node} + end + + @doc false + def handle_call(:list, _from, state) do + {:reply, Node.list() ++ [Node.self()], state} + end + + @doc false + def handle_call(:master, _from, state) do + {:reply, Node.self(), state} + end + + @doc false + def handle_call(:slaves, _from, state) do + {:reply, Node.list(), state} + end + + @doc """ + Returns a list of names of all the nodes, including master. See also `Distributed.Node.Iterator`. + """ + @spec list(opts :: [any]) :: [atom] + def list(opts \\ []) do + timeout = Keyword.get(opts, :timeout, :infinity) + GenServer.call(__MODULE__.process_id(), :list, timeout) + end + + @doc """ + Returns name of the master node. + """ + @spec master(opts :: [any]) :: atom + def master(opts \\ []) do + timeout = Keyword.get(opts, :timeout, :infinity) + GenServer.call(__MODULE__.process_id(), :master, timeout) + end + + @doc """ + Returns a list of names of the slave nodes. + """ + @spec slaves(opts :: [any]) :: [atom] + def slaves(opts \\ []) do + timeout = Keyword.get(opts, :timeout, :infinity) + GenServer.call(__MODULE__.process_id(), :slaves, timeout) + end + + @doc """ + Returns true if the `node` is the master node, otherwise false. + """ + @spec is_master?(node :: atom, opts :: [any]) :: boolean + def is_master?(node, opts \\ []) do + node == __MODULE__.master(opts) + end + + @doc """ + Returns true if the `node` is a slave node, otherwise false. + """ + @spec is_slave?(node :: atom, opts :: [any]) :: boolean + def is_slave?(node, opts \\ []) do + node in __MODULE__.slaves(opts) + end + +end diff --git a/lib/distributed/node/iterator.ex b/lib/distributed/node/iterator.ex new file mode 100644 index 0000000..968fffe --- /dev/null +++ b/lib/distributed/node/iterator.ex @@ -0,0 +1,76 @@ +defmodule Distributed.Node.Iterator do + @moduledoc """ + Functions in `Distributed.Node.Iterator` module help to iterate through all the nodes connected. + """ + + use GenServer + + @doc false + def start_link() do + GenServer.start_link(__MODULE__, [], name: __MODULE__.process_id()) + end + + @doc false + def init(_opts \\ []) do + { + :ok, + %{ + previous_nodes: [], + }, + } + end + + @doc false + def process_id() do + {:global, __MODULE__} + end + + @doc false + def handle_call({:next, opts}, _from, %{previous_nodes: previous_nodes} = state) do + skip_master = Keyword.get(opts, :skip_master, false) + no_sweep = Keyword.get(opts, :no_sweep, false) + nodes = case skip_master do + true -> + Distributed.Node.list() -- [Distributed.Node.master()] -- previous_nodes + false -> + Distributed.Node.list() -- previous_nodes + end + {need_to_repeat?, available_nodes} = case length(nodes) > 0 do + true -> + {false, nodes} + false -> + {true, Distributed.Node.list()} + end + next_node = List.first(available_nodes) + timely_previous_nodes = case need_to_repeat? do + false -> + previous_nodes ++ [next_node] + true -> + [next_node] + end + case no_sweep do + true -> + {:reply, next_node, state} + false -> + {:reply, next_node, Map.put(state, :previous_nodes, timely_previous_nodes)} + end + end + + @doc """ + Returns the next node's name in the iteration. When the iteration ends, it will repeat itself lazily. + + You can skip the master node via: + skip_master: true + + Note: `skip_master` will not work if the master node is the only node in the network. + + You can also know which one the next node, without touching the order via: + no_sweep: true + """ + @spec next(opts :: [any]) :: atom + def next(opts \\ []) do + timeout = Keyword.get(opts, :timeout, :infinity) + GenServer.call(__MODULE__.process_id(), {:next, opts}, timeout) + end + +end diff --git a/lib/distributed/parallel.ex b/lib/distributed/parallel.ex new file mode 100644 index 0000000..7b51846 --- /dev/null +++ b/lib/distributed/parallel.ex @@ -0,0 +1,22 @@ +defmodule Distributed.Parallel do + @moduledoc """ + The functions in `Distributed.Parallel` are not about scaling processes through all the nodes. They + execute processes asynchronously on the node that uses the functions. + + If you want to execute processes through all nodes in parallel, please see `Distributed.Replication` module. + """ + + @doc """ + Returns the results of new processes started by the application of `fun` on CPU cores. + See https://en.wikipedia.org/wiki/Map_(parallel_pattern) + """ + @spec map(enumerable :: Enum.enumerable, fun :: (() -> any), opts :: [any]) :: [any] + def map(enumerable, fun, opts \\ []) + when is_function(fun) + do + timeout = Keyword.get(opts, :timeout, :infinity) + Enum.map(enumerable, &(Task.async(fn -> fun.(&1) end))) + |> Enum.map(&(Task.await(&1, timeout))) + end + +end diff --git a/lib/distributed/replicator.ex b/lib/distributed/replicator.ex new file mode 100644 index 0000000..bb99483 --- /dev/null +++ b/lib/distributed/replicator.ex @@ -0,0 +1,13 @@ +defmodule Distributed.Replicator do + + defmacro __using__(opts \\ []) do + node_opts = Keyword.get(opts, :node, []) + gen_server_opts = Keyword.get(opts, :gen_server, []) + + quote do + import(Distributed.Replicator.Node, unquote(node_opts)) + import(Distributed.Replicator.GenServer, unquote(gen_server_opts)) + end + end + +end diff --git a/lib/distributed/replicator/gen_server.ex b/lib/distributed/replicator/gen_server.ex new file mode 100644 index 0000000..3498311 --- /dev/null +++ b/lib/distributed/replicator/gen_server.ex @@ -0,0 +1,53 @@ +defmodule Distributed.Replicator.GenServer do + @moduledoc """ + The functions in `Distributed.Replicator.GenServer` module helps to replicate an event by processing it on the all nodes in the network. + In `Distributed.Replicator.GenServer`, functions execute processes in parallel. + + **Note**: Since this module is only a wrapper for `GenServer` module, there is no need to write a detailed documentation for this module. + Please check documentation of the `GenServer` module; you can basically think that the functions of the module run on every single node + without specifying nodes, and you will be replied with a list of results of the processes. + """ + + use GenServer + + @doc false + def start_link() do + GenServer.start_link(__MODULE__, [], name: __MODULE__.process_id()) + end + + @doc false + def init(_opts \\ []) do + {:ok, %{}} + end + + @doc false + def process_id() do + Distributed.Replicator.GenServer + end + + @doc """ + Sends messages to the given dest on nodes and returns the message. See `Kernel.send/2` + """ + @spec info(dest :: pid | port | atom, msg :: any, opts :: [any]) :: any + def info(dest, msg, opts \\ []) do + Distributed.Parallel.map(Distributed.Node.list(opts), &(send({dest, &1}, msg))) + end + + @doc """ + Makes synchronous calls to the servers on nodes and waits for their replies. See `GenServer.call/3` + """ + @spec call(server :: atom, term, opts :: [any]) :: [term] + def call(server, term, opts \\ []) do + timeout = Keyword.get(opts, :timeout, :infinity) + Distributed.Parallel.map(Distributed.Node.list(opts), &(GenServer.call({server, &1}, term, timeout))) + end + + @doc """ + Sends asynchronous requests to the servers on nodes. See `GenServer.cast/2` + """ + @spec cast(server :: atom, term :: term, opts :: [any]) :: [term] + def cast(server, term, opts \\ []) do + Distributed.Parallel.map(Distributed.Node.list(opts), &(GenServer.cast({server, &1}, term))) + end + +end diff --git a/lib/distributed/replicator/node.ex b/lib/distributed/replicator/node.ex new file mode 100644 index 0000000..b20cb27 --- /dev/null +++ b/lib/distributed/replicator/node.ex @@ -0,0 +1,94 @@ +defmodule Distributed.Replicator.Node do + @moduledoc """ + The functions in `Distributed.Replicator.Node` module helps to replicate an event by processing it on the all nodes in the network. + In `Distributed.Replicator.Node`, functions execute processes in parallel. + + **Note**: Since this module is only a wrapper for `Node` module, there is no need to write a detailed documentation for this module. + Please check documentation of the `Node` module; you can basically think that the functions of the module run on every single node + without specifying nodes, and you will be replied with a list of results of the processes. + """ + + use GenServer + + @doc false + def start_link() do + GenServer.start_link(__MODULE__, [], name: __MODULE__.process_id()) + end + + @doc false + def init(_opts \\ []) do + {:ok, %{}} + end + + @doc false + def process_id() do + Distributed.Replicator.Node + end + + @doc """ + Monitors the status of nodes. + If flag is true, monitoring is turned on. If flag is false, monitoring is turned off. + For more information, see `Node.monitor/2` and `Node.monitor/3`. + """ + @spec monitor(flag :: boolean, opts :: [any]) :: [true] + def monitor(flag, opts \\ []) do + Distributed.Parallel.map(Distributed.Node.list(opts), &(Node.monitor(&1, flag, opts))) + end + + @doc """ + Tries to set up connections to nodes. + """ + @spec ping(opts :: [any]) :: [:pong | :pang] + def ping(opts \\ []) do + Distributed.Parallel.map(Distributed.Node.list(opts), &(Node.ping(&1))) + end + + @doc """ + Returns the PIDs of new processes started by the application of `fun` on nodes. + See `Node.spawn/2` and `Node.spawn/3`. + """ + @spec spawn(fun :: (() -> any), opts :: [any]) :: [pid | {pid, reference}] + def spawn(fun, opts \\ []) + when is_function(fun) + do + spawn_opts = Keyword.get(opts, :spawn_opts, []) + Distributed.Parallel.map(Distributed.Node.list(opts), &(Node.spawn(&1, fun, spawn_opts))) + end + + @doc """ + Returns the PIDs of new processes started by the application of `module.fun(args)` on nodes. + See `Node.spawn/4` and `Node.spawn/5`. + """ + @spec spawn(module :: module, fun :: atom, args :: [any], opts :: [any]) :: [pid | {pid, reference}] + def spawn(module, fun, args, opts \\ []) + when is_atom(module) + when is_atom(fun) + do + spawn_opts = Keyword.get(opts, :spawn_opts, []) + Distributed.Parallel.map(Distributed.Node.list(opts), &(Node.spawn(&1, module, fun, args, spawn_opts))) + end + + @doc """ + Returns the PIDs of new linked processes started by the application of `fun` on nodes. + See `Node.spawn_link/2`. + """ + @spec spawn_link((() -> any)) :: [pid] + def spawn_link(fun, opts \\ []) + when is_function(fun) + do + Distributed.Parallel.map(Distributed.Node.list(opts), &(Node.spawn_link(&1, fun))) + end + + @doc """ + Returns the PIDs of new linked processes started by the application of `module.function(args)` on nodes. + See `Node.spawn_link/4`. + """ + @spec spawn_link(module :: module, fun :: atom, args :: [any], opts :: [any]) :: [pid] + def spawn_link(module, fun, args, opts \\ []) + when is_atom(module) + when is_atom(fun) + do + Distributed.Parallel.map(Distributed.Node.list(opts), &(Node.spawn_link(&1, module, fun, args))) + end + +end diff --git a/lib/distributed/scaler.ex b/lib/distributed/scaler.ex new file mode 100644 index 0000000..644853e --- /dev/null +++ b/lib/distributed/scaler.ex @@ -0,0 +1,13 @@ +defmodule Distributed.Scaler do + + defmacro __using__(opts \\ []) do + node_opts = Keyword.get(opts, :node, []) + gen_server_opts = Keyword.get(opts, :gen_server, []) + + quote do + import(Distributed.Scaler.Node, unquote(node_opts)) + import(Distributed.Scaler.GenServer, unquote(gen_server_opts)) + end + end + +end diff --git a/lib/distributed/scaler/gen_server.ex b/lib/distributed/scaler/gen_server.ex new file mode 100644 index 0000000..2b4fce0 --- /dev/null +++ b/lib/distributed/scaler/gen_server.ex @@ -0,0 +1,55 @@ +defmodule Distributed.Scaler.GenServer do + @moduledoc """ + The functions in `Distributed.Scaler.GenServer` module helps to scale projects by processing every event on the next node, in order. + + **Note**: Since this module is only a wrapper for `GenServer` module, there is no need to write a detailed documentation for this module. + Please check documentation of the `GenServer` module; you can basically think that the functions of the module run on the next node + without specifying the node, and you will be replied with the result of the process. + + You can use this module mostly for read operations, loop actions or background tasks. It is suitable when you do not need replication + of events. + """ + + use GenServer + + @doc false + def start_link() do + GenServer.start_link(__MODULE__, [], name: __MODULE__.process_id()) + end + + @doc false + def init(_opts \\ []) do + {:ok, %{}} + end + + @doc false + def process_id() do + Distributed.Scaler.GenServer + end + + @doc """ + Sends messages to the given dest on nodes and returns the message. See `Kernel.send/2` + """ + @spec info(dest :: pid | port | atom, msg :: any) :: any + def info(dest, msg) do + send({dest, Distributed.Node.Iterator.next()}, msg) + end + + @doc """ + Makes synchronous calls to the servers on nodes and waits for their replies. See `GenServer.call/3` + """ + @spec call(server :: atom, term, opts :: [any]) :: term + def call(server, term, opts \\ []) do + timeout = Keyword.get(opts, :timeout, :infinity) + GenServer.call({server, Distributed.Node.Iterator.next()}, term, timeout) + end + + @doc """ + Sends asynchronous requests to the servers on nodes. See `GenServer.cast/2` + """ + @spec cast(server :: atom, term :: term) :: term + def cast(server, term) do + GenServer.cast({server, Distributed.Node.Iterator.next()}, term) + end + +end diff --git a/lib/distributed/scaler/node.ex b/lib/distributed/scaler/node.ex new file mode 100644 index 0000000..ee9716a --- /dev/null +++ b/lib/distributed/scaler/node.ex @@ -0,0 +1,78 @@ +defmodule Distributed.Scaler.Node do + @moduledoc """ + The functions in `Distributed.Scaler.Node` module helps to scale projects by processing every event on the next node, in order. + + **Note**: Since this module is only a wrapper for `Node` module, there is no need to write a detailed documentation for this module. + Please check documentation of the `Node` module; you can basically think that the functions of the module run on the next node + without specifying the node, and you will be replied with the result of the process. + + You can use this module mostly for read operations, loop actions or background tasks. It is suitable when you do not need replication + of events. + """ + + use GenServer + + @doc false + def start_link() do + GenServer.start_link(__MODULE__, [], name: __MODULE__.process_id()) + end + + @doc false + def init(_opts \\ []) do + {:ok, %{}} + end + + @doc false + def process_id() do + Distributed.Scaler.Node + end + + @doc """ + Returns the PID of a new process started by the application of `fun` on the next node. + See `Node.spawn/2` and `Node.spawn/3`. + """ + @spec spawn(fun :: (() -> any), opts :: [any]) :: pid | {pid, reference} + def spawn(fun, opts \\ []) + when is_function(fun) + do + spawn_opts = Keyword.get(opts, :spawn_opts, []) + Node.spawn(Distributed.Node.Iterator.next(), fun, spawn_opts) + end + + @doc """ + Returns the PID of a new process started by the application of `module.fun(args)` on the next node. + See `Node.spawn/4` and `Node.spawn/5`. + """ + @spec spawn(module :: module, fun :: atom, args :: [any], opts :: [any]) :: pid | {pid, reference} + def spawn(module, fun, args, opts \\ []) + when is_atom(module) + when is_atom(fun) + do + spawn_opts = Keyword.get(opts, :spawn_opts, []) + Node.spawn(Distributed.Node.Iterator.next(), module, fun, args, spawn_opts) + end + + @doc """ + Returns the PID of a new linked process started by the application of `fun` on the next node. + See `Node.spawn_link/2`. + """ + @spec spawn_link((() -> any)) :: pid + def spawn_link(fun) + when is_function(fun) + do + Node.spawn_link(Distributed.Node.Iterator.next(), fun) + end + + @doc """ + Returns the PID of a new linked process started by the application of `module.function(args)` on the next node. + See `Node.spawn_link/4`. + """ + @spec spawn_link(module :: module, fun :: atom, args :: [any]) :: pid + def spawn_link(module, fun, args) + when is_atom(module) + when is_atom(fun) + do + Node.spawn_link(Distributed.Node.Iterator.next(), module, fun, args) + end + +end diff --git a/mix.exs b/mix.exs new file mode 100644 index 0000000..cab4763 --- /dev/null +++ b/mix.exs @@ -0,0 +1,78 @@ +defmodule Distributed.Mixfile do + + use Mix.Project + + def project() do + [ + name: "Distributed", + source_url: "https://github.com/ertgl/distributed", + description: description(), + package: package(), + app: :distributed, + version: "0.1.0", + elixir: "~> 1.4", + build_embedded: Mix.env() == :prod, + start_permanent: Mix.env() == :prod, + deps: deps(), + docs: [ + main: "Distributed", + ], + ] + end + + defp description() do + """ + Distributed is a wrapper module that helps developers to make distributed, scaled, replicated and fault-tolerant (with takeover ability) master-slave systems. + """ + end + + defp package() do + [ + name: :distributed, + files: [ + "lib", + "mix.exs", + "README.md", + "LICENSE.txt", + ], + maintainers: [ + "Ertugrul Keremoglu " + ], + licenses: [ + "MIT", + ], + links: %{ + "GitHub" => "https://github.com/ertgl/distributed", + }, + ] + end + + # Configuration for the OTP application + # + # Type "mix help compile.app" for more information + def application() do + # Specify extra applications you'll use from Erlang/Elixir + [ + mod: {Distributed.Application, []}, + extra_applications: [ + :logger, + ], + ] + end + + # Dependencies can be Hex packages: + # + # {:my_dep, "~> 0.3.0"} + # + # Or git/path repositories: + # + # {:my_dep, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"} + # + # Type "mix help deps" for more examples and options + defp deps() do + [ + {:ex_doc, "~> 0.16.1", only: :dev}, + ] + end + +end diff --git a/test/distributed_test.exs b/test/distributed_test.exs new file mode 100644 index 0000000..35f027a --- /dev/null +++ b/test/distributed_test.exs @@ -0,0 +1,7 @@ +defmodule DistributedTest do + + use ExUnit.Case + + doctest Distributed + +end diff --git a/test/test_helper.exs b/test/test_helper.exs new file mode 100644 index 0000000..869559e --- /dev/null +++ b/test/test_helper.exs @@ -0,0 +1 @@ +ExUnit.start()