diff --git a/lib/fluent/plugin/out_sql.rb b/lib/fluent/plugin/out_sql.rb index 23dfa79..c039166 100644 --- a/lib/fluent/plugin/out_sql.rb +++ b/lib/fluent/plugin/out_sql.rb @@ -90,23 +90,26 @@ def init(base_model) def import(chunk, output) tag = chunk.metadata.tag + columns = [] records = [] chunk.msgpack_each { |time, data| begin data = output.inject_values_to_record(tag, time, data) - records << @model.new(@format_proc.call(data)) + new_record = @format_proc.call(data) + columns |= new_record.keys + records << @model.new(new_record) rescue => e args = {error: e, table: @table, record: Yajl.dump(data)} @log.warn "Failed to create the model. Ignore a record:", args end } begin - @model.import(records) + @model.import(columns, records) rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e if @enable_fallback # ignore other exceptions to use Fluentd retry mechanizm @log.warn "Got deterministic error. Fallback to one-by-one import", error: e - one_by_one_import(records) + one_by_one_import(columns, records) else @log.warn "Got deterministic error. Fallback is disabled", error: e raise e @@ -114,11 +117,11 @@ def import(chunk, output) end end - def one_by_one_import(records) + def one_by_one_import(columns, records) records.each { |record| retries = 0 begin - @model.import([record]) + @model.import(columns, [record]) rescue ActiveRecord::StatementInvalid, ActiveRecord::Import::MissingColumnError => e @log.error "Got deterministic error again. Dump a record", error: e, record: record rescue => e diff --git a/test/fixtures/schema.rb b/test/fixtures/schema.rb index a4d8874..128d5e1 100644 --- a/test/fixtures/schema.rb +++ b/test/fixtures/schema.rb @@ -27,5 +27,9 @@ t.datetime "updated_at", null: false t.string "custom_time" end -end + create_table "messages_unmapped", force: :cascade do |t| + t.string "message" + t.string "status", default: 'default_status' + end +end diff --git a/test/plugin/test_out_sql.rb b/test/plugin/test_out_sql.rb index 3ccbb9e..b21f2e3 100644 --- a/test/plugin/test_out_sql.rb +++ b/test/plugin/test_out_sql.rb @@ -80,6 +80,41 @@ def test_emit assert_equal(["message1", "message2"], messages) end + def test_unmapped_column_uses_db_default + config = %[ + host localhost + port 5432 + adapter postgresql + + database fluentd_test + username fluentd + password fluentd + + schema_search_path public + + remove_tag_prefix db + + + table messages_unmapped + column_mapping message:message +
+ ] + + d = create_driver(config) + time = Time.parse("2011-01-02 13:14:15 UTC").to_i + + d.run(default_tag: 'test') do + d.feed(time, {'message' => 'hello'}) + end + + records = ActiveRecord::Base.connection.select_all("SELECT * FROM messages_unmapped").to_a + + assert_equal 1, records.size + assert_equal 'hello', records.first['message'] + # The 'status' column is not mapped, so it should use the default value defined in the database schema + assert_equal 'default_status', records.first['status'] + end + class Fallback < self def test_simple d = create_driver @@ -91,10 +126,10 @@ def test_simple default_table = d.instance.instance_variable_get(:@default_table) model = default_table.instance_variable_get(:@model) - mock(model).import(anything).at_least(1) do + mock(model).import(anything, anything).at_least(1) do raise ActiveRecord::Import::MissingColumnError.new("dummy_table", "dummy_column") end - mock(default_table).one_by_one_import(anything) + mock(default_table).one_by_one_import(anything, anything) end end @@ -108,10 +143,10 @@ def test_limit default_table = d.instance.instance_variable_get(:@default_table) model = default_table.instance_variable_get(:@model) - mock(model).import([anything, anything]).once do + mock(model).import(anything, [anything, anything]).once do raise ActiveRecord::Import::MissingColumnError.new("dummy_table", "dummy_column") end - mock(model).import([anything]).times(12) do + mock(model).import(anything, [anything]).times(12) do raise StandardError end assert_equal(5, default_table.instance_variable_get(:@num_retries))