From 7e0d827b98ca274ff101b2806888032beb02478e Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Wed, 23 Jul 2025 11:25:52 -0700 Subject: [PATCH 1/2] include arroyo_strict_offset_reset in kafka config --- src/launchpad/kafka.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/launchpad/kafka.py b/src/launchpad/kafka.py index ca09a6ad..5b7e0a82 100644 --- a/src/launchpad/kafka.py +++ b/src/launchpad/kafka.py @@ -44,6 +44,7 @@ def create_kafka_consumer( "bootstrap.servers": config["bootstrap_servers"], "group.id": config["group_id"], "auto.offset.reset": config["auto_offset_reset"], + "arroyo.strict.offset.reset": config["arroyo_strict_offset_reset"], "enable.auto.commit": False, "enable.auto.offset.store": False, } @@ -137,4 +138,5 @@ def get_kafka_config() -> Dict[str, Any]: "max_pending_futures": int(os.getenv("KAFKA_MAX_PENDING_FUTURES", "100")), "healthcheck_file": os.getenv("KAFKA_HEALTHCHECK_FILE"), "auto_offset_reset": os.getenv("KAFKA_AUTO_OFFSET_RESET", "latest"), # latest = skip old messages + "arroyo_strict_offset_reset": os.getenv("ARROYO_STRICT_OFFSET_RESET"), } From 9ae98df51e40e512e42a258906bac184d52c9643 Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Wed, 23 Jul 2025 11:32:10 -0700 Subject: [PATCH 2/2] update --- src/launchpad/kafka.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/launchpad/kafka.py b/src/launchpad/kafka.py index 5b7e0a82..f7a6637e 100644 --- a/src/launchpad/kafka.py +++ b/src/launchpad/kafka.py @@ -129,6 +129,9 @@ def get_kafka_config() -> Dict[str, Any]: if not topics_env: raise ValueError("KAFKA_TOPICS env var is required") + # Parse arroyo_strict_offset_reset as boolean, default to None if invalid + arroyo_strict_offset_reset = {"true": True, "false": False}.get(os.getenv("ARROYO_STRICT_OFFSET_RESET", "").lower()) + # Optional configuration with defaults return { "bootstrap_servers": bootstrap_servers, @@ -138,5 +141,5 @@ def get_kafka_config() -> Dict[str, Any]: "max_pending_futures": int(os.getenv("KAFKA_MAX_PENDING_FUTURES", "100")), "healthcheck_file": os.getenv("KAFKA_HEALTHCHECK_FILE"), "auto_offset_reset": os.getenv("KAFKA_AUTO_OFFSET_RESET", "latest"), # latest = skip old messages - "arroyo_strict_offset_reset": os.getenv("ARROYO_STRICT_OFFSET_RESET"), + "arroyo_strict_offset_reset": arroyo_strict_offset_reset, }