Navigation Menu

Skip to content

Commit

Permalink
Separate source and destination information completely
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 15, 2015
1 parent 2605bb7 commit 188fe47
Showing 1 changed file with 23 additions and 17 deletions.
40 changes: 23 additions & 17 deletions lib/droonga/data_absorber_client.rb
Expand Up @@ -14,6 +14,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "droonga/loggable"
require "droonga/node_name"
require "droonga/catalog/dataset"
require "droonga/client"
require "droonga/catalog_generator"
require "droonga/catalog_fetcher"
Expand All @@ -26,9 +28,9 @@ class DataAbsorberClient
DEFAULT_PROGRESS_INTERVAL_SECONDS = 3

attr_reader :params
attr_reader :dataset, :port, :tag
attr_reader :host, :port, :tag, :dataset
attr_reader :messages_per_second, :progress_interval_seconds
attr_reader :source_host, :destination_host
attr_reader :source_host, :source_port, :source_tag, :source_dataset,
attr_reader :error_message

def initialize(params)
Expand All @@ -39,15 +41,18 @@ def initialize(params)
@progress_interval_seconds = @params[:progress_interval_seconds] ||
DEFAULT_PROGRESS_INTERVAL_SECONDS

@dataset = @params[:dataset] || CatalogGenerator::DEFAULT_DATASET
@port = @params[:port] || CatalogGenerator::DEFAULT_PORT
@tag = @params[:tag] || CatalogGenerator::DEFAULT_TAG
@host = @params[:host] || NodeName::DEFAULT_HOST
@port = @params[:port] || NodeName::DEFAULT_PORT
@tag = @params[:tag] || NodeName::DEFAULT_TAG
@dataset = @params[:dataset] || Catalog::Dataset::DEFAULT_NAME

@source_host = @params[:source_host]
@destination_host = @params[:destination_host]
@receiver_host = @params[:receiver_host] || @destination_host
@source_host = @params[:source_host] || @host
@source_port = @params[:source_port] || @port || NodeName::DEFAULT_PORT
@source_tag = @params[:source_tag] || @tag || NodeName::DEFAULT_TAG
@source_dataset = @params[:source_dataset] || @dataset || Catalog::Dataset::DEFAULT_NAME

@receiver_port = @params[:receiver_port]
@receiver_host = @params[:receiver_host] || @host
@receiver_port = @params[:receiver_port] || 0

@client_options = @params[:client_options] || {}

Expand All @@ -59,11 +64,12 @@ def run

absorb_message = {
"type" => "system.absorb-data",
"dataset" => @dataset,
"body" => {
"host" => @source_host,
"port" => @port,
"tag" => @tag,
"dataset" => @dataset,
"port" => @source_port,
"tag" => @source_tag,
"dataset" => @source_dataset,
"messagesPerSecond" => @messages_per_second,
"progressIntervalSeconds" => @progress_interval_seconds,
},
Expand Down Expand Up @@ -98,12 +104,12 @@ def run

def destination_client
options = {
:host => @destination_host,
:host => @host,
:port => @port,
:tag => @tag,
:protocol => :droonga,
:receiver_host => @receiver_host,
:receiver_port => 0,
:receiver_port => @receiver_port,
}.merge(@client_options)
@destination_client ||= Droonga::Client.new(options)
end
Expand Down Expand Up @@ -131,10 +137,10 @@ def source_catalog

def fetch_source_catalog
fetcher = CatalogFetcher.new(:host => @source_host,
:port => @port,
:tag => @tag,
:port => @source_port,
:tag => @source_tag,
:receiver_host => @receiver_host)
fetcher.fetch(:dataset => @dataset)
fetcher.fetch(:dataset => @source_dataset)
end

def log_tag
Expand Down

0 comments on commit 188fe47

Please sign in to comment.