Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_multiline: implemented buffered mode using in_emitter #4383

Merged
merged 22 commits into from
Jan 22, 2022

Conversation

PettitWesley
Copy link
Contributor

@PettitWesley PettitWesley commented Dec 1, 2021

This is the first part of the multiline filter re-design #4309

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

Example config:

[FILTER]
     name                  multiline
     match                 *
     multiline.key_content log
     multiline.parser      go, multiline-regex-test
     Buffer                 On

Debug logs:

TODO

Valgrind:

Buffered (new) mode:

[2021/12/01 08:11:08] [ info] [input] pausing forward.0
[2021/12/01 08:11:08] [ warn] [engine] service will shutdown in max 1 seconds
[2021/12/01 08:11:09] [ info] [engine] service has stopped (0 pending tasks)
==6961==
==6961== HEAP SUMMARY:
==6961==     in use at exit: 0 bytes in 0 blocks
==6961==   total heap usage: 3,151 allocs, 3,151 frees, 8,405,333 bytes allocated
==6961==
==6961== All heap blocks were freed -- no leaks are possible
==6961==
==6961== For counts of detected and suppressed errors, rerun with: -v
==6961== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

Un-buffered (old) mode:

==6811== HEAP SUMMARY:
==6811==     in use at exit: 0 bytes in 0 blocks
==6811==   total heap usage: 3,653 allocs, 3,653 frees, 24,229,263 bytes allocated
==6811==
==6811== All heap blocks were freed -- no leaks are possible
==6811==
==6811== For counts of detected and suppressed errors, rerun with: -v
==6811== Use --track-origins=yes to see where uninitialised values come from
==6811== ERROR SUMMARY: 5 errors from 5 contexts (suppressed: 0 from 0)
[ec2-user@ip-10-192-11-149 build]$ valgrind --leak-check=full ./bin/fluent-bit -c ~/ecs-mutliline/unbuffered.conf
==6961== Memcheck, a memory error detector
==6961== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==6961== Using Valgrind-3.13.0 and LibVEX; rerun with -h for copyright info
==6961== Command: ./bin/fluent-bit -c /home/ec2-user/ecs-mutliline/unbuffered.conf

Documentation

TODO


Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
@PettitWesley PettitWesley changed the title Multiline filter buffered mode using in_emitter filter_multiline: implemented buffered mode using in_emitter Dec 1, 2021
@PettitWesley
Copy link
Contributor Author

I used a container image to test; attached is the project files for that. I run it with the fluentd docker log driver:

docker run -d --log-driver fluentd multiline-app

And configure Fluent Bit to listen on forward port 24224
app.zip

Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Copy link
Collaborator

@nokute78 nokute78 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments.

Could you attach debug logs?

/* Create the emitter context */
ret = emitter_create(ctx);
if (ret == -1) {
return -1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flb_free(ctx) is needed.

* user must explicitly set buffer to false to turn it off
*/
tmp = (char *) flb_filter_get_property("buffer", ins);
if (tmp && (strcasecmp(tmp, "Off") == 0 || strcasecmp(tmp, "false") == 0)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flb_utils_bool is to check bool value.

flb_free(ctx);
return -1;
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it needs a log to output emitter_name for user.

@@ -375,6 +375,7 @@ static int cb_geoip2_filter(const void *data, size_t bytes,
const char *tag, int tag_len,
void **out_buf, size_t *out_size,
struct flb_filter_instance *f_ins,
struct flb_input_instance *i_ins,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(void) i_ins; is missing.

Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Wesley Pettit <wppttt@amazon.com>
@PettitWesley
Copy link
Contributor Author

@nokute78 Addressed feedback and here are the debug logs:

Fluent Bit v1.8.110
* Copyright (C) 2019-2021 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2022/01/11 07:54:02] [ info] Configuration:
[2022/01/11 07:54:02] [ info]  flush time     | 15.000000 seconds
[2022/01/11 07:54:02] [ info]  grace          | 1 seconds
[2022/01/11 07:54:02] [ info]  daemon         | 0
[2022/01/11 07:54:02] [ info] ___________
[2022/01/11 07:54:02] [ info]  inputs:
[2022/01/11 07:54:02] [ info]      forward
[2022/01/11 07:54:02] [ info] ___________
[2022/01/11 07:54:02] [ info]  filters:
[2022/01/11 07:54:02] [ info]      modify.0
[2022/01/11 07:54:02] [ info]      multiline.1
[2022/01/11 07:54:02] [ info] ___________
[2022/01/11 07:54:02] [ info]  outputs:
[2022/01/11 07:54:02] [ info]      stdout.0
[2022/01/11 07:54:02] [ info] ___________
[2022/01/11 07:54:02] [ info]  collectors:
[2022/01/11 07:54:02] [ info] [engine] started (pid=1691)
[2022/01/11 07:54:02] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2022/01/11 07:54:02] [debug] [storage] [cio stream] new stream registered: forward.0
[2022/01/11 07:54:02] [ info] [storage] version=1.1.5, initializing...
[2022/01/11 07:54:02] [ info] [storage] in-memory
[2022/01/11 07:54:02] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2022/01/11 07:54:02] [ info] [cmetrics] version=0.2.2
[2022/01/11 07:54:02] [debug] [in_fw] Listen='0.0.0.0' TCP_Port=24224
[2022/01/11 07:54:02] [ info] [input:forward:forward.0] listening on 0.0.0.0:24224
[2022/01/11 07:54:02] [debug] [filter:modify:modify.0] Initialized modify filter with 0 conditions and 3 rules
[2022/01/11 07:54:02] [ info] [filter:multiline:multiline.1] created emitter: emitter_for_multiline.1
[2022/01/11 07:54:02] [debug] [storage] [cio stream] new stream registered: emitter.1
[2022/01/11 07:54:02] [debug] [stdout:stdout.0] created event channels: read=21 write=22
[2022/01/11 07:54:02] [debug] [router] match rule forward.0:stdout.0
[2022/01/11 07:54:02] [debug] [router] match rule emitter.1:stdout.0
[2022/01/11 07:54:02] [ info] [sp] stream processor started
[2022/01/11 07:54:08] [ info] [filter:multiline:multiline.1] created new multiline stream for forward.0_c44f79fe8a81
[2022/01/11 07:54:08] [debug] [filter:multiline:multiline.1] Created new ML stream for forward.0_c44f79fe8a81
[2022/01/11 07:54:17] [debug] [task] created task=0x7f550801c8e0 id=0 OK
[0] c44f79fe8a81: [1641887648.000000000, {"log"=>"single line..."}]
[1] c44f79fe8a81: [1641887648.000000000, {"log"=>"Dec 14 06:41:08 Exception in thread "main" java.lang.RuntimeException: Something has gone wrong, aborting!"}]
[2] c44f79fe8a81: [1641887648.000000000, {"log"=>"    at com.myproject.module.MyProject.badMethod(MyProject.java:22)"}]
[3] c44f79fe8a81: [1641887648.000000000, {"log"=>"    at com.myproject.module.MyProject.oneMoreMethod(MyProject.java:18)"}]
[4] c44f79fe8a81: [1641887648.000000000, {"log"=>"    at com.myproject.module.MyProject.anotherMethod(MyProject.java:14)"}]
[5] c44f79fe8a81: [1641887648.000000000, {"log"=>"    at com.myproject.module.MyProject.someMethod(MyProject.java:10)"}]
[6] c44f79fe8a81: [1641887648.000000000, {"log"=>"    at com.myproject.module.MyProject.main(MyProject.java:6)"}]
[7] c44f79fe8a81: [1641887648.000000000, {"log"=>"another line..."}]
[8] c44f79fe8a81: [1641887648.000000000, {"log"=>"panic: my panic

goroutine 4 [running]:
panic(0x45cb40, 0x47ad70)
  /usr/local/go/src/runtime/panic.go:542 +0x46c fp=0xc42003f7b8 sp=0xc42003f710 pc=0x422f7c
main.main.func1(0xc420024120)
  foo.go:6 +0x39 fp=0xc42003f7d8 sp=0xc42003f7b8 pc=0x451339
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003f7e0 sp=0xc42003f7d8 pc=0x44b4d1
created by main.main
  foo.go:5 +0x58

goroutine 1 [chan receive]:
runtime.gopark(0x4739b8, 0xc420024178, 0x46fcd7, 0xc, 0xc420028e17, 0x3)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc420053e30 sp=0xc420053e00 pc=0x42503c
runtime.goparkunlock(0xc420024178, 0x46fcd7, 0xc, 0x1000f010040c217, 0x3)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc420053e70 sp=0xc420053e30 pc=0x42512e
runtime.chanrecv(0xc420024120, 0x0, 0xc420053f01, 0x4512d8)
  /usr/local/go/src/runtime/chan.go:506 +0x304 fp=0xc420053f20 sp=0xc420053e70 pc=0x4046b4
runtime.chanrecv1(0xc420024120, 0x0)
  /usr/local/go/src/runtime/chan.go:388 +0x2b fp=0xc420053f50 sp=0xc420053f20 pc=0x40439b
main.main()
  foo.go:9 +0x6f fp=0xc420053f80 sp=0xc420053f50 pc=0x4512ef
runtime.main()
  /usr/local/go/src/runtime/proc.go:185 +0x20d fp=0xc420053fe0 sp=0xc420053f80 pc=0x424bad
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc420053fe8 sp=0xc420053fe0 pc=0x44b4d1

goroutine 2 [force gc (idle)]:
runtime.gopark(0x4739b8, 0x4ad720, 0x47001e, 0xf, 0x14, 0x1)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003e768 sp=0xc42003e738 pc=0x42503c
runtime.goparkunlock(0x4ad720, 0x47001e, 0xf, 0xc420000114, 0x1)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003e7a8 sp=0xc42003e768 pc=0x42512e
runtime.forcegchelper()
  /usr/local/go/src/runtime/proc.go:238 +0xcc fp=0xc42003e7e0 sp=0xc42003e7a8 pc=0x424e5c
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003e7e8 sp=0xc42003e7e0 pc=0x44b4d1
created by runtime.init.4
  /usr/local/go/src/runtime/proc.go:227 +0x35

goroutine 3 [GC sweep wait]:
runtime.gopark(0x4739b8, 0x4ad7e0, 0x46fdd2, 0xd, 0x419914, 0x1)
  /usr/local/go/src/runtime/proc.go:280 +0x12c fp=0xc42003ef60 sp=0xc42003ef30 pc=0x42503c
runtime.goparkunlock(0x4ad7e0, 0x46fdd2, 0xd, 0x14, 0x1)
  /usr/local/go/src/runtime/proc.go:286 +0x5e fp=0xc42003efa0 sp=0xc42003ef60 pc=0x42512e
runtime.bgsweep(0xc42001e150)
  /usr/local/go/src/runtime/mgcsweep.go:52 +0xa3 fp=0xc42003efd8 sp=0xc42003efa0 pc=0x419973
runtime.goexit()
  /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42003efe0 sp=0xc42003efd8 pc=0x44b4d1
created by runtime.gcenable
  /usr/local/go/src/runtime/mgc.go:216 +0x58"}]
[9] c44f79fe8a81: [1641887648.000000000, {"log"=>"one more line, no multiline"}]
[10] c44f79fe8a81: [1641887648.000000000, {"log"=>"60"}]
[2022/01/11 07:54:17] [debug] [out coro] cb_destroy coro_id=0
[2022/01/11 07:54:17] [debug] [task] destroy task=0x7f550801c8e0 (task_id=0)
^C[2022/01/11 07:54:20] [engine] caught signal (SIGINT)
[2022/01/11 07:54:20] [ info] [input] pausing forward.0
[2022/01/11 07:54:20] [ warn] [engine] service will shutdown in max 1 seconds
[2022/01/11 07:54:21] [ info] [engine] service has stopped (0 pending tasks)

@edsiper
Copy link
Member

edsiper commented Jan 21, 2022

@PettitWesley can you submit a PR for GIT master first ?, so then we can merge this one

@PettitWesley
Copy link
Contributor Author

@edsiper ack- will put up a PR against master and test it by EOD

@edsiper
Copy link
Member

edsiper commented Jan 22, 2022

let's put this in now (but not forget about GIT master... )

thanks for the hard work on this one!

@PettitWesley
Copy link
Contributor Author

@edsiper Sorry, I just got it done now: #4671

Also here are docs: fluent/fluent-bit-docs#675

Also, I am blocked for testing until this is resolved: #4670

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants