Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion concepts/key-concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ Source events can have or not have a structure. A structure defines a set of _ke
At a low level both are just an array of bytes, but the Structured message defines _keys_ and _values_, having a structure helps to implement faster operations on data modifications.

{% hint style="info" %}
Fluent Bit **always** handles every Event message as a structured message. For performance reasons, we use a binary serialization data format called [MessagePack](https://msgpack.org/).
Fluent Bit **always** handles every Event message as a structured message.
For performance reasons, we use a binary serialization data format called [MessagePack](https://msgpack.org/).

Consider [MessagePack](https://msgpack.org/) as a binary version of JSON on steroids.
{% endhint %}
Expand Down
54 changes: 54 additions & 0 deletions development/msgpack-format.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Msgpack format

Fluent Bit **always** handles every Event message as a structured message using a binary serialization data format called [MessagePack](https://msgpack.org/).

## Fluent Bit usage

MessagePack is a standard and well-defined format, refer to the official documentation for full details.
This section provides an overview of the specific types used by Fluent Bit within the format to help anyone consuming it.

* The data structure used by Fluent Bit is a 2-length [`fixarray`](https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family) of the timestamp and the data.
* The timestamp comes from [`flb_time_append_to_msgpack`])(https://github.com/fluent/fluent-bit/blob/2138cee8f4878733956d42d82f6dcf95f0aa9339/src/flb_time.c#L197), so it’s either a `uint64`, a `float64`, or a [`fixext`](https://github.com/msgpack/msgpack/blob/master/spec.md#ext-format-family) where the 4 MSBs are the seconds (big-endian `uint32`) and 4 LSBs are nanoseconds.
* The data itself is just a [`msgpack` map](https://github.com/msgpack/msgpack/blob/master/spec.md#map-format-family) with the keys as strings.

## Example

Set up Fluent Bit to send in `msgpack` format to a specific port.

```bash
$ docker run --rm -it --network=host fluent/fluent-bit /fluent-bit/bin/fluent-bit -i cpu -o tcp://127.0.0.1:5170 -p format=msgpack -v

```

We could send this to stdout but as it is a serialized format you would end up with strange output.
As an example we use the [Python msgpack library](https://msgpack.org/#languages) to deal with it:

```python
#Python3
import socket
import msgpack

unpacker = msgpack.Unpacker(use_list=False, raw=False)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", 5170))
s.listen(1)
connection, address = s.accept()

while True:
data = connection.recv(1024)
if not data:
break
unpacker.feed(data)
for unpacked in unpacker:
print(unpacked)
```

Now make sure to install `msgpack` for Python (3) and then run up your test code:

```bash
$ pip install msgpack
$ python3 test.py
(ExtType(code=0, data=b'b\n5\xc65\x05\x14\xac'), {'cpu_p': 0.1875, 'user_p': 0.125, 'system_p': 0.0625, 'cpu0.p_cpu': 0.0, 'cpu0.p_user': 0.0, 'cpu0.p_system': 0.0, 'cpu1.p_cpu': 0.0, 'cpu1.p_user': 0.0, 'cpu1.p_system': 0.0, 'cpu2.p_cpu': 1.0, 'cpu2.p_user': 0.0, 'cpu2.p_system': 1.0, 'cpu3.p_cpu': 0.0, 'cpu3.p_user': 0.0, 'cpu3.p_system': 0.0, 'cpu4.p_cpu': 0.0, 'cpu4.p_user': 0.0, 'cpu4.p_system': 0.0, 'cpu5.p_cpu': 0.0, 'cpu5.p_user': 0.0, 'cpu5.p_system': 0.0, 'cpu6.p_cpu': 0.0, 'cpu6.p_user': 0.0, 'cpu6.p_system': 0.0, 'cpu7.p_cpu': 0.0, 'cpu7.p_user': 0.0, 'cpu7.p_system': 0.0, 'cpu8.p_cpu': 0.0, 'cpu8.p_user': 0.0, 'cpu8.p_system': 0.0, 'cpu9.p_cpu': 1.0, 'cpu9.p_user': 1.0, 'cpu9.p_system': 0.0, 'cpu10.p_cpu': 0.0, 'cpu10.p_user': 0.0, 'cpu10.p_system': 0.0, 'cpu11.p_cpu': 0.0, 'cpu11.p_user': 0.0, 'cpu11.p_system': 0.0, 'cpu12.p_cpu': 0.0, 'cpu12.p_user': 0.0, 'cpu12.p_system': 0.0, 'cpu13.p_cpu': 0.0, 'cpu13.p_user': 0.0, 'cpu13.p_system': 0.0, 'cpu14.p_cpu': 0.0, 'cpu14.p_user': 0.0, 'cpu14.p_system': 0.0, 'cpu15.p_cpu': 0.0, 'cpu15.p_user': 0.0, 'cpu15.p_system': 0.0})

```
61 changes: 42 additions & 19 deletions pipeline/outputs/tcp-and-tls.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,59 @@ The following parameters are available to configure a secure channel connection

### Command Line

#### JSON format

```bash
$ bin/fluent-bit -i cpu -o tcp://127.0.0.1:5170 -p format=json_lines -v
```

We have specified to gather [CPU](https://github.com/fluent/fluent-bit-docs/tree/16f30161dc4c79d407cd9c586a0c6839d0969d97/pipeline/input/cpu.md) usage metrics and send them in JSON lines mode to a remote end-point using netcat service, e.g:

#### Start the TCP listener
We have specified to gather [CPU](https://github.com/fluent/fluent-bit-docs/tree/16f30161dc4c79d407cd9c586a0c6839d0969d97/pipeline/input/cpu.md) usage metrics and send them in JSON lines mode to a remote end-point using netcat service.

Run the following in a separate terminal, netcat will start listening for messages on TCP port 5170
Run the following in a separate terminal, `netcat` will start listening for messages on TCP port 5170.
Once it connects to Fluent Bit ou should see the output as above in JSON format:

```text
```bash
$ nc -l 5170
{"date":1644834856.905985,"cpu_p":1.1875,"user_p":0.5625,"system_p":0.625,"cpu0.p_cpu":0.0,"cpu0.p_user":0.0,"cpu0.p_system":0.0,"cpu1.p_cpu":1.0,"cpu1.p_user":1.0,"cpu1.p_system":0.0,"cpu2.p_cpu":4.0,"cpu2.p_user":2.0,"cpu2.p_system":2.0,"cpu3.p_cpu":1.0,"cpu3.p_user":0.0,"cpu3.p_system":1.0,"cpu4.p_cpu":1.0,"cpu4.p_user":0.0,"cpu4.p_system":1.0,"cpu5.p_cpu":1.0,"cpu5.p_user":1.0,"cpu5.p_system":0.0,"cpu6.p_cpu":0.0,"cpu6.p_user":0.0,"cpu6.p_system":0.0,"cpu7.p_cpu":3.0,"cpu7.p_user":1.0,"cpu7.p_system":2.0,"cpu8.p_cpu":0.0,"cpu8.p_user":0.0,"cpu8.p_system":0.0,"cpu9.p_cpu":1.0,"cpu9.p_user":0.0,"cpu9.p_system":1.0,"cpu10.p_cpu":1.0,"cpu10.p_user":0.0,"cpu10.p_system":1.0,"cpu11.p_cpu":0.0,"cpu11.p_user":0.0,"cpu11.p_system":0.0,"cpu12.p_cpu":0.0,"cpu12.p_user":0.0,"cpu12.p_system":0.0,"cpu13.p_cpu":3.0,"cpu13.p_user":2.0,"cpu13.p_system":1.0,"cpu14.p_cpu":1.0,"cpu14.p_user":1.0,"cpu14.p_system":0.0,"cpu15.p_cpu":0.0,"cpu15.p_user":0.0,"cpu15.p_system":0.0}
```

Start Fluent Bit
#### Msgpack format

Repeat the JSON approach but using the `msgpack` output format.

```bash
$ bin/fluent-bit -i cpu -o stdout -p format=msgpack -v
Fluent Bit v1.x.x
* Copyright (C) 2019-2020 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2016/10/07 21:52:01] [ info] [engine] started
[0] cpu.0: [1475898721, {"cpu_p"=>0.500000, "user_p"=>0.250000, "system_p"=>0.250000, "cpu0.p_cpu"=>0.000000, "cpu0.p_user"=>0.000000, "cpu0.p_system"=>0.000000, "cpu1.p_cpu"=>0.000000, "cpu1.p_user"=>0.000000, "cpu1.p_system"=>0.000000, "cpu2.p_cpu"=>0.000000, "cpu2.p_user"=>0.000000, "cpu2.p_system"=>0.000000, "cpu3.p_cpu"=>1.000000, "cpu3.p_user"=>0.000000, "cpu3.p_system"=>1.000000}]
[1] cpu.0: [1475898722, {"cpu_p"=>0.250000, "user_p"=>0.250000, "system_p"=>0.000000, "cpu0.p_cpu"=>0.000000, "cpu0.p_user"=>0.000000, "cpu0.p_system"=>0.000000, "cpu1.p_cpu"=>1.000000, "cpu1.p_user"=>1.000000, "cpu1.p_system"=>0.000000, "cpu2.p_cpu"=>0.000000, "cpu2.p_user"=>0.000000, "cpu2.p_system"=>0.000000, "cpu3.p_cpu"=>0.000000, "cpu3.p_user"=>0.000000, "cpu3.p_system"=>0.000000}]
[2] cpu.0: [1475898723, {"cpu_p"=>0.750000, "user_p"=>0.250000, "system_p"=>0.500000, "cpu0.p_cpu"=>2.000000, "cpu0.p_user"=>1.000000, "cpu0.p_system"=>1.000000, "cpu1.p_cpu"=>0.000000, "cpu1.p_user"=>0.000000, "cpu1.p_system"=>0.000000, "cpu2.p_cpu"=>1.000000, "cpu2.p_user"=>0.000000, "cpu2.p_system"=>1.000000, "cpu3.p_cpu"=>0.000000, "cpu3.p_user"=>0.000000, "cpu3.p_system"=>0.000000}]
[3] cpu.0: [1475898724, {"cpu_p"=>1.000000, "user_p"=>0.750000, "system_p"=>0.250000, "cpu0.p_cpu"=>1.000000, "cpu0.p_user"=>1.000000, "cpu0.p_system"=>0.000000, "cpu1.p_cpu"=>2.000000, "cpu1.p_user"=>1.000000, "cpu1.p_system"=>1.000000, "cpu2.p_cpu"=>1.000000, "cpu2.p_user"=>1.000000, "cpu2.p_system"=>0.000000, "cpu3.p_cpu"=>1.000000, "cpu3.p_user"=>1.000000, "cpu3.p_system"=>0.000000}]
$ bin/fluent-bit -i cpu -o tcp://127.0.0.1:5170 -p format=msgpack -v

```

No more, no less, it just works.
We could send this to stdout but as it is a serialized format you would end up with strange output.
This should really be handled by a msgpack receiver to unpack as per the details in the developer documentation [here](../../development/msgpack-format.md).
As an example we use the [Python msgpack library](https://msgpack.org/#languages) to deal with it:

```python
#Python3
import socket
import msgpack

unpacker = msgpack.Unpacker(use_list=False, raw=False)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("127.0.0.1", 5170))
s.listen(1)
connection, address = s.accept()

while True:
data = connection.recv(1024)
if not data:
break
unpacker.feed(data)
for unpacked in unpacker:
print(unpacked)
```

```bash
$ pip install msgpack
$ python3 test.py
(ExtType(code=0, data=b'b\n5\xc65\x05\x14\xac'), {'cpu_p': 0.1875, 'user_p': 0.125, 'system_p': 0.0625, 'cpu0.p_cpu': 0.0, 'cpu0.p_user': 0.0, 'cpu0.p_system': 0.0, 'cpu1.p_cpu': 0.0, 'cpu1.p_user': 0.0, 'cpu1.p_system': 0.0, 'cpu2.p_cpu': 1.0, 'cpu2.p_user': 0.0, 'cpu2.p_system': 1.0, 'cpu3.p_cpu': 0.0, 'cpu3.p_user': 0.0, 'cpu3.p_system': 0.0, 'cpu4.p_cpu': 0.0, 'cpu4.p_user': 0.0, 'cpu4.p_system': 0.0, 'cpu5.p_cpu': 0.0, 'cpu5.p_user': 0.0, 'cpu5.p_system': 0.0, 'cpu6.p_cpu': 0.0, 'cpu6.p_user': 0.0, 'cpu6.p_system': 0.0, 'cpu7.p_cpu': 0.0, 'cpu7.p_user': 0.0, 'cpu7.p_system': 0.0, 'cpu8.p_cpu': 0.0, 'cpu8.p_user': 0.0, 'cpu8.p_system': 0.0, 'cpu9.p_cpu': 1.0, 'cpu9.p_user': 1.0, 'cpu9.p_system': 0.0, 'cpu10.p_cpu': 0.0, 'cpu10.p_user': 0.0, 'cpu10.p_system': 0.0, 'cpu11.p_cpu': 0.0, 'cpu11.p_user': 0.0, 'cpu11.p_system': 0.0, 'cpu12.p_cpu': 0.0, 'cpu12.p_user': 0.0, 'cpu12.p_system': 0.0, 'cpu13.p_cpu': 0.0, 'cpu13.p_user': 0.0, 'cpu13.p_system': 0.0, 'cpu14.p_cpu': 0.0, 'cpu14.p_user': 0.0, 'cpu14.p_system': 0.0, 'cpu15.p_cpu': 0.0, 'cpu15.p_user': 0.0, 'cpu15.p_system': 0.0})

```