Permalink
Browse files

add tests and documents

  • Loading branch information...
tagomoris committed Jul 18, 2012
1 parent 01302c9 commit aea9469d10d811d5650e730353f26192e5e6f031
Showing with 112 additions and 13 deletions.
  1. +9 −0 Rakefile
  2. +17 −13 lib/fluent/plugin/out_webhdfs.rb
  3. +28 −0 test/helper.rb
  4. +58 −0 test/plugin/test_out_webhdfs.rb
View
@@ -1,2 +1,11 @@
#!/usr/bin/env rake
require "bundler/gem_tasks"
+
+require 'rake/testtask'
+Rake::TestTask.new(:test) do |test|
+ test.libs << 'lib' << 'test'
+ test.pattern = 'test/**/test_*.rb'
+ test.verbose = true
+end
+
+task :default => :test
@@ -8,14 +8,19 @@ class Fluent::WebHDFSOutput < Fluent::TimeSlicedOutput
config_set_default :buffer_type, 'memory'
config_set_default :time_slice_format, '%Y%m%d'
- config_param :namenode, :string # host:port
+ config_param :host, :string, :default => nil
+ config_param :port, :integer, :default => 50070
+ config_param :namenode, :string, :default => nil # host:port
+
config_param :path, :string
config_param :username, :string, :default => nil
config_param :httpfs, :bool, :default => false
include Fluent::Mixin::PlainTextFormatter
+ config_param :default_tag, :string, :default => 'tag_missing'
+
def initialize
super
require 'net/http'
@@ -36,11 +41,18 @@ def configure(conf)
super
- unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ @namenode
- raise Fluent::ConfigError, "Invalid config value about namenode: '#{@namenode}', needs NAMENODE_NAME:PORT"
+ if @host
+ @namenode_host = @host
+ @namenode_port = @port
+ elsif @namenode
+ unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ @namenode
+ raise Fluent::ConfigError, "Invalid config value about namenode: '#{@namenode}', needs NAMENODE_NAME:PORT"
+ end
+ @namenode_host = $1
+ @namenode_port = $2.to_i
+ else
+ raise Fluent::ConfigError, "WebHDFS host or namenode missing"
end
- @namenode_host = $1
- @namenode_port = $2.to_i
unless @path.index('/') == 0
raise Fluent::ConfigError, "Path on hdfs MUST starts with '/', but '#{@path}'"
end
@@ -52,7 +64,6 @@ def configure(conf)
if @httpfs
@client.httpfs_mode = true
end
- @mutex = Mutex.new
end
def start
@@ -73,13 +84,6 @@ def shutdown
super
end
- def record_to_string(record)
- record.to_json
- end
-
- # def format(tag, time, record)
- # end
-
def path_format(chunk_key)
Time.strptime(chunk_key, @time_slice_format).strftime(@path)
end
View
@@ -0,0 +1,28 @@
+require 'rubygems'
+require 'bundler'
+begin
+ Bundler.setup(:default, :development)
+rescue Bundler::BundlerError => e
+ $stderr.puts e.message
+ $stderr.puts "Run `bundle install` to install missing gems"
+ exit e.status_code
+end
+require 'test/unit'
+
+$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
+$LOAD_PATH.unshift(File.dirname(__FILE__))
+require 'fluent/test'
+unless ENV.has_key?('VERBOSE')
+ nulllogger = Object.new
+ nulllogger.instance_eval {|obj|
+ def method_missing(method, *args)
+ # pass
+ end
+ }
+ $log = nulllogger
+end
+
+require 'fluent/plugin/out_webhdfs'
+
+class Test::Unit::TestCase
+end
@@ -0,0 +1,58 @@
+require 'helper'
+
+class WebHDFSOutputTest < Test::Unit::TestCase
+ CONFIG = %[
+host namenode.local
+path /hdfs/path/file.%Y%m%d.log
+ ]
+
+ def create_driver(conf=CONFIG,tag='test')
+ Fluent::Test::OutputTestDriver.new(Fluent::WebHDFSOutput, tag).configure(conf)
+ end
+
+ def test_configure
+ d = create_driver
+ assert_equal 'namenode.local', d.instance.instance_eval{ @namenode_host }
+ assert_equal 50070, d.instance.instance_eval{ @namenode_port }
+ assert_equal '/hdfs/path/file.%Y%m%d.log', d.instance.path
+ assert_equal '%Y%m%d', d.instance.time_slice_format
+ assert_equal false, d.instance.httpfs
+ assert_nil d.instance.username
+
+ assert_equal true, d.instance.output_include_time
+ assert_equal true, d.instance.output_include_tag
+ assert_equal 'json', d.instance.output_data_type
+ assert_nil d.instance.remove_prefix
+ assert_equal 'TAB', d.instance.field_separator
+ assert_equal true, d.instance.add_newline
+ assert_equal 'tag_missing', d.instance.default_tag
+
+ d = create_driver %[
+namenode server.local:14000
+path /hdfs/path/file.%Y%m%d.%H%M.log
+httpfs yes
+username hdfs_user
+]
+ assert_equal 'server.local', d.instance.instance_eval{ @namenode_host }
+ assert_equal 14000, d.instance.instance_eval{ @namenode_port }
+ assert_equal '/hdfs/path/file.%Y%m%d.%H%M.log', d.instance.path
+ assert_equal '%Y%m%d%H%M', d.instance.time_slice_format
+ assert_equal true, d.instance.httpfs
+ assert_equal 'hdfs_user', d.instance.username
+ end
+
+ def test_path_format
+ d = create_driver
+ assert_equal '/hdfs/path/file.%Y%m%d.log', d.instance.path
+ assert_equal '%Y%m%d', d.instance.time_slice_format
+ assert_equal '/hdfs/path/file.20120718.log', d.instance.path_format('20120718')
+
+ d = create_driver %[
+namenode server.local:14000
+path /hdfs/path/file.%Y%m%d.%H%M.log
+]
+ assert_equal '/hdfs/path/file.%Y%m%d.%H%M.log', d.instance.path
+ assert_equal '%Y%m%d%H%M', d.instance.time_slice_format
+ assert_equal '/hdfs/path/file.20120718.1503.log', d.instance.path_format('201207181503')
+ end
+end

0 comments on commit aea9469

Please sign in to comment.