## Sender Example

The sender connects the pipeline from example 10 to the output connectors form examples 6 and 7. The job of the  sender is to send events in state `processed` to the default output and events in state `failed` to the defined error output if configured. The sender works in batches and ensures the output connectors are flushed and backlogs of the output connectors are cleared after finishing a batch. So all the events should have the state `failed` or `delivered` after the batch cycle finishes.

Batches are processed in 4 simple steps:

1. Fill the batch from the `pipeline` iterator.
2. Send all events in state `processed` to the default output and flush the default output.
3. Send all events in state `failed` to the error output if defined and flush the error output.
4. If something fails on sending to error output, send an error message to stdout including the event

On our way the events will produce extra data like SRE Events and Pseudonym Events. These should be send to a corresponding opensearch index `sre` or `pseudonyms`. 

Lets start and start our needed opensearch instance.


In [17]:
%%bash
docker compose -f ../../../../../examples/compose/docker-compose.yml down -v  
docker compose -f ../../../../../examples/compose/docker-compose.yml up -d opensearch dashboards


DEBUG:asyncio:Using selector: EpollSelector


 Container dashboards  Stopping
 Container dashboards  Stopped
 Container dashboards  Removing
 Container dashboards  Removed
 Container opensearch  Stopping
 Container opensearch  Stopped
 Container opensearch  Removing
 Container opensearch  Removed
 Network compose_opensearch  Removing
 Volume compose_opensearch-data  Removing
 Volume compose_opensearch-data  Removed
 Network compose_opensearch  Removed
 Network compose_opensearch  Creating
 Network compose_opensearch  Created
 Volume "compose_opensearch-data"  Creating
 Volume "compose_opensearch-data"  Created
 Container opensearch  Creating
 Container opensearch  Created
 Container dashboards  Creating
 Container dashboards  Created
 Container opensearch  Starting
 Container opensearch  Started
 Container dashboards  Starting
 Container dashboards  Started


To make thinks simple we use the pipeline example from example 10 to produce LogEvents with extra_data.

In [21]:
from logprep.util.time import TimeParser
from logprep.ng.event.log_event import LogEvent
from logprep.ng.event.event_state import EventStateType
import logging
import sys

# Configure logging
logging.basicConfig(
    level=logging.DEBUG, 
    stream=sys.stdout
)


from logprep.factory import Factory
from logprep.ng.abc.processor import Processor
from logprep.ng.event.log_event import LogEvent
from logprep.ng.event.event_state import EventStateType
from logprep.ng.pipeline import Pipeline


input_data =  [
        LogEvent({"message": "Log message 1", "@timestamp": str(TimeParser.now())}, original=b"{\"message\": \"Log message 1\", \"@timestamp\": \"" + str(TimeParser.now()).encode() + b"\"}", state=EventStateType.RECEIVED),
        LogEvent({"message": "Log message 2", "@timestamp": str(TimeParser.now())}, original=b"{\"message\": \"Log message 2\", \"@timestamp\": \"" + str(TimeParser.now()).encode() + b"\"}", state=EventStateType.RECEIVED),
        LogEvent({"user": {"name": "John Doe"}, "@timestamp": str(TimeParser.now())}, original=b"{\"user\": {\"name\": \"John Doe\"}, \"@timestamp\": \"" + str(TimeParser.now()).encode() + b"\"}", state=EventStateType.RECEIVED),
    ]

def input_connector():
    for event in input_data:
        yield event
    return


def get_processors() -> list[Processor]:
    processors = [
        Factory.create(
            {
                "processor": {
                    "type": "ng_generic_adder",
                    "rules": [
                        {
                            "filter": "*",
                            "generic_adder": {"add": {"event.tags": "generic added tag"}},
                        }
                    ],
                }
            }
        ),
        Factory.create(
            {
                "pseudo_this": {
                    "type": "ng_pseudonymizer",
                    "pubkey_analyst": "../../../../../examples/exampledata/rules/pseudonymizer/example_analyst_pub.pem",
                    "pubkey_depseudo": "../../../../../examples/exampledata/rules/pseudonymizer/example_depseudo_pub.pem",
                    "regex_mapping": "../../../../../examples/exampledata/rules/pseudonymizer/regex_mapping.yml",
                    "hash_salt": "a_secret_tasty_ingredient",
                    "outputs": [{"opensearch": "pseudonyms"}],
                    "rules": [
                        {
                            "filter": "user.name",
                            "pseudonymizer": {
                                "id": "pseudonymizer-1a3c69b2-5d54-4b6b-ab07-c7ddbea7917c",
                                "mapping": {"user.name": "RE_WHOLE_FIELD"},
                            },
                        }
                    ],
                    "max_cached_pseudonyms": 1000000,
                }
            }
        ),
    ]
    for processor in processors:
        processor.setup()
    return processors

processors = get_processors()
pipeline = Pipeline(input_connector(), processors)


DEBUG:Processor:GenericAdder (processor) loaded 1 rules
DEBUG:Processor:Pseudonymizer (pseudo_this) loaded 1 rules
DEBUG:Component:Checking health of processor
DEBUG:Component:Checking health of pseudo_this


Now lets create our `opensearch_output` and `error_output` connector and instantiate the `Sender` class.

In [22]:

from logprep.ng.connector.opensearch.output import OpensearchOutput
from logprep.ng.sender import Sender

# creating default output
config = {
    "type": "ng_opensearch_output",
    "hosts": [
        "127.0.0.1:9200"
    ],
    "default_index": "processed",
    "default_op_type": "create",
    "message_backlog_size": 1000,
    "timeout": 10000,
    "flush_timeout": 60,
    "user": "admin",
    "secret": "admin",
    "desired_cluster_status": ["green", "yellow"]
}

opensearch_output: OpensearchOutput = Factory.create({"opensearch": config})
opensearch_output.setup()

# creating error output

error_config = {
    "type": "ng_opensearch_output",
    "hosts": [
        "127.0.0.1:9200"
    ],
    "default_index": "errors",
    "default_op_type": "create",
    "message_backlog_size": 1000,
    "timeout": 10000,
    "flush_timeout": 60,
    "user": "admin",
    "secret": "admin",
    "desired_cluster_status": ["green", "yellow"]
}

error_output: OpensearchOutput = Factory.create({"erroroutput": error_config})
error_output.setup()

sender = Sender(pipeline=pipeline, outputs=[opensearch_output], error_output=error_output)

DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): 127.0.0.1:9200
DEBUG:urllib3.connectionpool:http://127.0.0.1:9200 "GET /_cluster/health HTTP/1.1" 200 0
INFO:opensearch:GET http://127.0.0.1:9200/_cluster/health [status:200 request:0.009s]
DEBUG:opensearch:> None
DEBUG:opensearch:< {"cluster_name":"opensearch-cluster","status":"yellow","timed_out":false,"number_of_nodes":1,"number_of_data_nodes":1,"discovered_master":true,"discovered_cluster_manager":true,"active_primary_shards":6,"active_shards":6,"relocating_shards":0,"initializing_shards":0,"unassigned_shards":1,"delayed_unassigned_shards":0,"number_of_pending_tasks":0,"number_of_in_flight_fetch":0,"task_max_waiting_in_queue_millis":0,"active_shards_percent_as_number":85.71428571428571}
DEBUG:Component:Checking health of opensearch
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): 127.0.0.1:9200
DEBUG:urllib3.connectionpool:http://127.0.0.1:9200 "GET /_cluster/health HTTP/1.1" 200 0
INFO:opensearch:GET http

Running the sender

In [23]:
for event in sender:
    print(event)

DEBUG:Processor:GenericAdder (processor) processing event LogEvent(data={'message': 'Log message 1', '@timestamp': '2025-08-25 11:40:43.193610+00:00'}, state=processing)
DEBUG:Processor:Pseudonymizer (pseudo_this) processing event LogEvent(data={'message': 'Log message 1', '@timestamp': '2025-08-25 11:40:43.193610+00:00', 'event': {'tags': 'generic added tag'}}, state=processing)
DEBUG:Processor:GenericAdder (processor) processing event LogEvent(data={'message': 'Log message 2', '@timestamp': '2025-08-25 11:40:43.193661+00:00'}, state=processing)
DEBUG:Processor:Pseudonymizer (pseudo_this) processing event LogEvent(data={'message': 'Log message 2', '@timestamp': '2025-08-25 11:40:43.193661+00:00', 'event': {'tags': 'generic added tag'}}, state=processing)
DEBUG:Processor:GenericAdder (processor) processing event LogEvent(data={'user': {'name': 'John Doe'}, '@timestamp': '2025-08-25 11:40:43.193673+00:00'}, state=processing)
DEBUG:Processor:Pseudonymizer (pseudo_this) processing event L

To simulate failure we change write a concrete value to the field `event` where an object is expected. This should lead to a mapper parsing errors.

In [24]:
input_data =  [
    LogEvent({"message": "Log message 1", "event": "concrete value", "@timestamp": str(TimeParser.now())}, original=b"{\"message\": \"Log message 1\", \"event\": \"concrete value\", \"@timestamp\": \"" + str(TimeParser.now()).encode() + b"\"}", state=EventStateType.RECEIVED),
    LogEvent({"message": "Log message 2", "@timestamp": str(TimeParser.now())}, original=b"{\"message\": \"Log message 2\", \"@timestamp\": \"" + str(TimeParser.now()).encode() + b"\"}", state=EventStateType.RECEIVED),
    LogEvent({"user": {"name": "John Doe"}, "@timestamp": str(TimeParser.now())}, original=b"{\"user\": {\"name\": \"John Doe\"}, \"@timestamp\": \"" + str(TimeParser.now()).encode() + b"\"}", state=EventStateType.RECEIVED),
    ]

def input_connector():
    for event in input_data:
        yield event
    return

pipeline = Pipeline(input_connector(), processors)

sender = Sender(pipeline=pipeline, outputs=[opensearch_output], error_output=error_output)

rerun the sender

In [25]:
for event in sender:
    print(event)

DEBUG:Processor:GenericAdder (processor) processing event LogEvent(data={'message': 'Log message 1', 'event': 'concrete value', '@timestamp': '2025-08-25 11:40:57.501194+00:00'}, state=processing)
DEBUG:Processor:Pseudonymizer (pseudo_this) processing event LogEvent(data={'message': 'Log message 1', 'event': 'concrete value', '@timestamp': '2025-08-25 11:40:57.501194+00:00', 'tags': ['_generic_adder_failure']}, state=processing)
DEBUG:Processor:GenericAdder (processor) processing event LogEvent(data={'message': 'Log message 2', '@timestamp': '2025-08-25 11:40:57.501232+00:00'}, state=processing)
DEBUG:Processor:Pseudonymizer (pseudo_this) processing event LogEvent(data={'message': 'Log message 2', '@timestamp': '2025-08-25 11:40:57.501232+00:00', 'event': {'tags': 'generic added tag'}}, state=processing)
DEBUG:Processor:GenericAdder (processor) processing event LogEvent(data={'user': {'name': 'John Doe'}, '@timestamp': '2025-08-25 11:40:57.501244+00:00'}, state=processing)
DEBUG:Proces

check if we are able to make events from opensearch error index documents to ensure we are able to consume them with an opensearch_input connector in future iterations.

In [26]:
# get opensearch error event by reusing opensearch error_output connector client
import json
result = error_output._search_context.search(index="errors", body={"query": {"match_all": {}}})
last_error_event = result["hits"]["hits"][-1]["_source"]
data, original = json.loads(last_error_event["event"]), json.loads(last_error_event["original"])
log_event = LogEvent(data, original=original, state=EventStateType.RECEIVING)
print(log_event)

DEBUG:urllib3.connectionpool:http://127.0.0.1:9200 "POST /errors/_search HTTP/1.1" 200 0
INFO:opensearch:POST http://127.0.0.1:9200/errors/_search [status:200 request:0.008s]
DEBUG:opensearch:> {"query":{"match_all":{}}}
DEBUG:opensearch:< {"took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"errors","_id":"dH8I4ZgBuKo0aJRmFG24","_score":1.0,"_source":{"@timestamp":"2025-08-25T11:40:59.881036+00:00","reason":"Error during processing: (BulkError({'type': 'mapper_parsing_exception', 'reason': 'object mapping for [event] tried to parse field [event] as object, but found a concrete value'}),)","original":"\"{\\\"message\\\": \\\"Log message 1\\\", \\\"event\\\": \\\"concrete value\\\", \\\"@timestamp\\\": \\\"2025-08-25 11:40:57.501204+00:00\\\"}\"","event":"\"{\\\"message\\\": \\\"Log message 1\\\", \\\"event\\\": \\\"concrete value\\\", \\\"@timestamp\\\": \\\"2025-08-25 11:40