Skip to content

Commit c9d0bcc

Browse files
authored
Merge pull request #39 from TogetherCrew/feat/37-real-time-summary
Feat: solving issues #37 & #38!
2 parents 525bae0 + a025f68 commit c9d0bcc

File tree

7 files changed

+251
-100
lines changed

7 files changed

+251
-100
lines changed

README.md

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ This repository contains TogetherCrew's Temporal Python workflows for data proce
1212
### Hivemind Summarizer
1313

1414
- **Platform Summaries**: Retrieves and processes summaries from Platform data stored in Qdrant, with options to fetch by date or date range.
15+
- **Real-Time Summaries**: Generates new summaries for recent data across platforms or specific communities.
1516

1617
## Architecture
1718

@@ -51,23 +52,23 @@ The project uses Temporal for workflow orchestration with the following componen
5152

5253
### Running a Platform Summary Workflow
5354

54-
To fetch summaries for a specific community and date range:
55+
To fetch existing summaries for a specific community and date range from Qdrant:
5556

5657
```python
5758
from temporalio.client import Client
5859
from hivemind_summarizer.workflows import PlatformSummariesWorkflow
5960
from hivemind_summarizer.schema import PlatformFetchSummariesWorkflowInput
6061

61-
async def run_okatfirn_workflow():
62+
async def run_platform_summaries_workflow():
6263
client = await Client.connect("localhost:7233")
6364

6465
# Create workflow input
6566
input_data = PlatformFetchSummariesWorkflowInput(
66-
platform_id="your_platform_id",
67-
community_id="your_community_id",
68-
start_date="2023-05-01",
69-
end_date="2023-05-07",
70-
extract_text_only=True
67+
platform_id="your_platform_id", # Required: the platform to fetch summaries from
68+
community_id="your_community_id", # Required: the community to fetch summaries from
69+
start_date="2023-05-01", # Optional: fetch summaries from this date
70+
end_date="2023-05-07", # Optional: fetch summaries until this date
71+
extract_text_only=True # Optional: whether to extract only text content
7172
)
7273

7374
# Execute workflow
@@ -78,9 +79,47 @@ async def run_okatfirn_workflow():
7879
task_queue="your_task_queue"
7980
)
8081

82+
# Returns a list of existing summaries from Qdrant
8183
return result
8284
```
8385

86+
Note: This workflow only retrieves existing summaries that have already been generated and stored in Qdrant. It does not generate new summaries. Use this when you want to access previously generated summaries for a specific platform and community.
87+
88+
### Running a Real-Time Summary Workflow
89+
90+
To generate new summaries for recent data:
91+
92+
```python
93+
from temporalio.client import Client
94+
from hivemind_summarizer.workflows import RealTimeSummaryWorkflow
95+
from hivemind_summarizer.schema import RealTimeSummaryWorkflowInput
96+
97+
async def run_realtime_summary_workflow():
98+
client = await Client.connect("localhost:7233")
99+
100+
# Create workflow input
101+
input_data = RealTimeSummaryWorkflowInput(
102+
period="4h", # Optional: time period (e.g., "1h", "4h") or date in %Y-%m-%d format
103+
platform_id="your_platform_id", # Optional: filter by platform
104+
community_id="your_community_id", # Optional: filter by community
105+
collection_name="your_collection" # Optional: filter by collection
106+
)
107+
108+
# Execute workflow
109+
result = await client.execute_workflow(
110+
RealTimeSummaryWorkflow.run,
111+
input_data,
112+
id="realtime-summary-workflow",
113+
task_queue="your_task_queue"
114+
)
115+
116+
# Returns newly generated summary text
117+
return result
118+
```
119+
120+
Note: This workflow actively generates new summaries for recent data. Use this when you want to create fresh summaries for the specified time period and filters.
121+
Note 2: Either one of the filter by collection or filter by platform and community should be given. (to identify the collection to access tha raw data)
122+
84123
### Running a MediaWiki ETL Workflow
85124

86125
To process MediaWiki content for all communities or a specific platform:

hivemind_summarizer/activities.py

Lines changed: 142 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
import json
22
import logging
33
from typing import Any
4-
from datetime import datetime, timedelta
4+
from datetime import datetime, timedelta, timezone
55

6-
from bson import ObjectId
76
from tc_hivemind_backend.db.qdrant import QdrantSingleton
8-
from tc_hivemind_backend.db.mongo import MongoSingleton
97
from tc_hivemind_backend.ingest_qdrant import CustomIngestionPipeline
108

119
from temporalio import activity, workflow
1210
from qdrant_client.models import Filter, FieldCondition, MatchValue
1311
from qdrant_client.http import models
12+
from openai import AsyncOpenAI
13+
import re
1414

1515
with workflow.unsafe.imports_passed_through():
1616
from hivemind_summarizer.schema import (
1717
PlatformSummariesActivityInput,
1818
PlatformSummariesRangeActivityInput,
19-
PlatformGetCollectionNameInput,
19+
RealTimeSummaryWorkflowInput,
2020
)
2121

2222

@@ -41,57 +41,6 @@ def extract_summary_text(node_content: dict[str, Any]) -> str:
4141
return "Summary text not found"
4242

4343

44-
@activity.defn
45-
async def get_platform_name(input: PlatformGetCollectionNameInput) -> str:
46-
"""
47-
Activity that extracts collection name from MongoDB based on platform_id and community_id.
48-
49-
Parameters
50-
----------
51-
input: PlatformGetCollectionNameInput
52-
Input object containing platform_id and community_id
53-
54-
Returns
55-
-------
56-
str
57-
The platform name
58-
59-
Raises
60-
------
61-
Exception
62-
If platform not found or error occurs during DB access
63-
"""
64-
platform_id = input.platform_id
65-
community_id = input.community_id
66-
67-
logging.info(
68-
f"Getting collection name for platform_id: {platform_id}, community_id: {community_id}"
69-
)
70-
71-
try:
72-
# Get MongoDB client
73-
mongo_client = MongoSingleton.get_instance().get_client()
74-
75-
# Query the platform from Core database
76-
platform = mongo_client["Core"]["platforms"].find_one(
77-
{"_id": ObjectId(platform_id)}
78-
)
79-
80-
if not platform:
81-
raise Exception(f"Platform with ID {platform_id} not found")
82-
83-
# Extract platform name
84-
platform_name = platform.get("name")
85-
if not platform_name:
86-
raise Exception(f"Platform name not found for platform_id {platform_id}")
87-
88-
return platform_name
89-
90-
except Exception as e:
91-
logging.error(f"Error getting collection name: {str(e)}")
92-
raise
93-
94-
9544
@activity.defn
9645
async def fetch_platform_summaries_by_date(
9746
input: PlatformSummariesActivityInput,
@@ -111,13 +60,13 @@ async def fetch_platform_summaries_by_date(
11160
"""
11261
date = input.date
11362
extract_text_only = input.extract_text_only
114-
collection_name = f"{input.community_id}_{input.platform_name}_summary"
11563
community_id = input.community_id
64+
collection_name = f"{community_id}_{input.platform_id}_summary"
11665

11766
logging.info("Started fetch_platform_summaries_by_date!")
11867

119-
if not input.platform_name:
120-
raise ValueError("Platform name is required but was not provided")
68+
if not input.platform_id:
69+
raise ValueError("Platform id is required but was not provided")
12170

12271
logging.info(
12372
f"Fetching summaries for date: {date} from collection: {collection_name}"
@@ -147,7 +96,7 @@ async def fetch_platform_summaries_by_date(
14796
# pipeline requires a different format for the collection name
14897
pipeline = CustomIngestionPipeline(
14998
community_id=community_id,
150-
collection_name=f"{input.platform_name}_summary",
99+
collection_name=f"{input.platform_id}_summary",
151100
)
152101
# get the latest date from the collection
153102
latest_date = pipeline.get_latest_document_date(
@@ -216,7 +165,7 @@ async def fetch_platform_summaries_by_date_range(
216165
Parameters
217166
----------
218167
input : PlatformSummariesRangeActivityInput
219-
Input object containing start_date, end_date, platform_name and community_id
168+
Input object containing start_date, end_date, platform_id and community_id
220169
221170
Returns
222171
-------
@@ -226,19 +175,17 @@ async def fetch_platform_summaries_by_date_range(
226175
Raises
227176
------
228177
ValueError
229-
If end_date is before start_date or platform_name is not provided
178+
If end_date is before start_date or platform_id is not provided
230179
"""
231180
start_date = input.start_date
232181
end_date = input.end_date
233182
extract_text_only = input.extract_text_only
234-
platform_name = input.platform_name
183+
platform_id = input.platform_id
235184
community_id = input.community_id
236-
if not platform_name:
185+
if not platform_id:
237186
raise ValueError("Platform name is required but was not provided")
238187

239-
logging.info(
240-
f"Fetching summaries for date range: {start_date} to {end_date} from collection: {collection_name}"
241-
)
188+
logging.info(f"Fetching summaries for date range: {start_date} to {end_date}.")
242189

243190
try:
244191
# Parse the date strings to datetime objects
@@ -262,7 +209,7 @@ async def fetch_platform_summaries_by_date_range(
262209
date_input = PlatformSummariesActivityInput(
263210
date=date,
264211
extract_text_only=extract_text_only,
265-
platform_name=input.platform_name,
212+
platform_id=input.platform_id,
266213
community_id=community_id,
267214
)
268215
summaries = await fetch_platform_summaries_by_date(date_input)
@@ -272,6 +219,133 @@ async def fetch_platform_summaries_by_date_range(
272219

273220
except Exception as e:
274221
logging.error(
275-
f"Error fetching summaries for date range {start_date} to {end_date} from collection {collection_name}: {str(e)}"
222+
f"Error fetching summaries for date range {start_date} to {end_date}: {str(e)}"
223+
)
224+
raise
225+
226+
227+
@activity.defn
228+
async def fetch_and_summarize_realtime_data(
229+
input: RealTimeSummaryWorkflowInput,
230+
) -> str:
231+
"""
232+
Activity that fetches recent data from Qdrant and generates a real-time summary.
233+
234+
Parameters
235+
----------
236+
input : RealTimeSummaryWorkflowInput
237+
Input containing period, collection_name or platform_id/community_id, and extract_text_only
238+
239+
Returns
240+
-------
241+
str
242+
A summarized text of the recent data
243+
"""
244+
try:
245+
# Get Qdrant client
246+
qdrant_client = QdrantSingleton.get_instance().get_client()
247+
248+
# Determine collection name
249+
collection_name = input.collection_name
250+
if not collection_name and (input.platform_id and input.community_id):
251+
collection_name = f"{input.community_id}_{input.platform_id}"
252+
elif not collection_name:
253+
raise ValueError(
254+
"Either collection_name or both platform_id and community_id must be provided"
255+
)
256+
257+
# Calculate time filter based on period
258+
now = datetime.now(tz=timezone.utc)
259+
if input.period:
260+
if re.match(r"^\d+h$", input.period):
261+
hours = int(input.period[:-1])
262+
time_threshold = now - timedelta(hours=hours)
263+
elif re.match(r"^\d{4}-\d{2}-\d{2}$", input.period):
264+
time_threshold = datetime.strptime(input.period, "%Y-%m-%d").replace(
265+
tzinfo=timezone.utc
266+
)
267+
else:
268+
raise ValueError(
269+
"Period must be in format 'Nh' (e.g., '1h', '4h') or 'YYYY-MM-DD'"
270+
)
271+
else:
272+
# Default to last hour if no period specified
273+
time_threshold = now - timedelta(hours=1)
274+
275+
# Create filter for the time period
276+
filter_conditions = [
277+
FieldCondition(
278+
key="createdAt", range=models.Range(gt=time_threshold.timestamp())
279+
)
280+
]
281+
time_filter = Filter(must=filter_conditions)
282+
283+
# Query Qdrant for recent data
284+
search_results = qdrant_client.search(
285+
collection_name=collection_name,
286+
query_vector=[0]
287+
* 1024, # Using zero vector since we only care about the filter
288+
query_filter=time_filter,
289+
limit=500, # hard limit in case the data was a lot
290+
with_payload=True,
291+
with_vectors=False,
276292
)
293+
294+
if not search_results:
295+
return "No recent data found for the specified period."
296+
297+
logging.info(f"found {len(search_results)} raw data points!")
298+
299+
# Extract text content from the results
300+
texts = []
301+
for point in search_results:
302+
if "_node_content" in point.payload:
303+
content = point.payload["_node_content"]
304+
if isinstance(content, str):
305+
try:
306+
content = json.loads(content)
307+
except json.JSONDecodeError:
308+
pass
309+
if isinstance(content, dict) and "text" in content:
310+
if "author" in content["metadata"]:
311+
texts.append(
312+
content["metadata"]["author"] + ": " + content["text"]
313+
)
314+
else:
315+
texts.append(content["text"])
316+
317+
if not texts:
318+
return "No text content found in the recent data."
319+
320+
# Combine all texts
321+
combined_text = "\n".join(texts)
322+
323+
logging.info("Starting to summarize...")
324+
325+
# Initialize OpenAI client
326+
client = AsyncOpenAI()
327+
328+
# Generate summary using OpenAI
329+
prompt = (
330+
"Please provide a concise summary of the following content, focusing on the key points and main themes:"
331+
f"{combined_text}"
332+
)
333+
334+
response = await client.chat.completions.create(
335+
model="gpt-4o-mini",
336+
messages=[
337+
{
338+
"role": "system",
339+
"content": "You are a helpful assistant that summarizes content concisely and accurately.",
340+
},
341+
{"role": "user", "content": prompt},
342+
],
343+
temperature=0.3,
344+
n=1,
345+
)
346+
347+
return response.choices[0].message.content
348+
349+
except Exception as e:
350+
logging.error(f"Error in fetch_and_summarize_realtime_data: {str(e)}")
277351
raise

0 commit comments

Comments
 (0)