Skip to content

Commit

Permalink
black
Browse files Browse the repository at this point in the history
  • Loading branch information
matt-bernstein committed May 8, 2024
1 parent ca07581 commit df55514
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 14 deletions.
12 changes: 6 additions & 6 deletions adala/environments/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ async def message_receiver(self, consumer: AIOKafkaConsumer, timeout: int = 3):
try:
# Wait for the next message with a timeout
msg = await asyncio.wait_for(consumer.getone(), timeout=timeout)
# print_text(f"Received message: {msg.value}")
print_text(f"Received message: {msg.value}")
yield msg.value
except asyncio.TimeoutError:
# print_text(
# f"No message received within the timeout {timeout} seconds"
# )
print_text(
f"No message received within the timeout {timeout} seconds"
)
break
finally:
await consumer.stop()
Expand All @@ -78,10 +78,10 @@ async def message_sender(
try:
for record in data:
await producer.send_and_wait(topic, value=record)
# print_text(f"Sent message: {record} to {topic=}")
print_text(f"Sent message: {record} to {topic=}")
finally:
await producer.stop()
# print_text(f"No more messages for {topic=}")
print_text(f"No more messages for {topic=}")

async def get_next_batch(self, data_iterator, batch_size: int) -> List[Dict]:
batch = []
Expand Down
3 changes: 3 additions & 0 deletions server/tasks/process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ def process_file_streaming(self, agent: Agent, parent_job_id: str):
# Set input and output topics using parent job ID
agent.environment.kafka_input_topic = get_input_topic(parent_job_id)
agent.environment.kafka_output_topic = get_output_topic(parent_job_id)
import time

time.sleep(5)

# Run the agent
asyncio.run(agent.arun())
Expand Down
13 changes: 5 additions & 8 deletions server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ def get_input_topic(job_id: str):
retention_ms = settings.kafka_retention_ms

admin_client = KafkaAdminClient(
bootstrap_servers=bootstrap_servers,
client_id='topic_creator'
bootstrap_servers=bootstrap_servers, client_id="topic_creator"
)

topic = NewTopic(
name=topic_name,
num_partitions=1,
replication_factor=1,
topic_configs={"retention.ms": str(retention_ms)}
topic_configs={"retention.ms": str(retention_ms)},
)

try:
Expand All @@ -59,15 +58,14 @@ def get_output_topic(job_id: str):
retention_ms = settings.kafka_retention_ms

admin_client = KafkaAdminClient(
bootstrap_servers=bootstrap_servers,
client_id='topic_creator'
bootstrap_servers=bootstrap_servers, client_id="topic_creator"
)

topic = NewTopic(
name=topic_name,
num_partitions=1,
replication_factor=1,
topic_configs={"retention.ms": str(retention_ms)}
topic_configs={"retention.ms": str(retention_ms)},
)

try:
Expand All @@ -84,8 +82,7 @@ def delete_topic(topic_name: str):
bootstrap_servers = settings.kafka_bootstrap_servers

admin_client = KafkaAdminClient(
bootstrap_servers=bootstrap_servers,
client_id='topic_deleter'
bootstrap_servers=bootstrap_servers, client_id="topic_deleter"
)

admin_client.delete_topics(topics=[topic_name])

0 comments on commit df55514

Please sign in to comment.