Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[es_output] Generate_ID sometimes corrupts message structure during /_bulk requests.(possible cause: incorrect memory buffer size) #4311

Closed
sanicheev opened this issue Nov 11, 2021 · 6 comments · Fixed by #4361

Comments

@sanicheev
Copy link

sanicheev commented Nov 11, 2021

Bug Report

Describe the bug
After migrating to 1.8.X fluent -bit version we started to see lots of errors:
an id must be provided if version or value are set
Quick investigation showed that it might be caused by adjusting the code to work with ES 7.5+ API.
Exactly the same issue: #3909

In order to make it work we were forced to enable the setting: Generate_ID because we are using elasticsearch 6.4 and we cannot afford to migrate on newer version at the moment.
That solved our issue.
However we started to notice lots of 500 errors on our loadbalancer which is located in the front of elasticsearch coordinator nodes.
Using strace showed the issue with message corruption.
In the following examples i removed several fields for the sake of simplicity
Please check "_id" field. It is corrupted in both examples.

Example 1 of corrupted message:

Formatted message:
POST /_bulk HTTP/1.1
Host: elk.example.internal
X-Real-IP: 192.168.8.189
X-Forwarded-For: 192.168.10.141, 192.168.8.189
X-Forwarded-Proto: https
Content-Length: 1991
X-Forwarded-Port: 443
Content-Type: application/x-ndjson
User-Agent: Fluent-Bit

{"create":{"_index":"k8s-v1-abcdef-website-production-abcdef-website-production-abcdef-event-bus-handler-2021.11.10","_type":"_doc","_id":"ef8da3c7-14d0-bdc6-940a-e10d3c441487{"@timestamp":"2021-11-10T05:22:09.508Z","stream":"stderr","kubernetes":{"host":"ip-192-168-10-26.eu-central-1.compute.internal","labels":{"app_kubernetes_io/name":"website-production-web","pod-template-hash":"745f6cc4bc","app_kubernetes_io/instance":"website-production"}}}}}


Raw message in strace:
POST /_bulk HTTP/1.1\r\nHost: elk.exmaple.internal\r\nX-Real-IP: 192.168.8.189\r\nX-Forwarded-For: 192.168.10.141, 192.168.8.189\r\nX-Forwarded-Proto: https\r\nContent-Length: 1991\r\nX-Forwarded-Port: 443\r\nContent-Type: application/x-ndjsonUser-Agent: Fluent-Bit\r\n\r\n{\"create\":{\"_index\":\"k8s-v1-abcdef-website-production-abcdef-website-production-abcdef-event-bus-handler-2021.11.10\",\"_type\":\"_doc\",\"_id\":\"ef8da3c7-14d0-bdc6-940a-e10d3c441487\0\0\0\0{\"@timestamp\":\"2021-11-10T05:22:09.508Z\",\"stream\":\"stderr\",\"kubernetes\":{\"host\":\"ip-192-168-10-26.eu-central-1.compute.internal\",\"labels\":{\"app_kubernetes_io/name\":\"website-production-web\",\"pod-template-hash\":\"745f6cc4bc\",\"app_kubernetes_io/instance\":\"website-production\"}}}}}

Example 2 of corrupted message:

Formatted message:
POST /_bulk HTTP/1.1
Host: elk.example.internal
X-Real-IP: 192.168.8.189
X-Forwarded-For: 192.168.7.202, 192.168.8.189
X-Forwarded-Proto: https
Content-Length: 5536
X-Forwarded-Port: 443
Content-Type: application/x-ndjson
User-Agent: Fluent-Bit

{"create":{"_index":"k8s-v1-abcdef-website-production-abcdef-website-production-abcdef-event-bus-handler-2021.11.10","_type":"_doc","_id":"307e59d7-0cf8-8b65-d839-bû  9Vk8s{"@timestamp":"2021-11-10T05:18:59.183Z"}}}

Raw message in strace:
POST /_bulk HTTP/1.1\r\nHost: elk.example.internal\r\nX-Real-IP: 192.168.8.189\r\nX-Forwarded-For: 192.168.7.202, 192.168.8.189\r\nX-Forwarded-Proto: https\r\nContent-Length: 5536\r\nX-Forwarded-Port: 443\r\nContent-Type: application/x-ndjson\r\nUser-Agent: Fluent-Bit\r\n\r\n{\"create\":{\"_index\":\"k8s-v1-abcdef-website-production-abcdef-website-production-abcdef-event-bus-handler-2021.11.10\",\"_type\":\"_doc\",\"_id\":\"307e59d7-0cf8-8b65-d839-b\0\177\0\0\373\t\2279\5V\0\0k8s{\"@timestamp\":\"2021-11-10T05:18:59.183Z\"}}}\n

Unclear what happens to the message when code block responsible for generate_id begin to process:
https://github.com/fluent/fluent-bit/blob/master/plugins/out_es/es.c#L489-L507

I suspect the issue is either in 'MurmurHash3_x64_128' or 'snprintf' functions caused by incorrect buffer size.

Exception in elasticsearch:

{"error":{"root_cause":[{"type":"json_parse_exception","reason":"Illegal unquoted character ((CTRL-CHAR, code 0)): has to be escaped using backslash to be included in string value\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@46755a1a; line: 1, column: 166]"}],"type":"json_parse_exception","reason":"Illegal unquoted character ((CTRL-CHAR, code 0)): has to be escaped using backslash to be included in string value\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@46755a1a; line: 1, column: 166]"},"status":500}

Corruption happens always on column 166.
At the same time there is a preprocessor instruction:

#define ES_BULK_HEADER      165  /* ES Bulk API prefix line  */

Your Environment
Tried fluent-bit versions: 1.8.0 -> latest(1.8.9)
Version of elasticesarch cluster: 6.4.0
Fluent-bit primarily runs on EKS(AWS managed kubernetes service)
Fluent-bit configuration:

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluent-bit-config
  namespace: logging
  labels:
    k8s-app: fluent-bit
data:
  # Configuration files: server, input, filters and output
  # ======================================================
  fluent-bit.conf: |
    [SERVICE]
        Flush           1
        Log_Level       error
        Daemon          Off
        Parsers_File    parsers.conf
        HTTP_Server     On
        HTTP_Listen     0.0.0.0
        HTTP_Port       2020
        Health_Check    On
        storage.metrics On

    @INCLUDE input-systemd.conf
    @INCLUDE input-kubernetes.conf
    @INCLUDE filter-kubernetes.conf
    @INCLUDE filter-api-logs.conf
    @INCLUDE filter-systemd.conf
    @INCLUDE output-elasticsearch.conf

  input-systemd.conf: |
    [INPUT]
      Name             systemd
      Path             /journal
      Tag              systemd.*
      Read_From_Tail   On
      Skip_Empty_Lines On
      Skip_Long_Lines  On


  input-kubernetes.conf: |
    [INPUT]
        Name             tail
        Tag              kube.*
        Path             /var/log/containers/*.log
        DB               /var/log/flb_kube.db
        Mem_Buf_Limit    5MB
        Skip_Empty_Lines On
        Skip_Long_Lines  On
        Refresh_Interval 10
        Docker_mode      On
        Parser           docker

  filter-kubernetes.conf: |
    [FILTER]
        Name                kubernetes
        Match               kube.*
        Kube_URL            https://kubernetes.default.svc:443
        Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
        Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
        Kube_Tag_Prefix     kube.var.log.containers.
        Keep_Log            Off
        Merge_Log           On
        Merge_Log_Key       app_log
        K8S-Logging.Parser  On
        K8S-Logging.Exclude Off

    [FILTER]
        Name                lua
        Match               kube.*
        script              /etc/fluent-bit/fluentbit.lua
        call                set_index

    # remove field from gatekeeper logs which generates too many indices
    [FILTER]
        Name                modify
        Match               kube.*
        Remove              app_log.updateConstraints


  filter-xcore-api-logs.conf: |
    [FILTER]
        Name                rewrite_tag
        Match               kube.*
        Rule                $app_log['__routing_key'] ^api-logs$ api-logs.$TAG[4].log true

    [FILTER]
        Name                nest
        Match               api-logs.*
        Operation           lift
        Nested_under        app_log

  filter-systemd.conf: |
    [FILTER]
        Name parser
        Match systemd.*
        Key_Name MESSAGE
        Parser systemd-json
        Reserve_Data True

    [FILTER]
        Name modify
        Match systemd.*
        Rename _SYSTEMD_UNIT systemd_unit

    # Append/Remove keys from log records
    [FILTER]
        Name aws
        Match *
        imds_version v1
        az true
        ec2_instance_id true
        ec2_instance_type true
        private_ip true
        ami_id true
        account_id true
        hostname true
        vpc_id true

    [FILTER]
        Name modify
        Match *

        Rename account_id aws_account_id
        Rename ami_id aws_ami_id
        Rename az aws_az
        Rename ec2_instance_id aws_instance_id
        Rename ec2_instance_type aws_instance_type
        Rename private_ip aws_private_ip
        Rename vpc_id aws_vpc_id

        Rename MESSAGE message
        Rename msg message
        Rename LOG message
        Rename log message

    [FILTER]
        Name record_modifier
        Match *

        Record environment ${ENVIRONMENT}

        Remove_Key _CURSOR
        Remove_Key _REALTIME_TIMESTAMP
        Remove_Key _MONOTONIC_TIMESTAMP
        Remove_Key _SOURCE_MONOTONIC_TIMESTAMP
        Remove_Key _BOOT_ID
        Remove_Key _MACHINE_ID
        Remove_Key PRIORITY
        Remove_Key SYSLOG_FACILITY
        Remove_Key SYSLOG_IDENTIFIER
        Remove_Key _CAP_EFFECTIVE
        Remove_Key _EXE
        Remove_Key _GID
        Remove_Key _PID
        Remove_Key _STREAM_ID
        Remove_Key _SYSTEMD_CGROUP
        Remove_Key _SYSTEMD_SLICE
        Remove_Key _TRANSPORT
        Remove_Key _UID
        Remove_Key _COMM
        Remove_Key CODE_FILE
        Remove_Key CODE_FUNC
        Remove_Key CODE_LINE
        Remove_Key TLOG_ID
        Remove_Key TLOG_REC
        Remove_Key TLOG_SESSION
        Remove_Key _AUDIT_LOGINUID
        Remove_Key _AUDIT_SESSION
        Remove_Key _CMDLINE
        Remove_Key _HOSTNAME
        Remove_Key _SOURCE_REALTIME_TIMESTAMP
        Remove_Key _SYSTEMD_OWNER_UID
        Remove_Key _SYSTEMD_SESSION


  output-elasticsearch.conf: |
    [OUTPUT]
        Name            es
        Match           systemd.*
        Host            ${FLUENT_ELASTICSEARCH_HOST}
        Port            ${FLUENT_ELASTICSEARCH_PORT}
        HTTP_User       ${FLUENT_ELASTICSEARCH_USER}
        HTTP_Passwd     ${FLUENT_ELASTICSEARCH_PASSWORD}
        tls on
        tls.verify off
        workers 1

        Generate_ID On
        Logstash_Format On
        Replace_Dots    On
        Logstash_Prefix fluent-bit-k8s-v4
        Retry_Limit     False

    [OUTPUT]
        Name            es
        Match           kube.*
        Host            ${FLUENT_ELASTICSEARCH_HOST}
        Port            ${FLUENT_ELASTICSEARCH_PORT}
        HTTP_User       ${FLUENT_ELASTICSEARCH_USER}
        HTTP_Passwd     ${FLUENT_ELASTICSEARCH_PASSWORD}
        tls on
        tls.verify off
        workers 1

        Generate_ID On
        Logstash_Format     On
        Logstash_Prefix_Key $es_index
        Replace_Dots        On
        Retry_Limit         False

    [OUTPUT]
        Name            es
        Match           xcore-api-logs.*
        Type            fluentd
        Host            ${FLUENT_ELASTICSEARCH_HOST}
        Port            ${FLUENT_ELASTICSEARCH_PORT}
        HTTP_User       ${FLUENT_ELASTICSEARCH_USER}
        HTTP_Passwd     ${FLUENT_ELASTICSEARCH_PASSWORD}
        tls on
        tls.verify off
        workers 1

        Generate_ID On
        Logstash_Format On
        Logstash_Prefix api.application
        Replace_Dots    On
        Retry_Limit     False

  parsers.conf: |
    [PARSER]
        Name        systemd-json
        Format      json
        Time_Key    time_local

    [PARSER]
        Name        docker
        Format      json
        Time_Key    time
        Time_Format %Y-%m-%dT%H:%M:%S.%L
        Time_Keep   On


  fluentbit.lua: |
    function set_index(tag, timestamp, record)
        prefix = "k8s-v1"
        if record["kubernetes"] ~= nil then
            if record["kubernetes"]["namespace_name"] ~= nil then
                if record["kubernetes"]["container_name"] ~= nil then
                    record["es_index"] = prefix
                        .. "-"
                        .. record["kubernetes"]["namespace_name"]
                        .. "-"
                        .. record["kubernetes"]["container_name"]
                    return 1, timestamp, record
                end
                record["es_index"] = prefix
                    .. "-"
                    .. record["kubernetes"]["namespace_name"]
                return 1, timestamp, record
            end
        end
        return 1, timestamp, record
    end

Please let me know if you need any other information

@sanicheev sanicheev changed the title [es_output] Generate_ID sometimes corrupts message structure during /_bulk requests.(possible cause: memory corruption) [es_output] Generate_ID sometimes corrupts message structure during /_bulk requests.(possible cause: incorrect memory buffer size) Nov 11, 2021
@sanicheev
Copy link
Author

I have an impression if header is too long(which looks like our case) it is being cut and we get an unexpected behaviour?

@sanicheev
Copy link
Author

Can confirm the issue is related to bulk header size.
If its going above 165 symbols it gets truncated.
Extra space consumed by the rest of the string is replaced by random data which breaks json format.

It looks like we need a mechanism to trim bulk header string if its longer than 165 symbols.

@sanicheev
Copy link
Author

sanicheev commented Nov 22, 2021

We are using a workaround currently: truncate long index names using custom lua instructions.
This tip might be helpful to people who will face same issue

@nokute78
Copy link
Collaborator

Thank you for reporting.
If a length of _index is greater than 80, snprintf will truncate and generate broken json to fill fixed bulk header.

The length of example is 94 and it causes this issue.

$ irb
irb(main):001:0> "k8s-v1-abcdef-website-production-abcdef-website-production-abcdef-event-bus-handler-2021.11.10".length
=> 94

We can reproduce this issue using below command.

fluent-bit -i cpu -o es -p generate_id=on -p index=123456789012345678901234567890123456789012345678901234567890123456789012345678901

nokute78 added a commit to nokute78/fluent-bit that referenced this issue Nov 23, 2021
Signed-off-by: Takahiro Yamashita <nokute78@gmail.com>
@nokute78
Copy link
Collaborator

I sent a patch #4361.

@sanicheev
Copy link
Author

Thank you @nokute78

edsiper pushed a commit that referenced this issue Dec 18, 2021
Signed-off-by: Takahiro Yamashita <nokute78@gmail.com>
0Delta pushed a commit to 0Delta/fluent-bit that referenced this issue Jan 20, 2022
Signed-off-by: Takahiro Yamashita <nokute78@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants