diff --git a/README.md b/README.md index 4172b63..034789d 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ This repository contains TogetherCrew's Temporal Python workflows for data proce ### Hivemind Summarizer - **Platform Summaries**: Retrieves and processes summaries from Platform data stored in Qdrant, with options to fetch by date or date range. +- **Real-Time Summaries**: Generates new summaries for recent data across platforms or specific communities. ## Architecture @@ -51,23 +52,23 @@ The project uses Temporal for workflow orchestration with the following componen ### Running a Platform Summary Workflow -To fetch summaries for a specific community and date range: +To fetch existing summaries for a specific community and date range from Qdrant: ```python from temporalio.client import Client from hivemind_summarizer.workflows import PlatformSummariesWorkflow from hivemind_summarizer.schema import PlatformFetchSummariesWorkflowInput -async def run_okatfirn_workflow(): +async def run_platform_summaries_workflow(): client = await Client.connect("localhost:7233") # Create workflow input input_data = PlatformFetchSummariesWorkflowInput( - platform_id="your_platform_id", - community_id="your_community_id", - start_date="2023-05-01", - end_date="2023-05-07", - extract_text_only=True + platform_id="your_platform_id", # Required: the platform to fetch summaries from + community_id="your_community_id", # Required: the community to fetch summaries from + start_date="2023-05-01", # Optional: fetch summaries from this date + end_date="2023-05-07", # Optional: fetch summaries until this date + extract_text_only=True # Optional: whether to extract only text content ) # Execute workflow @@ -78,9 +79,47 @@ async def run_okatfirn_workflow(): task_queue="your_task_queue" ) + # Returns a list of existing summaries from Qdrant return result ``` +Note: This workflow only retrieves existing summaries that have already been generated and stored in Qdrant. It does not generate new summaries. Use this when you want to access previously generated summaries for a specific platform and community. + +### Running a Real-Time Summary Workflow + +To generate new summaries for recent data: + +```python +from temporalio.client import Client +from hivemind_summarizer.workflows import RealTimeSummaryWorkflow +from hivemind_summarizer.schema import RealTimeSummaryWorkflowInput + +async def run_realtime_summary_workflow(): + client = await Client.connect("localhost:7233") + + # Create workflow input + input_data = RealTimeSummaryWorkflowInput( + period="4h", # Optional: time period (e.g., "1h", "4h") or date in %Y-%m-%d format + platform_id="your_platform_id", # Optional: filter by platform + community_id="your_community_id", # Optional: filter by community + collection_name="your_collection" # Optional: filter by collection + ) + + # Execute workflow + result = await client.execute_workflow( + RealTimeSummaryWorkflow.run, + input_data, + id="realtime-summary-workflow", + task_queue="your_task_queue" + ) + + # Returns newly generated summary text + return result +``` + +Note: This workflow actively generates new summaries for recent data. Use this when you want to create fresh summaries for the specified time period and filters. +Note 2: Either one of the filter by collection or filter by platform and community should be given. (to identify the collection to access tha raw data) + ### Running a MediaWiki ETL Workflow To process MediaWiki content for all communities or a specific platform: diff --git a/hivemind_summarizer/activities.py b/hivemind_summarizer/activities.py index 0ac5865..78a4805 100644 --- a/hivemind_summarizer/activities.py +++ b/hivemind_summarizer/activities.py @@ -1,22 +1,22 @@ import json import logging from typing import Any -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone -from bson import ObjectId from tc_hivemind_backend.db.qdrant import QdrantSingleton -from tc_hivemind_backend.db.mongo import MongoSingleton from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline from temporalio import activity, workflow from qdrant_client.models import Filter, FieldCondition, MatchValue from qdrant_client.http import models +from openai import AsyncOpenAI +import re with workflow.unsafe.imports_passed_through(): from hivemind_summarizer.schema import ( PlatformSummariesActivityInput, PlatformSummariesRangeActivityInput, - PlatformGetCollectionNameInput, + RealTimeSummaryWorkflowInput, ) @@ -41,57 +41,6 @@ def extract_summary_text(node_content: dict[str, Any]) -> str: return "Summary text not found" -@activity.defn -async def get_platform_name(input: PlatformGetCollectionNameInput) -> str: - """ - Activity that extracts collection name from MongoDB based on platform_id and community_id. - - Parameters - ---------- - input: PlatformGetCollectionNameInput - Input object containing platform_id and community_id - - Returns - ------- - str - The platform name - - Raises - ------ - Exception - If platform not found or error occurs during DB access - """ - platform_id = input.platform_id - community_id = input.community_id - - logging.info( - f"Getting collection name for platform_id: {platform_id}, community_id: {community_id}" - ) - - try: - # Get MongoDB client - mongo_client = MongoSingleton.get_instance().get_client() - - # Query the platform from Core database - platform = mongo_client["Core"]["platforms"].find_one( - {"_id": ObjectId(platform_id)} - ) - - if not platform: - raise Exception(f"Platform with ID {platform_id} not found") - - # Extract platform name - platform_name = platform.get("name") - if not platform_name: - raise Exception(f"Platform name not found for platform_id {platform_id}") - - return platform_name - - except Exception as e: - logging.error(f"Error getting collection name: {str(e)}") - raise - - @activity.defn async def fetch_platform_summaries_by_date( input: PlatformSummariesActivityInput, @@ -111,13 +60,13 @@ async def fetch_platform_summaries_by_date( """ date = input.date extract_text_only = input.extract_text_only - collection_name = f"{input.community_id}_{input.platform_name}_summary" community_id = input.community_id + collection_name = f"{community_id}_{input.platform_id}_summary" logging.info("Started fetch_platform_summaries_by_date!") - if not input.platform_name: - raise ValueError("Platform name is required but was not provided") + if not input.platform_id: + raise ValueError("Platform id is required but was not provided") logging.info( f"Fetching summaries for date: {date} from collection: {collection_name}" @@ -147,7 +96,7 @@ async def fetch_platform_summaries_by_date( # pipeline requires a different format for the collection name pipeline = CustomIngestionPipeline( community_id=community_id, - collection_name=f"{input.platform_name}_summary", + collection_name=f"{input.platform_id}_summary", ) # get the latest date from the collection latest_date = pipeline.get_latest_document_date( @@ -216,7 +165,7 @@ async def fetch_platform_summaries_by_date_range( Parameters ---------- input : PlatformSummariesRangeActivityInput - Input object containing start_date, end_date, platform_name and community_id + Input object containing start_date, end_date, platform_id and community_id Returns ------- @@ -226,19 +175,17 @@ async def fetch_platform_summaries_by_date_range( Raises ------ ValueError - If end_date is before start_date or platform_name is not provided + If end_date is before start_date or platform_id is not provided """ start_date = input.start_date end_date = input.end_date extract_text_only = input.extract_text_only - platform_name = input.platform_name + platform_id = input.platform_id community_id = input.community_id - if not platform_name: + if not platform_id: raise ValueError("Platform name is required but was not provided") - logging.info( - f"Fetching summaries for date range: {start_date} to {end_date} from collection: {collection_name}" - ) + logging.info(f"Fetching summaries for date range: {start_date} to {end_date}.") try: # Parse the date strings to datetime objects @@ -262,7 +209,7 @@ async def fetch_platform_summaries_by_date_range( date_input = PlatformSummariesActivityInput( date=date, extract_text_only=extract_text_only, - platform_name=input.platform_name, + platform_id=input.platform_id, community_id=community_id, ) summaries = await fetch_platform_summaries_by_date(date_input) @@ -272,6 +219,133 @@ async def fetch_platform_summaries_by_date_range( except Exception as e: logging.error( - f"Error fetching summaries for date range {start_date} to {end_date} from collection {collection_name}: {str(e)}" + f"Error fetching summaries for date range {start_date} to {end_date}: {str(e)}" + ) + raise + + +@activity.defn +async def fetch_and_summarize_realtime_data( + input: RealTimeSummaryWorkflowInput, +) -> str: + """ + Activity that fetches recent data from Qdrant and generates a real-time summary. + + Parameters + ---------- + input : RealTimeSummaryWorkflowInput + Input containing period, collection_name or platform_id/community_id, and extract_text_only + + Returns + ------- + str + A summarized text of the recent data + """ + try: + # Get Qdrant client + qdrant_client = QdrantSingleton.get_instance().get_client() + + # Determine collection name + collection_name = input.collection_name + if not collection_name and (input.platform_id and input.community_id): + collection_name = f"{input.community_id}_{input.platform_id}" + elif not collection_name: + raise ValueError( + "Either collection_name or both platform_id and community_id must be provided" + ) + + # Calculate time filter based on period + now = datetime.now(tz=timezone.utc) + if input.period: + if re.match(r"^\d+h$", input.period): + hours = int(input.period[:-1]) + time_threshold = now - timedelta(hours=hours) + elif re.match(r"^\d{4}-\d{2}-\d{2}$", input.period): + time_threshold = datetime.strptime(input.period, "%Y-%m-%d").replace( + tzinfo=timezone.utc + ) + else: + raise ValueError( + "Period must be in format 'Nh' (e.g., '1h', '4h') or 'YYYY-MM-DD'" + ) + else: + # Default to last hour if no period specified + time_threshold = now - timedelta(hours=1) + + # Create filter for the time period + filter_conditions = [ + FieldCondition( + key="createdAt", range=models.Range(gt=time_threshold.timestamp()) + ) + ] + time_filter = Filter(must=filter_conditions) + + # Query Qdrant for recent data + search_results = qdrant_client.search( + collection_name=collection_name, + query_vector=[0] + * 1024, # Using zero vector since we only care about the filter + query_filter=time_filter, + limit=500, # hard limit in case the data was a lot + with_payload=True, + with_vectors=False, ) + + if not search_results: + return "No recent data found for the specified period." + + logging.info(f"found {len(search_results)} raw data points!") + + # Extract text content from the results + texts = [] + for point in search_results: + if "_node_content" in point.payload: + content = point.payload["_node_content"] + if isinstance(content, str): + try: + content = json.loads(content) + except json.JSONDecodeError: + pass + if isinstance(content, dict) and "text" in content: + if "author" in content["metadata"]: + texts.append( + content["metadata"]["author"] + ": " + content["text"] + ) + else: + texts.append(content["text"]) + + if not texts: + return "No text content found in the recent data." + + # Combine all texts + combined_text = "\n".join(texts) + + logging.info("Starting to summarize...") + + # Initialize OpenAI client + client = AsyncOpenAI() + + # Generate summary using OpenAI + prompt = ( + "Please provide a concise summary of the following content, focusing on the key points and main themes:" + f"{combined_text}" + ) + + response = await client.chat.completions.create( + model="gpt-4o-mini", + messages=[ + { + "role": "system", + "content": "You are a helpful assistant that summarizes content concisely and accurately.", + }, + {"role": "user", "content": prompt}, + ], + temperature=0.3, + n=1, + ) + + return response.choices[0].message.content + + except Exception as e: + logging.error(f"Error in fetch_and_summarize_realtime_data: {str(e)}") raise diff --git a/hivemind_summarizer/real_time_summary_workflow.py b/hivemind_summarizer/real_time_summary_workflow.py new file mode 100644 index 0000000..551fb58 --- /dev/null +++ b/hivemind_summarizer/real_time_summary_workflow.py @@ -0,0 +1,44 @@ +from temporalio import workflow +from temporalio.common import RetryPolicy +from datetime import timedelta + +with workflow.unsafe.imports_passed_through(): + from .activities import fetch_and_summarize_realtime_data + from .schema import RealTimeSummaryWorkflowInput + + +@workflow.defn +class RealTimeSummaryWorkflow: + """ + A Temporal workflow that fetches and summarizes recent data in real-time. + """ + + @workflow.run + async def run(self, input: RealTimeSummaryWorkflowInput) -> str: + """ + Run the workflow to fetch and summarize recent data. + + Parameters + ---------- + input : RealTimeSummaryWorkflowInput + Input containing period, collection_name or platform_id/community_id, and extract_text_only + + Returns + ------- + str + A summarized text of the recent data + """ + # Execute the activity with retry policy + summary = await workflow.execute_activity( + fetch_and_summarize_realtime_data, + input, + schedule_to_close_timeout=timedelta(minutes=5), + retry_policy=RetryPolicy( + maximum_attempts=3, + initial_interval=timedelta(seconds=1), + maximum_interval=timedelta(minutes=1), + backoff_coefficient=2.0, + ), + ) + + return summary diff --git a/hivemind_summarizer/schema.py b/hivemind_summarizer/schema.py index 2cea026..01f51e3 100644 --- a/hivemind_summarizer/schema.py +++ b/hivemind_summarizer/schema.py @@ -4,19 +4,14 @@ class PlatformSummariesActivityInput(BaseModel): date: str | None = None extract_text_only: bool = True - platform_name: str | None = None - community_id: str | None = None + platform_id: str + community_id: str class PlatformSummariesRangeActivityInput(BaseModel): start_date: str end_date: str extract_text_only: bool = True - platform_name: str | None = None - community_id: str | None = None - - -class PlatformGetCollectionNameInput(BaseModel): platform_id: str community_id: str @@ -27,3 +22,12 @@ class PlatformFetchSummariesWorkflowInput(BaseModel): start_date: str | None = None end_date: str | None = None extract_text_only: bool = True + + +class RealTimeSummaryWorkflowInput(BaseModel): + period: str | None = ( + None # could be in format of hour (1h, 4h, ...) or day %Y-%m-%d + ) + platform_id: str | None = None + community_id: str | None = None + collection_name: str | None = None diff --git a/hivemind_summarizer/workflows.py b/hivemind_summarizer/summarizer_workflow.py similarity index 82% rename from hivemind_summarizer/workflows.py rename to hivemind_summarizer/summarizer_workflow.py index 8f82b50..7c86ef5 100644 --- a/hivemind_summarizer/workflows.py +++ b/hivemind_summarizer/summarizer_workflow.py @@ -9,12 +9,10 @@ from .activities import ( fetch_platform_summaries_by_date, fetch_platform_summaries_by_date_range, - get_platform_name, ) from .schema import ( PlatformSummariesActivityInput, PlatformSummariesRangeActivityInput, - PlatformGetCollectionNameInput, PlatformFetchSummariesWorkflowInput, ) @@ -52,17 +50,6 @@ async def run( ) ) - logging.info("Getting collection name!") - # First, get the collection name - platform_name = await workflow.execute_activity( - get_platform_name, - PlatformGetCollectionNameInput( - platform_id=input.platform_id, community_id=input.community_id - ), - schedule_to_close_timeout=timedelta(minutes=1), - retry_policy=RetryPolicy(maximum_attempts=3), - ) - # if end_date is not provided, the workflow will fetch summaries just for the start_date if input.end_date is None: logging.info("Getting summaries by date!") @@ -70,7 +57,7 @@ async def run( fetch_platform_summaries_by_date, PlatformSummariesActivityInput( date=input.start_date, - platform_name=platform_name, + platform_id=input.platform_id, community_id=input.community_id, extract_text_only=input.extract_text_only, ), @@ -85,7 +72,7 @@ async def run( PlatformSummariesRangeActivityInput( start_date=input.start_date, end_date=input.end_date, - platform_name=platform_name, + platform_id=input.platform_id, community_id=input.community_id, extract_text_only=input.extract_text_only, ), diff --git a/registry.py b/registry.py index 4e22c70..5a7a412 100644 --- a/registry.py +++ b/registry.py @@ -13,7 +13,7 @@ from hivemind_summarizer.activities import ( fetch_platform_summaries_by_date, fetch_platform_summaries_by_date_range, - get_platform_name, + fetch_and_summarize_realtime_data, ) from workflows import ( CommunityWebsiteWorkflow, @@ -21,6 +21,7 @@ MediaWikiETLWorkflow, PlatformSummariesWorkflow, IngestionWorkflow, + RealTimeSummaryWorkflow, ) WORKFLOWS = [ @@ -29,6 +30,7 @@ MediaWikiETLWorkflow, PlatformSummariesWorkflow, IngestionWorkflow, + RealTimeSummaryWorkflow, ] ACTIVITIES = [ @@ -43,6 +45,6 @@ say_hello, fetch_platform_summaries_by_date, fetch_platform_summaries_by_date_range, - get_platform_name, process_document, + fetch_and_summarize_realtime_data, ] diff --git a/workflows.py b/workflows.py index fa1ab2b..dd53b85 100644 --- a/workflows.py +++ b/workflows.py @@ -13,7 +13,8 @@ from hivemind_etl.simple_ingestion.pipeline import ( IngestionWorkflow, ) -from hivemind_summarizer.workflows import PlatformSummariesWorkflow +from hivemind_summarizer.summarizer_workflow import PlatformSummariesWorkflow +from hivemind_summarizer.real_time_summary_workflow import RealTimeSummaryWorkflow from temporalio import workflow