Permalink
Browse files

implement alpha ver

  • Loading branch information...
tagomoris committed May 20, 2012
1 parent 177409d commit 4d3c2aa46ad8472ee0d54b34b9698d8240bab2b7
Showing with 342 additions and 2 deletions.
  1. +13 −0 LICENSE.txt
  2. +4 −2 fluent-plugin-webhdfs.gemspec
  3. +207 −0 lib/fluent/plugin/ext_mixin.rb
  4. +118 −0 lib/fluent/plugin/out_webhdfs.rb
View
@@ -0,0 +1,13 @@
+Copyright (c) 2012- TAGOMORI Satoshi
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
@@ -4,8 +4,8 @@ Gem::Specification.new do |gem|
gem.version = "0.0.1"
gem.authors = ["TAGOMORI Satoshi"]
gem.email = ["tagomoris@gmail.com"]
- gem.summary = %q{TODO: Write a gem summary}
- gem.description = %q{TODO: Write a gem description}
+ gem.summary = %q{Fluentd plugin to write data on HDFS over WebHDFS, with flexible formatting}
+ gem.description = %q{Fluentd plugin to write data on HDFS over WebHDFS}
gem.homepage = "https://github.com/tagomoris/fluent-plugin-webhdfs"
gem.files = `git ls-files`.split($\)
@@ -14,5 +14,7 @@ Gem::Specification.new do |gem|
gem.require_paths = ["lib"]
gem.add_development_dependency "fluentd"
+ gem.add_development_dependency "webhdfs"
gem.add_runtime_dependency "fluentd"
+ gem.add_runtime_dependency "webhdfs"
end
@@ -0,0 +1,207 @@
+module FluentExt; end
+
+module FluentExt::PlainTextFormatterMixin
+ #TODO: tests!
+
+ # config_param :output_data_type, :string, :default => 'json' # or 'attr:field' or 'attr:field1,field2,field3(...)'
+
+ attr_accessor :output_include_time, :output_include_tag, :output_data_type
+ attr_accessor :add_newline, :field_separator
+ attr_accessor :remove_prefix, :default_tag
+
+ attr_accessor :f_separator
+
+ def configure(conf)
+ super
+
+ @output_include_time = Fluent::Config.bool_value(conf['output_include_time'])
+ @output_include_time = true if @output_include_time.nil?
+
+ @output_include_tag = Fluent::Config.bool_value(conf['output_include_tag'])
+ @output_include_tag = true if @output_include_tag.nil?
+
+ @output_data_type = conf['output_data_type']
+ @output_data_type = 'json' if @output_data_type.nil?
+
+ @f_separator = case conf['field_separator']
+ when 'SPACE' then ' '
+ when 'COMMA' then ','
+ else "\t"
+ end
+ @add_newline = Fluent::Config.bool_value(conf['add_newline'])
+ if @add_newline.nil?
+ @add_newline = true
+ end
+
+ @remove_prefix = conf['remove_prefix']
+ if @remove_prefix
+ @removed_prefix_string = @remove_prefix + '.'
+ @removed_length = @removed_prefix_string.length
+ end
+ if @output_include_tag and @remove_prefix and @remove_prefix.length > 0
+ @default_tag = conf['default_tag']
+ if @default_tag.nil? or @default_tag.length < 1
+ raise Fluent::ConfigError, "Missing 'default_tag' with output_include_tag and remove_prefix."
+ end
+ end
+
+ # default timezone: utc
+ if conf['localtime'].nil? and conf['utc'].nil?
+ @utc = true
+ @localtime = false
+ elsif not @localtime and not @utc
+ @utc = true
+ @localtime = false
+ end
+ # mix-in default time formatter (or you can overwrite @timef on your own configure)
+ @timef = @output_include_time ? Fluent::TimeFormatter.new(@time_format, @localtime) : nil
+
+ @custom_attributes = []
+ if @output_data_type == 'json'
+ self.instance_eval {
+ def stringify_record(record)
+ record.to_json
+ end
+ }
+ elsif @output_data_type =~ /^attr:(.*)$/
+ @custom_attributes = $1.split(',')
+ if @custom_attributes.size > 1
+ self.instance_eval {
+ def stringify_record(record)
+ @custom_attributes.map{|attr| (record[attr] || 'NULL').to_s}.join(@f_separator)
+ end
+ }
+ elsif @custom_attributes.size == 1
+ self.instance_eval {
+ def stringify_record(record)
+ (record[@custom_attributes[0]] || 'NULL').to_s
+ end
+ }
+ else
+ raise Fluent::ConfigError, "Invalid attributes specification: '#{@output_data_type}', needs one or more attributes."
+ end
+ else
+ raise Fluent::ConfigError, "Invalid output_data_type: '#{@output_data_type}'. specify 'json' or 'attr:ATTRIBUTE_NAME' or 'attr:ATTR1,ATTR2,...'"
+ end
+
+ if @output_include_time and @output_include_tag
+ if @add_newline and @remove_prefix
+ self.instance_eval {
+ def format(tag,time,record)
+ if (tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length) or
+ tag == @remove_prefix
+ tag = tag[@removed_length..-1] || @default_tag
+ end
+ @timef.format(time) + @f_separator + tag + @f_separator + stringify_record(record) + "\n"
+ end
+ }
+ elsif @add_newline
+ self.instance_eval {
+ def format(tag,time,record)
+ @timef.format(time) + @f_separator + tag + @f_separator + stringify_record(record) + "\n"
+ end
+ }
+ elsif @remove_prefix
+ self.instance_eval {
+ def format(tag,time,record)
+ if (tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length) or
+ tag == @remove_prefix
+ tag = tag[@removed_length..-1] || @default_tag
+ end
+ @timef.format(time) + @f_separator + tag + @f_separator + stringify_record(record)
+ end
+ }
+ else
+ self.instance_eval {
+ def format(tag,time,record)
+ @timef.format(time) + @f_separator + tag + @f_separator + stringify_record(record)
+ end
+ }
+ end
+ elsif @output_include_time
+ if @add_newline
+ self.instance_eval {
+ def format(tag,time,record);
+ @timef.format(time) + @f_separator + stringify_record(record) + "\n"
+ end
+ }
+ else
+ self.instance_eval {
+ def format(tag,time,record);
+ @timef.format(time) + @f_separator + stringify_record(record)
+ end
+ }
+ end
+ elsif @output_include_tag
+ if @add_newline and @remove_prefix
+ self.instance_eval {
+ def format(tag,time,record)
+ if (tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length) or
+ tag == @remove_prefix
+ tag = tag[@removed_length..-1] || @default_tag
+ end
+ tag + @f_separator + stringify_record(record) + "\n"
+ end
+ }
+ elsif @add_newline
+ self.instance_eval {
+ def format(tag,time,record)
+ tag + @f_separator + stringify_record(record) + "\n"
+ end
+ }
+ elsif @remove_prefix
+ self.instance_eval {
+ def format(tag,time,record)
+ if (tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length) or
+ tag == @remove_prefix
+ tag = tag[@removed_length..-1] || @default_tag
+ end
+ tag + @f_separator + stringify_record(record)
+ end
+ }
+ else
+ self.instance_eval {
+ def format(tag,time,record)
+ tag + @f_separator + stringify_record(record)
+ end
+ }
+ end
+ else # without time, tag
+ if @add_newline
+ self.instance_eval {
+ def format(tag,time,record);
+ stringify_record(record) + "\n"
+ end
+ }
+ else
+ self.instance_eval {
+ def format(tag,time,record);
+ stringify_record(record)
+ end
+ }
+ end
+ end
+ end
+
+ def stringify_record(record)
+ record.to_json
+ end
+
+ def format(tag, time, record)
+ if tag == @remove_prefix or (tag[0, @removed_length] == @removed_prefix_string and tag.length > @removed_length)
+ tag = tag[@removed_length..-1] || @default_tag
+ end
+ time_str = if @output_include_time
+ @timef.format(time) + @f_separator
+ else
+ ''
+ end
+ tag_str = if @output_include_tag
+ tag + @f_separator
+ else
+ ''
+ end
+ time_str + tag_str + stringify_record(record) + "\n"
+ end
+
+end
@@ -0,0 +1,118 @@
+require_relative 'ext_mixin'
+
+class Fluent::WebHDFSOutput < Fluent::TimeSlicedOutput
+ Fluent::Plugin.register_output('webhdfs', self)
+
+ WEBHDFS_VERSION = 'v1'
+
+ config_set_default :buffer_type, 'memory'
+ config_set_default :time_slice_format, '%Y%m%d'
+
+ config_param :namenode, :string # host:port
+ config_param :path, :string
+ config_param :username, :string, :default => nil
+
+ include FluentExt::PlainTextFormatterMixin
+ config_set_default :output_include_time, true
+ config_set_default :output_include_tag, true
+ config_set_default :output_data_type, 'json'
+ config_set_default :field_separator, "\t"
+ config_set_default :add_newline, true
+ config_set_default :remove_prefix, nil
+
+ def initialize
+ super
+ require 'net/http'
+ require 'time'
+ require 'webhdfs'
+ end
+
+ def configure(conf)
+ if conf['path']
+ if conf['path'].index('%S')
+ conf['time_slice_format'] = '%Y%m%d%H%M%S'
+ elsif conf['path'].index('%M')
+ conf['time_slice_format'] = '%Y%m%d%H%M'
+ elsif conf['path'].index('%H')
+ conf['time_slice_format'] = '%Y%m%d%H'
+ end
+ end
+
+ super
+
+ unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ @namenode
+ raise Fluent::ConfigError, "Invalid config value about namenode: '#{@namenode}', needs NAMENODE_NAME:PORT"
+ end
+ @namenode_host = $1
+ @namenode_port = $2.to_i
+ unless @path.index('/') == 0
+ raise Fluent::ConfigError, "Path on hdfs MUST starts with '/', but '#{@path}'"
+ end
+ @conn = nil
+
+ @f_separator = case @field_separator
+ when 'SPACE' then ' '
+ when 'COMMA' then ','
+ else "\t"
+ end
+
+ # path => cached_url
+ # @cached_datanode_urls = {}
+ @client = WebHDFS::Client.new(@namenode_host, @namenode_port, @username)
+ @mutex = Mutex.new
+ end
+
+ def start
+ super
+
+ noerror = false
+ begin
+ ary = @client.list('/')
+ noerror = true
+ rescue
+ $log.error "webdhfs check request failed!"
+ raise
+ end
+ $log.info "webhdfs connection confirmed: #{@namenode_host}:#{@namenode_port}"
+ end
+
+ def shutdown
+ super
+ end
+
+ def record_to_string(record)
+ record.to_json
+ end
+
+ def format(tag, time, record)
+ time_str = @timef.format(time)
+ time_str + @f_separator + tag + @f_separator + record_to_string(record) + @line_end
+ end
+
+ def path_format(chunk_key)
+ Time.strptime(chunk_key, @time_slice_format).strftime(@path)
+ end
+
+ # TODO datanode url caching?
+
+ # TODO check conflictions
+
+ def send_data(path, data)
+ begin
+ client.append(path, data)
+ rescue WebHDFS::FileNotFoundError
+ client.create(path, data)
+ end
+ end
+
+ def write(chunk)
+ hdfs_path = path_format(chunk.key)
+ begin
+ send_data(hdfs_path, chunk.read)
+ rescue
+ $log.error "failed to communicate hdfs cluster, path: #{hdfs_path}"
+ raise
+ end
+ hdfs_path
+ end
+end

0 comments on commit 4d3c2aa

Please sign in to comment.