Skip to content

Conversation

@ginobiliwang
Copy link
Contributor

Signed-off-by: Fenggang ginobiliwang@gmail.com

  1. This is initial commit for watermark filter feature.
  2. The idea is from a paper (The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing) from Google.
  3. From stream processing perspective current function is limited. It has a fixed watermark. It can deal with out of order data. It has a batch window. It has tolerance for data latency.
  4. Document PR for this filter watermark feature would be submitted soon.
  5. The little demo and valgrind output below are for your reference.

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

[root@localhost ~]# nc 127.0.0.1 5170
{"time":"2021-11-12 18:30:00"}
{"time":"2021-11-12 18:30:01"}
{"time":"2021-11-12 18:30:02"}
{"time":"2021-11-12 18:30:07"}
{"time":"2021-11-12 18:30:06"}
{"time":"2021-11-12 18:30:04"}
{"time":"2021-11-12 18:30:05"}
{"time":"2021-11-12 18:31:00"}

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

[root@localhost build]# valgrind --leak-check=full bin/fluent-bit -i tcp -F watermark -p 'time_field=time' 'watermark=20' -m '*' -o stdout
==3805== Memcheck, a memory error detector
==3805== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==3805== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info
==3805== Command: bin/fluent-bit -i tcp -F watermark -p time_field=time watermark=20 -m * -o stdout
==3805==
Fluent Bit v1.7.4

  • Copyright (C) 2019-2021 The Fluent Bit Authors
  • Copyright (C) 2015-2018 Treasure Data
  • Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
  • https://fluentbit.io

[2021/05/08 22:12:14] [ info] [engine] started (pid=3805)
[2021/05/08 22:12:14] [ info] [storage] version=1.1.1, initializing...
[2021/05/08 22:12:14] [ info] [storage] in-memory
[2021/05/08 22:12:14] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2021/05/08 22:12:14] [ info] [input:tcp:tcp.0] listening on 0.0.0.0:5170
[2021/05/08 22:12:14] [ info] [sp] stream processor started
==3805== Warning: client switching stacks? SP change: 0x9bc2788 --> 0xa48a850
==3805== to suppress, use: --max-stackframe=9208008 or greater
==3805== Warning: client switching stacks? SP change: 0xa48a7b8 --> 0x9bc2788
==3805== to suppress, use: --max-stackframe=9207856 or greater
==3805== Warning: client switching stacks? SP change: 0x9bc2788 --> 0xa48a7b8
==3805== to suppress, use: --max-stackframe=9207856 or greater
==3805== further instances of this message will not be shown.
[0] tcp.0: [1620483146.702023972, {"time"=>"2021-11-12 18:30:00"}]
[1] tcp.0: [1620483150.354546260, {"time"=>"2021-11-12 18:30:01"}]
[2] tcp.0: [1620483153.771547131, {"time"=>"2021-11-12 18:30:02"}]
[3] tcp.0: [1620483176.174986006, {"time"=>"2021-11-12 18:30:04"}]
[4] tcp.0: [1620483180.376079757, {"time"=>"2021-11-12 18:30:05"}]
[5] tcp.0: [1620483169.903685948, {"time"=>"2021-11-12 18:30:06"}]
[6] tcp.0: [1620483163.781975807, {"time"=>"2021-11-12 18:30:07"}]
^C[2021/05/08 22:13:19] [engine] caught signal (SIGINT)
[2021/05/08 22:13:19] [ warn] [engine] service will stop in 5 seconds
[2021/05/08 22:13:24] [ info] [engine] service stopped
==3805==
==3805== HEAP SUMMARY:
==3805== in use at exit: 99,816 bytes in 3,380 blocks
==3805== total heap usage: 4,229 allocs, 849 frees, 5,092,444 bytes allocated
==3805==
==3805== LEAK SUMMARY:
==3805== definitely lost: 0 bytes in 0 blocks
==3805== indirectly lost: 0 bytes in 0 blocks
==3805== possibly lost: 0 bytes in 0 blocks
==3805== still reachable: 99,816 bytes in 3,380 blocks
==3805== suppressed: 0 bytes in 0 blocks
==3805== Reachable blocks (those to which a pointer was found) are not shown.
==3805== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==3805==
==3805== For lists of detected and suppressed errors, rerun with: -s
==3805== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
[root@localhost build]#

Signed-off-by: Fenggang <ginobiliwang@gmail.com>
@ginobiliwang
Copy link
Contributor Author

Hi @koleini & @edsiper, this is the feature i have discussed with you before. Now it is ready to be reviewed.

@ginobiliwang
Copy link
Contributor Author

REGISTER_OUT_PLUGIN("out_cloudwatch_logs")
REGISTER_OUT_PLUGIN("out_kinesis_firehose")
REGISTER_OUT_PLUGIN("out_kinesis_streams")
REGISTER_OUT_PLUGIN("out_prometheus_exporter")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be some merge issue, let me fix it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still looks wrong in the diff, does this need rebasing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i will rebase it in a whole after Masoud's initial review.

@nigels-com
Copy link
Contributor

Is there an well known and tested 3rd party heap implementation, as an alternative?

@ginobiliwang
Copy link
Contributor Author

Is there an well known and tested 3rd party heap implementation, as an alternative?

Most of implementations are customized versions. For this plugin, i have done customization as well.

@github-actions
Copy link
Contributor

github-actions bot commented Jul 1, 2021

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Jul 1, 2021
@ginobiliwang
Copy link
Contributor Author

@edsiper @koleini , could you please review this PR ?

@github-actions github-actions bot removed the Stale label Jul 11, 2021
@github-actions
Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Aug 11, 2021
@edsiper
Copy link
Member

edsiper commented Dec 12, 2021

@koleini ping

@github-actions github-actions bot removed the Stale label Dec 15, 2021
@github-actions
Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Mar 15, 2022
@ginobiliwang
Copy link
Contributor Author

@koleini ,can you help to review this PR ?

@github-actions github-actions bot removed the Stale label Jul 21, 2022
@koleini
Copy link
Collaborator

koleini commented Jul 29, 2022

This pull request requires documentation and unit tests to be included. In addition, there are indentation problems in the code that needs to be fixed.

Please see https://github.com/fluent/fluent-bit/blob/master/CONTRIBUTING.md for the guidelines.

@ginobiliwang
Copy link
Contributor Author

Hi @koleini , this is the doc PR fluent/fluent-bit-docs#530 .

Let me fix the format issue and fill up unit tests.

Signed-off-by: Fenggang <ginobiliwang@gmail.com>
@leonardo-albertovich
Copy link
Contributor

The binary tree we use is this well known red/black binary tree implementation https://github.com/fluent/fluent-bit/tree/master/lib/rbtree

@ginobiliwang
Copy link
Contributor Author

I did not notice we have rbtree in place to use. And definitely it is ok to use rbtree for current feature.

@leonardo-albertovich
Copy link
Contributor

I'm glad you find it fitting, I agree and would really appreciate it if at some point you updated your code to use it.

Thanks a lot!

@github-actions github-actions bot removed the Stale label May 4, 2023
@github-actions
Copy link
Contributor

github-actions bot commented Aug 3, 2023

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Aug 3, 2023
@ginobiliwang
Copy link
Contributor Author

will continue to finish this pr soon, tied up by some other affairs.

@github-actions github-actions bot removed the Stale label Aug 7, 2023
@github-actions
Copy link
Contributor

github-actions bot commented Nov 6, 2023

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Nov 6, 2023
@ginobiliwang
Copy link
Contributor Author

sorry for the delay, will catch up ASAP.

@github-actions github-actions bot removed the Stale label Nov 7, 2023
@ginobiliwang
Copy link
Contributor Author

ginobiliwang commented Dec 19, 2023

HI @leonardo-albertovich , i have tried to use rbtree in fluent bit repo to failed to apply. The main issue is that rbtree could not handle duplicate key. In this PR, it would deal with records with duplicate keys. So i have to revert back to use heap data structure to implement it, i think heap is the best data structure to do the job.

BTW, i have accidently closed this PR, would you please help to reopen it.

@ginobiliwang
Copy link
Contributor Author

Hi @leonardo-albertovich , @koleini , @edsiper any chance to reopen this PR ? i can not re-open it by myself.

@lecaros
Copy link
Contributor

lecaros commented May 15, 2024

ping @ginobiliwang

@ginobiliwang
Copy link
Contributor Author

ping @ginobiliwang

Thanks @lecaros for reopening the PR. The same question i have post last year, re-paste here.
=======>
i have tried to use rbtree in fluent bit repo , but failed to apply. The main issue is that rbtree could not handle duplicate key. In this PR, it would need to deal with records with duplicate keys. So i have to revert back to use heap data structure to implement it, i think heap is the best data structure to do the job.
=======>

Please review the above, thanks.

@lecaros
Copy link
Contributor

lecaros commented May 23, 2024

pinging @edsiper @leonardo-albertovich and @cosmo0920 to help you out with your question

@github-actions
Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Nov 26, 2024
@ginobiliwang
Copy link
Contributor Author

ginobiliwang commented Nov 26, 2024

pinging @edsiper @leonardo-albertovich and @cosmo0920

@github-actions github-actions bot removed the Stale label Nov 28, 2024
@github-actions
Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Feb 27, 2025
@github-actions github-actions bot removed the Stale label Mar 21, 2025
@github-actions
Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@github-actions github-actions bot added the Stale label Aug 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants