Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to v0.14 API #44

Merged
merged 21 commits into from
Jan 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ sudo: false
language: ruby

rvm:
- 2.0.0
- 2.1
- 2.2
- 2.3.0
- 2.3.1

branches:
only:
Expand All @@ -23,12 +22,4 @@ script: bundle exec rake test

gemfile:
- Gemfile
- gemfiles/fluentd_v0.12.gemfile
- gemfiles/fluentd_v0.14.gemfile

matrix:
exclude:
- rvm: 2.0.0
gemfile: Gemfile
- rvm: 2.0.0
gemfile: gemfiles/fluentd_v0.14.gemfile
3 changes: 2 additions & 1 deletion fluent-plugin-webhdfs.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ Gem::Specification.new do |gem|

gem.add_development_dependency "rake"
gem.add_development_dependency "test-unit"
gem.add_development_dependency "test-unit-rr"
gem.add_development_dependency "appraisal"
gem.add_development_dependency "snappy", '>= 0.0.13'
gem.add_runtime_dependency "fluentd", '>= 0.10.59'
gem.add_runtime_dependency "fluentd", '>= 0.14.2'
gem.add_runtime_dependency "fluent-mixin-plaintextformatter", '>= 0.2.1'
gem.add_runtime_dependency "fluent-mixin-config-placeholders", ">= 0.3.0"
gem.add_runtime_dependency "webhdfs", '>= 0.6.0'
Expand Down
108 changes: 44 additions & 64 deletions lib/fluent/plugin/out_webhdfs.rb
Original file line number Diff line number Diff line change
@@ -1,77 +1,73 @@
# -*- coding: utf-8 -*-

require 'net/http'
require 'time'
require 'webhdfs'
require 'tempfile'
require 'fluent/config/element'
require 'fluent/plugin/output'

require 'fluent/mixin/config_placeholders'
require 'fluent/mixin/plaintextformatter'

class Fluent::WebHDFSOutput < Fluent::TimeSlicedOutput
class Fluent::Plugin::WebHDFSOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output('webhdfs', self)

config_set_default :buffer_type, 'memory'
config_set_default :time_slice_format, '%Y%m%d'

# For fluentd v0.12.16 or earlier
class << self
unless method_defined?(:desc)
def desc(description)
end
end
end
helpers :compat_parameters

desc 'WebHDFS/HttpFs host'
config_param :host, :string, :default => nil
config_param :host, :string, default: nil
desc 'WebHDFS/HttpFs port'
config_param :port, :integer, :default => 50070
config_param :port, :integer, default: 50070
desc 'Namenode (host:port)'
config_param :namenode, :string, :default => nil # host:port
config_param :namenode, :string, default: nil # host:port
desc 'Standby namenode for Namenode HA (host:port)'
config_param :standby_namenode, :string, :default => nil # host:port
config_param :standby_namenode, :string, default: nil # host:port

desc 'Ignore errors on start up'
config_param :ignore_start_check_error, :bool, :default => false
config_param :ignore_start_check_error, :bool, default: false

include Fluent::Mixin::ConfigPlaceholders

desc 'Output file path on HDFS'
config_param :path, :string
desc 'User name for pseudo authentication'
config_param :username, :string, :default => nil
config_param :username, :string, default: nil

desc 'Store data over HttpFs instead of WebHDFS'
config_param :httpfs, :bool, :default => false
config_param :httpfs, :bool, default: false

desc 'Number of seconds to wait for the connection to open'
config_param :open_timeout, :integer, :default => 30 # from ruby net/http default
config_param :open_timeout, :integer, default: 30 # from ruby net/http default
desc 'Number of seconds to wait for one block to be read'
config_param :read_timeout, :integer, :default => 60 # from ruby net/http default
config_param :read_timeout, :integer, default: 60 # from ruby net/http default

desc 'Retry automatically when known errors of HDFS are occurred'
config_param :retry_known_errors, :bool, :default => false
config_param :retry_known_errors, :bool, default: false
desc 'Retry interval'
config_param :retry_interval, :integer, :default => nil
config_param :retry_interval, :integer, default: nil
desc 'The number of retries'
config_param :retry_times, :integer, :default => nil
config_param :retry_times, :integer, default: nil

# how many times of write failure before switch to standby namenode
# by default it's 11 times that costs 1023 seconds inside fluentd,
# which is considered enough to exclude the scenes that caused by temporary network fail or single datanode fail
desc 'How many times of write failure before switch to standby namenode'
config_param :failures_before_use_standby, :integer, :default => 11
config_param :failures_before_use_standby, :integer, default: 11

include Fluent::Mixin::PlainTextFormatter

config_param :default_tag, :string, :default => 'tag_missing'
config_param :default_tag, :string, default: 'tag_missing'

desc 'Append data or not'
config_param :append, :bool, :default => true
config_param :append, :bool, default: true

desc 'Use SSL or not'
config_param :ssl, :bool, :default => false
config_param :ssl, :bool, default: false
desc 'OpenSSL certificate authority file'
config_param :ssl_ca_file, :string, :default => nil
config_param :ssl_ca_file, :string, default: nil
desc 'OpenSSL verify mode (none,peer)'
config_param :ssl_verify_mode, :default => nil do |val|
config_param :ssl_verify_mode, default: nil do |val|
case val
when 'none'
:none
Expand All @@ -83,11 +79,11 @@ def desc(description)
end

desc 'Use kerberos authentication or not'
config_param :kerberos, :bool, :default => false
config_param :kerberos, :bool, default: false

SUPPORTED_COMPRESS = ['gzip', 'bzip2', 'snappy', 'lzo_command', 'text']
desc "Compress method (#{SUPPORTED_COMPRESS.join(',')})"
config_param :compress, :default => nil do |val|
config_param :compress, default: nil do |val|
unless SUPPORTED_COMPRESS.include? val
raise Fluent::ConfigError, "unsupported compress: #{val}"
end
Expand All @@ -100,28 +96,24 @@ def desc(description)

def initialize
super
require 'net/http'
require 'time'
require 'webhdfs'

@compressor = nil
end

# Define `log` method for v0.10.42 or earlier
unless method_defined?(:log)
define_method("log") { $log }
end

def configure(conf)
if conf['path']
if conf['path'].index('%S')
conf['time_slice_format'] = '%Y%m%d%H%M%S'
elsif conf['path'].index('%M')
conf['time_slice_format'] = '%Y%m%d%H%M'
elsif conf['path'].index('%H')
conf['time_slice_format'] = '%Y%m%d%H'
end
compat_parameters_convert(conf, :buffer, default_chunk_key: "time")

timekey = case conf["path"]
when /%S/ then 1
when /%M/ then 60
when /%H/ then 3600
else 86400
end
if conf.elements(name: "buffer", arg: "time").empty?
e = Fluent::Config::Element.new("buffer", "time", {}, [])
conf.elements << e
end
buffer_config = conf.elements(name: "buffer", arg: "time").first
buffer_config["timekey"] = timekey unless buffer_config["timekey"]

super

Expand All @@ -130,7 +122,7 @@ def configure(conf)
rescue Fluent::ConfigError
raise
rescue
$log.warn "#{@comress} not found. Use 'text' instead"
log.warn "#{@comress} not found. Use 'text' instead"
@compressor = COMPRESSOR_REGISTRY.lookup('text').new
end

Expand Down Expand Up @@ -167,7 +159,7 @@ def configure(conf)
@client_standby = nil
end

if not @append
unless @append
if @path.index(CHUNK_ID_PLACE_HOLDER).nil?
raise Fluent::ConfigError, "path must contain ${chunk_id}, which is the placeholder for chunk_id, when append is set to false."
end
Expand Down Expand Up @@ -230,14 +222,6 @@ def start
end
end

def shutdown
super
end

def path_format(chunk_key)
Time.strptime(chunk_key, @time_slice_format).strftime(@path)
end

def is_standby_exception(e)
e.is_a?(WebHDFS::IOError) && e.message.match(/org\.apache\.hadoop\.ipc\.StandbyException/)
end
Expand All @@ -249,10 +233,6 @@ def namenode_failover
end
end

def chunk_unique_id_to_str(unique_id)
unique_id.unpack('C*').map{|x| x.to_s(16).rjust(2,'0')}.join('')
end

# TODO check conflictions

def send_data(path, data)
Expand All @@ -269,9 +249,9 @@ def send_data(path, data)

def generate_path(chunk)
hdfs_path = if @append
path_format(chunk.key)
extract_placeholders(@path, chunk.metadata)
else
path_format(chunk.key).gsub(CHUNK_ID_PLACE_HOLDER, chunk_unique_id_to_str(chunk.unique_id))
extract_placeholders(@path, chunk.metadata).gsub(CHUNK_ID_PLACE_HOLDER, dump_unique_id(chunk.unique_id))
end
hdfs_path = "#{hdfs_path}#{@compressor.ext}"
hdfs_path
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/webhdfs_compressor_bzip2.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Fluent
class WebHDFSOutput < Fluent::TimeSlicedOutput
module Fluent::Plugin
class WebHDFSOutput < Output
class Bzip2Compressor < Compressor
WebHDFSOutput.register_compressor('bzip2', self)

Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/webhdfs_compressor_gzip.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Fluent
class WebHDFSOutput < Fluent::TimeSlicedOutput
module Fluent::Plugin
class WebHDFSOutput < Output
class GzipCompressor < Compressor
WebHDFSOutput.register_compressor('gzip', self)

Expand Down
6 changes: 3 additions & 3 deletions lib/fluent/plugin/webhdfs_compressor_lzo_command.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module Fluent
class WebHDFSOutput
module Fluent::Plugin
class WebHDFSOutput < Output
class LZOCommandCompressor < Compressor
WebHDFSOutput.register_compressor('lzo_command', self)

config_param :command_parameter, :string, :default => '-qf1'
config_param :command_parameter, :string, default: '-qf1'

def configure(conf)
super
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/webhdfs_compressor_snappy.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Fluent
class WebHDFSOutput < Fluent::TimeSlicedOutput
module Fluent::Plugin
class WebHDFSOutput < Output
class SnappyCompressor < Compressor
WebHDFSOutput.register_compressor('snappy', self)

Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/webhdfs_compressor_text.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Fluent
class WebHDFSOutput < Fluent::TimeSlicedOutput
module Fluent::Plugin
class WebHDFSOutput < Output
class TextCompressor < Compressor
WebHDFSOutput.register_compressor('text', self)

Expand Down
5 changes: 5 additions & 0 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
exit e.status_code
end
require 'test/unit'
require 'test/unit/rr'

$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
$LOAD_PATH.unshift(File.dirname(__FILE__))
require 'fluent/test'
require 'fluent/test/helpers'
require 'fluent/test/driver/output'
unless ENV.has_key?('VERBOSE')
nulllogger = Object.new
nulllogger.instance_eval {|obj|
Expand All @@ -22,6 +25,8 @@ def method_missing(method, *args)
$log = nulllogger
end

include Fluent::Test::Helpers

require 'fluent/plugin/out_webhdfs'

class Test::Unit::TestCase
Expand Down
6 changes: 3 additions & 3 deletions test/plugin/test_compressor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ class Snappy < self
def setup
omit unless Object.const_defined?(:Snappy)
Fluent::Test.setup
@compressor = Fluent::WebHDFSOutput::SnappyCompressor.new
@compressor = Fluent::Plugin::WebHDFSOutput::SnappyCompressor.new
end

def create_driver(conf=CONFIG,tag='test')
Fluent::Test::OutputTestDriver.new(Fluent::WebHDFSOutput, tag).configure(conf)
def create_driver(conf = CONFIG)
Fluent::Test::Driver::Output.new(Fluent::Plugin::WebHDFSOutput).configure(conf)
end

def test_ext
Expand Down
Loading