Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

first commit, do not work yet

  • Loading branch information...
commit 7899021a109ee4243ee67b08ebe3b267ad12fa56 0 parents
@awetzel authored
7 .gitignore
@@ -0,0 +1,7 @@
+/ebin
+/deps
+erl_crash.dump
+*.ez
+*_data
+*.config
+*.swp
20 LICENSE
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2013 Arnaud Wetzel
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
31 Makefile
@@ -0,0 +1,31 @@
+.SECONDARY:
+
+all: deps supervisorring
+
+deps:
+ @mix do deps.get
+
+supervisorring:
+ @mix compile
+
+## single node dev
+start: supervisorring sys.config
+ @iex --erl "-config sys -name supervisorring@127.0.0.1" -S mix run
+
+start_%: %.config %_data
+ @iex --erl "-config $* -name supervisorring$*@127.0.0.1" -S mix run
+
+## multiple node dev
+NODES = dev1 dev2 dev3 dev4
+multi_start: supervisorring
+ @for n in $(NODES); do xterm -e "make start_$$n ; read" & done
+
+## Erlang configuration management using Mix
+## (name).config is defined by the merge of mix confs : sys_config and (name)_config
+%.config: mix.exs
+ mix run -e 'File.write!("$@", :io_lib.format("~p.~n",[\
+ (Mix.project[:sys_config]||[]) |> ListDict.merge(Mix.project[:$*_config]||[],fn(_,conf1,conf2)->ListDict.merge(conf1,conf2) end)\
+ ]))'
+
+%_data:
+ mkdir "$@"
27 README.md
@@ -0,0 +1,27 @@
+supervisorring
+==============
+
+Supervisorring is a small Elixir library and behaviour to create a
+distributed `gen_server`.
+
+## TODO ##
+
+Make it work.
+
+## How does it work ? ##
+
+`supervisorring` uses `nano_ring` (
+ https://github.com/awetzel/nano_ring ) to be kept updated of
+the cluster nodes.
+
+A distributed hash table distribute servers to the different nodes.
+The nodes are itself placed on the same dht with the hash of the node
+name, so that a small number of servers are redistribute on the
+addition or removal of a node (consistent hashing).
+
+The differents hashes are inserted into a dedicated binary search
+tree to optimize the node lookups.
+
+## How to test Supervisorring ##
+
+
70 lib/supervisorring.ex
@@ -0,0 +1,70 @@
+defmodule Supervisorring do
+ use GenEvent.Behaviour
+ defrecord State, ring: nil, servers: HashMap.new
+ import Enum
+ @doc "Small process distribution server"
+ def start_link, do: :gen_server.start_link({:local,__MODULE__},__MODULE__,[],[])
+ def init(_), do: {:ok,HashSet.new}
+
+ def handle_event({:new_ring,oldring,newring},state) do
+ case {oldring.up_set|>to_list,newring.up_set|>to_list} do
+ {unchange,unchange}-> {:ok,state}
+ {_,newnodes} -> {:ok,state.ring(ConstHash.ring_for_nodes(newnodes))}
+ end
+ end
+
+ def handle_call({:new_proc,key,_args},_from,procset) do
+ node = node_for_key(key,:gen_server.call(NanoRing,:get_up) |> Enum.to_list)
+ {:reply,node,procset}
+ end
+
+ defmodule ConstHash do
+ @docmodule "consistent hashing key/node mapping"
+
+ @doc "Map a given key to a node of the ring in a consistent way (ring modifications move a minimum of keys)"
+ def node_for_key(key,ring), do: bfind(key,ring)
+
+ @vnode_per_node 300
+ @doc "generate the node_for_key ring parameter according to a given node list"
+ def ring_for_nodes(nodes) do
+ #Place nodes at @vnode_per_node dots in the hash space {hash(node++vnode_idx),node},
+ #then create a bst adapted to consistent hashing traversal, for a given hash, find the next vnode dot in the ring
+ vnodes = nodes |> flat_map(fn n -> (1..@vnode_per_node |> map &{key_as_int("#{n}#{&1}"),n}) end)
+ vnodes |> bsplit({0,trunc(:math.pow(2,160)-1)},vnodes|>first)
+ end
+
+ # "place each term into int hash space : term -> 160bits bin -> integer"
+ defp key_as_int(<<key::[size(160),integer]>>),do: key
+ defp key_as_int(key),do: (key |> term_to_binary |> :crypto.sha |> key_as_int)
+
+ # dedicated binary search tree, middle split each interval (easy tree
+ # balancing) except when only one vnode is there (split at vnode hash)
+ defp bsplit([],{_,_},{_,next}), do: next # if no vnode dot in interval, take next node in the ring
+ defp bsplit([{h,n}],{_,_},{_,next}), do: {h,n,next} # interval contains a vnode split
+ defp bsplit(list,{lbound,rbound},next) do # interval contains multiple vnode, recursivly middle split allows easy tree balancing
+ center = lbound + (rbound - lbound)/2
+ {left,right} = list |> partition(fn {h,_n}->h<center end)
+ {center,bsplit(left,{lbound,center},(right|>first)||next),bsplit(right,{center,rbound},next)}
+ end
+ # bsplit is designed to allow standard btree traversing to associate node to hash
+ defp bfind(_,node) when is_atom(node), do: node
+ defp bfind(k,{center,_,right}) when k > center, do: bfind(k,right)
+ defp bfind(k,{center,left,_}) when k <= center, do: bfind(k,left)
+ end
+end
+
+defmodule Supervisorring.App do
+ use Application.Behaviour
+ def start(_type,_args) do
+ :supervisor.start_link(Supervisorring.App.Sup,[])
+ end
+ defmodule Sup do
+ use Supervisor.Behaviour
+ def init([]) do
+ supervise([
+ worker(:gen_event,[{:local,NanoRing.Events}], id: NanoRing.Events),
+ worker(NanoRing,[])
+ ], strategy: :one_for_one)
+ end
+ end
+end
20 mix.exs
@@ -0,0 +1,20 @@
+defmodule Supervisorring.Mixfile do
+ use Mix.Project
+
+ def project do
+ [ app: :supervisorring,
+ version: "0.0.1",
+ elixir: "~> 0.11.0",
+ deps: [{:nano_ring,"0.0.1",git: "git://github.com/awetzel/nano_ring"}],
+ ## dev multi nodes configs
+ dev1_config: [nano_ring: [data_dir: "./dev1_data"]],
+ dev2_config: [nano_ring: [data_dir: "./dev2_data"]],
+ dev3_config: [nano_ring: [data_dir: "./dev3_data"]],
+ dev4_config: [nano_ring: [data_dir: "./dev4_data"]]
+ ]
+ end
+
+ def application do
+ [ applications: [:nano_ring,:iex], env: [ data_dir: "./data" ] ]
+ end
+end
91 test/ring_test.exs
@@ -0,0 +1,91 @@
+Code.require_file "test_helper.exs", __DIR__
+
+defmodule NanoProcDistTest do
+ use ExUnit.Case
+ import Enum
+ import NanoProcDist
+
+ @nb_key 1000
+ @test_set 1..@nb_key |> Enum.map fn _ -> :crypto.rand_bytes(100) end
+ @nodes [:n1,:n2,:n3,:n4,:n5,:n6,:n7]
+
+ #test "every key should give a node" do
+ # assert(@test_set |> Enum.map(&NanoProcDist.node_for_key(&1,@nodes)) |> Enum.all?)
+ #end
+
+ #test "results should be the same as chash" do
+ # chash_ring = chash_ring(@nodes)
+ # IO.puts inspect chash_ring
+ # res_chash = @test_set |> Enum.map(fn k -> {k,:chash.successors(:chash.key_of(k),chash_ring,1) |> Enum.at(0) |> elem(1)} end) |> HashSet.new
+ # res_set = @test_set |> Enum.map(fn k -> {k,NanoProcDist.node_for_key(k,@nodes)} end) |> HashSet.new
+ # assert res_set == res_chash
+ #end
+
+ test "each node should be assign to roughly the same nb of keys" do
+ ring = ring_for_nodes(@nodes)
+ res_set = @test_set |> map(fn k -> node_for_key(key_as_int(k),ring) end)
+ counts = @nodes |> Enum.map fn n -> (res_set |> Enum.count &(&1==n)) end
+ IO.puts inspect counts
+ end
+
+ test "if we remove a node, only nb_keys/nb_node keys have to move" do
+ ring = ring_for_nodes(@nodes)
+ nodes = @nodes ++ [:n8]
+ ring2 = ring_for_nodes(nodes)
+ res1_set = @test_set |> map(fn k -> {k,node_for_key(key_as_int(k),ring)} end)
+ res2_set = @test_set |> map(fn k -> {k,node_for_key(key_as_int(k),ring2)} end)
+ IO.puts inspect Set.difference(res1_set|>HashSet.new,res2_set|>HashSet.new) |> Set.size
+
+ # chash_ring = chash_ring(@nodes)
+ # IO.puts inspect chash_ring
+ # chash_res_set = @test_set |> map(fn k -> {k,:chash.successors(:chash.key_of(k),chash_ring,1) |> at(0) |> elem(1)} end) |> HashSet.new
+ # chash_ring2 = chash_ring(nodes)
+ # chash_res_set2 = @test_set |> Enum.map(fn k -> {k,:chash.successors(:chash.key_of(k),chash_ring2,1) |> at(0) |> elem(1)} end) |> HashSet.new
+ # IO.puts inspect Set.difference(chash_res_set,chash_res_set2) |> Set.size
+ end
+
+ #defp chash_ring2(nodes) do
+ # ring = :chash.fresh(64, :n1)
+ # nodes |> with_index
+ # |> map(fn {n,i}->{ring|>:chash.nodes|>slice(i,64)|>take_every(length(nodes)),i} end)
+ # |> reduce(ring,
+ # fn {partitions,node_index},ring1 ->
+ # partitions |> reduce(ring1,fn {idx,_},ring2 ->
+ # :chash.update(idx,at(nodes,node_index),ring2)
+ # end)
+ # end)
+ #end
+ #defp chash_ring3(nodes) do
+ # ring = :chash.fresh(64, :n1)
+ # ring |> :chash.nodes
+ # |> chunks(div(64,length(nodes)),div(64,length(nodes)),[])
+ # |> with_index
+ # |> reduce(ring,
+ # fn {partitions,node_index},ring1 ->
+ # partitions |> reduce(ring1,fn {idx,_},ring2 ->
+ # :chash.update(idx,at(nodes,node_index),ring2)
+ # end)
+ # end)
+ #end
+ #defp chash_ring(nodes) do
+ # nb_rep = 20
+ # ring = :chash.fresh((length(nodes))*nb_rep, :n1)
+ # nodes |> map(fn n -> {n,1..nb_rep |> map &"#{n}#{&1}"} end)
+ # |> reduce(ring,
+ # fn {node,replicates},ring1 ->
+ # replicates |> reduce(ring1,fn rep_name,ring2 ->
+ # :chash.update(:chash.successors(:chash.key_of(rep_name),ring2,1) |> at(0) |> elem(0),node,ring2)
+ # end)
+ # end)
+ #end
+ #defp chash_ring(nodes) do
+ # ring = :chash.fresh(64, :n1)
+ # nodes |> map(fn n->{n,:chash.successors(:chash.key_of(n),ring)|>take_every(length(nodes))} end)
+ # |> reduce(ring,
+ # fn {node,replicates},ring1 ->
+ # replicates |> reduce(ring1,fn {idx,_},ring2 ->
+ # :chash.update(idx,node,ring2)
+ # end)
+ # end)
+ #end
+end
1  test/test_helper.exs
@@ -0,0 +1 @@
+ExUnit.start
Please sign in to comment.
Something went wrong with that request. Please try again.