Skip to content

Commit

Permalink
improve readability of stanby-namenode support code
Browse files Browse the repository at this point in the history
  • Loading branch information
y-lan committed May 9, 2013
1 parent e3f71c7 commit d0a9125
Showing 1 changed file with 32 additions and 19 deletions.
51 changes: 32 additions & 19 deletions lib/fluent/plugin/out_webhdfs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def configure(conf)
unless @path.index('/') == 0
raise Fluent::ConfigError, "Path on hdfs MUST starts with '/', but '#{@path}'"
end

@client = prepare_client(@namenode_host, @namenode_port, @username)
if @standby_namenode_host
@client_standby = prepare_client(@standby_namenode_host, @standby_namenode_port, @username)
Expand All @@ -94,6 +94,19 @@ def prepare_client(host, port, username)
client
end

def namenode_available(client)
if client
available = true
begin
client.list('/')
rescue => e
$log.warn "webhdfs check request failed. (host: #{client.host}:#{client.port}, error: #{e.message})"
available = false
end
available
end
end

def start
super

Expand All @@ -114,48 +127,48 @@ def path_format(chunk_key)
Time.strptime(chunk_key, @time_slice_format).strftime(@path)
end

def is_standby_exception(e)
e.is_a?(WebHDFS::IOError) && e.message.match(/org\.apache\.hadoop\.ipc\.StandbyException/)
end

def namenode_failover
if @standby_namenode
@client, @client_standby = @client_standby, @client
@namenode_failovered = true
$log.warn "Namenode failovered, now use #{@client.host}:#{@client.port} ."
$log.warn "Namenode failovered, now use #{@client.host}:#{@client.port}."
end
end

# TODO check conflictions

def send_data(path, data)
try_cnt = 0
begin
@client.append(path, data)
rescue WebHDFS::FileNotFoundError
@client.create(path, data)
rescue WebHDFS::IOError => e
if e.message.match(/org\.apache\.hadoop\.ipc\.StandbyException/) && @client_standby
$log.warn "Seems the connected host is a standy namenode (Maybe due to an auto-failover). Gonna try the standby namenode."
namenode_failover
try_cnt += 1
retry if try_cnt <= 1
end
raise
end
end

def write(chunk)
hdfs_path = path_format(chunk.key)
failovered = false
begin
send_data(hdfs_path, chunk.read)

## reset failover status after one success send
@namenode_failovered = false
rescue
$log.error "failed to communicate hdfs cluster, path: #{hdfs_path}"
if ((@error_history.size + 1) >= @failures_before_use_standby) && @client_standby && !@namenode_failovered
rescue => e
$log.warn "failed to communicate hdfs cluster, path: #{hdfs_path}"
raise e if failovered
if is_standby_exception(e) && namenode_available(@client_standby)
$log.warn "Seems the connected host is a standy namenode (Maybe due to an auto-failover). Gonna try the standby namenode."
namenode_failover
failovered = true
retry
end
if ((@error_history.size + 1) >= @failures_before_use_standby) && namenode_available(@client_standby)
$log.warn "Too many failures. Try to use the standby namenode instead."
namenode_failover
failovered = true
retry
end
raise
raise e
end
hdfs_path
end
Expand Down

0 comments on commit d0a9125

Please sign in to comment.