Skip to content

Latest commit

 

History

History
249 lines (208 loc) · 20.7 KB

kafka-logger.md

File metadata and controls

249 lines (208 loc) · 20.7 KB
title keywords description
kafka-logger
Apache APISIX
API Gateway
Plugin
Kafka Logger
This document contains information about the Apache APISIX kafka-logger Plugin.

Description

The kafka-logger Plugin is used to push logs as JSON objects to Apache Kafka clusters. It works as a Kafka client driver for the ngx_lua Nginx module.

It might take some time to receive the log data. It will be automatically sent after the timer function in the batch processor expires.

Attributes

Name Type Required Default Valid values Description
broker_list object True Deprecated, use brokers instead. List of Kafka brokers. (nodes).
brokers array True List of Kafka brokers (nodes).
brokers.host string True The host of Kafka broker, e.g, 192.168.1.1.
brokers.port integer True [0, 65535] The port of Kafka broker
brokers.sasl_config object False The sasl config of Kafka broker
brokers.sasl_config.mechanism string False "PLAIN" ["PLAIN"] The mechaism of sasl config
brokers.sasl_config.user string True The user of sasl_config. If sasl_config exists, it's required.
brokers.sasl_config.password string True The password of sasl_config. If sasl_config exists, it's required.
kafka_topic string True Target topic to push the logs for organisation.
producer_type string False async ["async", "sync"] Message sending mode of the producer.
required_acks integer False 1 [1, -1] Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka acks attribute. required_acks cannot be 0. See Apache Kafka documentation for more.
key string False Key used for allocating partitions for messages.
timeout integer False 3 [1,...] Timeout for the upstream to send data.
name string False "kafka logger" Unique identifier for the batch processor. If you use Prometheus to monitor APISIX metrics, the name is exported in apisix_batch_process_entries.
meta_format enum False "default" ["default","origin"] Format to collect the request information. Setting to default collects the information in JSON format and origin collects the information with the original HTTP request. See examples below.
log_format object False Log format declared as key value pairs in JSON format. Values only support strings. APISIX or Nginx variables can be used by prefixing the string with $.
include_req_body boolean False false [false, true] When set to true includes the request body in the log. If the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitations.
include_req_body_expr array False Filter for when the include_req_body attribute is set to true. Request body is only logged when the expression set here evaluates to true. See lua-resty-expr for more.
max_req_body_bytes integer False 524288 >=1 Request bodies within this size will be pushed to kafka, if the size exceeds the configured value it will be truncated before pushing to Kafka.
include_resp_body boolean False false [false, true] When set to true includes the response body in the log.
include_resp_body_expr array False Filter for when the include_resp_body attribute is set to true. Response body is only logged when the expression set here evaluates to true. See lua-resty-expr for more.
max_resp_body_bytes integer False 524288 >=1 Request bodies within this size will be pushed to kafka, if the size exceeds the configured value it will be truncated before pushing to Kafka.
cluster_name integer False 1 [0,...] Name of the cluster. Used when there are two or more Kafka clusters. Only works if the producer_type attribute is set to async.
producer_batch_num integer optional 200 [1,...] batch_num parameter in lua-resty-kafka. The merge message and batch is send to the server. Unit is message count.
producer_batch_size integer optional 1048576 [0,...] batch_size parameter in lua-resty-kafka in bytes.
producer_max_buffering integer optional 50000 [1,...] max_buffering parameter in lua-resty-kafka representing maximum buffer size. Unit is message count.
producer_time_linger integer optional 1 [1,...] flush_time parameter in lua-resty-kafka in seconds.
meta_refresh_interval integer optional 30 [1,...] refresh_interval parameter in lua-resty-kafka specifies the time to auto refresh the metadata, in seconds.

This Plugin supports using batch processors to aggregate and process entries (logs/data) in a batch. This avoids the need for frequently submitting the data. The batch processor submits data every 5 seconds or when the data in the queue reaches 1000. See Batch Processor for more information or setting your custom configuration.

:::info IMPORTANT

The data is first written to a buffer. When the buffer exceeds the batch_max_size or buffer_duration attribute, the data is sent to the Kafka server and the buffer is flushed.

If the process is successful, it will return true and if it fails, returns nil with a string with the "buffer overflow" error.

:::

meta_format example

  • default:

    {
      "upstream": "127.0.0.1:1980",
      "start_time": 1619414294760,
      "client_ip": "127.0.0.1",
      "service_id": "",
      "route_id": "1",
      "request": {
        "querystring": {
          "ab": "cd"
        },
        "size": 90,
        "uri": "/hello?ab=cd",
        "url": "http://localhost:1984/hello?ab=cd",
        "headers": {
          "host": "localhost",
          "content-length": "6",
          "connection": "close"
        },
        "body": "abcdef",
        "method": "GET"
      },
      "response": {
        "headers": {
          "connection": "close",
          "content-type": "text/plain; charset=utf-8",
          "date": "Mon, 26 Apr 2021 05:18:14 GMT",
          "server": "APISIX/2.5",
          "transfer-encoding": "chunked"
        },
        "size": 190,
        "status": 200
      },
      "server": {
        "hostname": "localhost",
        "version": "2.5"
      },
      "latency": 0
    }
  • origin:

    GET /hello?ab=cd HTTP/1.1
    host: localhost
    content-length: 6
    connection: close
    
    abcdef

Metadata

You can also set the format of the logs by configuring the Plugin metadata. The following configurations are available:

Name Type Required Default Description
log_format object False Log format declared as key value pairs in JSON format. Values only support strings. APISIX or Nginx variables can be used by prefixing the string with $.

:::info IMPORTANT

Configuring the Plugin metadata is global in scope. This means that it will take effect on all Routes and Services which use the kafka-logger Plugin.

:::

The example below shows how you can configure through the Admin API:

:::note You can fetch the admin_key from config.yaml and save to an environment variable with the following command:

admin_key=$(yq '.deployment.admin.admin_key[0].key' conf/config.yaml | sed 's/"//g')

:::

curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/kafka-logger -H "X-API-KEY: $admin_key" -X PUT -d '
{
    "log_format": {
        "host": "$host",
        "@timestamp": "$time_iso8601",
        "client_ip": "$remote_addr"
    }
}'

With this configuration, your logs would be formatted as shown below:

{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}

Enable Plugin

The example below shows how you can enable the kafka-logger Plugin on a specific Route:

curl http://127.0.0.1:9180/apisix/admin/routes/5 -H "X-API-KEY: $admin_key" -X PUT -d '
{
    "plugins": {
       "kafka-logger": {
           "brokers" : [
             {
               "host" :"127.0.0.1",
               "port" : 9092
             }
            ],
           "kafka_topic" : "test2",
           "key" : "key1",
           "batch_max_size": 1,
           "name": "kafka logger"
       }
    },
    "upstream": {
       "nodes": {
           "127.0.0.1:1980": 1
       },
       "type": "roundrobin"
    },
    "uri": "/hello"
}'

This Plugin also supports pushing to more than one broker at a time. You can specify multiple brokers in the Plugin configuration as shown below:

 "brokers" : [
    {
      "host" :"127.0.0.1",
      "port" : 9092
    },
    {
      "host" :"127.0.0.1",
      "port" : 9093
    }
],

Example usage

Now, if you make a request to APISIX, it will be logged in your Kafka server:

curl -i http://127.0.0.1:9080/hello

Delete Plugin

To remove the kafka-logger Plugin, you can delete the corresponding JSON configuration from the Plugin configuration. APISIX will automatically reload and you do not have to restart for this to take effect.

curl http://127.0.0.1:9180/apisix/admin/routes/1  -H "X-API-KEY: $admin_key" -X PUT -d '
{
    "methods": ["GET"],
    "uri": "/hello",
    "plugins": {},
    "upstream": {
        "type": "roundrobin",
        "nodes": {
            "127.0.0.1:1980": 1
        }
    }
}'