Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 116 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,132 @@
# temporal-worker-python

This repository contains TogetherCrew's Temporal Python workflows. Follow the steps below to set up the project locally:
This repository contains TogetherCrew's Temporal Python workflows for data processing and analysis. It leverages the Temporal workflow engine to orchestrate ETL processes and data summarization tasks.

## Project Components

### Hivemind ETL

- **Website Ingestion**: Extracts, transforms, and loads data from websites defined in the platform configuration.
- **MediaWiki Ingestion**: Processes content from MediaWiki instances, including extraction of pages, revisions, and content.

### Hivemind Summarizer

- **Telegram Summaries**: Retrieves and processes summaries from Telegram data stored in Qdrant, with options to fetch by date or date range.

## Architecture

The project uses Temporal for workflow orchestration with the following components:

- **Temporal Server**: Manages workflow execution and task queues
- **MongoDB**: Stores platform and community configuration
- **Qdrant**: Vector database for storing and retrieving summary content
- **Redis**: Caching and state management
- **PostgreSQL**: Used by Temporal for workflow history and state

## Setup Instructions

1. Configure Environment Variables
- Copy the example environment file:

```bash
cp .env.example .env
cp .env.example .env
```

Update the `.env` file with your own values, referencing the services defined in `docker-compose.dev.yml`.

Required variables:
- `TEMPORAL_TASK_QUEUE`: Queue name for the worker
- Database connection parameters for MongoDB, Qdrant, etc.

Comment on lines +37 to +40
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Explicitly enumerate required environment variables
Listing only TEMPORAL_TASK_QUEUE and a generic “Database connection parameters” may confuse new contributors. Please expand this section to include specific keys (e.g., MONGODB_URI, QDRANT_URL, REDIS_URL, etc.), or reference the variables defined in .env.example.

-   Required variables:
-   - `TEMPORAL_TASK_QUEUE`: Queue name for the worker
-   - Database connection parameters for MongoDB, Qdrant, etc.
+   Required variables:
+   - `TEMPORAL_TASK_QUEUE`: Name of the Temporal task queue
+   - `MONGODB_URI`: MongoDB connection string
+   - `QDRANT_URL`: Qdrant endpoint URL
+   - `REDIS_URL`: Redis connection URL
+   - (Any other variables listed in `.env.example`)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Required variables:
- `TEMPORAL_TASK_QUEUE`: Queue name for the worker
- Database connection parameters for MongoDB, Qdrant, etc.
Required variables:
- `TEMPORAL_TASK_QUEUE`: Name of the Temporal task queue
- `MONGODB_URI`: MongoDB connection string
- `QDRANT_URL`: Qdrant endpoint URL
- `REDIS_URL`: Redis connection URL
- (Any other variables listed in `.env.example`)

2. Start Services
- Use the following command to set up and run the required services:
- Use the following command to set up and run the required services:

```bash
docker compose -f docker-compose.dev.yml up -d
```

3. Open [localhost:8080](http://localhost:8080/) to access the Temporal dashboard.

## Usage Examples

### Running a Telegram Summary Workflow

To fetch summaries for a specific community and date range:

```python
from temporalio.client import Client
from hivemind_summarizer.workflows import TelegramSummariesWorkflow
from hivemind_summarizer.schema import TelegramFetchSummariesWorkflowInput

async def run_telegram_workflow():
client = await Client.connect("localhost:7233")

# Create workflow input
input_data = TelegramFetchSummariesWorkflowInput(
platform_id="your_platform_id",
community_id="your_community_id",
start_date="2023-05-01",
end_date="2023-05-07",
extract_text_only=True
)

# Execute workflow
result = await client.execute_workflow(
TelegramSummariesWorkflow.run,
input_data,
id="telegram-summaries-workflow",
task_queue="your_task_queue"
)

return result
```

### Running a MediaWiki ETL Workflow

To process MediaWiki content for all communities or a specific platform:

```python
from temporalio.client import Client
from hivemind_etl.mediawiki.workflows import MediaWikiETLWorkflow

async def run_mediawiki_workflow(platform_id=None):
client = await Client.connect("localhost:7233")

# Execute workflow for all platforms or a specific one
await client.execute_workflow(
MediaWikiETLWorkflow.run,
platform_id, # Pass None to process all platforms
id="mediawiki-etl-workflow",
task_queue="your_task_queue"
)
```

### Running a Website Ingestion Workflow

To ingest content from websites:

```python
from temporalio.client import Client
from hivemind_etl.website.workflows import WebsiteIngestionSchedulerWorkflow

async def run_website_workflow(platform_id=None):
client = await Client.connect("localhost:7233")

# Execute workflow for all communities or a specific one
await client.execute_workflow(
WebsiteIngestionSchedulerWorkflow.run,
platform_id, # Pass None to process all platforms
id="website-ingestion-workflow",
task_queue="your_task_queue"
)
```

## Development

To run the worker locally:

```bash
docker compose -f docker-compose.dev.yml up -d
```
```bash
python worker.py
```

3. Open [localhost:8080](http://localhost:8080/) and check temporal dashboard.
This will start a worker that connects to Temporal and listens for tasks on the configured task queue.
Empty file added hivemind_summarizer/__init__.py
Empty file.
249 changes: 249 additions & 0 deletions hivemind_summarizer/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
import json
import logging
from typing import Any
from datetime import datetime, timedelta

from bson import ObjectId
from tc_hivemind_backend.db.qdrant import QdrantSingleton
from tc_hivemind_backend.db.mongo import MongoSingleton

from temporalio import activity, workflow
from qdrant_client.models import Filter, FieldCondition, MatchValue

with workflow.unsafe.imports_passed_through():
from hivemind_summarizer.schema import (
TelegramSummariesActivityInput,
TelegramSummariesRangeActivityInput,
TelegramGetCollectionNameInput,
)


def extract_summary_text(node_content: dict[str, Any]) -> str:
"""
Extract the actual summary text from the node_content.

Parameters
----------
node_content : dict[str, Any]
The parsed node_content object

Returns
-------
str
The extracted summary text
"""
# Based on the example provided, the text is in the "text" field
if isinstance(node_content, dict) and "text" in node_content:
return node_content["text"]

return "Summary text not found"


@activity.defn
async def get_collection_name(input: TelegramGetCollectionNameInput) -> str:
"""
Activity that extracts collection name from MongoDB based on platform_id and community_id.

Parameters
----------
input: TelegramGetCollectionNameInput
Input object containing platform_id and community_id

Returns
-------
str
The collection name in format [communityId]_[platformName]_summary

Raises
------
Exception
If platform not found or error occurs during DB access
"""
platform_id = input.platform_id
community_id = input.community_id

logging.info(
f"Getting collection name for platform_id: {platform_id}, community_id: {community_id}"
)

try:
# Get MongoDB client
mongo_client = MongoSingleton.get_instance().get_client()

# Query the platform from Core database
platform = mongo_client["Core"]["platforms"].find_one(
{"_id": ObjectId(platform_id)}
)

if not platform:
raise Exception(f"Platform with ID {platform_id} not found")

# Extract platform name
platform_name = platform.get("name")
if not platform_name:
raise Exception(f"Platform name not found for platform_id {platform_id}")

# Construct collection name
collection_name = f"{community_id}_{platform_name}_summary"

logging.info(f"Generated collection name: {collection_name}")
return collection_name

except Exception as e:
logging.error(f"Error getting collection name: {str(e)}")
raise


@activity.defn
async def fetch_telegram_summaries_by_date(
input: TelegramSummariesActivityInput,
) -> list[dict[str, Any]] | str:
"""
Activity that fetches Telegram summaries for a specific date from Qdrant.

Parameters
----------
input : TelegramSummariesActivityInput
Input object containing date, collection_name and extract_text_only

Returns
-------
list[dict[str, Any]] | str
A list of summary objects for the specified date or a string of summaries
"""
date = input.date
extract_text_only = input.extract_text_only
collection_name = input.collection_name

logging.info("Started fetch_telegram_summaries_by_date!")
if not collection_name:
raise ValueError("Collection name is required but was not provided")

logging.info(
f"Fetching summaries for date: {date} from collection: {collection_name}"
)

try:
# Get Qdrant client
qdrant_client = QdrantSingleton.get_instance().get_client()

# Create filter for the specified date
filter_conditions = [FieldCondition(key="date", match=MatchValue(value=date))]

date_filter = Filter(must=filter_conditions)

# Query Qdrant for all summaries matching the date using the provided collection name
search_results = qdrant_client.search(
collection_name=collection_name,
query_vector=[0] * 1024,
query_filter=date_filter,
limit=100,
with_payload=True,
with_vectors=False,
)

summaries = []
for point in search_results:
# Extract the summary data from each point
summary_data = point.payload

# If _node_content is a JSON string, parse it
if "_node_content" in summary_data and isinstance(
summary_data["_node_content"], str
):
try:
node_content = json.loads(summary_data["_node_content"])
if extract_text_only:
summary_data = extract_summary_text(node_content)
else:
summary_data["parsed_content"] = node_content
summary_data["summary_text"] = extract_summary_text(
node_content
)
except json.JSONDecodeError:
logging.warning(
f"Failed to parse _node_content as JSON for point with date {date}"
)

summaries.append(summary_data)

logging.info(
f"Found {len(summaries)} summaries for date {date} in collection {collection_name}"
)
return "\n".join(summaries) if extract_text_only else summaries

except Exception as e:
logging.error(
f"Error fetching summaries for date {date} from collection {collection_name}: {str(e)}"
)
raise


@activity.defn
async def fetch_telegram_summaries_by_date_range(
input: TelegramSummariesRangeActivityInput,
) -> dict[str, list[dict[str, Any] | str]]:
"""
Activity that fetches Telegram summaries for a range of dates from Qdrant.

Parameters
----------
input : TelegramSummariesRangeActivityInput
Input object containing start_date, end_date, collection_name and extract_text_only

Returns
-------
dict[str, list[dict[str, Any] | str]]
A dictionary mapping dates to lists of summary objects or a string of summaries

Raises
------
ValueError
If end_date is before start_date or collection_name is not provided
"""
start_date = input.start_date
end_date = input.end_date
extract_text_only = input.extract_text_only
collection_name = input.collection_name

if not collection_name:
raise ValueError("Collection name is required but was not provided")

logging.info(
f"Fetching summaries for date range: {start_date} to {end_date} from collection: {collection_name}"
)

try:
# Parse the date strings to datetime objects
start = datetime.strptime(start_date, "%Y-%m-%d").date()
end = datetime.strptime(end_date, "%Y-%m-%d").date()

# Validate that end_date is not before start_date
if end < start:
raise ValueError("End date cannot be before start date")

# Calculate all dates in the range
date_range = []
current = start
while current <= end:
date_range.append(current.strftime("%Y-%m-%d"))
current += timedelta(days=1)

# Fetch summaries for each date
result = {}
for date in date_range:
date_input = TelegramSummariesActivityInput(
date=date,
extract_text_only=extract_text_only,
collection_name=collection_name,
)
summaries = await fetch_telegram_summaries_by_date(date_input)
result[date] = summaries

return result

except Exception as e:
logging.error(
f"Error fetching summaries for date range {start_date} to {end_date} from collection {collection_name}: {str(e)}"
)
raise
Loading