diff --git a/README.md b/README.md index 557473f..5c49260 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,27 @@ # temporal-worker-python -This repository contains TogetherCrew's Temporal Python workflows. Follow the steps below to set up the project locally: +This repository contains TogetherCrew's Temporal Python workflows for data processing and analysis. It leverages the Temporal workflow engine to orchestrate ETL processes and data summarization tasks. + +## Project Components + +### Hivemind ETL + +- **Website Ingestion**: Extracts, transforms, and loads data from websites defined in the platform configuration. +- **MediaWiki Ingestion**: Processes content from MediaWiki instances, including extraction of pages, revisions, and content. + +### Hivemind Summarizer + +- **Telegram Summaries**: Retrieves and processes summaries from Telegram data stored in Qdrant, with options to fetch by date or date range. + +## Architecture + +The project uses Temporal for workflow orchestration with the following components: + +- **Temporal Server**: Manages workflow execution and task queues +- **MongoDB**: Stores platform and community configuration +- **Qdrant**: Vector database for storing and retrieving summary content +- **Redis**: Caching and state management +- **PostgreSQL**: Used by Temporal for workflow history and state ## Setup Instructions @@ -8,16 +29,104 @@ This repository contains TogetherCrew's Temporal Python workflows. Follow the st - Copy the example environment file: ```bash - cp .env.example .env + cp .env.example .env ``` Update the `.env` file with your own values, referencing the services defined in `docker-compose.dev.yml`. + Required variables: + - `TEMPORAL_TASK_QUEUE`: Queue name for the worker + - Database connection parameters for MongoDB, Qdrant, etc. + 2. Start Services - - Use the following command to set up and run the required services: + - Use the following command to set up and run the required services: + + ```bash + docker compose -f docker-compose.dev.yml up -d + ``` + +3. Open [localhost:8080](http://localhost:8080/) to access the Temporal dashboard. + +## Usage Examples + +### Running a Telegram Summary Workflow + +To fetch summaries for a specific community and date range: + +```python +from temporalio.client import Client +from hivemind_summarizer.workflows import TelegramSummariesWorkflow +from hivemind_summarizer.schema import TelegramFetchSummariesWorkflowInput + +async def run_telegram_workflow(): + client = await Client.connect("localhost:7233") + + # Create workflow input + input_data = TelegramFetchSummariesWorkflowInput( + platform_id="your_platform_id", + community_id="your_community_id", + start_date="2023-05-01", + end_date="2023-05-07", + extract_text_only=True + ) + + # Execute workflow + result = await client.execute_workflow( + TelegramSummariesWorkflow.run, + input_data, + id="telegram-summaries-workflow", + task_queue="your_task_queue" + ) + + return result +``` + +### Running a MediaWiki ETL Workflow + +To process MediaWiki content for all communities or a specific platform: + +```python +from temporalio.client import Client +from hivemind_etl.mediawiki.workflows import MediaWikiETLWorkflow + +async def run_mediawiki_workflow(platform_id=None): + client = await Client.connect("localhost:7233") + + # Execute workflow for all platforms or a specific one + await client.execute_workflow( + MediaWikiETLWorkflow.run, + platform_id, # Pass None to process all platforms + id="mediawiki-etl-workflow", + task_queue="your_task_queue" + ) +``` + +### Running a Website Ingestion Workflow + +To ingest content from websites: + +```python +from temporalio.client import Client +from hivemind_etl.website.workflows import WebsiteIngestionSchedulerWorkflow + +async def run_website_workflow(platform_id=None): + client = await Client.connect("localhost:7233") + + # Execute workflow for all communities or a specific one + await client.execute_workflow( + WebsiteIngestionSchedulerWorkflow.run, + platform_id, # Pass None to process all platforms + id="website-ingestion-workflow", + task_queue="your_task_queue" + ) +``` + +## Development + +To run the worker locally: - ```bash - docker compose -f docker-compose.dev.yml up -d - ``` +```bash +python worker.py +``` -3. Open [localhost:8080](http://localhost:8080/) and check temporal dashboard. +This will start a worker that connects to Temporal and listens for tasks on the configured task queue. diff --git a/hivemind_summarizer/__init__.py b/hivemind_summarizer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hivemind_summarizer/activities.py b/hivemind_summarizer/activities.py new file mode 100644 index 0000000..4e0a72d --- /dev/null +++ b/hivemind_summarizer/activities.py @@ -0,0 +1,249 @@ +import json +import logging +from typing import Any +from datetime import datetime, timedelta + +from bson import ObjectId +from tc_hivemind_backend.db.qdrant import QdrantSingleton +from tc_hivemind_backend.db.mongo import MongoSingleton + +from temporalio import activity, workflow +from qdrant_client.models import Filter, FieldCondition, MatchValue + +with workflow.unsafe.imports_passed_through(): + from hivemind_summarizer.schema import ( + TelegramSummariesActivityInput, + TelegramSummariesRangeActivityInput, + TelegramGetCollectionNameInput, + ) + + +def extract_summary_text(node_content: dict[str, Any]) -> str: + """ + Extract the actual summary text from the node_content. + + Parameters + ---------- + node_content : dict[str, Any] + The parsed node_content object + + Returns + ------- + str + The extracted summary text + """ + # Based on the example provided, the text is in the "text" field + if isinstance(node_content, dict) and "text" in node_content: + return node_content["text"] + + return "Summary text not found" + + +@activity.defn +async def get_collection_name(input: TelegramGetCollectionNameInput) -> str: + """ + Activity that extracts collection name from MongoDB based on platform_id and community_id. + + Parameters + ---------- + input: TelegramGetCollectionNameInput + Input object containing platform_id and community_id + + Returns + ------- + str + The collection name in format [communityId]_[platformName]_summary + + Raises + ------ + Exception + If platform not found or error occurs during DB access + """ + platform_id = input.platform_id + community_id = input.community_id + + logging.info( + f"Getting collection name for platform_id: {platform_id}, community_id: {community_id}" + ) + + try: + # Get MongoDB client + mongo_client = MongoSingleton.get_instance().get_client() + + # Query the platform from Core database + platform = mongo_client["Core"]["platforms"].find_one( + {"_id": ObjectId(platform_id)} + ) + + if not platform: + raise Exception(f"Platform with ID {platform_id} not found") + + # Extract platform name + platform_name = platform.get("name") + if not platform_name: + raise Exception(f"Platform name not found for platform_id {platform_id}") + + # Construct collection name + collection_name = f"{community_id}_{platform_name}_summary" + + logging.info(f"Generated collection name: {collection_name}") + return collection_name + + except Exception as e: + logging.error(f"Error getting collection name: {str(e)}") + raise + + +@activity.defn +async def fetch_telegram_summaries_by_date( + input: TelegramSummariesActivityInput, +) -> list[dict[str, Any]] | str: + """ + Activity that fetches Telegram summaries for a specific date from Qdrant. + + Parameters + ---------- + input : TelegramSummariesActivityInput + Input object containing date, collection_name and extract_text_only + + Returns + ------- + list[dict[str, Any]] | str + A list of summary objects for the specified date or a string of summaries + """ + date = input.date + extract_text_only = input.extract_text_only + collection_name = input.collection_name + + logging.info("Started fetch_telegram_summaries_by_date!") + if not collection_name: + raise ValueError("Collection name is required but was not provided") + + logging.info( + f"Fetching summaries for date: {date} from collection: {collection_name}" + ) + + try: + # Get Qdrant client + qdrant_client = QdrantSingleton.get_instance().get_client() + + # Create filter for the specified date + filter_conditions = [FieldCondition(key="date", match=MatchValue(value=date))] + + date_filter = Filter(must=filter_conditions) + + # Query Qdrant for all summaries matching the date using the provided collection name + search_results = qdrant_client.search( + collection_name=collection_name, + query_vector=[0] * 1024, + query_filter=date_filter, + limit=100, + with_payload=True, + with_vectors=False, + ) + + summaries = [] + for point in search_results: + # Extract the summary data from each point + summary_data = point.payload + + # If _node_content is a JSON string, parse it + if "_node_content" in summary_data and isinstance( + summary_data["_node_content"], str + ): + try: + node_content = json.loads(summary_data["_node_content"]) + if extract_text_only: + summary_data = extract_summary_text(node_content) + else: + summary_data["parsed_content"] = node_content + summary_data["summary_text"] = extract_summary_text( + node_content + ) + except json.JSONDecodeError: + logging.warning( + f"Failed to parse _node_content as JSON for point with date {date}" + ) + + summaries.append(summary_data) + + logging.info( + f"Found {len(summaries)} summaries for date {date} in collection {collection_name}" + ) + return "\n".join(summaries) if extract_text_only else summaries + + except Exception as e: + logging.error( + f"Error fetching summaries for date {date} from collection {collection_name}: {str(e)}" + ) + raise + + +@activity.defn +async def fetch_telegram_summaries_by_date_range( + input: TelegramSummariesRangeActivityInput, +) -> dict[str, list[dict[str, Any] | str]]: + """ + Activity that fetches Telegram summaries for a range of dates from Qdrant. + + Parameters + ---------- + input : TelegramSummariesRangeActivityInput + Input object containing start_date, end_date, collection_name and extract_text_only + + Returns + ------- + dict[str, list[dict[str, Any] | str]] + A dictionary mapping dates to lists of summary objects or a string of summaries + + Raises + ------ + ValueError + If end_date is before start_date or collection_name is not provided + """ + start_date = input.start_date + end_date = input.end_date + extract_text_only = input.extract_text_only + collection_name = input.collection_name + + if not collection_name: + raise ValueError("Collection name is required but was not provided") + + logging.info( + f"Fetching summaries for date range: {start_date} to {end_date} from collection: {collection_name}" + ) + + try: + # Parse the date strings to datetime objects + start = datetime.strptime(start_date, "%Y-%m-%d").date() + end = datetime.strptime(end_date, "%Y-%m-%d").date() + + # Validate that end_date is not before start_date + if end < start: + raise ValueError("End date cannot be before start date") + + # Calculate all dates in the range + date_range = [] + current = start + while current <= end: + date_range.append(current.strftime("%Y-%m-%d")) + current += timedelta(days=1) + + # Fetch summaries for each date + result = {} + for date in date_range: + date_input = TelegramSummariesActivityInput( + date=date, + extract_text_only=extract_text_only, + collection_name=collection_name, + ) + summaries = await fetch_telegram_summaries_by_date(date_input) + result[date] = summaries + + return result + + except Exception as e: + logging.error( + f"Error fetching summaries for date range {start_date} to {end_date} from collection {collection_name}: {str(e)}" + ) + raise diff --git a/hivemind_summarizer/schema.py b/hivemind_summarizer/schema.py new file mode 100644 index 0000000..efe8ebf --- /dev/null +++ b/hivemind_summarizer/schema.py @@ -0,0 +1,27 @@ +from pydantic import BaseModel + + +class TelegramSummariesActivityInput(BaseModel): + date: str + extract_text_only: bool = True + collection_name: str | None = None + + +class TelegramSummariesRangeActivityInput(BaseModel): + start_date: str + end_date: str + extract_text_only: bool = True + collection_name: str | None = None + + +class TelegramGetCollectionNameInput(BaseModel): + platform_id: str + community_id: str + + +class TelegramFetchSummariesWorkflowInput(BaseModel): + platform_id: str + community_id: str + start_date: str + end_date: str | None = None + extract_text_only: bool = True diff --git a/hivemind_summarizer/workflows.py b/hivemind_summarizer/workflows.py new file mode 100644 index 0000000..b825789 --- /dev/null +++ b/hivemind_summarizer/workflows.py @@ -0,0 +1,93 @@ +import logging +from typing import Any +from datetime import timedelta + +from temporalio import workflow +from temporalio.common import RetryPolicy + +with workflow.unsafe.imports_passed_through(): + from .activities import ( + fetch_telegram_summaries_by_date, + fetch_telegram_summaries_by_date_range, + get_collection_name, + ) + from .schema import ( + TelegramSummariesActivityInput, + TelegramSummariesRangeActivityInput, + TelegramGetCollectionNameInput, + TelegramFetchSummariesWorkflowInput, + ) + + +@workflow.defn +class TelegramSummariesWorkflow: + """ + A Temporal workflow that fetches Telegram summaries for a specified date. + """ + + @workflow.run + async def run( + self, input: TelegramFetchSummariesWorkflowInput + ) -> list[dict[str, Any]]: + """ + Run the workflow to fetch Telegram summaries for the specified date. + + Parameters + ---------- + input : TelegramFetchSummariesWorkflowInput + Input containing platform_id, community_id, start_date, end_date, extract_text_only and collection_name + + Returns + ------- + list[dict[str, Any]] + A list of summary objects for the specified date + """ + logging.info("Started TelegramSummariesWorkflow!") + logging.info( + ( + f" Platform ID: {input.platform_id}. " + f" Community ID: {input.community_id}. " + f" Start Date: {input.start_date}. " + f" End Date: {input.end_date}" + ) + ) + + logging.info("Getting collection name!") + # First, get the collection name + collection_name = await workflow.execute_activity( + get_collection_name, + TelegramGetCollectionNameInput( + platform_id=input.platform_id, community_id=input.community_id + ), + schedule_to_close_timeout=timedelta(minutes=1), + retry_policy=RetryPolicy(maximum_attempts=3), + ) + + # if end_date is not provided, the workflow will fetch summaries just for the start_date + if input.end_date is None: + logging.info("Getting summaries by date!") + summaries = await workflow.execute_activity( + fetch_telegram_summaries_by_date, + TelegramSummariesActivityInput( + date=input.start_date, + collection_name=collection_name, + extract_text_only=input.extract_text_only, + ), + schedule_to_close_timeout=timedelta(minutes=2), + retry_policy=RetryPolicy(maximum_attempts=3), + ) + return summaries + else: + logging.info("Getting summaries by date range!") + summaries = await workflow.execute_activity( + fetch_telegram_summaries_by_date_range, + TelegramSummariesRangeActivityInput( + start_date=input.start_date, + end_date=input.end_date, + collection_name=collection_name, + extract_text_only=input.extract_text_only, + ), + schedule_to_close_timeout=timedelta(minutes=2), + retry_policy=RetryPolicy(maximum_attempts=3), + ) + return summaries diff --git a/registry.py b/registry.py index b3f03ef..200b50b 100644 --- a/registry.py +++ b/registry.py @@ -9,11 +9,17 @@ transform_mediawiki_data, load_mediawiki_data, ) +from hivemind_summarizer.activities import ( + fetch_telegram_summaries_by_date, + fetch_telegram_summaries_by_date_range, + get_collection_name, +) from workflows import ( CommunityWebsiteWorkflow, SayHello, WebsiteIngestionSchedulerWorkflow, MediaWikiETLWorkflow, + TelegramSummariesWorkflow, ) WORKFLOWS = [ @@ -21,6 +27,7 @@ SayHello, WebsiteIngestionSchedulerWorkflow, MediaWikiETLWorkflow, + TelegramSummariesWorkflow, ] ACTIVITIES = [ @@ -33,4 +40,7 @@ transform_mediawiki_data, load_mediawiki_data, say_hello, + fetch_telegram_summaries_by_date, + fetch_telegram_summaries_by_date_range, + get_collection_name, ] diff --git a/workflows.py b/workflows.py index f55748f..6b43a77 100644 --- a/workflows.py +++ b/workflows.py @@ -10,6 +10,7 @@ from hivemind_etl.mediawiki.workflows import ( MediaWikiETLWorkflow, ) +from hivemind_summarizer.workflows import TelegramSummariesWorkflow from temporalio import workflow