Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions lib/fluent/plugin/out_sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,35 +90,38 @@ def init(base_model)

def import(chunk, output)
tag = chunk.metadata.tag
columns = []
records = []
chunk.msgpack_each { |time, data|
begin
data = output.inject_values_to_record(tag, time, data)
records << @model.new(@format_proc.call(data))
new_record = @format_proc.call(data)
columns |= new_record.keys
records << @model.new(new_record)
rescue => e
args = {error: e, table: @table, record: Yajl.dump(data)}
@log.warn "Failed to create the model. Ignore a record:", args
end
}
begin
@model.import(records)
@model.import(columns, records)
rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e
if @enable_fallback
# ignore other exceptions to use Fluentd retry mechanizm
@log.warn "Got deterministic error. Fallback to one-by-one import", error: e
one_by_one_import(records)
one_by_one_import(columns, records)
else
@log.warn "Got deterministic error. Fallback is disabled", error: e
raise e
end
end
end

def one_by_one_import(records)
def one_by_one_import(columns, records)
records.each { |record|
retries = 0
begin
@model.import([record])
@model.import(columns, [record])
rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e
@log.error "Got deterministic error again. Dump a record", error: e, record: record
rescue => e
Expand Down
6 changes: 5 additions & 1 deletion test/fixtures/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,9 @@
t.datetime "updated_at", null: false
t.string "custom_time"
end
end

create_table "messages_unmapped", force: :cascade do |t|
t.string "message"
t.string "status", default: 'default_status'
end
end
43 changes: 39 additions & 4 deletions test/plugin/test_out_sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,41 @@ def test_emit
assert_equal(["message1", "message2"], messages)
end

def test_unmapped_column_uses_db_default
config = %[
host localhost
port 5432
adapter postgresql

database fluentd_test
username fluentd
password fluentd

schema_search_path public

remove_tag_prefix db

<table>
table messages_unmapped
column_mapping message:message
</table>
]

d = create_driver(config)
time = Time.parse("2011-01-02 13:14:15 UTC").to_i

d.run(default_tag: 'test') do
d.feed(time, {'message' => 'hello'})
end

records = ActiveRecord::Base.connection.select_all("SELECT * FROM messages_unmapped").to_a

assert_equal 1, records.size
assert_equal 'hello', records.first['message']
# The 'status' column is not mapped, so it should use the default value defined in the database schema
assert_equal 'default_status', records.first['status']
end

class Fallback < self
def test_simple
d = create_driver
Expand All @@ -91,10 +126,10 @@ def test_simple

default_table = d.instance.instance_variable_get(:@default_table)
model = default_table.instance_variable_get(:@model)
mock(model).import(anything).at_least(1) do
mock(model).import(anything, anything).at_least(1) do
raise ActiveRecord::Import::MissingColumnError.new("dummy_table", "dummy_column")
end
mock(default_table).one_by_one_import(anything)
mock(default_table).one_by_one_import(anything, anything)
end
end

Expand All @@ -108,10 +143,10 @@ def test_limit

default_table = d.instance.instance_variable_get(:@default_table)
model = default_table.instance_variable_get(:@model)
mock(model).import([anything, anything]).once do
mock(model).import(anything, [anything, anything]).once do
raise ActiveRecord::Import::MissingColumnError.new("dummy_table", "dummy_column")
end
mock(model).import([anything]).times(12) do
mock(model).import(anything, [anything]).times(12) do
raise StandardError
end
assert_equal(5, default_table.instance_variable_get(:@num_retries))
Expand Down