Skip to content

Commit

Permalink
Save last_id to Mongo
Browse files Browse the repository at this point in the history
  • Loading branch information
soutaro committed Dec 28, 2015
1 parent 15cb240 commit fa0a49f
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 9 deletions.
21 changes: 18 additions & 3 deletions README.rdoc
Expand Up @@ -138,9 +138,6 @@ Use _mongo_tail_ type in source.

# Convert 'time'(BSON's time) to fluent time(Unix time).
time_key time

# You can store last ObjectId to tail over server's shutdown
id_store_file /Users/repeatedly/devel/fluent-plugin-mongo/last_id
</source>

You can also use _url_ to specify the database to connect.
Expand All @@ -154,6 +151,24 @@ You can also use _url_ to specify the database to connect.

This allows the plugin to read data from a replica set.

You can save last ObjectId to tail over server's shutdown to file.

<source>
...

id_store_file /Users/repeatedly/devel/fluent-plugin-mongo/last_id
</source>

Or Mongo collection can be used to keep last ObjectID.

<source>
...

id_store_collection last_id
</source>

Make sure the collection is capped. The plugin inserts records but does not remove at all.

= NOTE

== Broken data as a BSON
Expand Down
30 changes: 24 additions & 6 deletions lib/fluent/plugin/in_mongo_tail.rb
Expand Up @@ -19,6 +19,7 @@ class MongoTailInput < Input

# To store last ObjectID
config_param :id_store_file, :string, :default => nil
config_param :id_store_collection, :string, :default => nil

# SSL connection
config_param :ssl, :bool, :default => false
Expand Down Expand Up @@ -53,7 +54,7 @@ def configure(conf)
@last_id = get_last_id
@connection_options[:ssl] = @ssl

$log.debug "Setup mongo_tail configuration: mode = #{@id_store_file ? 'persistent' : 'non-persistent'}"
$log.debug "Setup mongo_tail configuration: mode = #{@id_store_file || @id_store_collection ? 'persistent' : 'non-persistent'}, last_id = #{@last_id}"
end

def start
Expand Down Expand Up @@ -176,27 +177,44 @@ def open_id_storage
@id_storage = File.open(@id_store_file, 'w')
@id_storege.sync
end

if @id_store_collection
@id_storage = get_database.collection(@id_store_collection)
end
end

def close_id_storage
if @id_storage
if @id_storage.is_a?(File)
@id_storage.close
end
end

def get_last_id
if @id_store_file && File.exist?(@id_store_file)
BSON::ObjectId(File.read(@id_store_file)).to_s rescue nil
else
begin
if @id_store_file && File.exist?(@id_store_file)
return BSON::ObjectId(File.read(@id_store_file)).to_s
end

if @id_store_collection
collection = get_database.collection(@id_store_collection)
count = collection.find.count
doc = collection.find.skip(count - 1).limit(1).first
return doc && doc["last_id"]
end
rescue
nil
end
end

def save_last_id(last_id)
if @id_storage
if @id_storage.is_a?(File)
@id_storage.pos = 0
@id_storage.write(last_id)
end

if @id_storage.is_a?(Mongo::Collection)
@id_storage.insert("last_id" => last_id)
end
end
end
end
2 changes: 2 additions & 0 deletions test/plugin/in_mongo_tail.rb
Expand Up @@ -13,6 +13,7 @@ def setup
tag_key tag
time_key time
id_store_file /tmp/fluent_mongo_last_id
id_store_collection test_last_id
]

def create_driver(conf = CONFIG)
Expand All @@ -28,6 +29,7 @@ def test_configure
assert_equal('tag', d.instance.tag_key)
assert_equal('time', d.instance.time_key)
assert_equal('/tmp/fluent_mongo_last_id', d.instance.id_store_file)
assert_equal('test_last_id', d.instance.id_store_collection)
end

def test_url_configration
Expand Down

0 comments on commit fa0a49f

Please sign in to comment.