Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ option(FLB_FILTER_STDOUT "Enable stdout filter" Yes)
option(FLB_FILTER_PARSER "Enable parser filter" Yes)
option(FLB_FILTER_KUBERNETES "Enable kubernetes filter" Yes)
option(FLB_FILTER_REWRITE_TAG "Enable tag rewrite filter" Yes)
option(FLB_FILTER_WATERMARK "Enable watermark filter" Yes)
option(FLB_FILTER_THROTTLE "Enable throttle filter" Yes)
option(FLB_FILTER_THROTTLE_SIZE "Enable throttle size filter" No)
option(FLB_FILTER_TYPE_CONVERTER "Enable type converter filter" Yes)
Expand Down
1 change: 1 addition & 0 deletions plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ REGISTER_FILTER_PLUGIN("filter_alter_size")
REGISTER_FILTER_PLUGIN("filter_aws")
REGISTER_FILTER_PLUGIN("filter_checklist")
REGISTER_FILTER_PLUGIN("filter_record_modifier")
REGISTER_FILTER_PLUGIN("filter_watermark")
REGISTER_FILTER_PLUGIN("filter_throttle")
REGISTER_FILTER_PLUGIN("filter_throttle_size")
REGISTER_FILTER_PLUGIN("filter_tensorflow")
Expand Down
6 changes: 6 additions & 0 deletions plugins/filter_watermark/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
set(src
heap.c
watermark.c
)

FLB_PLUGIN(filter_watermark "${src}" "")
217 changes: 217 additions & 0 deletions plugins/filter_watermark/heap.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2019-2021 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <stdio.h>
#include <sys/types.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_log.h>

#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <stddef.h>

#include "heap.h"
#include "watermark.h"

enum reheap_direction { DIR_UP, DIR_DOWN };

static void reheap(struct c_heap_t *h, size_t root, enum reheap_direction dir)
{
size_t left;
size_t right;
size_t min;
int status;
void *tmp;

/* Calculate the positions of the children */
left = (2 * root) + 1;
if (left >= h->array_len)
left = 0;

right = (2 * root) + 2;
if (right >= h->array_len)
right = 0;

/* Check which one of the children is smaller. */
if ((left == 0) && (right == 0))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still some of your conditional statements don't follow the guidelines for brace usage (please find and fix the rest).

return;
else if (left == 0)
min = right;
else if (right == 0)
min = left;
else {
status = h->compare(h->array[left], h->array[right]);
if (status > 0) {
min = right;
} else {
min = left;
}
}

status = h->compare(h->array[root], h->array[min]);
if (status <= 0) {
/* We didn't need to change anything, so the rest of the tree should be okay now. */
return;
} else {
/* if (status > 0) */
tmp = h->array[root];
h->array[root] = h->array[min];
h->array[min] = tmp;
}

if ((dir == DIR_UP) && (root == 0)) {
return;
}

if (dir == DIR_UP) {
reheap(h, (root - 1) / 2, dir);
} else if (dir == DIR_DOWN) {
reheap(h, min, dir);
}
}



struct c_heap_t *c_heap_create(int (*compare)(void *, void *), int (*deconstructor)(void *))
{
struct c_heap_t *h;

if (compare == NULL)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requires braces (please find the rest and fix).

return NULL;

if (deconstructor == NULL)
return NULL;

h = calloc(1, sizeof(*h));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use flb_calloc instead.

if (h == NULL)
return NULL;

h->compare = compare;
h->deconstructor = deconstructor;
h->array = NULL;
h->array_len = 0;
h->array_size = 0;

return h;
}

void c_heap_destroy(struct c_heap_t *h)
{
int i;
if (h == NULL)
return;

for(i=0; i< h->array_len; i++) {
h->deconstructor(h->array[i]);
}

h->array_len = 0;
h->array_size = 0;
free(h->array);
h->array = NULL;

free(h);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use flb_free (and the same for other places).

}

int c_heap_insert(struct c_heap_t *h, void *ptr)
{
size_t index;
void **tmp;

if ((h == NULL) || (ptr == NULL))
return -EINVAL;

assert(h->array_len <= h->array_size);
if (h->array_len == h->array_size) {

tmp = realloc(h->array, (h->array_size + 16) * sizeof(*h->array));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use flb_realloc.

if (tmp == NULL) {
return -ENOMEM;
}

h->array = tmp;
h->array_size += 16;
}

/* Insert the new node as a leaf. */
index = h->array_len;
h->array[index] = ptr;
h->array_len++;

/* Reorganize the heap from bottom up. */
reheap(h, /* parent of this node */ (index - 1) / 2, DIR_UP);

return 0;
}

void *c_heap_get_root(struct c_heap_t *h)
{
void *ret = NULL;
void **tmp;

if (h == NULL)
return NULL;

if (h->array_len == 0) {
return NULL;
} else if (h->array_len == 1) {
ret = h->array[0];
h->array[0] = NULL;
h->array_len = 0;
} else {
/* if (h->array_len > 1) */
ret = h->array[0];
h->array[0] = h->array[h->array_len - 1];
h->array[h->array_len - 1] = NULL;
h->array_len--;

reheap(h, /* root = */ 0, DIR_DOWN);
}

/* free some memory */
if ((h->array_len + 32) < h->array_size) {

tmp = realloc(h->array, (h->array_len + 16) * sizeof(*h->array));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use flb_realloc.

if (tmp != NULL) {
h->array = tmp;
h->array_size = h->array_len + 16;
}
}

return ret;
}

void *c_heap_read_root(struct c_heap_t *h)
{
void *ret = NULL;

if (h == NULL)
return NULL;

if (h->array_len == 0) {
return NULL;
} else {
/* if (h->array_len > 1) */
ret = h->array[0];
}
return ret;
}
36 changes: 36 additions & 0 deletions plugins/filter_watermark/heap.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2019-2021 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

struct c_heap_t {
int (*compare)(void *, void *);
int (*deconstructor)(void *);

void **array;
/* # entries used */
size_t array_len;
/* # entries allocated */
size_t array_size;
};

struct c_heap_t *c_heap_create(int (*compare)(void *, void *), int (*deconstructor)(void *));
void c_heap_destroy(struct c_heap_t *h);
int c_heap_insert(struct c_heap_t *h, void *ptr);
void *c_heap_get_root(struct c_heap_t *h);
void *c_heap_read_root(struct c_heap_t *h);
Loading