Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

fix etc

  • Loading branch information...
commit 4775a4feefa98b67a923556cc3f36fa161c4487d 1 parent 61cf7b0
@winebarrel winebarrel authored
View
10 lib/rgossip2.rb
@@ -1,4 +1,5 @@
require 'rgossip2/context'
+require 'rgossip2/context_helper'
require 'rgossip2/client'
require 'rgossip2/node'
require 'rgossip2/node_list'
@@ -10,11 +11,14 @@ module RGossip2
# Clientの生成
# 直接、Client#newは実行しない
- def client(initial_nodes = [], address = nil, data = nil, options = {})
+ def client(options = {})
+ initial_nodes = options.delete(:initial_nodes) || []
+ address = options.delete(:address)
+ data = options.delete(:data)
+
context = Context.new(options)
- context.create(Client, initial_nodes, address, data)
+ Client.new(context, initial_nodes, address, data)
end
module_function :client
end # RGossip2
-
View
36 lib/rgossip2/client.rb
@@ -18,46 +18,50 @@ module RGossip2
#
class Client
include Enumerable
+ include ContextHelper
attr_reader :node_list
attr_reader :dead_list
attr_reader :self_node
- attr_accessor :context
+ attr_reader :context
+
+ def initialize(context, initial_nodes = [], address = nil, data = nil)
+ @context = context
- def initialize(initial_nodes = [], address = nil, data = nil)
# データがバッファサイズを超える場合はエラー
- raise 'Data is too large' if data && data.length > @context.buffer_size
+ if data and data.length > @context.buffer_size
+ raise 'Data is too large'
+ end
# IPアドレスを取得。デフォルトはローカルホストアドレス
@address = name2addr(address || IPSocket.getaddress(Socket.gethostname))
-
- @context.info("Client is initialized: initial_nodes=#{initial_nodes.inspect}, address=#{@address}, data=#{data.inspect}")
+ info("Client is initialized: initial_nodes=#{initial_nodes.inspect}, address=#{@address}, data=#{data.inspect}")
# NodeListを生成
- @node_list = @context.create(Nodes)
- @dead_list = @context.create(Nodes)
+ @node_list = create(NodeList)
+ @dead_list = create(NodeList)
# Nodeを生成
- @self_node = @context.create(Node, @node_list, @dead_list, @address, data, nil)
+ @self_node = create(Node, @node_list, @dead_list, @address, data, nil)
@self_node.update_timestamp
@node_list << @self_node
# 初期ノードを追加
initial_nodes.uniq.each do |i|
- @node_list << @context.create(Node, @node_list, @dead_list, name2addr(i), nil, nil)
+ @node_list << create(Node, @node_list, @dead_list, name2addr(i), nil, nil)
end
# Gossiper、Receiverを生成
- @gossiper = @context.create(Gossiper, @self_node, @node_list)
- @receiver = @context.create(Receiver, @self_node, @node_list, @dead_list)
+ @gossiper = create(Gossiper, @self_node, @node_list)
+ @receiver = create(Receiver, @self_node, @node_list, @dead_list)
end
def start
# 開始している場合はスキップ
return if @running
- @context.info("Client is started: address=#{@address}")
+ info("Client is started: address=#{@address}")
# NodoのTimerをスタート
@node_list.each do |node|
@@ -76,7 +80,7 @@ def stop
# 停止している場合はスキップ
return unless @running
- @context.info("Client is stopped")
+ info("Client is stopped")
@gossiper.stop
@receiver.stop
@@ -121,7 +125,7 @@ def add_node(address)
# すでに存在する場合はエラー
raise 'The node already exists' if @node_list.any? {|i| i.address == address }
- node = @context.create(Node, @node_list, @dead_list, address, nil, nil)
+ node = create(Node, @node_list, @dead_list, address, nil, nil)
@node_list << node
# デッドリストからは追加したノードを削除
@@ -131,7 +135,7 @@ def add_node(address)
node.start_timer if @running
- @context.callback(:add, address, nil, nil)
+ callback(:add, address, nil, nil)
}
}
end
@@ -158,7 +162,7 @@ def delete_node(address)
i.address == address
end
- @context.callback(:delete, address, nil, nil)
+ callback(:delete, address, nil, nil)
}
}
end
View
50 lib/rgossip2/context.rb
@@ -16,7 +16,7 @@ class Context
# バッファサイズと遊び
# 「buffer_size * allowance + digest_length」が 65515bytes 以下になるようにする
attr_accessor :buffer_size
- attr_accessor :attr_accessor
+ attr_accessor :allowance
# ハッシュ関数のアルゴリズムと長さ
attr_accessor :digest_algorithm
@@ -42,13 +42,9 @@ class Context
attr_accessor :error_handler
def initialize(options = {})
- # ハッシュのキーをシンボルに変換
- tmp = {}
- options.each {|k, v| tmp[k.to_sym] = v }
- options = tmp
-
- unless options.has_key?(:auth_key)
+ unless @auth_key = options[:auth_key]
raise ':auth_key is required'
+
end
defaults = {
@@ -79,46 +75,6 @@ def initialize(options = {})
end
end # initialize
- # 他のクラスのインスタンスを生成して自分自身をセットする
- def create(klass, *args)
- obj = klass.new(*args)
- obj.context = self
- return obj
- end
-
- # 各種ハンドラプロキシメソッド
- def callback(action, address, timestamp, data)
- if self.callback_handler
- self.callback_handler.call([action, address, timestamp, data])
- end
- end
-
- def handle_error(e)
- if self.error_handler
- self.error_handler.call(e)
- else
- raise e
- end
- end
-
- # ノード情報群からハッシュ値とメッセージを生成する
- def digest_and_message(nodes)
- message = nodes.map {|i| i.to_a }.to_msgpack
- hash = OpenSSL::HMAC::digest(self.digest_algorithm.new, self.auth_key, message)
- [hash, message]
- end
-
- # ロギングプロキシメソッド
- [:fatal, :error, :worn, :info, :debug].each do |name|
- define_method(name) do |message|
- if self.logger
- self.logger.send(name, message)
- else
- $stderr.puts("#{name}: #{message}")
- end
- end
- end
-
end # Context
end # RGossip2
View
55 lib/rgossip2/context_helper.rb
@@ -0,0 +1,55 @@
+module RGossip2
+
+ #
+ # module ContextHelper
+ # レシーバなしでコンテキストを操作するためのモジュール
+ #
+ module ContextHelper
+
+ private
+
+ def create(*args)
+ @context.create(*args)
+ end
+
+ # 他のクラスのインスタンスを生成して自分自身をセットする
+ def create(klass, *args)
+ klass.new(@context, *args)
+ end
+
+ # 各種ハンドラプロキシメソッド
+ def callback(action, address, timestamp, data)
+ if @context.callback_handler
+ @context.callback_handler.call([action, address, timestamp, data])
+ end
+ end
+
+ def handle_error(e)
+ if @context.error_handler
+ @context.error_handler.call(e)
+ else
+ raise e
+ end
+ end
+
+ # ノード情報群からハッシュ値とメッセージを生成する
+ def digest_and_message(nodes)
+ message = nodes.map {|i| i.to_a }.to_msgpack
+ hash = OpenSSL::HMAC::digest(@context.digest_algorithm.new, @context.auth_key, message)
+ [hash, message]
+ end
+
+ # ロギングプロキシメソッド
+ [:fatal, :error, :worn, :info, :debug].each do |name|
+ define_method(name) do |message|
+ if @context.logger
+ @context.logger.send(name, message)
+ else
+ $stderr.puts("#{name}: #{message}")
+ end
+ end
+ end
+
+ end # ContextHelper
+
+end # RGossip2
View
14 lib/rgossip2/gossipper.rb
@@ -12,16 +12,16 @@ module RGossip2
# +-----------------------+
#
class Gossiper
+ include ContextHelper
- attr_writer :context
-
- def initialize(self_node, node_list)
+ def initialize(context, self_node, node_list)
+ @context = context
@self_node = self_node
@node_list = node_list
end
def start
- @context.info("Transmission was started: interval=#{@context.gossip_interval}, port=#{@context.port}")
+ info("Transmission was started: interval=#{@context.gossip_interval}, port=#{@context.port}")
@running = true
@@ -34,7 +34,7 @@ def start
begin
@node_list.synchronize { gossip(sock) }
rescue Exception => e
- @context.handle_error(e)
+ handle_error(e)
end
sleep(@context.gossip_interval)
@@ -46,7 +46,7 @@ def start
end # start
def stop
- @context.info("Transmission was stopped")
+ info("Transmission was stopped")
# フラグをfalseにしてスレッドを終了させる
@running = false
@@ -67,7 +67,7 @@ def gossip(sock)
dest = @node_list.choose_except(@self_node)
return unless dest # ないとは思うけど…
- @context.debug("Data is transmitted: address=#{dest.address}")
+ debug("Data is transmitted: address=#{dest.address}")
# チャンクに分けてデータを送信
@node_list.serialize.each do |chunk|
View
18 lib/rgossip2/node.rb
@@ -18,15 +18,17 @@ module RGossip2
# +------------+ +---------+
#
class Node
+ include ContextHelper
+
attr_reader :address
attr_accessor :timestamp
attr_accessor :data
- attr_writer :context
-
# クラスの生成・初期化はContextクラスからのみ行う
# addressはユニークであること
- def initialize(node_list, dead_list, address, data, timestamp)
+ def initialize(context, node_list, dead_list, address, data, timestamp)
+ @context = context
+
@node_list = node_list
@dead_list = dead_list
@address = address
@@ -36,7 +38,7 @@ def initialize(node_list, dead_list, address, data, timestamp)
# node_lifetimeの時間内に更新されない場合
# TimerがNodeを破棄する
@timer = Timer.new(@context.node_lifetime) do
- @context.debug("Node timed out: address=#{@address}")
+ debug("Node timed out: address=#{@address}")
# ノードリストからNodeを削除
@node_list.synchronize {
@@ -57,7 +59,7 @@ def initialize(node_list, dead_list, address, data, timestamp)
}
# 破棄時の処理をコールバック
- @context.callback(:delete, @address, @timestamp, @data)
+ callback(:delete, @address, @timestamp, @data)
end
end
@@ -74,17 +76,17 @@ def to_a
alias to_ary to_a
def start_timer
- @context.debug("Node timer is started: address=#{@address}")
+ debug("Node timer is started: address=#{@address}")
@timer.start
end
def reset_timer
- @context.debug("Node timer is reset: address=#{@address}")
+ debug("Node timer is reset: address=#{@address}")
@timer.reset
end
def stop_timer
- @context.debug("Node timer is suspended: address=#{@address}")
+ debug("Node timer is suspended: address=#{@address}")
@timer.stop
end
View
11 lib/rgossip2/node_list.rb
@@ -15,13 +15,12 @@ module RGossip2
# +------------+
#
class NodeList < Array
+ include ContextHelper
include Mutex_m
- attr_writer :context
-
- # クラスの生成・初期化はContextクラスからのみ行う
- def initialize(ary = [])
+ def initialize(context, ary = [])
super(ary)
+ @context = context
end
# 指定したNode以外のNodeをリストからランダムに選択する
@@ -49,7 +48,7 @@ def serialize
nodes << node
datasum << packed
else
- chunks << @context.digest_and_message(nodes).join
+ chunks << digest_and_message(nodes).join
nodes.clear
datasum.replace('')
@@ -60,7 +59,7 @@ def serialize
# 残りのNodeをチャンクに追加
unless nodes.empty?
- chunks << @context.digest_and_message(nodes).join
+ chunks << digest_and_message(nodes).join
end
return chunks
View
34 lib/rgossip2/receiver.rb
@@ -18,17 +18,17 @@ module RGossip2
# +-----------------------+
#
class Receiver
+ include ContextHelper
- attr_writer :context
-
- def initialize(self_node, node_list, dead_list)
+ def initialize(context, self_node, node_list, dead_list)
+ @context = context
@self_node = self_node
@node_list = node_list
@dead_list = dead_list
end
def start
- @context.info("Reception is started: port=#{@context.port}")
+ info("Reception is started: port=#{@context.port}")
@running = true
@@ -48,7 +48,7 @@ def start
end # start
def stop
- @context.info("Reception is stopped")
+ info("Reception is stopped")
# フラグをfalseにしてスレッドを終了させる
@running = false
@@ -62,10 +62,10 @@ def join
# 受信処理の本体
def receive(sock)
- return unless select([sock], [], [], @@timeout)
+ return unless select([sock], [], [], @context.receive_timeout)
message, (afam, port, host, ip) = sock.recvfrom(@context.buffer_size * @context.allowance)
- @context.debug("Data was received: from=#{ip}")
+ debug("Data was received: from=#{ip}")
recv_nodes = unpack_message(message)
@@ -75,17 +75,17 @@ def receive(sock)
}
else
# データが取得できなかった場合は無効なデータとして処理
- @context.debug("Invalid data was received: from=#{ip}")
+ debug("Invalid data was received: from=#{ip}")
end
rescue Exception => e
- @context.handle_error(e)
+ handle_error(e)
end
# ハッシュ値をチェックしてメッセージをデシリアライズ
def unpack_message(message)
recv_hash = message.slice!(0, @context.digest_length)
recv_nodes = MessagePack.unpack(message)
- hash, xxx = @context.digest_and_message(recv_nodes)
+ hash, xxx = digest_and_message(recv_nodes)
(recv_hash == hash) ? recv_nodes : nil
rescue MessagePack::UnpackError => e
return nil
@@ -104,13 +104,13 @@ def merge_lists(recv_nodes)
# 受信したNodeのタイムスタンプが新しければ
# 持っているNodeを更新
if timestamp > node.timestamp
- @context.debug("The node was updated: address=#{address} timestamp=#{timestamp}")
+ debug("The node was updated: address=#{address} timestamp=#{timestamp}")
node.timestamp = timestamp
node.data = data
node.reset_timer
- @context.callback(:update, address, timestamp, data)
+ callback(:update, address, timestamp, data)
end
elsif (index = @dead_list.synchronize { @dead_list.index {|i| i.address == address } })
# デッドリストに見つかった場合
@@ -120,24 +120,24 @@ def merge_lists(recv_nodes)
# 受信したNodeのタイムスタンプが新しければ
# デッドリストのノードを復活させる
if timestamp > node.timestamp
- @context.debug("Node revived: address=#{address} timestamp=#{timestamp}")
+ debug("Node revived: address=#{address} timestamp=#{timestamp}")
@dead_list.delete_at(index)
@node_list << node
node.start_timer
- @context.callback(:comeback, address, timestamp, data)
+ callback(:comeback, address, timestamp, data)
end
}
else
# リストにない場合はNodeを追加
- @context.debug("Node was added: address=#{address} timestamp=#{timestamp}")
+ debug("Node was added: address=#{address} timestamp=#{timestamp}")
- node = @context.create(Node, @node_list, @dead_list, address, data, timestamp)
+ node = create(Node, @node_list, @dead_list, address, data, timestamp)
@node_list << node
node.start_timer
- @context.callback(:add, address, timestamp, data)
+ callback(:add, address, timestamp, data)
end
end
end # merge_lists
Please sign in to comment.
Something went wrong with that request. Please try again.