diff --git a/airflow_provider_kafka/operators/produce_to_topic.py b/airflow_provider_kafka/operators/produce_to_topic.py index 1287448..f0edb02 100644 --- a/airflow_provider_kafka/operators/produce_to_topic.py +++ b/airflow_provider_kafka/operators/produce_to_topic.py @@ -117,7 +117,7 @@ def execute(self, context) -> Any: # For each returned k/v in the callable : publish and flush if needed. for k, v in producer_callable(): producer.produce( - self.topic, key=k, value=v, on_delivery=self.delivery_callback + self.topic, key=k, value=v, headers=self.producer_function_kwargs.get('headers'), on_delivery=self.delivery_callback ) producer.poll(self.poll_timeout) if self.synchronous: