Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 46 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This repository contains TogetherCrew's Temporal Python workflows for data proce
### Hivemind Summarizer

- **Platform Summaries**: Retrieves and processes summaries from Platform data stored in Qdrant, with options to fetch by date or date range.
- **Real-Time Summaries**: Generates new summaries for recent data across platforms or specific communities.

## Architecture

Expand Down Expand Up @@ -51,23 +52,23 @@ The project uses Temporal for workflow orchestration with the following componen

### Running a Platform Summary Workflow

To fetch summaries for a specific community and date range:
To fetch existing summaries for a specific community and date range from Qdrant:

```python
from temporalio.client import Client
from hivemind_summarizer.workflows import PlatformSummariesWorkflow
from hivemind_summarizer.schema import PlatformFetchSummariesWorkflowInput

async def run_okatfirn_workflow():
async def run_platform_summaries_workflow():
client = await Client.connect("localhost:7233")

# Create workflow input
input_data = PlatformFetchSummariesWorkflowInput(
platform_id="your_platform_id",
community_id="your_community_id",
start_date="2023-05-01",
end_date="2023-05-07",
extract_text_only=True
platform_id="your_platform_id", # Required: the platform to fetch summaries from
community_id="your_community_id", # Required: the community to fetch summaries from
start_date="2023-05-01", # Optional: fetch summaries from this date
end_date="2023-05-07", # Optional: fetch summaries until this date
extract_text_only=True # Optional: whether to extract only text content
)

# Execute workflow
Expand All @@ -78,9 +79,47 @@ async def run_okatfirn_workflow():
task_queue="your_task_queue"
)

# Returns a list of existing summaries from Qdrant
return result
```

Note: This workflow only retrieves existing summaries that have already been generated and stored in Qdrant. It does not generate new summaries. Use this when you want to access previously generated summaries for a specific platform and community.

### Running a Real-Time Summary Workflow

To generate new summaries for recent data:

```python
from temporalio.client import Client
from hivemind_summarizer.workflows import RealTimeSummaryWorkflow
from hivemind_summarizer.schema import RealTimeSummaryWorkflowInput

async def run_realtime_summary_workflow():
client = await Client.connect("localhost:7233")

# Create workflow input
input_data = RealTimeSummaryWorkflowInput(
period="4h", # Optional: time period (e.g., "1h", "4h") or date in %Y-%m-%d format
platform_id="your_platform_id", # Optional: filter by platform
community_id="your_community_id", # Optional: filter by community
collection_name="your_collection" # Optional: filter by collection
)

# Execute workflow
result = await client.execute_workflow(
RealTimeSummaryWorkflow.run,
input_data,
id="realtime-summary-workflow",
task_queue="your_task_queue"
)

# Returns newly generated summary text
return result
```

Note: This workflow actively generates new summaries for recent data. Use this when you want to create fresh summaries for the specified time period and filters.
Note 2: Either one of the filter by collection or filter by platform and community should be given. (to identify the collection to access tha raw data)

### Running a MediaWiki ETL Workflow

To process MediaWiki content for all communities or a specific platform:
Expand Down
210 changes: 142 additions & 68 deletions hivemind_summarizer/activities.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import json
import logging
from typing import Any
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

from bson import ObjectId
from tc_hivemind_backend.db.qdrant import QdrantSingleton
from tc_hivemind_backend.db.mongo import MongoSingleton
from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline

from temporalio import activity, workflow
from qdrant_client.models import Filter, FieldCondition, MatchValue
from qdrant_client.http import models
from openai import AsyncOpenAI
import re

with workflow.unsafe.imports_passed_through():
from hivemind_summarizer.schema import (
PlatformSummariesActivityInput,
PlatformSummariesRangeActivityInput,
PlatformGetCollectionNameInput,
RealTimeSummaryWorkflowInput,
)


Expand All @@ -41,57 +41,6 @@ def extract_summary_text(node_content: dict[str, Any]) -> str:
return "Summary text not found"


@activity.defn
async def get_platform_name(input: PlatformGetCollectionNameInput) -> str:
"""
Activity that extracts collection name from MongoDB based on platform_id and community_id.

Parameters
----------
input: PlatformGetCollectionNameInput
Input object containing platform_id and community_id

Returns
-------
str
The platform name

Raises
------
Exception
If platform not found or error occurs during DB access
"""
platform_id = input.platform_id
community_id = input.community_id

logging.info(
f"Getting collection name for platform_id: {platform_id}, community_id: {community_id}"
)

try:
# Get MongoDB client
mongo_client = MongoSingleton.get_instance().get_client()

# Query the platform from Core database
platform = mongo_client["Core"]["platforms"].find_one(
{"_id": ObjectId(platform_id)}
)

if not platform:
raise Exception(f"Platform with ID {platform_id} not found")

# Extract platform name
platform_name = platform.get("name")
if not platform_name:
raise Exception(f"Platform name not found for platform_id {platform_id}")

return platform_name

except Exception as e:
logging.error(f"Error getting collection name: {str(e)}")
raise


@activity.defn
async def fetch_platform_summaries_by_date(
input: PlatformSummariesActivityInput,
Expand All @@ -111,13 +60,13 @@ async def fetch_platform_summaries_by_date(
"""
date = input.date
extract_text_only = input.extract_text_only
collection_name = f"{input.community_id}_{input.platform_name}_summary"
community_id = input.community_id
collection_name = f"{community_id}_{input.platform_id}_summary"

logging.info("Started fetch_platform_summaries_by_date!")

if not input.platform_name:
raise ValueError("Platform name is required but was not provided")
if not input.platform_id:
raise ValueError("Platform id is required but was not provided")

logging.info(
f"Fetching summaries for date: {date} from collection: {collection_name}"
Expand Down Expand Up @@ -147,7 +96,7 @@ async def fetch_platform_summaries_by_date(
# pipeline requires a different format for the collection name
pipeline = CustomIngestionPipeline(
community_id=community_id,
collection_name=f"{input.platform_name}_summary",
collection_name=f"{input.platform_id}_summary",
)
# get the latest date from the collection
latest_date = pipeline.get_latest_document_date(
Expand Down Expand Up @@ -216,7 +165,7 @@ async def fetch_platform_summaries_by_date_range(
Parameters
----------
input : PlatformSummariesRangeActivityInput
Input object containing start_date, end_date, platform_name and community_id
Input object containing start_date, end_date, platform_id and community_id

Returns
-------
Expand All @@ -226,19 +175,17 @@ async def fetch_platform_summaries_by_date_range(
Raises
------
ValueError
If end_date is before start_date or platform_name is not provided
If end_date is before start_date or platform_id is not provided
"""
start_date = input.start_date
end_date = input.end_date
extract_text_only = input.extract_text_only
platform_name = input.platform_name
platform_id = input.platform_id
community_id = input.community_id
if not platform_name:
if not platform_id:
raise ValueError("Platform name is required but was not provided")

logging.info(
f"Fetching summaries for date range: {start_date} to {end_date} from collection: {collection_name}"
)
logging.info(f"Fetching summaries for date range: {start_date} to {end_date}.")

try:
# Parse the date strings to datetime objects
Expand All @@ -262,7 +209,7 @@ async def fetch_platform_summaries_by_date_range(
date_input = PlatformSummariesActivityInput(
date=date,
extract_text_only=extract_text_only,
platform_name=input.platform_name,
platform_id=input.platform_id,
community_id=community_id,
)
summaries = await fetch_platform_summaries_by_date(date_input)
Expand All @@ -272,6 +219,133 @@ async def fetch_platform_summaries_by_date_range(

except Exception as e:
logging.error(
f"Error fetching summaries for date range {start_date} to {end_date} from collection {collection_name}: {str(e)}"
f"Error fetching summaries for date range {start_date} to {end_date}: {str(e)}"
)
raise


@activity.defn
async def fetch_and_summarize_realtime_data(
input: RealTimeSummaryWorkflowInput,
) -> str:
"""
Activity that fetches recent data from Qdrant and generates a real-time summary.

Parameters
----------
input : RealTimeSummaryWorkflowInput
Input containing period, collection_name or platform_id/community_id, and extract_text_only

Returns
-------
str
A summarized text of the recent data
"""
try:
# Get Qdrant client
qdrant_client = QdrantSingleton.get_instance().get_client()

# Determine collection name
collection_name = input.collection_name
if not collection_name and (input.platform_id and input.community_id):
collection_name = f"{input.community_id}_{input.platform_id}"
elif not collection_name:
raise ValueError(
"Either collection_name or both platform_id and community_id must be provided"
)

# Calculate time filter based on period
now = datetime.now(tz=timezone.utc)
if input.period:
if re.match(r"^\d+h$", input.period):
hours = int(input.period[:-1])
time_threshold = now - timedelta(hours=hours)
elif re.match(r"^\d{4}-\d{2}-\d{2}$", input.period):
time_threshold = datetime.strptime(input.period, "%Y-%m-%d").replace(
tzinfo=timezone.utc
)
else:
raise ValueError(
"Period must be in format 'Nh' (e.g., '1h', '4h') or 'YYYY-MM-DD'"
)
else:
# Default to last hour if no period specified
time_threshold = now - timedelta(hours=1)

# Create filter for the time period
filter_conditions = [
FieldCondition(
key="createdAt", range=models.Range(gt=time_threshold.timestamp())
)
]
time_filter = Filter(must=filter_conditions)

# Query Qdrant for recent data
search_results = qdrant_client.search(
collection_name=collection_name,
query_vector=[0]
* 1024, # Using zero vector since we only care about the filter
query_filter=time_filter,
limit=500, # hard limit in case the data was a lot
with_payload=True,
with_vectors=False,
)

if not search_results:
return "No recent data found for the specified period."

logging.info(f"found {len(search_results)} raw data points!")

# Extract text content from the results
texts = []
for point in search_results:
if "_node_content" in point.payload:
content = point.payload["_node_content"]
if isinstance(content, str):
try:
content = json.loads(content)
except json.JSONDecodeError:
pass
if isinstance(content, dict) and "text" in content:
if "author" in content["metadata"]:
texts.append(
content["metadata"]["author"] + ": " + content["text"]
)
else:
texts.append(content["text"])

if not texts:
return "No text content found in the recent data."

# Combine all texts
combined_text = "\n".join(texts)

logging.info("Starting to summarize...")

# Initialize OpenAI client
client = AsyncOpenAI()

# Generate summary using OpenAI
prompt = (
"Please provide a concise summary of the following content, focusing on the key points and main themes:"
f"{combined_text}"
)

response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a helpful assistant that summarizes content concisely and accurately.",
},
{"role": "user", "content": prompt},
],
temperature=0.3,
n=1,
)

return response.choices[0].message.content

except Exception as e:
logging.error(f"Error in fetch_and_summarize_realtime_data: {str(e)}")
raise
Loading