From 4017d31e0a8df3bce8ba206543b957369249429b Mon Sep 17 00:00:00 2001 From: Mohammad Amin Date: Wed, 30 Apr 2025 09:44:49 +0330 Subject: [PATCH] feat: rename summarizer to be platform agnostic! --- README.md | 16 +++++++------- hivemind_summarizer/activities.py | 32 +++++++++++++-------------- hivemind_summarizer/schema.py | 8 +++---- hivemind_summarizer/workflows.py | 36 +++++++++++++++---------------- registry.py | 12 +++++------ workflows.py | 2 +- 6 files changed, 53 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index 5c49260..4172b63 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ This repository contains TogetherCrew's Temporal Python workflows for data proce ### Hivemind Summarizer -- **Telegram Summaries**: Retrieves and processes summaries from Telegram data stored in Qdrant, with options to fetch by date or date range. +- **Platform Summaries**: Retrieves and processes summaries from Platform data stored in Qdrant, with options to fetch by date or date range. ## Architecture @@ -49,20 +49,20 @@ The project uses Temporal for workflow orchestration with the following componen ## Usage Examples -### Running a Telegram Summary Workflow +### Running a Platform Summary Workflow To fetch summaries for a specific community and date range: ```python from temporalio.client import Client -from hivemind_summarizer.workflows import TelegramSummariesWorkflow -from hivemind_summarizer.schema import TelegramFetchSummariesWorkflowInput +from hivemind_summarizer.workflows import PlatformSummariesWorkflow +from hivemind_summarizer.schema import PlatformFetchSummariesWorkflowInput -async def run_telegram_workflow(): +async def run_okatfirn_workflow(): client = await Client.connect("localhost:7233") # Create workflow input - input_data = TelegramFetchSummariesWorkflowInput( + input_data = PlatformFetchSummariesWorkflowInput( platform_id="your_platform_id", community_id="your_community_id", start_date="2023-05-01", @@ -72,9 +72,9 @@ async def run_telegram_workflow(): # Execute workflow result = await client.execute_workflow( - TelegramSummariesWorkflow.run, + PlatformSummariesWorkflow.run, input_data, - id="telegram-summaries-workflow", + id="platform-summaries-workflow", task_queue="your_task_queue" ) diff --git a/hivemind_summarizer/activities.py b/hivemind_summarizer/activities.py index 7350a90..0ac5865 100644 --- a/hivemind_summarizer/activities.py +++ b/hivemind_summarizer/activities.py @@ -14,9 +14,9 @@ with workflow.unsafe.imports_passed_through(): from hivemind_summarizer.schema import ( - TelegramSummariesActivityInput, - TelegramSummariesRangeActivityInput, - TelegramGetCollectionNameInput, + PlatformSummariesActivityInput, + PlatformSummariesRangeActivityInput, + PlatformGetCollectionNameInput, ) @@ -42,13 +42,13 @@ def extract_summary_text(node_content: dict[str, Any]) -> str: @activity.defn -async def get_platform_name(input: TelegramGetCollectionNameInput) -> str: +async def get_platform_name(input: PlatformGetCollectionNameInput) -> str: """ Activity that extracts collection name from MongoDB based on platform_id and community_id. Parameters ---------- - input: TelegramGetCollectionNameInput + input: PlatformGetCollectionNameInput Input object containing platform_id and community_id Returns @@ -93,15 +93,15 @@ async def get_platform_name(input: TelegramGetCollectionNameInput) -> str: @activity.defn -async def fetch_telegram_summaries_by_date( - input: TelegramSummariesActivityInput, +async def fetch_platform_summaries_by_date( + input: PlatformSummariesActivityInput, ) -> list[dict[str, Any]] | str: """ - Activity that fetches Telegram summaries for a specific date from Qdrant. + Activity that fetches Platform summaries for a specific date from Qdrant. Parameters ---------- - input : TelegramSummariesActivityInput + input : PlatformSummariesActivityInput Input object containing date, collection_name and extract_text_only Returns @@ -114,7 +114,7 @@ async def fetch_telegram_summaries_by_date( collection_name = f"{input.community_id}_{input.platform_name}_summary" community_id = input.community_id - logging.info("Started fetch_telegram_summaries_by_date!") + logging.info("Started fetch_platform_summaries_by_date!") if not input.platform_name: raise ValueError("Platform name is required but was not provided") @@ -207,15 +207,15 @@ async def fetch_telegram_summaries_by_date( @activity.defn -async def fetch_telegram_summaries_by_date_range( - input: TelegramSummariesRangeActivityInput, +async def fetch_platform_summaries_by_date_range( + input: PlatformSummariesRangeActivityInput, ) -> dict[str, list[dict[str, Any] | str]]: """ - Activity that fetches Telegram summaries for a range of dates from Qdrant. + Activity that fetches summaries for a range of dates from Qdrant. Parameters ---------- - input : TelegramSummariesRangeActivityInput + input : PlatformSummariesRangeActivityInput Input object containing start_date, end_date, platform_name and community_id Returns @@ -259,13 +259,13 @@ async def fetch_telegram_summaries_by_date_range( # Fetch summaries for each date result = {} for date in date_range: - date_input = TelegramSummariesActivityInput( + date_input = PlatformSummariesActivityInput( date=date, extract_text_only=extract_text_only, platform_name=input.platform_name, community_id=community_id, ) - summaries = await fetch_telegram_summaries_by_date(date_input) + summaries = await fetch_platform_summaries_by_date(date_input) result[date] = summaries return result diff --git a/hivemind_summarizer/schema.py b/hivemind_summarizer/schema.py index 47388bf..2cea026 100644 --- a/hivemind_summarizer/schema.py +++ b/hivemind_summarizer/schema.py @@ -1,14 +1,14 @@ from pydantic import BaseModel -class TelegramSummariesActivityInput(BaseModel): +class PlatformSummariesActivityInput(BaseModel): date: str | None = None extract_text_only: bool = True platform_name: str | None = None community_id: str | None = None -class TelegramSummariesRangeActivityInput(BaseModel): +class PlatformSummariesRangeActivityInput(BaseModel): start_date: str end_date: str extract_text_only: bool = True @@ -16,12 +16,12 @@ class TelegramSummariesRangeActivityInput(BaseModel): community_id: str | None = None -class TelegramGetCollectionNameInput(BaseModel): +class PlatformGetCollectionNameInput(BaseModel): platform_id: str community_id: str -class TelegramFetchSummariesWorkflowInput(BaseModel): +class PlatformFetchSummariesWorkflowInput(BaseModel): platform_id: str community_id: str start_date: str | None = None diff --git a/hivemind_summarizer/workflows.py b/hivemind_summarizer/workflows.py index 3331db5..8f82b50 100644 --- a/hivemind_summarizer/workflows.py +++ b/hivemind_summarizer/workflows.py @@ -7,42 +7,42 @@ with workflow.unsafe.imports_passed_through(): from .activities import ( - fetch_telegram_summaries_by_date, - fetch_telegram_summaries_by_date_range, + fetch_platform_summaries_by_date, + fetch_platform_summaries_by_date_range, get_platform_name, ) from .schema import ( - TelegramSummariesActivityInput, - TelegramSummariesRangeActivityInput, - TelegramGetCollectionNameInput, - TelegramFetchSummariesWorkflowInput, + PlatformSummariesActivityInput, + PlatformSummariesRangeActivityInput, + PlatformGetCollectionNameInput, + PlatformFetchSummariesWorkflowInput, ) @workflow.defn -class TelegramSummariesWorkflow: +class PlatformSummariesWorkflow: """ - A Temporal workflow that fetches Telegram summaries for a specified date. + A Temporal workflow that fetches summaries for a specified date. """ @workflow.run async def run( - self, input: TelegramFetchSummariesWorkflowInput + self, input: PlatformFetchSummariesWorkflowInput ) -> list[dict[str, Any]]: """ - Run the workflow to fetch Telegram summaries for the specified date. + Run the workflow to fetch summaries for the specified date. Parameters ---------- - input : TelegramFetchSummariesWorkflowInput - Input containing platform_id, community_id, start_date, end_date, extract_text_only and collection_name + input : PlatformFetchSummariesWorkflowInput + Input containing platform_id, community_id, start_date, end_date, extract_text_only and platform_name Returns ------- list[dict[str, Any]] A list of summary objects for the specified date """ - logging.info("Started TelegramSummariesWorkflow!") + logging.info("Started PlatformSummariesWorkflow!") logging.info( ( f" Platform ID: {input.platform_id}. " @@ -56,7 +56,7 @@ async def run( # First, get the collection name platform_name = await workflow.execute_activity( get_platform_name, - TelegramGetCollectionNameInput( + PlatformGetCollectionNameInput( platform_id=input.platform_id, community_id=input.community_id ), schedule_to_close_timeout=timedelta(minutes=1), @@ -67,8 +67,8 @@ async def run( if input.end_date is None: logging.info("Getting summaries by date!") summaries = await workflow.execute_activity( - fetch_telegram_summaries_by_date, - TelegramSummariesActivityInput( + fetch_platform_summaries_by_date, + PlatformSummariesActivityInput( date=input.start_date, platform_name=platform_name, community_id=input.community_id, @@ -81,8 +81,8 @@ async def run( else: logging.info("Getting summaries by date range!") summaries = await workflow.execute_activity( - fetch_telegram_summaries_by_date_range, - TelegramSummariesRangeActivityInput( + fetch_platform_summaries_by_date_range, + PlatformSummariesRangeActivityInput( start_date=input.start_date, end_date=input.end_date, platform_name=platform_name, diff --git a/registry.py b/registry.py index 1225ceb..0856ac2 100644 --- a/registry.py +++ b/registry.py @@ -10,8 +10,8 @@ load_mediawiki_data, ) from hivemind_summarizer.activities import ( - fetch_telegram_summaries_by_date, - fetch_telegram_summaries_by_date_range, + fetch_platform_summaries_by_date, + fetch_platform_summaries_by_date_range, get_platform_name, ) from workflows import ( @@ -19,7 +19,7 @@ SayHello, WebsiteIngestionSchedulerWorkflow, MediaWikiETLWorkflow, - TelegramSummariesWorkflow, + PlatformSummariesWorkflow, ) WORKFLOWS = [ @@ -27,7 +27,7 @@ SayHello, WebsiteIngestionSchedulerWorkflow, MediaWikiETLWorkflow, - TelegramSummariesWorkflow, + PlatformSummariesWorkflow, ] ACTIVITIES = [ @@ -40,7 +40,7 @@ transform_mediawiki_data, load_mediawiki_data, say_hello, - fetch_telegram_summaries_by_date, - fetch_telegram_summaries_by_date_range, + fetch_platform_summaries_by_date, + fetch_platform_summaries_by_date_range, get_platform_name, ] diff --git a/workflows.py b/workflows.py index 6b43a77..8f60276 100644 --- a/workflows.py +++ b/workflows.py @@ -10,7 +10,7 @@ from hivemind_etl.mediawiki.workflows import ( MediaWikiETLWorkflow, ) -from hivemind_summarizer.workflows import TelegramSummariesWorkflow +from hivemind_summarizer.workflows import PlatformSummariesWorkflow from temporalio import workflow