Skip to content

Implement Scheduler task with dual-channel coordination\n\nTask ID: task-2.1-implement-scheduler#356

Draft
eric-wang-1990 wants to merge 6 commits intomainfrom
stack/pr-phase2-core-pipeline
Draft

Implement Scheduler task with dual-channel coordination\n\nTask ID: task-2.1-implement-scheduler#356
eric-wang-1990 wants to merge 6 commits intomainfrom
stack/pr-phase2-core-pipeline

Conversation

@eric-wang-1990
Copy link
Collaborator

@eric-wang-1990 eric-wang-1990 commented Mar 18, 2026

🥞 Stacked PR

Use this link to review incremental changes.


What's Changed

Please fill in a description of the changes here.

This contains breaking changes.

Closes #NNN.

@eric-wang-1990
Copy link
Collaborator Author

[Critical] String-based 403/401 detection is fragile

worker.rs detects auth/expiry errors by checking error_str.contains("401") / contains("403"). This can false-positive on error messages that happen to contain those digit sequences (e.g. "error code 4035"), and will break silently if the error format ever changes.

Prefer a structured check. Since DatabricksErrorHelper::io() produces errors with the message "HTTP 403 - ...", at minimum scope the match more tightly:

let is_auth_error = error_str.contains("HTTP 401")
    || error_str.contains("HTTP 403");

Long-term, add a typed error variant (e.g. ErrorKind::HttpStatus(u16)) so callers can match on status code directly.


This comment was generated with GitHub MCP.

@eric-wang-1990
Copy link
Collaborator Author

[High] Scheduler's bounded result_channel send has no timeout or cancellation escape

If the consumer stops reading (e.g. drops or panics mid-stream without cancelling), result_channel.send(handle).await in the scheduler will block forever since the channel is bounded. The scheduler never checks the cancellation token while blocked on this send.

Wrap the send in a tokio::select! with the cancellation token:

tokio::select! {
    res = result_channel.send(handle) => {
        if res.is_err() { /* consumer dropped */ break; }
    }
    _ = cancel_token.cancelled() => break,
}

This comment was generated with GitHub MCP.

@eric-wang-1990
Copy link
Collaborator Author

[High] Empty-batch chunks cause an infinite loop in the consumer

In consumer.rs, when a chunk returns an empty batches vec the code logs a warning and continues the outer loop. But chunk_index is not advanced, so the consumer re-fetches the same chunk handle repeatedly, looping forever on CPU.

Either treat empty batches as an error, or advance chunk_index before continuing:

if batches.is_empty() {
    warn!("Chunk {} returned empty batches", chunk_index);
    return Err(DatabricksErrorHelper::invalid_state()
        .message(format!("Chunk {} returned no Arrow batches", chunk_index)));
}

This comment was generated with GitHub MCP.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant