### Chunks of code we will need

##### Utility code for credentials and configs

Install the needed libraries

In [None]:
!python -m pip install confluent-kafka tabulate

A bit of utility code to keep the credentials out of the github repo. There is an easy startup guide for using Confluent Cloud over at https://developer.confluent.io/get-started/python/. 

In [None]:
# Read the config file
def read_ccloud_config(config_file='client.properties', producer_only=True):
    omitted_fields = set(['schema.registry.url', 'basic.auth.credentials.source', 'basic.auth.user.info'])
    omitted_prefix = 'confluent'
    conf = {}
    with open(config_file) as fh:
        for line in fh:
            line = line.strip()
            if len(line) != 0 and line[0] != "#":
                parameter, value = line.strip().split('=', 1)
                if producer_only:
                    if parameter in omitted_fields or parameter.startswith(omitted_prefix):
                        continue   
                conf[parameter] = value.strip()
    return conf

##### The message producer

A basic Kafka Producer with a simple approach to data rate, payload size, and keys
Some documentation:
- The python kafka client library is described at https://docs.confluent.io/kafka-clients/python/current/overview.html
- The metrics delivered via the callback are documented at librdkafka https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md
- Configuration parameters for the Publisher are also as librdkafka https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
- Callbacks are at the python client documentation https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration

In [None]:
from confluent_kafka import Producer
from collections import defaultdict
from datetime import datetime, timezone
from random import randint
from time import sleep
import json


def get_stats_cb(results):
    # Accumulating, sort of, the statistics so we can aggregate at the end
    def stats_cb(s):
        j = json.loads(s)
        # Overwriting each time since we just need the last one
        results[j['name']].append(j)
    return stats_cb
        
def get_delivery_callback(latencies):
    def delivery_callback(err, msg):
        if err:
            print('ERROR: Message failed delivery: {}'.format(err))
        else:
            latencies.append(msg.latency())
    return delivery_callback

def getMessages(numMessages, msgSize):
    num_partitions = 6 # Our topic is configured as such
    # len is 64 for the below string
    base_msg = "Upon our honor, we will monitor our data streaming application. "
    for i in range(numMessages):
        yield { 'key': f"mt_key_{i % num_partitions}", 'value': f"{base_msg * (msgSize//len(base_msg))}" }

def publishMessages(load_params):
    startTime = datetime.now(timezone.utc)

    # Simulating extra connections
    conf = read_ccloud_config()
    statistics_interval_ms = 250
    stats = defaultdict(list)
    conf['stats_cb'] = get_stats_cb(stats)
    conf['statistics.interval.ms'] = statistics_interval_ms
    if 'extra_producer_args' in load_params.keys():
        conf.update(load_params['extra_producer_args'])

    producers = [ Producer(conf) for i in range(load_params['num_producers']) ]
    
    msgSentCount = 0
    numMessages = load_params['num_msgs']
    msgSize = load_params['msg_size_bytes']
    msgRateSleepTimeSecs = 1 / load_params['msg_rate_per_s'] 

    latencies = []
    delivery_callback = get_delivery_callback(latencies)

    for msg in getMessages(numMessages, msgSize):
        ts = datetime.now(timezone.utc)
        ts_str = ts.isoformat()
        msg['value'] = '{ "payload": "' + msg['value'] + '", "ts": "' + ts_str + '" }'
        producer_index = msgSentCount % load_params['num_producers']
        producers[producer_index].produce("sale_records", key=msg['key'], value=msg['value'],
                                                       on_delivery=delivery_callback)
        if not msgSentCount % 100:
            for producer in producers:
                producer.poll()
        msgSentCount += 1
        sleep(msgRateSleepTimeSecs)           

    produceEndTime = datetime.now(timezone.utc)

    for producer in producers:
        producer.flush()
    
    endTime = datetime.now(timezone.utc)

    return startTime, produceEndTime, endTime, latencies, stats

##### Getting cluster metrics using the Metrics API

The calls to the Confluent Metrics API to get the metrics we care about. 
- Documentation for it is at https://docs.confluent.io/cloud/current/monitoring/metrics-api.html. 
- A complete list of available cluster metrics is at https://api.telemetry.confluent.cloud/docs/descriptors/datasets/cloud

In [None]:
from datetime import datetime, timedelta
import json
import urllib.request

MetricsQueries = {
    'active_connection_count' : {
        'query': { "aggregations":[{ "metric":"io.confluent.kafka.server/active_connection_count"}] }
    },
    'request_count': {
        'query': { 'aggregations': [{ 'metric': 'io.confluent.kafka.server/request_count'}] }
    },
    'received_records': {
        'query': { 'aggregations': [{ 'metric': 'io.confluent.kafka.server/received_records'}]}
    }
}
def getMetrics(startTime, endTime):
    # The Metrics API aggregates by the minute, and throw in clock skew
    sleep(60)
    startTime -= timedelta(seconds=60)
    endTime += timedelta(seconds=60)

    conf = read_ccloud_config(producer_only=False)
    url = conf['confluent.metrics.endpoint']
    headers = {
        'Authorization': f"Basic {conf['confluent.cloud_api_token']}",
        'Content-Type': 'application/json'
    }
    common = {
        "filter":{"op":"OR","filters":[{"field":"resource.kafka.id","op":"EQ","value":"lkc-v1jq15"}]},
        "granularity":"PT1M",
        "limit":1000
    }
    interval = {
        "intervals":[f"{startTime.isoformat(timespec='seconds')}/{endTime.isoformat(timespec='seconds')}"],
    }

    responses = {}
    for qry in MetricsQueries:
        data = MetricsQueries[qry]['query'] | common | interval

        req = urllib.request.Request(url, json.dumps(data).encode('utf-8'), headers)
        resp = urllib.request.urlopen(req)
        if resp.getcode() == 200:
            responses[qry] = json.loads(resp.read())
        else:
            print(f"Error: {resp.getcode()}, Request was {json.dumps(data)}")

    return responses


##### Test Runner

The main code to execute a sample load.

In [None]:
from statistics import mean
def execute_test(load_params):

    start, endProduce, end, latencies,stats = publishMessages(load_params)
    print(f"Done in {(end-start).seconds} s Start:{start}, End:{end})")

    result = [
        load_params['extra_producer_args']['linger.ms'],
        mean(latencies)
    ]
    if load_params.get('include_client_metrics', False):
        # Kafka client library metrics
        # Some are aggregated so we only need the last value
        # Some are per metric report, so we need to aggregate them ourselves
        client_metrics = { 
            'num_requests_made' : sum( [ s[-1]['tx'] for s in stats.values() ]),
            'num_messages_sent' : sum( [ s[-1]['txmsgs'] for s in stats.values() ]),
            'num_batch_cnt': sum( t['batchcnt']['cnt'] for s in stats.values() for entry in s for t in entry['topics'].values() ),
            'avg_batch_size_bytes': mean( t['batchsize']['avg'] for s in stats.values() for entry in s for t in entry['topics'].values() )
        }
        result.extend([
            client_metrics['num_requests_made'],
            client_metrics['num_messages_sent'],
            client_metrics['num_batch_cnt'],
            client_metrics['avg_batch_size_bytes']
        ])

    if load_params.get('include_cluster_metrics', False):
        # Metrics from the cluster metrics API
        from_metrics_api = getMetrics(start, end)
        cluster_metrics = {
            'active_connection_count': max([ v['value'] for v in from_metrics_api['active_connection_count']['data'] ]),
            'request_count': sum([ v['value'] for v in from_metrics_api['request_count']['data'] ]),
            'received_records': sum([ v['value'] for v in from_metrics_api['received_records']['data'] ])
        }
        result.extend([
            cluster_metrics['active_connection_count'],
            cluster_metrics['request_count'],
            cluster_metrics['received_records']
        ])
    
    return result

from tabulate import tabulate
def print_results(load_params, results):  
    headers = ['linger.ms', 'Avg Latency (micros)']
    if load_params.get('include_client_metrics', False):
        headers.extend(['Requests', 'Messages Sent', 'Batches Sent', 'Avg Batch Size(bytes)'])
    if load_params.get('include_cluster_metrics', False):
        headers.extend(['connections', 'Requests', 'Messages'])
    print(tabulate(results, headers=headers, tablefmt='orgtbl'))

### Snippets

#### delivery callback

In [None]:
producer = Producer(read_ccloud_config())
producer.produce(topic='sale_records', 
                 key='msg_key', value='This Data is in Motion',
                 on_delivery=lambda err,msg: print(f"Latency (in microseconds): {msg.latency()}"))
msgs_in_buffer = producer.flush()

#### stats callback

In [None]:
def stats_callback(json_string):
    stats = json.loads(json_string)
    print(f"Messages sent: {stats['txmsgs']}, Avg Messages in Batch: {stats['topics']['sale_records']['batchcnt']['avg']}")

conf = read_ccloud_config()
conf['stats_cb'] = stats_callback
conf['statistics.interval.ms'] = 100

producer = Producer(conf)

for msg_cont in range(5):
    producer.produce(topic='sale_records', 
                     key='msg_key', value='This Data is in Motion',
                     on_delivery=lambda err,msg: print(f"Latency (in microseconds): {msg.latency()}"))
    producer.poll()

msgs_in_buffer = producer.flush()

#### Metrics API

In [None]:
conf = read_ccloud_config(producer_only=False)
url = conf['confluent.metrics.endpoint']
startTime = datetime.now(timezone.utc) - timedelta(hours=1)
endTime = datetime.now(timezone.utc)
headers = {
    'Authorization': f"Basic {conf['confluent.cloud_api_token']}",
    'Content-Type': 'application/json'
}
qry = {
    'aggregations': [{ 'metric': 'io.confluent.kafka.server/received_records'}],
    'filter':{'op':'OR','filters':[{'field':'resource.kafka.id','op':'EQ','value':'lkc-v1jq15'}]},
    'granularity':'PT1M',
    'limit':1000,
    'intervals':[f"{startTime.isoformat(timespec='seconds')}/{endTime.isoformat(timespec='seconds')}"]
}
req = urllib.request.Request(url, json.dumps(qry).encode('utf-8'), headers)
resp = urllib.request.urlopen(req)
print(json.dumps(json.loads(resp.read()), indent=2))

## Tests

#### Linger.ms with application, kafka client and cluster metrics

In [None]:
# Load Generation
load_params = {
    'num_producers': 1,
    'num_msgs': 10000,
    'msg_rate_per_s': 200, 
    'msg_size_bytes': 1*1024,
    'extra_producer_args': {
        'linger.ms': 0
    },
    'include_client_metrics': True,
    'include_cluster_metrics': True
}

linger_ms_times = [ 0, 1000 ]

results = []
for linger_ms in linger_ms_times:
    load_params['extra_producer_args']['linger.ms'] = linger_ms
    results.append(execute_test(load_params))
    sleep(120) # Cluster metrics are aggregated to the minute and we want to avoid overlap

print_results(load_params, results)

#### Requests

Still bothered by client side requests != cluster request_count. Lets turn debugging on for a small test.

In [None]:
import logging
logging.basicConfig(filename='producer.log', filemode='w',
                    level=logging.DEBUG,
                    format='%(asctime)s %(levelname)s %(message)s')


# Load Generation
load_params = {
    'num_producers': 1,
    'num_msgs': 100000,
    'msg_rate_per_s': 400, 
    'msg_size_bytes': 1*1024,
    'extra_producer_args': {
        'linger.ms': 0,
        'debug': 'all',
        'logger': logging.getLogger()
    },
    'include_client_metrics': True,
    'include_cluster_metrics': True
}

results.append(execute_test(load_params))


print_results(load_params, results)