Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This repository contains TogetherCrew's Temporal Python workflows for data proce

### Hivemind Summarizer

- **Telegram Summaries**: Retrieves and processes summaries from Telegram data stored in Qdrant, with options to fetch by date or date range.
- **Platform Summaries**: Retrieves and processes summaries from Platform data stored in Qdrant, with options to fetch by date or date range.

## Architecture

Expand Down Expand Up @@ -49,20 +49,20 @@ The project uses Temporal for workflow orchestration with the following componen

## Usage Examples

### Running a Telegram Summary Workflow
### Running a Platform Summary Workflow

To fetch summaries for a specific community and date range:

```python
from temporalio.client import Client
from hivemind_summarizer.workflows import TelegramSummariesWorkflow
from hivemind_summarizer.schema import TelegramFetchSummariesWorkflowInput
from hivemind_summarizer.workflows import PlatformSummariesWorkflow
from hivemind_summarizer.schema import PlatformFetchSummariesWorkflowInput

async def run_telegram_workflow():
async def run_okatfirn_workflow():
client = await Client.connect("localhost:7233")

# Create workflow input
input_data = TelegramFetchSummariesWorkflowInput(
input_data = PlatformFetchSummariesWorkflowInput(
platform_id="your_platform_id",
community_id="your_community_id",
start_date="2023-05-01",
Expand All @@ -72,9 +72,9 @@ async def run_telegram_workflow():

# Execute workflow
result = await client.execute_workflow(
TelegramSummariesWorkflow.run,
PlatformSummariesWorkflow.run,
input_data,
id="telegram-summaries-workflow",
id="platform-summaries-workflow",
task_queue="your_task_queue"
)

Expand Down
32 changes: 16 additions & 16 deletions hivemind_summarizer/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

with workflow.unsafe.imports_passed_through():
from hivemind_summarizer.schema import (
TelegramSummariesActivityInput,
TelegramSummariesRangeActivityInput,
TelegramGetCollectionNameInput,
PlatformSummariesActivityInput,
PlatformSummariesRangeActivityInput,
PlatformGetCollectionNameInput,
)


Expand All @@ -42,13 +42,13 @@ def extract_summary_text(node_content: dict[str, Any]) -> str:


@activity.defn
async def get_platform_name(input: TelegramGetCollectionNameInput) -> str:
async def get_platform_name(input: PlatformGetCollectionNameInput) -> str:
"""
Activity that extracts collection name from MongoDB based on platform_id and community_id.

Parameters
----------
input: TelegramGetCollectionNameInput
input: PlatformGetCollectionNameInput
Input object containing platform_id and community_id

Returns
Expand Down Expand Up @@ -93,15 +93,15 @@ async def get_platform_name(input: TelegramGetCollectionNameInput) -> str:


@activity.defn
async def fetch_telegram_summaries_by_date(
input: TelegramSummariesActivityInput,
async def fetch_platform_summaries_by_date(
input: PlatformSummariesActivityInput,
) -> list[dict[str, Any]] | str:
"""
Activity that fetches Telegram summaries for a specific date from Qdrant.
Activity that fetches Platform summaries for a specific date from Qdrant.

Parameters
----------
input : TelegramSummariesActivityInput
input : PlatformSummariesActivityInput
Input object containing date, collection_name and extract_text_only

Returns
Expand All @@ -114,7 +114,7 @@ async def fetch_telegram_summaries_by_date(
collection_name = f"{input.community_id}_{input.platform_name}_summary"
community_id = input.community_id

logging.info("Started fetch_telegram_summaries_by_date!")
logging.info("Started fetch_platform_summaries_by_date!")

if not input.platform_name:
raise ValueError("Platform name is required but was not provided")
Expand Down Expand Up @@ -207,15 +207,15 @@ async def fetch_telegram_summaries_by_date(


@activity.defn
async def fetch_telegram_summaries_by_date_range(
input: TelegramSummariesRangeActivityInput,
async def fetch_platform_summaries_by_date_range(
input: PlatformSummariesRangeActivityInput,
) -> dict[str, list[dict[str, Any] | str]]:
"""
Activity that fetches Telegram summaries for a range of dates from Qdrant.
Activity that fetches summaries for a range of dates from Qdrant.

Parameters
----------
input : TelegramSummariesRangeActivityInput
input : PlatformSummariesRangeActivityInput
Input object containing start_date, end_date, platform_name and community_id

Returns
Expand Down Expand Up @@ -259,13 +259,13 @@ async def fetch_telegram_summaries_by_date_range(
# Fetch summaries for each date
result = {}
for date in date_range:
date_input = TelegramSummariesActivityInput(
date_input = PlatformSummariesActivityInput(
date=date,
extract_text_only=extract_text_only,
platform_name=input.platform_name,
community_id=community_id,
)
summaries = await fetch_telegram_summaries_by_date(date_input)
summaries = await fetch_platform_summaries_by_date(date_input)
result[date] = summaries

return result
Expand Down
8 changes: 4 additions & 4 deletions hivemind_summarizer/schema.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
from pydantic import BaseModel


class TelegramSummariesActivityInput(BaseModel):
class PlatformSummariesActivityInput(BaseModel):
date: str | None = None
extract_text_only: bool = True
platform_name: str | None = None
community_id: str | None = None


class TelegramSummariesRangeActivityInput(BaseModel):
class PlatformSummariesRangeActivityInput(BaseModel):
start_date: str
end_date: str
extract_text_only: bool = True
platform_name: str | None = None
community_id: str | None = None


class TelegramGetCollectionNameInput(BaseModel):
class PlatformGetCollectionNameInput(BaseModel):
platform_id: str
community_id: str


class TelegramFetchSummariesWorkflowInput(BaseModel):
class PlatformFetchSummariesWorkflowInput(BaseModel):
platform_id: str
community_id: str
start_date: str | None = None
Expand Down
36 changes: 18 additions & 18 deletions hivemind_summarizer/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,42 @@

with workflow.unsafe.imports_passed_through():
from .activities import (
fetch_telegram_summaries_by_date,
fetch_telegram_summaries_by_date_range,
fetch_platform_summaries_by_date,
fetch_platform_summaries_by_date_range,
get_platform_name,
)
from .schema import (
TelegramSummariesActivityInput,
TelegramSummariesRangeActivityInput,
TelegramGetCollectionNameInput,
TelegramFetchSummariesWorkflowInput,
PlatformSummariesActivityInput,
PlatformSummariesRangeActivityInput,
PlatformGetCollectionNameInput,
PlatformFetchSummariesWorkflowInput,
)


@workflow.defn
class TelegramSummariesWorkflow:
class PlatformSummariesWorkflow:
"""
A Temporal workflow that fetches Telegram summaries for a specified date.
A Temporal workflow that fetches summaries for a specified date.
"""

@workflow.run
async def run(
self, input: TelegramFetchSummariesWorkflowInput
self, input: PlatformFetchSummariesWorkflowInput
) -> list[dict[str, Any]]:
"""
Run the workflow to fetch Telegram summaries for the specified date.
Run the workflow to fetch summaries for the specified date.

Parameters
----------
input : TelegramFetchSummariesWorkflowInput
Input containing platform_id, community_id, start_date, end_date, extract_text_only and collection_name
input : PlatformFetchSummariesWorkflowInput
Input containing platform_id, community_id, start_date, end_date, extract_text_only and platform_name

Returns
-------
list[dict[str, Any]]
A list of summary objects for the specified date
"""
logging.info("Started TelegramSummariesWorkflow!")
logging.info("Started PlatformSummariesWorkflow!")
logging.info(
(
f" Platform ID: {input.platform_id}. "
Expand All @@ -56,7 +56,7 @@ async def run(
# First, get the collection name
platform_name = await workflow.execute_activity(
get_platform_name,
TelegramGetCollectionNameInput(
PlatformGetCollectionNameInput(
platform_id=input.platform_id, community_id=input.community_id
),
schedule_to_close_timeout=timedelta(minutes=1),
Expand All @@ -67,8 +67,8 @@ async def run(
if input.end_date is None:
logging.info("Getting summaries by date!")
summaries = await workflow.execute_activity(
fetch_telegram_summaries_by_date,
TelegramSummariesActivityInput(
fetch_platform_summaries_by_date,
PlatformSummariesActivityInput(
date=input.start_date,
platform_name=platform_name,
community_id=input.community_id,
Expand All @@ -81,8 +81,8 @@ async def run(
else:
logging.info("Getting summaries by date range!")
summaries = await workflow.execute_activity(
fetch_telegram_summaries_by_date_range,
TelegramSummariesRangeActivityInput(
fetch_platform_summaries_by_date_range,
PlatformSummariesRangeActivityInput(
start_date=input.start_date,
end_date=input.end_date,
platform_name=platform_name,
Expand Down
12 changes: 6 additions & 6 deletions registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@
load_mediawiki_data,
)
from hivemind_summarizer.activities import (
fetch_telegram_summaries_by_date,
fetch_telegram_summaries_by_date_range,
fetch_platform_summaries_by_date,
fetch_platform_summaries_by_date_range,
get_platform_name,
)
from workflows import (
CommunityWebsiteWorkflow,
SayHello,
WebsiteIngestionSchedulerWorkflow,
MediaWikiETLWorkflow,
TelegramSummariesWorkflow,
PlatformSummariesWorkflow,
)

WORKFLOWS = [
CommunityWebsiteWorkflow,
SayHello,
WebsiteIngestionSchedulerWorkflow,
MediaWikiETLWorkflow,
TelegramSummariesWorkflow,
PlatformSummariesWorkflow,
]

ACTIVITIES = [
Expand All @@ -40,7 +40,7 @@
transform_mediawiki_data,
load_mediawiki_data,
say_hello,
fetch_telegram_summaries_by_date,
fetch_telegram_summaries_by_date_range,
fetch_platform_summaries_by_date,
fetch_platform_summaries_by_date_range,
get_platform_name,
]
2 changes: 1 addition & 1 deletion workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from hivemind_etl.mediawiki.workflows import (
MediaWikiETLWorkflow,
)
from hivemind_summarizer.workflows import TelegramSummariesWorkflow
from hivemind_summarizer.workflows import PlatformSummariesWorkflow

from temporalio import workflow

Expand Down