Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Rackspace Cloud Files input. #772

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 69 additions & 0 deletions lib/logstash/inputs/cloud_files.rb
@@ -0,0 +1,69 @@
require 'fog'
require 'stringio'
require 'zlib'
require 'logstash/inputs/base'
require 'logstash/namespace'

class LogStash::Inputs::CloudFiles < LogStash::Inputs::Base
milestone 1
config_name 'cloud_files'

default :codec, 'line'

config :username, :validate => :string, :required => true
config :api_key, :validate => :string, :required => true
config :region, :validate => :string, :required => true
config :container, :validate => :string, :required => true
config :interval, :validate => :number, :default => 60
config :sincedb_path, :validate => :string, :required => true

def register
@api = Fog::Storage.new(:provider => 'Rackspace', :rackspace_username => @username, :rackspace_api_key => @api_key, :rackspace_region => @region)
end

def run(queue)
loop do
process_log_files(queue)
sleep(@interval)
end

finished
end

private

def process_log_files(queue)
last_read = sincedb_read

container = @api.directories.get(@container)
container.files.each do |file|
process_file(queue, file) if file.last_modified > last_read
end
end

def process_file(queue, file)
log_stream = StringIO.new(file.body)
reader = Zlib::GzipReader.new(log_stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lines codec will already split a byte stream up into events.

I also think it might be a good idea to omit the gzip assumptions here. This should be a codec as well so all can benefit regardless of if its compressed or not.

Thoughts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nickethier Sorry for taking so long to get back to you. I'm about to push a change that uses the line codec to read the inflated GZip stream rather than defaulting to the plain codec.

I had planned to extract the GZip assumption to a codec but couldn't find much documentation on codecs. My first attempt was simply a wrapper around the line codec that inflated the GZip stream and passed it off the line codec. However, I wasn't quite sure what the implementation of the encode method should look like. Any suggestions?


@codec.decode(reader.read) do |event|
decorate(event)
queue << event
end

sincedb_write(file.last_modified)
end

def sincedb_read
if File.exists?(@sincedb_path)
since = Time.parse(File.read(@sincedb_path))
else
since = Time.new(0)
end

since
end

def sincedb_write(since)
File.open(@sincedb_path, 'w') { |f| f.write(since.to_s) }
end
end
5 changes: 3 additions & 2 deletions logstash.gemspec
Expand Up @@ -35,7 +35,7 @@ Gem::Specification.new do |gem|
# Input/Output/Filter dependencies
#TODO Can these be optional?
gem.add_runtime_dependency "awesome_print" #(MIT license)
gem.add_runtime_dependency "aws-sdk" #{Apache 2.0 license}
gem.add_runtime_dependency "aws-sdk" #{Apache 2.0 license}
gem.add_runtime_dependency "google-api-client" #{Apache 2.0 license}
gem.add_runtime_dependency "heroku" #(MIT license)
gem.add_runtime_dependency "addressable" #(Apache 2.0 license)
Expand Down Expand Up @@ -77,6 +77,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "twitter", "5.0.0.rc.1" #(MIT license)
gem.add_runtime_dependency "rsolr" #(Apache 2.0 license)
gem.add_runtime_dependency "edn" #(MIT license)
gem.add_runtime_dependency "fog" #(MIT License)

if RUBY_PLATFORM == 'java'
gem.platform = RUBY_PLATFORM
Expand Down Expand Up @@ -119,7 +120,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "rspec" #(MIT license)
gem.add_runtime_dependency "insist", "1.0.0" #(Apache 2.0 license)
gem.add_runtime_dependency "rumbster" # For faking smtp in email tests (Apache 2.0 license)

#Development Deps
gem.add_development_dependency "coveralls"

Expand Down