feat: implement async batch processing for classification pipeline#11
feat: implement async batch processing for classification pipeline#11kunalbhardwaj2006 wants to merge 1 commit intoAOSSIE-Org:mainfrom
Conversation
WalkthroughUpdated Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested labels
Poem
🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip CodeRabbit can use OpenGrep to find security vulnerabilities and bugs across 17+ programming languages.OpenGrep is compatible with Semgrep configurations. Add an |
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@generator/src/knowledge_utils.py`:
- Around line 50-54: The async function is performing blocking file I/O with
open() (see the block that reads into prompts and the similar code at lines
marked 67-68); replace these synchronous reads with aiofiles: import aiofiles,
use async with aiofiles.open(filepath, 'r') and await f.read() (and preserve the
same exception handling to log errors with logger.error and append "" on
failure) so the function remains non-blocking and compatible with asyncio-based
concurrency.
- Around line 291-295: The code sets stream_code, subject_name, subtopic_name to
None when meta is None but continues and still performs the DB insert (using
those values) while only skipping frontend export; change the flow so that when
meta is None for a given subtopic_id you skip all further processing for that
subtopic (e.g., early continue/return) and do not perform the DB insert. Locate
the block that unpacks meta and the subsequent DB insertion code (references:
meta, subtopic_id, stream_code, subject_name, subtopic_name and the DB insert
logic) and make the missing-metadata branch bail out before any insert or
downstream work.
- Line 39: Remove the redundant fallback in knowledge_utils.py by changing the
batch_size assignment so it uses the validated configuration value directly
(i.e., set batch_size = CLASSIFICATION_BATCH_SIZE) instead of
"CLASSIFICATION_BATCH_SIZE or 3"; this avoids masking misconfiguration that is
already asserted as positive in config.py and keeps behavior consistent with the
validation in config.
- Around line 60-68: The loop uses zip(batch_files, results) which can silently
truncate if lengths differ; add an explicit length check before iterating (e.g.,
if len(batch_files) != len(results): raise ValueError or log and handle the
mismatch) or switch to zip(batch_files, results, strict=True) if your runtime
supports it, so you never write responses to the wrong response_file; update the
code around the loop that iterates over filepath, response to validate lengths
and fail/handle early instead of silently truncating.
- Around line 82-86: The slice logic can produce an empty string when there is a
'{' but no closing '}', so change the checks around start/end: call end_index =
text.rfind('}'), ensure start != -1 and end_index != -1 and end_index > start
before setting end = end_index + 1 and json_str = text[start:end]; if that
condition fails, avoid creating json_str (or explicitly set it to
None/raise/skip) so malformed input is handled safely; update the code around
the variables start, end_index/end, and json_str accordingly.
- Around line 301-303: The call to the synchronous generate_text(prompt) inside
the async block blocks the event loop; either (A) wrap the call in the event
loop's executor and await it (e.g., content = await
asyncio.get_running_loop().run_in_executor(None, generate_text, prompt)) or (B,
preferred) refactor generate_text to be async (use aiohttp instead of requests)
and then await generate_text(prompt) directly; update the code where content =
generate_text(prompt) is used and any callers of generate_text to match the
chosen approach.
- Around line 177-182: The loop currently reassigns loop variables subject_name
and subtopic_name after checking special cases, which is confusing and flagged
by linters; instead, create new variables (e.g., norm_subject_name and
norm_subtopic_name) to hold the normalized values from
normalize_subtopic(subject_name) and normalize_subtopic(subtopic_name), handle
the "other"/"unclassifiable" mapping into those new variables (e.g., set
norm_subject_name="general aptitude", norm_subtopic_name="miscellaneous" when
matched), and update any subsequent code in this block to use norm_subject_name
and norm_subtopic_name rather than mutating the original loop variables.
- Line 16: The import of process_batch from generator.src.llm_utils is failing
because process_batch is not defined in llm_utils.py; locate the call results =
await process_batch(prompts) in knowledge_utils.py and either (A) implement an
async process_batch(prompts) function inside generator/src/llm_utils.py that
matches the call signature and uses existing helpers (e.g., generate_text) to
process prompt lists, or (B) if an equivalent batch helper already exists under
a different name in llm_utils.py, change the import and the call in
knowledge_utils.py to that existing function name (ensuring it is async and
returns the expected results structure). Ensure the symbol names process_batch
(or the replaced function name) and generate_text are consistent between the two
modules so the import and await call succeed.
- Around line 403-414: The current loop parsing question IDs (variables q_id,
parts) assumes a prefix_year_qno_... format and emits a generic logger.warning
when malformed; update the logger.warning in the for loop that builds q_path
(q_path, q_list) to include the actual malformed q_id and the expected format
(e.g., "expected 'prefix_year_qno[_...]'") and optionally an example, so the
message becomes descriptive enough to aid debugging; keep skipping behavior but
make the warning text include q_id and the expected pattern using the same
logger.warning call.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 644b91ce-5b74-49dd-830b-c16c4a5e12b0
📒 Files selected for processing (1)
generator/src/knowledge_utils.py
| CLASSIFICATION_BATCH_SIZE | ||
| ) | ||
|
|
||
| from generator.src.llm_utils import generate_text, process_batch |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for process_batch definition
rg -n "def process_batch" --type pyRepository: AOSSIE-Org/LibrEd
Length of output: 43
🏁 Script executed:
# First, let me check the exact import statement in knowledge_utils.py
head -30 generator/src/knowledge_utils.py | cat -nRepository: AOSSIE-Org/LibrEd
Length of output: 1019
🏁 Script executed:
# Check what functions are actually defined in llm_utils.py
cat generator/src/llm_utils.pyRepository: AOSSIE-Org/LibrEd
Length of output: 1604
🏁 Script executed:
# Search for any usage of process_batch in the codebase to see if it's actually called
rg "process_batch" --type py -nRepository: AOSSIE-Org/LibrEd
Length of output: 244
Critical: process_batch is imported but does not exist in llm_utils.py.
Line 16 imports process_batch from generator.src.llm_utils, but this function is not defined in that module. The function is actively called on line 57 (results = await process_batch(prompts)), so this import error will cause an ImportError at runtime and break the module.
Either implement process_batch in llm_utils.py or update the import to use an existing function.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@generator/src/knowledge_utils.py` at line 16, The import of process_batch
from generator.src.llm_utils is failing because process_batch is not defined in
llm_utils.py; locate the call results = await process_batch(prompts) in
knowledge_utils.py and either (A) implement an async process_batch(prompts)
function inside generator/src/llm_utils.py that matches the call signature and
uses existing helpers (e.g., generate_text) to process prompt lists, or (B) if
an equivalent batch helper already exists under a different name in
llm_utils.py, change the import and the call in knowledge_utils.py to that
existing function name (ensuring it is async and returns the expected results
structure). Ensure the symbol names process_batch (or the replaced function
name) and generate_text are consistent between the two modules so the import and
await call succeed.
| if limit: | ||
| files = files[:limit] | ||
|
|
||
| batch_size = CLASSIFICATION_BATCH_SIZE or 3 |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Redundant fallback value.
CLASSIFICATION_BATCH_SIZE is already validated to be positive in config.py via an assertion. The or 3 fallback is unnecessary and could mask configuration issues.
♻️ Remove redundant fallback
- batch_size = CLASSIFICATION_BATCH_SIZE or 3
+ batch_size = CLASSIFICATION_BATCH_SIZE📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| batch_size = CLASSIFICATION_BATCH_SIZE or 3 | |
| batch_size = CLASSIFICATION_BATCH_SIZE |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@generator/src/knowledge_utils.py` at line 39, Remove the redundant fallback
in knowledge_utils.py by changing the batch_size assignment so it uses the
validated configuration value directly (i.e., set batch_size =
CLASSIFICATION_BATCH_SIZE) instead of "CLASSIFICATION_BATCH_SIZE or 3"; this
avoids masking misconfiguration that is already asserted as positive in
config.py and keeps behavior consistent with the validation in config.
| with open(filepath, 'r') as f: | ||
| prompts.append(f.read()) | ||
| except Exception as e: | ||
| logger.error(f"Failed to read {filepath}: {e}") | ||
| prompts.append("") |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Blocking file I/O in async function.
Using open() in an async function blocks the event loop. For batch processing where multiple batches run sequentially, this may not be critical, but it prevents proper concurrency if callers expect non-blocking behavior.
♻️ Consider using aiofiles for async file operations
+import aiofiles
+
# In process_classification_prompts:
- with open(filepath, 'r') as f:
- prompts.append(f.read())
+ async with aiofiles.open(filepath, 'r') as f:
+ prompts.append(await f.read())
- with open(response_file, 'w') as f:
- f.write(content)
+ async with aiofiles.open(response_file, 'w') as f:
+ await f.write(content)Also applies to: 67-68
🧰 Tools
🪛 Ruff (0.15.6)
[warning] 50-50: Async functions should not open files with blocking methods like open
(ASYNC230)
[warning] 50-50: Unnecessary mode argument
Remove mode argument
(UP015)
[warning] 52-54: try-except within a loop incurs performance overhead
(PERF203)
[warning] 52-52: Do not catch blind exception: Exception
(BLE001)
[warning] 53-53: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
[warning] 53-53: Logging statement uses f-string
(G004)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@generator/src/knowledge_utils.py` around lines 50 - 54, The async function is
performing blocking file I/O with open() (see the block that reads into prompts
and the similar code at lines marked 67-68); replace these synchronous reads
with aiofiles: import aiofiles, use async with aiofiles.open(filepath, 'r') and
await f.read() (and preserve the same exception handling to log errors with
logger.error and append "" on failure) so the function remains non-blocking and
compatible with asyncio-based concurrency.
| for filepath, response in zip(batch_files, results): | ||
| try: | ||
| content = response or "" | ||
|
|
||
| base_name = os.path.basename(filepath).replace('.txt', '') | ||
| response_file = os.path.join(RESPONSE_DIR, f"{base_name}_response.json") | ||
|
|
||
| with open(response_file, 'w') as f: | ||
| f.write(content) |
There was a problem hiding this comment.
Risk of silent data corruption: zip() without strict=True.
If process_batch returns fewer results than the number of prompts submitted (e.g., due to partial failures), the zip() will silently truncate, causing responses to be written to the wrong files or some files to be skipped entirely without any error.
🛡️ Proposed fix to enforce length matching
- for filepath, response in zip(batch_files, results):
+ for filepath, response in zip(batch_files, results, strict=True):Alternatively, add an explicit length check before the loop if you need to handle mismatches gracefully.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for filepath, response in zip(batch_files, results): | |
| try: | |
| content = response or "" | |
| base_name = os.path.basename(filepath).replace('.txt', '') | |
| response_file = os.path.join(RESPONSE_DIR, f"{base_name}_response.json") | |
| with open(response_file, 'w') as f: | |
| f.write(content) | |
| for filepath, response in zip(batch_files, results, strict=True): | |
| try: | |
| content = response or "" | |
| base_name = os.path.basename(filepath).replace('.txt', '') | |
| response_file = os.path.join(RESPONSE_DIR, f"{base_name}_response.json") | |
| with open(response_file, 'w') as f: | |
| f.write(content) |
🧰 Tools
🪛 Ruff (0.15.6)
[warning] 60-60: zip() without an explicit strict= parameter
Add explicit value for parameter strict=
(B905)
[warning] 67-67: Async functions should not open files with blocking methods like open
(ASYNC230)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@generator/src/knowledge_utils.py` around lines 60 - 68, The loop uses
zip(batch_files, results) which can silently truncate if lengths differ; add an
explicit length check before iterating (e.g., if len(batch_files) !=
len(results): raise ValueError or log and handle the mismatch) or switch to
zip(batch_files, results, strict=True) if your runtime supports it, so you never
write responses to the wrong response_file; update the code around the loop that
iterates over filepath, response to validate lengths and fail/handle early
instead of silently truncating.
| start = text.find('{') | ||
| end = text.rfind('}') + 1 | ||
|
|
||
| if start != -1 and end != -1: | ||
| json_str = text[start:end] |
There was a problem hiding this comment.
Edge case: malformed JSON with { but no }.
If the text contains { but no }, rfind('}') returns -1, making end = 0. The condition start != -1 and end != -1 passes (since end is 0, not -1), but text[start:0] returns an empty string, which may not be the intended behavior.
🛡️ Proposed fix to handle edge case
start = text.find('{')
end = text.rfind('}') + 1
- if start != -1 and end != -1:
+ if start != -1 and end > start:
json_str = text[start:end]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@generator/src/knowledge_utils.py` around lines 82 - 86, The slice logic can
produce an empty string when there is a '{' but no closing '}', so change the
checks around start/end: call end_index = text.rfind('}'), ensure start != -1
and end_index != -1 and end_index > start before setting end = end_index + 1 and
json_str = text[start:end]; if that condition fails, avoid creating json_str (or
explicitly set it to None/raise/skip) so malformed input is handled safely;
update the code around the variables start, end_index/end, and json_str
accordingly.
| if subject_name.lower() == "other" or subtopic_name.lower() == "unclassifiable": | ||
| subject_name = "general aptitude" | ||
| subtopic_name = "miscellaneous" | ||
| else: | ||
| subject_name = normalize_subtopic(subject_name) | ||
| subtopic_name = normalize_subtopic(subtopic_name) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Loop variables overwritten inside loop body.
Reassigning subject_name and subtopic_name within the loop that iterates over them can be confusing and is flagged by linters (PLW2901). Consider using distinct variable names for the normalized values.
♻️ Use distinct variable names for clarity
- if subject_name.lower() == "other" or subtopic_name.lower() == "unclassifiable":
- subject_name = "general aptitude"
- subtopic_name = "miscellaneous"
+ if subject_name.lower() == "other" or subtopic_name.lower() == "unclassifiable":
+ norm_subject = "general aptitude"
+ norm_subtopic = "miscellaneous"
else:
- subject_name = normalize_subtopic(subject_name)
- subtopic_name = normalize_subtopic(subtopic_name)
+ norm_subject = normalize_subtopic(subject_name)
+ norm_subtopic = normalize_subtopic(subtopic_name)
+
+ # Use norm_subject and norm_subtopic in the rest of the block🧰 Tools
🪛 Ruff (0.15.6)
[warning] 178-178: for loop variable subject_name overwritten by assignment target
(PLW2901)
[warning] 179-179: for loop variable subtopic_name overwritten by assignment target
(PLW2901)
[warning] 181-181: for loop variable subject_name overwritten by assignment target
(PLW2901)
[warning] 182-182: for loop variable subtopic_name overwritten by assignment target
(PLW2901)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@generator/src/knowledge_utils.py` around lines 177 - 182, The loop currently
reassigns loop variables subject_name and subtopic_name after checking special
cases, which is confusing and flagged by linters; instead, create new variables
(e.g., norm_subject_name and norm_subtopic_name) to hold the normalized values
from normalize_subtopic(subject_name) and normalize_subtopic(subtopic_name),
handle the "other"/"unclassifiable" mapping into those new variables (e.g., set
norm_subject_name="general aptitude", norm_subtopic_name="miscellaneous" when
matched), and update any subsequent code in this block to use norm_subject_name
and norm_subtopic_name rather than mutating the original loop variables.
| if meta: | ||
| stream_code, subject_name, subtopic_name = meta | ||
| else: | ||
| logger.warning(f"No metadata found for {subtopic_id}") | ||
| stream_code, subject_name, subtopic_name = (None, None, None) |
There was a problem hiding this comment.
Metadata query may return None, but subsequent code assumes valid values.
When meta is None, you set stream_code, subject_name, subtopic_name = (None, None, None) and continue processing. This leads to the conditional on line 332 skipping the frontend export, but the database insert on lines 324-327 still occurs with potentially invalid data. Consider whether to skip processing entirely when metadata is missing.
🧰 Tools
🪛 Ruff (0.15.6)
[warning] 294-294: Logging statement uses f-string
(G004)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@generator/src/knowledge_utils.py` around lines 291 - 295, The code sets
stream_code, subject_name, subtopic_name to None when meta is None but continues
and still performs the DB insert (using those values) while only skipping
frontend export; change the flow so that when meta is None for a given
subtopic_id you skip all further processing for that subtopic (e.g., early
continue/return) and do not perform the DB insert. Locate the block that unpacks
meta and the subsequent DB insertion code (references: meta, subtopic_id,
stream_code, subject_name, subtopic_name and the DB insert logic) and make the
missing-metadata branch bail out before any insert or downstream work.
| try: | ||
| content = generate_text(prompt) | ||
|
|
||
| if content.startswith('```markdown'): | ||
| content = content.replace('```markdown', '', 1) | ||
| if content.startswith('```'): | ||
| content = content.replace('```', '', 1) | ||
| if content.endswith('```'): | ||
| # 🤖 Generate theory | ||
| content = generate_text(prompt) or "" |
There was a problem hiding this comment.
Synchronous generate_text() blocks the async event loop.
generate_text() is a synchronous function (makes blocking HTTP calls via requests.post). Calling it directly in an async function blocks the event loop, negating any async benefits and preventing concurrent processing.
♻️ Wrap in run_in_executor or make generate_text async
Option 1: Run synchronous function in thread pool:
- content = generate_text(prompt) or ""
+ loop = asyncio.get_event_loop()
+ content = await loop.run_in_executor(None, generate_text, prompt) or ""Option 2 (preferred): Refactor generate_text to use aiohttp instead of requests, then await it directly.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try: | |
| content = generate_text(prompt) | |
| if content.startswith('```markdown'): | |
| content = content.replace('```markdown', '', 1) | |
| if content.startswith('```'): | |
| content = content.replace('```', '', 1) | |
| if content.endswith('```'): | |
| # 🤖 Generate theory | |
| content = generate_text(prompt) or "" | |
| try: | |
| # 🤖 Generate theory | |
| loop = asyncio.get_event_loop() | |
| content = await loop.run_in_executor(None, generate_text, prompt) or "" |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@generator/src/knowledge_utils.py` around lines 301 - 303, The call to the
synchronous generate_text(prompt) inside the async block blocks the event loop;
either (A) wrap the call in the event loop's executor and await it (e.g.,
content = await asyncio.get_running_loop().run_in_executor(None, generate_text,
prompt)) or (B, preferred) refactor generate_text to be async (use aiohttp
instead of requests) and then await generate_text(prompt) directly; update the
code where content = generate_text(prompt) is used and any callers of
generate_text to match the chosen approach.
| for (q_id,) in questions: | ||
| try: | ||
| parts = q_id.split('_') | ||
|
|
||
| if len(parts) >= 3: | ||
| path_stream = parts[0] | ||
| path_year = parts[1] | ||
| path_qno = parts[2] | ||
|
|
||
| q_path = f"questions/{path_year}/{path_qno}/" | ||
| q_list.append(q_path) | ||
| else: | ||
| logger.warning(f"Skipping malformed question ID in manifest: {q_id}") | ||
| logger.warning(f"[Manifest] Skipping malformed ID: {q_id}") |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Question ID parsing assumes specific format.
The code assumes question IDs follow a prefix_year_qno_... format. While error handling exists, the warning message could be more descriptive about the expected format to aid debugging.
♻️ Improve warning message clarity
else:
- logger.warning(f"[Manifest] Skipping malformed ID: {q_id}")
+ logger.warning(f"[Manifest] Skipping malformed ID: {q_id} (expected format: prefix_year_qno_...)")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for (q_id,) in questions: | |
| try: | |
| parts = q_id.split('_') | |
| if len(parts) >= 3: | |
| path_stream = parts[0] | |
| path_year = parts[1] | |
| path_qno = parts[2] | |
| q_path = f"questions/{path_year}/{path_qno}/" | |
| q_list.append(q_path) | |
| else: | |
| logger.warning(f"Skipping malformed question ID in manifest: {q_id}") | |
| logger.warning(f"[Manifest] Skipping malformed ID: {q_id}") | |
| for (q_id,) in questions: | |
| try: | |
| parts = q_id.split('_') | |
| if len(parts) >= 3: | |
| path_year = parts[1] | |
| path_qno = parts[2] | |
| q_path = f"questions/{path_year}/{path_qno}/" | |
| q_list.append(q_path) | |
| else: | |
| logger.warning("[Manifest] Skipping malformed ID: %s (expected format: prefix_year_qno_...)", q_id) |
🧰 Tools
🪛 Ruff (0.15.6)
[warning] 414-414: Logging statement uses f-string
(G004)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@generator/src/knowledge_utils.py` around lines 403 - 414, The current loop
parsing question IDs (variables q_id, parts) assumes a prefix_year_qno_...
format and emits a generic logger.warning when malformed; update the
logger.warning in the for loop that builds q_path (q_path, q_list) to include
the actual malformed q_id and the expected format (e.g., "expected
'prefix_year_qno[_...]'") and optionally an example, so the message becomes
descriptive enough to aid debugging; keep skipping behavior but make the warning
text include q_id and the expected pattern using the same logger.warning call.
|
Hi @maintainers It looks like the workflow requires approval to run. Could you please approve the workflow so CI checks can complete? Thanks! |
🚀 Overview
This PR introduces async batch processing for the classification pipeline to improve performance and scalability.
🔧 Changes Made
⚡ Impact
🧪 Testing
📌 Notes
This PR focuses on classification pipeline improvements. LLM optimization is handled in a separate PR.
Summary by CodeRabbit