Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to v0.14 API #44

Merged
merged 21 commits into from
Jan 23, 2017
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fluent-plugin-webhdfs.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Gem::Specification.new do |gem|
gem.add_development_dependency "test-unit"
gem.add_development_dependency "appraisal"
gem.add_development_dependency "snappy", '>= 0.0.13'
gem.add_runtime_dependency "fluentd", '>= 0.10.59'
gem.add_runtime_dependency "fluentd", '>= 0.14.2'
gem.add_runtime_dependency "fluent-mixin-plaintextformatter", '>= 0.2.1'
gem.add_runtime_dependency "fluent-mixin-config-placeholders", ">= 0.3.0"
gem.add_runtime_dependency "webhdfs", '>= 0.6.0'
Expand Down
91 changes: 40 additions & 51 deletions lib/fluent/plugin/out_webhdfs.rb
Original file line number Diff line number Diff line change
@@ -1,77 +1,76 @@
# -*- coding: utf-8 -*-

require 'net/http'
require 'time'
require 'webhdfs'
require 'tempfile'
require 'fluent/plugin/output'

require 'fluent/mixin/config_placeholders'
require 'fluent/mixin/plaintextformatter'

class Fluent::WebHDFSOutput < Fluent::TimeSlicedOutput
class Fluent::Plugin::WebHDFSOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output('webhdfs', self)

config_set_default :buffer_type, 'memory'
config_set_default :time_slice_format, '%Y%m%d'
helpers :compat_parameters

# For fluentd v0.12.16 or earlier
class << self
unless method_defined?(:desc)
def desc(description)
end
end
config_section :buffer do
config_set_default :@type, 'memory'
end

desc 'WebHDFS/HttpFs host'
config_param :host, :string, :default => nil
config_param :host, :string, default: nil
desc 'WebHDFS/HttpFs port'
config_param :port, :integer, :default => 50070
config_param :port, :integer, default: 50070
desc 'Namenode (host:port)'
config_param :namenode, :string, :default => nil # host:port
config_param :namenode, :string, default: nil # host:port
desc 'Standby namenode for Namenode HA (host:port)'
config_param :standby_namenode, :string, :default => nil # host:port
config_param :standby_namenode, :string, default: nil # host:port

desc 'Ignore errors on start up'
config_param :ignore_start_check_error, :bool, :default => false
config_param :ignore_start_check_error, :bool, default: false

include Fluent::Mixin::ConfigPlaceholders

desc 'Output file path on HDFS'
config_param :path, :string
desc 'User name for pseudo authentication'
config_param :username, :string, :default => nil
config_param :username, :string, default: nil

desc 'Store data over HttpFs instead of WebHDFS'
config_param :httpfs, :bool, :default => false
config_param :httpfs, :bool, default: false

desc 'Number of seconds to wait for the connection to open'
config_param :open_timeout, :integer, :default => 30 # from ruby net/http default
config_param :open_timeout, :integer, default: 30 # from ruby net/http default
desc 'Number of seconds to wait for one block to be read'
config_param :read_timeout, :integer, :default => 60 # from ruby net/http default
config_param :read_timeout, :integer, default: 60 # from ruby net/http default

desc 'Retry automatically when known errors of HDFS are occurred'
config_param :retry_known_errors, :bool, :default => false
config_param :retry_known_errors, :bool, default: false
desc 'Retry interval'
config_param :retry_interval, :integer, :default => nil
config_param :retry_interval, :integer, default: nil
desc 'The number of retries'
config_param :retry_times, :integer, :default => nil
config_param :retry_times, :integer, default: nil

# how many times of write failure before switch to standby namenode
# by default it's 11 times that costs 1023 seconds inside fluentd,
# which is considered enough to exclude the scenes that caused by temporary network fail or single datanode fail
desc 'How many times of write failure before switch to standby namenode'
config_param :failures_before_use_standby, :integer, :default => 11
config_param :failures_before_use_standby, :integer, default: 11

include Fluent::Mixin::PlainTextFormatter

config_param :default_tag, :string, :default => 'tag_missing'
config_param :default_tag, :string, default: 'tag_missing'

desc 'Append data or not'
config_param :append, :bool, :default => true
config_param :append, :bool, default: true

desc 'Use SSL or not'
config_param :ssl, :bool, :default => false
config_param :ssl, :bool, default: false
desc 'OpenSSL certificate authority file'
config_param :ssl_ca_file, :string, :default => nil
config_param :ssl_ca_file, :string, default: nil
desc 'OpenSSL verify mode (none,peer)'
config_param :ssl_verify_mode, :default => nil do |val|
config_param :ssl_verify_mode, default: nil do |val|
case val
when 'none'
:none
Expand All @@ -83,11 +82,11 @@ def desc(description)
end

desc 'Use kerberos authentication or not'
config_param :kerberos, :bool, :default => false
config_param :kerberos, :bool, default: false

SUPPORTED_COMPRESS = ['gzip', 'bzip2', 'snappy', 'lzo_command', 'text']
desc "Compress method (#{SUPPORTED_COMPRESS.join(',')})"
config_param :compress, :default => nil do |val|
config_param :compress, default: nil do |val|
unless SUPPORTED_COMPRESS.include? val
raise Fluent::ConfigError, "unsupported compress: #{val}"
end
Expand All @@ -100,28 +99,18 @@ def desc(description)

def initialize
super
require 'net/http'
require 'time'
require 'webhdfs'

@compressor = nil
end

# Define `log` method for v0.10.42 or earlier
unless method_defined?(:log)
define_method("log") { $log }
end

def configure(conf)
if conf['path']
if conf['path'].index('%S')
conf['time_slice_format'] = '%Y%m%d%H%M%S'
elsif conf['path'].index('%M')
conf['time_slice_format'] = '%Y%m%d%H%M'
elsif conf['path'].index('%H')
conf['time_slice_format'] = '%Y%m%d%H'
end
end
conf["time_slice_format"] = case conf["path"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way of v0.14 API is to set <buffer> timekey </buffer> if arg of <buffer> includes "time" and timekey is not set.

when /%S/ then "%Y%m%d%H%M%S"
when /%M/ then "%Y%m%d%H%M"
when /%H/ then "%Y%m%d%H"
else "%Y%m%d"
end

compat_parameters_convert(conf, :buffer)

super

Expand Down Expand Up @@ -234,8 +223,8 @@ def shutdown
super
end

def path_format(chunk_key)
Time.strptime(chunk_key, @time_slice_format).strftime(@path)
def path_format(metadata)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method still needed? (instead of using extract_placeholders directly)

extract_placeholders(@path, metadata)
end

def is_standby_exception(e)
Expand Down Expand Up @@ -269,9 +258,9 @@ def send_data(path, data)

def generate_path(chunk)
hdfs_path = if @append
path_format(chunk.key)
path_format(chunk.metadata)
else
path_format(chunk.key).gsub(CHUNK_ID_PLACE_HOLDER, chunk_unique_id_to_str(chunk.unique_id))
path_format(chunk.metadata).gsub(CHUNK_ID_PLACE_HOLDER, chunk_unique_id_to_str(chunk.unique_id))
Copy link
Member

@tagomoris tagomoris Aug 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fluentd v0.14 has dump_unique_id_hex method in lib/fluent/unique_id.rb.
We can delete chunk_unique_id_to_str method definition.

end
hdfs_path = "#{hdfs_path}#{@compressor.ext}"
hdfs_path
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/webhdfs_compressor_bzip2.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Fluent
class WebHDFSOutput < Fluent::TimeSlicedOutput
module Fluent::Plugin
class WebHDFSOutput < Output
class Bzip2Compressor < Compressor
WebHDFSOutput.register_compressor('bzip2', self)

Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/webhdfs_compressor_gzip.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Fluent
class WebHDFSOutput < Fluent::TimeSlicedOutput
module Fluent::Plugin
class WebHDFSOutput < Output
class GzipCompressor < Compressor
WebHDFSOutput.register_compressor('gzip', self)

Expand Down
6 changes: 3 additions & 3 deletions lib/fluent/plugin/webhdfs_compressor_lzo_command.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module Fluent
class WebHDFSOutput
module Fluent::Plugin
class WebHDFSOutput < Output
class LZOCommandCompressor < Compressor
WebHDFSOutput.register_compressor('lzo_command', self)

config_param :command_parameter, :string, :default => '-qf1'
config_param :command_parameter, :string, default: '-qf1'

def configure(conf)
super
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/webhdfs_compressor_snappy.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Fluent
class WebHDFSOutput < Fluent::TimeSlicedOutput
module Fluent::Plugin
class WebHDFSOutput < Output
class SnappyCompressor < Compressor
WebHDFSOutput.register_compressor('snappy', self)

Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/webhdfs_compressor_text.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Fluent
class WebHDFSOutput < Fluent::TimeSlicedOutput
module Fluent::Plugin
class WebHDFSOutput < Output
class TextCompressor < Compressor
WebHDFSOutput.register_compressor('text', self)

Expand Down
4 changes: 4 additions & 0 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
$LOAD_PATH.unshift(File.dirname(__FILE__))
require 'fluent/test'
require 'fluent/test/helpers'
require 'fluent/test/driver/output'
unless ENV.has_key?('VERBOSE')
nulllogger = Object.new
nulllogger.instance_eval {|obj|
Expand All @@ -22,6 +24,8 @@ def method_missing(method, *args)
$log = nulllogger
end

include Fluent::Test::Helpers

require 'fluent/plugin/out_webhdfs'

class Test::Unit::TestCase
Expand Down
6 changes: 3 additions & 3 deletions test/plugin/test_compressor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ class Snappy < self
def setup
omit unless Object.const_defined?(:Snappy)
Fluent::Test.setup
@compressor = Fluent::WebHDFSOutput::SnappyCompressor.new
@compressor = Fluent::Plugin::WebHDFSOutput::SnappyCompressor.new
end

def create_driver(conf=CONFIG,tag='test')
Fluent::Test::OutputTestDriver.new(Fluent::WebHDFSOutput, tag).configure(conf)
def create_driver(conf = CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::WebHDFSOutput).configure(conf)
end

def test_ext
Expand Down
26 changes: 12 additions & 14 deletions test/plugin/test_out_webhdfs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ def setup
Fluent::Test.setup
end

def create_driver(conf=CONFIG,tag='test')
Fluent::Test::OutputTestDriver.new(Fluent::WebHDFSOutput, tag).configure(conf)
def create_driver(conf = CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::WebHDFSOutput).configure(conf)
end

class ConfigureTest < self
Expand All @@ -19,7 +19,6 @@ def test_default
assert_equal 'namenode.local', d.instance.instance_eval{ @namenode_host }
assert_equal 50070, d.instance.instance_eval{ @namenode_port }
assert_equal '/hdfs/path/file.%Y%m%d.log', d.instance.path
assert_equal '%Y%m%d', d.instance.time_slice_format
assert_equal false, d.instance.httpfs
assert_nil d.instance.username
assert_equal false, d.instance.ignore_start_check_error
Expand All @@ -43,7 +42,6 @@ def test_httpfs
assert_equal 'server.local', d.instance.instance_eval{ @namenode_host }
assert_equal 14000, d.instance.instance_eval{ @namenode_port }
assert_equal '/hdfs/path/file.%Y%m%d.%H%M.log', d.instance.path
assert_equal '%Y%m%d%H%M', d.instance.time_slice_format
assert_equal true, d.instance.httpfs
assert_equal 'hdfs_user', d.instance.username
end
Expand All @@ -60,17 +58,16 @@ def test_ssl
assert_equal 'server.local', d.instance.instance_eval{ @namenode_host }
assert_equal 14000, d.instance.instance_eval{ @namenode_port }
assert_equal '/hdfs/path/file.%Y%m%d.%H%M.log', d.instance.path
assert_equal '%Y%m%d%H%M', d.instance.time_slice_format
assert_equal true, d.instance.ssl
assert_equal '/path/to/ca_file.pem', d.instance.ssl_ca_file
assert_equal :peer, d.instance.ssl_verify_mode
assert_equal true, d.instance.kerberos
end

data(gzip: ['gzip', Fluent::WebHDFSOutput::GzipCompressor],
bzip2: ['bzip2', Fluent::WebHDFSOutput::Bzip2Compressor],
snappy: ['snappy', Fluent::WebHDFSOutput::SnappyCompressor],
lzo: ['lzo_command', Fluent::WebHDFSOutput::LZOCommandCompressor])
data(gzip: ['gzip', Fluent::Plugin::WebHDFSOutput::GzipCompressor],
bzip2: ['bzip2', Fluent::Plugin::WebHDFSOutput::Bzip2Compressor],
snappy: ['snappy', Fluent::Plugin::WebHDFSOutput::SnappyCompressor],
lzo: ['lzo_command', Fluent::Plugin::WebHDFSOutput::LZOCommandCompressor])
def test_compress(data)
compress_type, compressor_class = data
begin
Expand All @@ -85,7 +82,6 @@ def test_compress(data)
assert_equal 'server.local', d.instance.instance_eval{ @namenode_host }
assert_equal 14000, d.instance.instance_eval{ @namenode_port }
assert_equal '/hdfs/path/file.%Y%m%d.%H%M.log', d.instance.path
assert_equal '%Y%m%d%H%M', d.instance.time_slice_format
assert_equal compress_type, d.instance.compress
assert_equal compressor_class, d.instance.compressor.class
end
Expand All @@ -102,19 +98,21 @@ def test_placeholders
class PathFormatTest < self
def test_default
d = create_driver
time = event_time("2012-07-18 15:03:00 +0900")
metadata = d.instance.metadata("test", time, {})
assert_equal '/hdfs/path/file.%Y%m%d.log', d.instance.path
assert_equal '%Y%m%d', d.instance.time_slice_format
assert_equal '/hdfs/path/file.20120718.log', d.instance.path_format('20120718')
assert_equal '/hdfs/path/file.20120718.log', d.instance.path_format(metadata)
end

def test_time_slice_format
d = create_driver %[
namenode server.local:14000
path /hdfs/path/file.%Y%m%d.%H%M.log
]
time = event_time("2012-07-18 15:03:00 +0900")
metadata = d.instance.metadata("test", time, {})
assert_equal '/hdfs/path/file.%Y%m%d.%H%M.log', d.instance.path
assert_equal '%Y%m%d%H%M', d.instance.time_slice_format
assert_equal '/hdfs/path/file.20120718.1503.log', d.instance.path_format('201207181503')
assert_equal '/hdfs/path/file.20120718.1503.log', d.instance.path_format(metadata)
end
end

Expand Down