Navigation Menu

Skip to content

Commit

Permalink
Add "join" serf event (experimental)
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Jun 28, 2014
1 parent a94528c commit a020e40
Showing 1 changed file with 62 additions and 9 deletions.
71 changes: 62 additions & 9 deletions lib/droonga/command/serf_event_handler.rb
Expand Up @@ -70,6 +70,8 @@ def process_event
case @event_sub_name
when "change_role"
save_status(:role, @payload["role"])
when "join"
join
when "set_replicas"
set_replicas
when "add_replicas"
Expand All @@ -92,15 +94,62 @@ def given_hosts
hosts
end

def join
type = @payload["type"]
case type
when "replica"
join_as_replica
end
end

def join_as_replica
source = @payload["source"]
return unless soruce

generator = create_current_catalog_generator
dataset = generator.dataset_for_host(source)
return unless dataset

dataset_name = dataset.name
tag = dataset.tag
port = dataset.port
other_hosts = dataset.hosts

if @payload["copy"]
modify_catalog do |modifier|
modifier.datasets[dataset].replicas.hosts = [host]
end
sleep(1) # wait for restart

DataAbsorber.absorb(:dataset => dataset,
:source_host => source,
:destination_host => host,
:port => port,
:tag => tag)
sleep(1)
end

modify_catalog do |modifier|
modifier.datasets[dataset].replicas.hosts += other_hosts
modifier.datasets[dataset].replicas.hosts.uniq!
end
sleep(1) # wait for restart

source_node = "#{source}:#{port}/#{tag}"
Serf.send_event(source_node, "add_replicas",
"dataset" => dataset,
"hosts" => [host])
end

def set_replicas
dataset = @payload["dataset"]
return unless dataset

hosts = given_hosts
return unless hosts

modify_catalog do |generator|
generator.datasets[dataset].replicas.hosts = hosts
modify_catalog do |modifier|
modifier.datasets[dataset].replicas.hosts = hosts
end
end

Expand All @@ -111,9 +160,9 @@ def add_replicas
hosts = given_hosts
return unless hosts

modify_catalog do |generator|
generator.datasets[dataset].replicas.hosts += hosts
generator.datasets[dataset].replicas.hosts.uniq!
modify_catalog do |modifier|
modifier.datasets[dataset].replicas.hosts += hosts
modifier.datasets[dataset].replicas.hosts.uniq!
end
end

Expand All @@ -124,17 +173,21 @@ def remove_replica
hosts = given_hosts
return unless hosts

modify_catalog do |generator|
generator.datasets[dataset].replicas.hosts -= hosts
modify_catalog do |modifier|
modifier.datasets[dataset].replicas.hosts -= hosts
end
end

def modify_catalog
generator = create_current_catalog_generator
yield(generator)
SafeFileWriter.write(Path.catalog, JSON.pretty_generate(generator.catalog))
end

def create_current_catalog_generator
current_catalog = JSON.parse(Path.catalog.read)
generator = CatalogGenerator.new
generator.load(current_catalog)
yield(generator)
SafeFileWriter.write(Path.catalog, JSON.pretty_generate(generator.catalog))
end

def absorb_data
Expand Down

0 comments on commit a020e40

Please sign in to comment.