Skip to content

Commit

Permalink
split out into utility fn
Browse files Browse the repository at this point in the history
  • Loading branch information
matt-bernstein committed May 8, 2024
1 parent df55514 commit ac7e310
Showing 1 changed file with 7 additions and 24 deletions.
31 changes: 7 additions & 24 deletions server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,20 @@ class Settings(BaseSettings):
def get_input_topic(job_id: str):
topic_name = f"adala-input-{job_id}"

# same logic as get_output_topic

settings = Settings()
bootstrap_servers = settings.kafka_bootstrap_servers
retention_ms = settings.kafka_retention_ms

admin_client = KafkaAdminClient(
bootstrap_servers=bootstrap_servers, client_id="topic_creator"
)

topic = NewTopic(
name=topic_name,
num_partitions=1,
replication_factor=1,
topic_configs={"retention.ms": str(retention_ms)},
)

try:
admin_client.create_topics(new_topics=[topic])
except TopicAlreadyExistsError:
pass
ensure_topic(topic_name)

return topic_name


def get_output_topic(job_id: str):
topic_name = f"adala-output-{job_id}"

# same logic as get_input_topic
ensure_topic(topic_name)

return topic_name


def ensure_topic(topic_name: str):
settings = Settings()
bootstrap_servers = settings.kafka_bootstrap_servers
retention_ms = settings.kafka_retention_ms
Expand All @@ -71,10 +55,9 @@ def get_output_topic(job_id: str):
try:
admin_client.create_topics(new_topics=[topic])
except TopicAlreadyExistsError:
# we shouldn't hit this case when KAFKA_CFG_AUTO_CREATE_TOPICS=false unless there is a legitimate name collision, so should raise here after testing
pass

return topic_name


def delete_topic(topic_name: str):
# unused for now
Expand Down

0 comments on commit ac7e310

Please sign in to comment.