Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion plugins/in_forward/fw.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,28 +124,37 @@ static int fw_unix_create(struct flb_in_fw_config *ctx)
static int in_fw_collect(struct flb_input_instance *ins,
struct flb_config *config, void *in_context)
{
int state_backup;
struct flb_connection *connection;
struct fw_conn *conn;
struct flb_in_fw_config *ctx;

ctx = in_context;

state_backup = ctx->state;
ctx->state = FW_INSTANCE_STATE_ACCEPTING_CLIENT;

connection = flb_downstream_conn_get(ctx->downstream);

if (connection == NULL) {
flb_plg_error(ctx->ins, "could not accept new connection");
ctx->state = state_backup;

return -1;
}

if (!config->is_ingestion_active) {
flb_downstream_conn_release(connection);
ctx->state = state_backup;

return -1;
}

if(ctx->is_paused) {
flb_downstream_conn_release(connection);
flb_plg_trace(ins, "TCP connection will be closed FD=%i", connection->fd);
ctx->state = state_backup;

return -1;
}

Expand All @@ -154,9 +163,17 @@ static int in_fw_collect(struct flb_input_instance *ins,
conn = fw_conn_add(connection, ctx);
if (!conn) {
flb_downstream_conn_release(connection);
ctx->state = state_backup;

return -1;
}

ctx->state = state_backup;

if (ctx->state == FW_INSTANCE_STATE_PAUSED) {
fw_conn_del_all(ctx);
}
Comment on lines +171 to +175

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve paused state when accepting connections

During connection accept, in_fw_collect unconditionally restores ctx->state to the prior value at exit and immediately checks that restored value to decide whether to purge connections. If in_fw_pause sets ctx->state to FW_INSTANCE_STATE_PAUSED while a client is being accepted, this restore overwrites the paused state with the old RUNNING value and the subsequent fw_conn_del_all guard never fires, so the paused transition is lost and the just-accepted connection keeps processing traffic even though ingestion was paused to relieve backpressure. The state reset here should only occur when the state was not changed externally.

Useful? React with 👍 / 👎.


return 0;
}

Expand Down Expand Up @@ -263,6 +280,7 @@ static int in_fw_init(struct flb_input_instance *ins,
return -1;
}

ctx->state = FW_INSTANCE_STATE_RUNNING;
ctx->coll_fd = -1;
ctx->ins = ins;
mk_list_init(&ctx->connections);
Expand Down Expand Up @@ -386,7 +404,10 @@ static void in_fw_pause(void *data, struct flb_config *config)
return;
}

fw_conn_del_all(ctx);
if (ctx->state == FW_INSTANCE_STATE_RUNNING) {
fw_conn_del_all(ctx);
}

ctx->is_paused = FLB_TRUE;
ret = pthread_mutex_unlock(&ctx->conn_mutex);
if (ret != 0) {
Expand All @@ -406,6 +427,8 @@ static void in_fw_pause(void *data, struct flb_config *config)
if (config->is_ingestion_active == FLB_FALSE) {
fw_conn_del_all(ctx);
}

ctx->state = FW_INSTANCE_STATE_PAUSED;
}

static void in_fw_resume(void *data, struct flb_config *config) {
Expand All @@ -427,6 +450,8 @@ static void in_fw_resume(void *data, struct flb_config *config) {
flb_plg_error(ctx->ins, "cannot unlock collector mutex");
return;
}

ctx->state = FW_INSTANCE_STATE_RUNNING;
}
}

Expand Down
8 changes: 8 additions & 0 deletions plugins/in_forward/fw.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>

#define FW_INSTANCE_STATE_RUNNING 0
#define FW_INSTANCE_STATE_ACCEPTING_CLIENT 1
#define FW_INSTANCE_STATE_PROCESSING_PACKET 2
#define FW_INSTANCE_STATE_PAUSED 3


enum {
FW_HANDSHAKE_HELO = 1,
FW_HANDSHAKE_PINGPONG = 2,
Expand Down Expand Up @@ -76,6 +82,8 @@ struct flb_in_fw_config {

pthread_mutex_t conn_mutex;

int state;

/* Plugin is paused */
int is_paused;
};
Expand Down
37 changes: 32 additions & 5 deletions plugins/in_forward/fw_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
#include "fw_prot.h"
#include "fw_conn.h"

/* Callback invoked every time an event is triggered for a connection */
int fw_conn_event(void *data)
static int fw_conn_event_internal(struct flb_connection *connection)
{
int ret;
int bytes;
Expand All @@ -39,9 +38,6 @@ int fw_conn_event(void *data)
struct fw_conn *conn;
struct mk_event *event;
struct flb_in_fw_config *ctx;
struct flb_connection *connection;

connection = (struct flb_connection *) data;

conn = connection->user_data;

Expand Down Expand Up @@ -127,6 +123,37 @@ int fw_conn_event(void *data)
return 0;
}

/* Callback invoked every time an event is triggered for a connection */
int fw_conn_event(void *data)
{
struct flb_in_fw_config *ctx;
struct fw_conn *conn;
int result;
struct flb_connection *connection;
int state_backup;

connection = (struct flb_connection *) data;

conn = connection->user_data;

ctx = conn->ctx;

state_backup = ctx->state;

ctx->state = FW_INSTANCE_STATE_PROCESSING_PACKET;

result = fw_conn_event_internal(connection);

if (ctx->state == FW_INSTANCE_STATE_PROCESSING_PACKET) {
ctx->state = state_backup;
}
else if (ctx->state == FW_INSTANCE_STATE_PAUSED) {
fw_conn_del_all(ctx);
}

return result;
}

/* Create a new Forward request instance */
struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_config *ctx)
{
Expand Down