-
Notifications
You must be signed in to change notification settings - Fork 935
Description
Description
Hi, I've been using this library to send the result of an AWS lambda to our kafka with an avro schema.
The serialisation of the schema is fine, except for the first key, "creation_time" which invariably is set to something like 1970-01-20T13:34:12.378Z when received by kafka (as visualised in AKHQ).
The timestamp of the message is fine as long as I let it set to default, If y try to use the same timestamp as in the schema the message is shown in the AKHQ as sent 58 years ago.
I have the problem in all our environments kafka's and I can reproduce it on my local dev env.
I tried to debug the code, but I can't get info after the serialisation, here's what I'm sure of:
Timestamp var content just before serialisation (float): 1690451888.45323
Time received on the AKHQ message: 1970-01-20T13:34:11.888Z
After conversion this time give 1686851 as timestamp.
I initially through it was somehow truncated before serialisation, but it doesn't looks like it.
How to reproduce
Here's how I get my timestamp in the values:
timestamp = datetime.now(tz=tz.gettz(self.config.timezone)).timestamp()
values = {
"creationTime": timestamp,
"description": message,
"eventTypeId": self.config.metric_name,
"pgd": [],
"contracts": [],
"points": [],
"objects": [],
}My kafka code
"""This module contains everything necessary to send messages to kafka"""
import logging
from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import StringSerializer
from param_store_models import KafkaInfo
LOGGER = logging.getLogger(__name__)
class KafkaProducer:
"""Class used to send messages to kafka"""
def __init__(self, schema: str, kafka_info: KafkaInfo):
producer_ssm_conf = {
"bootstrap.servers": kafka_info.bootstrap_servers,
"security.protocol": kafka_info.security_protocol,
"sasl.mechanism": kafka_info.sasl_mecanism,
"sasl.username": kafka_info.sasl_username,
"sasl.password": kafka_info.sasl_password,
}
registry_ssm_conf = {"url": kafka_info.schema_registry_url}
serializer = AvroSerializer(
SchemaRegistryClient(registry_ssm_conf), schema, conf={"auto.register.schemas": False}
)
producer_default_conf = {
"value.serializer": serializer,
"key.serializer": StringSerializer(),
"enable.idempotence": "true",
"max.in.flight.requests.per.connection": 1,
"retries": 5,
"acks": "all",
"retry.backoff.ms": 500,
"queue.buffering.max.ms": 500,
"error_cb": self.delivery_report,
}
self.__serializing_producer = SerializingProducer({**producer_default_conf, **producer_ssm_conf})
def produce(self, topic: str, key=None, value=None, timestamp=0):
"""Asynchronously produce message to a topic"""
LOGGER.info(f"Produce message {value} to topic {topic}")
self.__serializing_producer.produce(topic, key, value, on_delivery=self.delivery_report, timestamp=timestamp)
def flush(self):
"""
Flush messages and trigger callbacks
:return: Number of messages still in queue.
"""
LOGGER.debug("Flushing messages to kafka")
return self.__serializing_producer.flush()
@staticmethod
def delivery_report(err, msg):
"""
Called once for each message produced to indicate delivery result.
Triggered by poll() or flush().
"""
if err:
LOGGER.error(f"Kafka message delivery failed: {err}")
else:
LOGGER.info(f"Kafka message delivered to {msg.topic()} [{msg.partition()}]")My avro schema (partially redacted to hide the customer info)
{
"type": "record",
"name": "EventRecord",
"namespace": "com.event",
"doc": "Schéma d'un évènement de supervision brut",
"fields": [
{
"name": "creationTime",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "eventTypeId",
"type": "string"
},
{
"name": "internalProductId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "description",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "contracts",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "points",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "objects",
"type": {
"type": "array",
"items": "string"
}
}
]
}Received message on the akhq topic
{"creationTime":"1970-01-20T13:34:12.378Z","eventTypeId":"test","description":"Test","pgd":[],"contracts":[],"points":[],"objects":[]}Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): ('2.1.1', 33620224), ('2.1.1', 33620479) - Apache Kafka broker version: confluentinc/cp-kafka:latest
- Client configuration:
{...} - Operating system: Windows (Kafka stock on docker) / AWS ECS (Docker containers also)
- Provide client logs (with
'debug': '..'as necessary): provided above - Provide broker log excerpts: No special logs in the broker for this
- Critical issue