-
Notifications
You must be signed in to change notification settings - Fork 0
Fix/63 ingestion pipline remove caching #64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…les! - Updated the CustomIngestionPipeline instantiation in etl.py, pipeline.py, website_etl.py, and activities.py to set use_cache=False, ensuring that caching is disabled during document ingestion.
|
Warning Rate limit exceeded@amindadgar has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 12 minutes and 8 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (2)
WalkthroughAdds explicit Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Orchestrator
participant MediaWikiETL as MediaWikiETL.load
participant Pipeline as CustomIngestionPipeline
Orchestrator->>MediaWikiETL: load(community_id, platform_id)
MediaWikiETL->>Pipeline: new(community_id, collection_name=..., use_cache=false)
Pipeline-->>MediaWikiETL: initialized
MediaWikiETL->>Pipeline: ingest()
Pipeline-->>MediaWikiETL: results
MediaWikiETL-->>Orchestrator: load complete
sequenceDiagram
autonumber
actor Runner
participant WebsiteETL as WebsiteETL.__init__
participant Pipeline as CustomIngestionPipeline
Runner->>WebsiteETL: construct(community_id, collection_name)
WebsiteETL->>Pipeline: new(community_id, collection_name, use_cache=false)
Pipeline-->>WebsiteETL: ready
sequenceDiagram
autonumber
actor Temporal
participant Activities as hivemind_summarizer.activities
participant Pipeline as CustomIngestionPipeline
Temporal->>Activities: fetch_platform_summaries_by_date(_|range)
Activities->>Activities: build PlatformSummariesActivityInput(..., use_cache=false)
Activities->>Pipeline: new(..., use_cache=false)
Pipeline-->>Activities: results
Activities-->>Temporal: summaries
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
hivemind_summarizer/activities.py (2)
96-102: Don’t ignore input.use_cache and guard empty collections to avoid crashes
- Hardcoding use_cache=False here bypasses the new input.use_cache field. Use the provided flag so callers control behavior.
- If the collection is empty, get_latest_document_date may return None; calling .strftime on None will raise. Add a guard and return empty results.
Apply:
- pipeline = CustomIngestionPipeline( - community_id=community_id, - collection_name=f"{input.platform_id}_summary", - use_cache=False, - ) + pipeline = CustomIngestionPipeline( + community_id=community_id, + collection_name=f"{input.platform_id}_summary", + use_cache=getattr(input, "use_cache", False), + ) # get the latest date from the collection - latest_date = pipeline.get_latest_document_date( + latest_date = pipeline.get_latest_document_date( field_name="date", field_schema=models.PayloadSchemaType.DATETIME ) + if latest_date is None: + logging.info("No documents found in summary collection; returning empty result.") + return "" if extract_text_only else []
207-217: Type mismatch in date-range return when extract_text_only=Truefetch_platform_summaries_by_date may return a str, but this function is annotated to return dict[str, list[dict | str]]. You currently assign a str directly, violating the annotation. Wrap the string in a list (or change the annotation), keeping a consistent shape for consumers.
- summaries = await fetch_platform_summaries_by_date(date_input) - result[date] = summaries + summaries = await fetch_platform_summaries_by_date(date_input) + if extract_text_only and isinstance(summaries, str): + result[date] = [summaries] + else: + result[date] = summaries
🧹 Nitpick comments (4)
hivemind_etl/simple_ingestion/pipeline.py (1)
189-193: Batch path also hardcodes cache off — align with a single source of truthSame note as above. If you keep it per-call, consider threading a use_cache flag through BatchIngestionRequest to keep both paths consistent.
hivemind_etl/website/website_etl.py (1)
32-34: Cache disabled in WebsiteETL — clarify docstring vs. pipeline conventionChange is fine. Minor: the class docstring says collection name would be community_id_platform_id, while CustomIngestionPipeline expects collection_name without the community prefix (it reconstructs it internally). Consider updating the docstring to avoid confusion.
hivemind_summarizer/activities.py (2)
207-219: Optional: fetch per-date concurrently with bounded parallelismTo reduce latency over long ranges without overloading Qdrant, fetch dates concurrently with a small semaphore (e.g., 5).
- result = {} - for date in date_range: - date_input = PlatformSummariesActivityInput( - date=date, - extract_text_only=extract_text_only, - platform_id=input.platform_id, - community_id=community_id, - use_cache=False, - ) - summaries = await fetch_platform_summaries_by_date(date_input) - if extract_text_only and isinstance(summaries, str): - result[date] = [summaries] - else: - result[date] = summaries + result: dict[str, list[dict[str, Any]] | list[str]] = {} + sem = asyncio.Semaphore(5) + async def fetch_one(d: str): + async with sem: + date_input = PlatformSummariesActivityInput( + date=d, + extract_text_only=extract_text_only, + platform_id=input.platform_id, + community_id=community_id, + use_cache=getattr(input, "use_cache", False), + ) + out = await fetch_platform_summaries_by_date(date_input) + result[d] = [out] if extract_text_only and isinstance(out, str) else out + await asyncio.gather(*(fetch_one(d) for d in date_range))
286-293: Heads-up: fixed 1024-dim zero vector may mismatch collection configIf the target summary collection uses a different vector size, search will fail. Consider deriving dimension from the collection schema or centralizing this constant.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
hivemind_etl/mediawiki/etl.py(1 hunks)hivemind_etl/simple_ingestion/pipeline.py(2 hunks)hivemind_etl/website/website_etl.py(1 hunks)hivemind_summarizer/activities.py(2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-05-12T11:24:54.953Z
Learnt from: amindadgar
PR: TogetherCrew/temporal-worker-python#39
File: hivemind_summarizer/activities.py:97-100
Timestamp: 2025-05-12T11:24:54.953Z
Learning: In the temporal-worker-python project, `CustomIngestionPipeline` expects collection names WITHOUT the community_id prefix (e.g., `{platform_id}_summary`), while direct Qdrant queries use collection names WITH the community_id prefix (e.g., `{community_id}_{platform_id}_summary`).
Applied to files:
hivemind_etl/website/website_etl.pyhivemind_etl/simple_ingestion/pipeline.pyhivemind_summarizer/activities.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: ci / lint / Lint
- GitHub Check: ci / test / Test
🔇 Additional comments (3)
hivemind_etl/simple_ingestion/pipeline.py (1)
147-151: Verifyuse_cachesupport inCustomIngestionPipelineand consider central config toggleConfirm that the external backend’s
tc_hivemind_backend.ingest_qdrant.CustomIngestionPipelineconstructor actually accepts ause_cacheparameter in your deployed dependency to avoid a runtime TypeError. If cache control will be toggled system-wide, consolidate this into a single config/env flag rather than hardcoding at each call site.hivemind_etl/mediawiki/etl.py (1)
101-103: LGTM on disabling cache — confirm backend compatibilityConstructor now includes use_cache=False. Please confirm the backend’s CustomIngestionPipeline supports it to avoid instantiation errors during load.
hivemind_summarizer/activities.py (1)
61-75: Minor: collection naming is consistent with conventionUsing f"{community_id}_{input.platform_id}_summary" for direct Qdrant queries and f"{platform_id}_summary" for the pipeline aligns with the known convention. Nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
requirements.txt (1)
12-13: Keep boto3/botocore in lockstep to prevent runtime mismatches.These two are tightly coupled; allowing unbounded minor drift can break at import/runtime.
Apply bounds that keep them on the same minor:
-boto3>=1.38.19 -botocore>=1.38.19 +boto3>=1.38.19,<1.39.0 +botocore>=1.38.19,<1.39.0
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
requirements.txt(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: ci / test / Test
- GitHub Check: ci / lint / Lint
| @@ -1,5 +1,5 @@ | |||
| python-dotenv>=1.0.0, <2.0.0 | |||
| tc-hivemind-backend==1.4.3 | |||
| tc-hivemind-backend==1.4.6 | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Bump to tc-hivemind-backend 1.4.6 — verify API usage and transitive impacts.
If this bump is to consume the new use_cache plumbing, ensure all CustomIngestionPipeline instantiations include use_cache= to avoid runtime errors on older call sites.
Run:
Also re-run unit/integration tests for ingestion and summarizer paths that construct the pipeline.
🏁 Script executed:
#!/usr/bin/env bash
# 1) Show every instantiation
rg -nP --type=py -C2 'CustomIngestionPipeline\s*\('
# 2) Flag calls likely missing the explicit kwarg (handles multi-line)
rg --json -nUP --type=py '(?s)CustomIngestionPipeline\s*\((?:(?!\)).)*\)' \
| jq -r 'select(.type=="match") | .data | "\(.path.text):\(.line_number):\(.submatches[0].match.text)"' \
| rg -v 'use_cache\s*='Length of output: 3179
Ensure all CustomIngestionPipeline instantiations include use_cache
Missing use_cache at:
hivemind_summarizer/activities.py:97hivemind_etl/simple_ingestion/pipeline.py:147hivemind_etl/simple_ingestion/pipeline.py:189
Adduse_cache=<bool>to these calls to avoid runtime errors.
🤖 Prompt for AI Agents
In requirements.txt line 2 and in hivemind_summarizer/activities.py around line
97 and hivemind_etl/simple_ingestion/pipeline.py around lines 147 and 189, each
instantiation of CustomIngestionPipeline is missing the required use_cache
parameter; update those constructor calls to include use_cache=<bool> (true or
false as appropriate for the use case) so the signature is satisfied and runtime
errors are avoided, keeping the parameter name exactly use_cache and ensuring
any callers/tests are updated if they relied on a default.
- Updated test assertions in TestMediawikiETL to include use_cache=False when instantiating the ingestion pipeline, ensuring consistency with the recent changes to disable caching during document ingestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
tests/unit/test_mediawiki_etl.py (2)
167-168: Nice: explicitly asserting use_cache=False. Consider making the mock assertion robust to positional-vs-keyword calls.If the production code ever passes collection_name/use_cache positionally, this strict kwargs assertion will fail even though semantics are correct. Suggest DRYing and tolerating either style while still requiring use_cache be explicitly provided.
Apply this diff within this test to use a helper:
- mock_ingestion_pipeline_class.assert_called_once_with( - self.community_id, collection_name=self.platform_id, use_cache=False - ) + self._assert_pipeline_init(mock_ingestion_pipeline_class)Add this helper method inside TestMediawikiETL (e.g., after setUp):
def _assert_pipeline_init(self, mock_cls): mock_cls.assert_called_once() args, kwargs = mock_cls.call_args # community_id community_id = kwargs.get("community_id", args[0] if len(args) >= 1 else None) self.assertEqual(community_id, self.community_id) # collection_name collection_name = kwargs.get("collection_name", args[1] if len(args) >= 2 else None) self.assertEqual(collection_name, self.platform_id) # require explicit use_cache either as kwarg or third positional if "use_cache" in kwargs: self.assertFalse(kwargs["use_cache"]) else: self.assertGreaterEqual(len(args), 3, "use_cache must be provided explicitly") self.assertFalse(args[2])
195-196: Same robustness/DRY suggestion for the second assertion.Mirror the change here to avoid brittleness and duplication.
- mock_ingestion_pipeline_class.assert_called_once_with( - self.community_id, collection_name=self.platform_id, use_cache=False - ) + self._assert_pipeline_init(mock_ingestion_pipeline_class)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
tests/unit/test_mediawiki_etl.py(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: ci / test / Test
Summary by CodeRabbit
Bug Fixes
Chores