diff --git a/adala/environments/kafka.py b/adala/environments/kafka.py index 2f3881c..e863fc8 100644 --- a/adala/environments/kafka.py +++ b/adala/environments/kafka.py @@ -61,12 +61,12 @@ async def message_receiver(self, consumer: AIOKafkaConsumer, timeout: int = 3): try: # Wait for the next message with a timeout msg = await asyncio.wait_for(consumer.getone(), timeout=timeout) - # print_text(f"Received message: {msg.value}") + print_text(f"Received message: {msg.value}") yield msg.value except asyncio.TimeoutError: - # print_text( - # f"No message received within the timeout {timeout} seconds" - # ) + print_text( + f"No message received within the timeout {timeout} seconds" + ) break finally: await consumer.stop() @@ -78,10 +78,10 @@ async def message_sender( try: for record in data: await producer.send_and_wait(topic, value=record) - # print_text(f"Sent message: {record} to {topic=}") + print_text(f"Sent message: {record} to {topic=}") finally: await producer.stop() - # print_text(f"No more messages for {topic=}") + print_text(f"No more messages for {topic=}") async def get_next_batch(self, data_iterator, batch_size: int) -> List[Dict]: batch = [] diff --git a/server/tasks/process_file.py b/server/tasks/process_file.py index 4c0d51e..643b5c5 100644 --- a/server/tasks/process_file.py +++ b/server/tasks/process_file.py @@ -116,6 +116,9 @@ def process_file_streaming(self, agent: Agent, parent_job_id: str): # Set input and output topics using parent job ID agent.environment.kafka_input_topic = get_input_topic(parent_job_id) agent.environment.kafka_output_topic = get_output_topic(parent_job_id) + import time + + time.sleep(5) # Run the agent asyncio.run(agent.arun()) diff --git a/server/utils.py b/server/utils.py index 5b72a73..2857f74 100644 --- a/server/utils.py +++ b/server/utils.py @@ -30,15 +30,14 @@ def get_input_topic(job_id: str): retention_ms = settings.kafka_retention_ms admin_client = KafkaAdminClient( - bootstrap_servers=bootstrap_servers, - client_id='topic_creator' + bootstrap_servers=bootstrap_servers, client_id="topic_creator" ) topic = NewTopic( name=topic_name, num_partitions=1, replication_factor=1, - topic_configs={"retention.ms": str(retention_ms)} + topic_configs={"retention.ms": str(retention_ms)}, ) try: @@ -59,15 +58,14 @@ def get_output_topic(job_id: str): retention_ms = settings.kafka_retention_ms admin_client = KafkaAdminClient( - bootstrap_servers=bootstrap_servers, - client_id='topic_creator' + bootstrap_servers=bootstrap_servers, client_id="topic_creator" ) topic = NewTopic( name=topic_name, num_partitions=1, replication_factor=1, - topic_configs={"retention.ms": str(retention_ms)} + topic_configs={"retention.ms": str(retention_ms)}, ) try: @@ -84,8 +82,7 @@ def delete_topic(topic_name: str): bootstrap_servers = settings.kafka_bootstrap_servers admin_client = KafkaAdminClient( - bootstrap_servers=bootstrap_servers, - client_id='topic_deleter' + bootstrap_servers=bootstrap_servers, client_id="topic_deleter" ) admin_client.delete_topics(topics=[topic_name])