-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Added telegram fetch summaries workflow! #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis update introduces a new workflow and supporting activities for fetching Telegram summaries from Qdrant and MongoDB databases. It adds a set of Pydantic schemas for input validation, defines asynchronous Temporal activities to retrieve and process summary data by date or date range, and implements a workflow to orchestrate these activities. The registry is updated to include the new workflow and activities, and necessary imports are added to the main workflow module. The changes provide structured, validated, and orchestrated access to Telegram summary data. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Workflow
participant Activities
participant MongoDB
participant Qdrant
Client->>Workflow: Start TelegramSummariesWorkflow(input)
Workflow->>Activities: get_collection_name(platform_id, community_id)
Activities->>MongoDB: Query for platform name
MongoDB-->>Activities: Return platform name
Activities-->>Workflow: Return collection name
alt Single date
Workflow->>Activities: fetch_telegram_summaries_by_date(date, collection_name, extract_text_only)
Activities->>Qdrant: Query summaries by date
Qdrant-->>Activities: Return summaries
Activities-->>Workflow: Return summaries
else Date range
Workflow->>Activities: fetch_telegram_summaries_by_date_range(start_date, end_date, collection_name, extract_text_only)
loop For each date in range
Activities->>Qdrant: Query summaries for date
Qdrant-->>Activities: Return summaries
end
Activities-->>Workflow: Return aggregated summaries
end
Workflow-->>Client: Return summaries
Possibly related issues
Poem
Tip ⚡💬 Agentic Chat (Pro Plan, General Availability)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🔭 Outside diff range comments (1)
hivemind_summarizer/schema.py (1)
22-28: 🛠️ Refactor suggestionAdd similar date validations to the workflow input model.
The main workflow input model should also validate date formats and the relationship between start_date and end_date (when provided) to ensure consistency.
class TelegramFetchSummariesWorkflowInput(BaseModel): platform_id: str community_id: str start_date: str end_date: str | None = None extract_text_only: bool = True + + @validator('start_date') + def validate_start_date(cls, v): + try: + datetime.strptime(v, '%Y-%m-%d') + return v + except ValueError: + raise ValueError('start_date must be in YYYY-MM-DD format') + + @validator('end_date') + def validate_end_date(cls, v, values): + if v is None: + return v + try: + end_date = datetime.strptime(v, '%Y-%m-%d') + except ValueError: + raise ValueError('end_date must be in YYYY-MM-DD format') + + if 'start_date' in values and v < values['start_date']: + raise ValueError('end_date must be after start_date') + return v
🧹 Nitpick comments (7)
hivemind_summarizer/schema.py (1)
1-28: Add docstrings to describe the purpose of each model.Adding class-level docstrings would improve code maintainability by clearly explaining the purpose and expected usage of each model.
from pydantic import BaseModel class TelegramSummariesActivityInput(BaseModel): + """ + Input model for the activity that fetches Telegram summaries for a specific date. + + Attributes: + date: The date in YYYY-MM-DD format to fetch summaries for + extract_text_only: If True, return only the text of summaries + collection_name: Optional name of the collection to query + """ date: str extract_text_only: bool = True collection_name: str | None = Noneworkflows.py (1)
13-13: Remove unused import.The import of
TelegramSummariesWorkflowis flagged as unused by static analysis. If it's not directly used in this file, you should remove it to avoid confusion and maintain clean imports.-from hivemind_summarizer.workflows import TelegramSummariesWorkflow🧰 Tools
🪛 Ruff (0.8.2)
13-13:
hivemind_summarizer.workflows.TelegramSummariesWorkflowimported but unusedRemove unused import:
hivemind_summarizer.workflows.TelegramSummariesWorkflow(F401)
hivemind_summarizer/workflows.py (3)
57-64: Consider more robust error handling for activity execution.The current implementation doesn't have explicit error handling for activity failures. While the retry policy helps, adding explicit try/except blocks would allow for more graceful failure handling and better logging.
# First, get the collection name + try: collection_name = await workflow.execute_activity( get_collection_name, TelegramGetCollectionNameInput( platform_id=input.platform_id, community_id=input.community_id ), schedule_to_close_timeout=timedelta(minutes=1), retry_policy=RetryPolicy(maximum_attempts=3), ) + except Exception as e: + logging.error(f"Failed to get collection name: {e}") + raise
76-78: Review timeout values and consider making them configurable.The current timeout values are hardcoded. Consider making these values configurable either through environment variables or workflow parameters to accommodate varying data sizes and network conditions.
+# At the top of the file, add: +from os import getenv + +# Define default timeout values that can be overridden +DEFAULT_COLLECTION_TIMEOUT = timedelta(minutes=int(getenv('COLLECTION_TIMEOUT_MINUTES', '1'))) +DEFAULT_SUMMARIES_TIMEOUT = timedelta(minutes=int(getenv('SUMMARIES_TIMEOUT_MINUTES', '2'))) +DEFAULT_MAX_ATTEMPTS = int(getenv('MAX_RETRY_ATTEMPTS', '3'))Then update the activity executions to use these configurable values.
Also applies to: 90-92
1-93: Add typing for the returned activities results.The code correctly specifies the return type for the workflow method, but it would be beneficial to add type hints for what the activities return to improve code clarity and aid in future maintenance.
Consider creating type aliases or classes for the return types of each activity to make the code more self-documenting.
# Add at the top of the file with other imports +from typing import Any, TypedDict, List + +class TelegramSummary(TypedDict): + """Type definition for a Telegram summary object""" + id: str + text: str + # Add other fields as neededhivemind_summarizer/activities.py (2)
81-88: Sanitiseplatform_namebefore using it in a collection identifier
platform_namemay contain spaces, uppercase letters or special characters
(e.g. “Telegram Channel”). These could be illegal or simply inconvenient as
Qdrant collection names.-platform_name = platform.get("name") +raw_name = platform.get("name") +platform_name = ( + raw_name.lower().replace(" ", "_") if raw_name is not None else None +)Consider a dedicated “slugify” helper to enforce consistent naming.
183-242: Activities calling other activities bypass Temporal semantics
fetch_telegram_summaries_by_date_rangeawaits
fetch_telegram_summaries_by_datedirectly.
Because both functions are decorated with@activity.defn, the inner call is
executed as a regular Python coroutine, not as a Temporal activity. This
means:• No activity‑level retries / timeouts
• Both run in the same process – heavy I/O in the inner call can block the
outer activity.Refactor by extracting the common Qdrant logic into a private helper and keep
only one public activity; or convert the range function into a workflow and
invoke the date activity withworkflow.execute_activity.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
hivemind_summarizer/activities.py(1 hunks)hivemind_summarizer/schema.py(1 hunks)hivemind_summarizer/workflows.py(1 hunks)registry.py(2 hunks)workflows.py(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
workflows.py (1)
hivemind_summarizer/workflows.py (1)
TelegramSummariesWorkflow(23-93)
hivemind_summarizer/activities.py (1)
hivemind_summarizer/schema.py (3)
TelegramSummariesActivityInput(4-7)TelegramSummariesRangeActivityInput(10-14)TelegramGetCollectionNameInput(17-19)
🪛 Ruff (0.8.2)
workflows.py
13-13: hivemind_summarizer.workflows.TelegramSummariesWorkflow imported but unused
Remove unused import: hivemind_summarizer.workflows.TelegramSummariesWorkflow
(F401)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: ci / test / Test
- GitHub Check: ci / lint / Lint
🔇 Additional comments (2)
registry.py (1)
12-16: LGTM! Proper registration of new workflow and activities.The new Telegram summary workflow and activities are correctly imported and registered in the respective lists. This ensures they'll be properly available in the Temporal framework.
Also applies to: 22-22, 30-30, 43-45
hivemind_summarizer/workflows.py (1)
66-79:❓ Verification inconclusive
Ensure proper validation of input dates before execution.
While the Pydantic models should handle basic validation, it would be good to have runtime validation in the workflow to catch and handle any date-related issues before executing the activities.
The workflow assumes the input dates are valid, but if they're not in the expected format, this could lead to issues in the activities. Consider verifying the input date formats at the workflow level.
🏁 Script executed:
#!/bin/bash # Check if this workflow or related activities validate date formats anywhere # Check for date validation in the activities echo "Checking for date validation in activities..." rg -A3 "date" --include "*.py" "hivemind_summarizer/activities.py" # Check for date validations or parsing in the codebase echo "Checking for date validations in the codebase..." rg -A2 "strptime|datetime\.parse|validate.*date" --include "*.py"Length of output: 1224
#!/bin/bash
set -eecho "=== workflows.py (lines 1–200) ==="
sed -n '1,200p' hivemind_summarizer/workflows.py || trueecho
echo "=== Locate TelegramSummariesActivityInput definition ==="
rg -A5 "class TelegramSummariesActivityInput" -g '*.py' || trueecho
echo "=== Scan entire codebase for date‐format validation or parsing ==="
rg -e 'strptime|datetime.strptime|parse_date|validate.date' -g '.py' -A2 || true
#!/bin/bash
set -eecho "=== Locate workflow definition and its input type ==="
rg "@workflow.defn" -n hivemind_summarizer/workflows.py || trueecho
echo "=== Occurrences of 'input.' in workflows.py ==="
rg "input." -n hivemind_summarizer/workflows.py || trueecho
echo "=== Find all Pydantic Input model classes ==="
grep -R "class .*Input" -n hivemind_summarizer || trueecho
echo "=== Inspect TelegramSummariesActivityInput fields ==="
grep -R "class TelegramSummariesActivityInput" -n hivemind_summarizer || true
grep -R "date" -n hivemind_summarizer/activities.py || true
#!/bin/bash
set -eecho "== Locate all Pydantic input model classes =="
rg "class .Input" --glob '.py'echo
echo "== Locate TelegramSummariesActivityInput definition =="
INPUT_FILE=$(rg -l "class TelegramSummariesActivityInput" --glob '*.py')
echo "Definition file: $INPUT_FILE"
sed -n '1,200p' "$INPUT_FILE"echo
echo "== Search for any @validator usage in the repository =="
rg "@validator" --glob '*.py'echo
echo "== Search for explicit datetime parsing or format validation =="
rg "datetime." --glob '.py'
rg "strptime" --glob '.py'
rg "parse(" --glob '*.py'
#!/bin/bash
set -eecho "=== Locating TelegramSummariesActivityInput definition ==="
INPUT_FILE=$(rg -l "class TelegramSummariesActivityInput" -g '*.py')
echo "Found in: $INPUT_FILE"echo
echo "=== Showing its definition (first 50 lines) ==="
sed -n '1,50p' "$INPUT_FILE"echo
echo "=== Searching for Pydantic validators anywhere in repo ==="
rg -n "@validator" -g '*.py' || echo "No @validator usage found"echo
echo "=== Searching for datetime/date field types in all Input models ==="
rg -n "class .Input" -g '.py' | while read -r file; do
echo "---- $file ----"
rg -n "date|datetime" -g '*.py' "$file"
done
#!/bin/bash
set -eecho "=== Locating all Pydantic input models ==="
rg -l "class .Input" -g '.py'echo
echo "=== Showing definitions of each Input model (first 100 lines) ==="
for f in $(rg -l "class .Input" -g '.py'); do
echo
echo "---- $f ----"
sed -n '1,100p' "$f"
doneecho
echo "=== Searching for Pydantic @validator usage ==="
rg -n "@validator" -g '*.py' || echo "No @validator decorators found"echo
echo "=== Inspecting date/datetime fields in Input models ==="
rg -n "start_date|end_date" -g '*.py' || echo "No explicit date fields found"
#!/bin/bash
set -eecho "=== Locate TelegramSummariesActivityInput definition ==="
DEF_FILE=$(grep -R "class TelegramSummariesActivityInput" -n hivemind_summarizer | cut -d: -f1)
echo "Definition file: $DEF_FILE"echo
echo "=== Show first 200 lines of the input model ==="
sed -n '1,200p' "$DEF_FILE"echo
echo "=== Search for any Pydantic validators ==="
grep -R "@validator" -n hivemind_summarizer || echo "No @validator decorators found"echo
echo "=== Inspect date/datetime fields across input models ==="
grep -R -n "start_date|end_date|date:" hivemind_summarizer || echo "No explicit date fields found"
Refocus date‐validation in the Pydantic model, not in the workflow
Pydantic already parses and validates fields typed as
date(and will raise aValidationErrorif an incoming string isn’t a recognized date format), so adding manual checks in the workflow is generally redundant. Instead, any cross‐field or business‐logic constraints (for example, ensuringend_date >= start_date) belong in a@validatoron your input model:from pydantic import BaseModel, validator from datetime import date from typing import Optional class SummariesWorkflowInput(BaseModel): start_date: date end_date: Optional[date] extract_text_only: bool @validator("end_date") def check_date_order(cls, v, values): if v is not None and v < values["start_date"]: raise ValueError("end_date must be on or after start_date") return v• Centralize all date‐format and inter‐date logic in the Pydantic model
• Rely on Pydantic’s built‑in parsing/validation instead of rechecking in the workflow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
README.md (1)
74-79: Consider using dynamic workflow IDs
A staticid="telegram-summaries-workflow"can collide if run multiple times. To avoidWorkflowExecutionAlreadyStartederrors, consider appending a timestamp or UUID:- id="telegram-summaries-workflow", + id=f"telegram-summaries-workflow-{datetime.utcnow().isoformat()}",
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
README.md(1 hunks)
🧰 Additional context used
🪛 LanguageTool
README.md
[style] ~132-~132: Consider using “who” when you are referring to a person instead of an object.
Context: ...worker.py ``` This will start a worker that connects to Temporal and listens for ta...
(THAT_WHO)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: ci / build-push / Build + Push Image
🔇 Additional comments (7)
README.md (7)
3-3: Repository description clearly outlines scope
The introductory sentence succinctly conveys that this repo contains Temporal Python workflows for ETL and summarization.
32-33: Correct.envsetup example
Thecp .env.example .envcommand is accurate and aligns with common practices for environment configuration.
42-46: Setup command for services is accurate
Thedocker compose -f docker-compose.dev.yml up -dcommand correctly brings up all required services.
57-60: Imports reflect the new Telegram workflow
The example importsTelegramSummariesWorkflowand its input schema from the correct modules, making usage clear.
62-63: Client connection snippet is clear
UsingClient.connect("localhost:7233")demonstrates the correct way to establish a Temporal connection.
65-71: Validate date format against the Pydantic schema
The example uses ISO‑formatted date strings forstart_dateandend_date. Please confirm thatTelegramFetchSummariesWorkflowInputacceptsstrin this format (or adjust todatetimeobjects if required). Consider adding a footnote or example conversion.
132-132: Skip: static analysis false positive
The suggestion to change “that” to “who” here isn’t applicable since “worker” refers to a process, not a person.🧰 Tools
🪛 LanguageTool
[style] ~132-~132: Consider using “who” when you are referring to a person instead of an object.
Context: ...worker.py ``` This will start a worker that connects to Temporal and listens for ta...(THAT_WHO)
| Required variables: | ||
| - `TEMPORAL_TASK_QUEUE`: Queue name for the worker | ||
| - Database connection parameters for MongoDB, Qdrant, etc. | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Explicitly enumerate required environment variables
Listing only TEMPORAL_TASK_QUEUE and a generic “Database connection parameters” may confuse new contributors. Please expand this section to include specific keys (e.g., MONGODB_URI, QDRANT_URL, REDIS_URL, etc.), or reference the variables defined in .env.example.
- Required variables:
- - `TEMPORAL_TASK_QUEUE`: Queue name for the worker
- - Database connection parameters for MongoDB, Qdrant, etc.
+ Required variables:
+ - `TEMPORAL_TASK_QUEUE`: Name of the Temporal task queue
+ - `MONGODB_URI`: MongoDB connection string
+ - `QDRANT_URL`: Qdrant endpoint URL
+ - `REDIS_URL`: Redis connection URL
+ - (Any other variables listed in `.env.example`)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Required variables: | |
| - `TEMPORAL_TASK_QUEUE`: Queue name for the worker | |
| - Database connection parameters for MongoDB, Qdrant, etc. | |
| Required variables: | |
| - `TEMPORAL_TASK_QUEUE`: Name of the Temporal task queue | |
| - `MONGODB_URI`: MongoDB connection string | |
| - `QDRANT_URL`: Qdrant endpoint URL | |
| - `REDIS_URL`: Redis connection URL | |
| - (Any other variables listed in `.env.example`) |
Summary by CodeRabbit
New Features
Chores
Documentation