Skip to content

Commit

Permalink
Handle GitLab job cancellation
Browse files Browse the repository at this point in the history
When a job is cancelled on GitLab, the corresponding handler futures
will be dropped. If the handler was already created, the cleanup()
method will still be called, giving the opportunity to clean up after
anything that may have been interrupted.

Signed-off-by: Ryan Gonzalez <ryan.gonzalez@collabora.com>
  • Loading branch information
refi64 committed Sep 9, 2022
1 parent df9fff9 commit bdba64c
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 43 deletions.
14 changes: 9 additions & 5 deletions gitlab-runner-mock/src/api/trace.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use wiremock::ResponseTemplate;
use wiremock::{Request, Respond};

use crate::GitlabRunnerMock;
use crate::{GitlabRunnerMock, MockJobState};

pub(crate) struct JobTraceResponder {
mock: GitlabRunnerMock,
Expand Down Expand Up @@ -43,12 +43,16 @@ impl Respond for JobTraceResponder {
if let Some(job) = self.mock.find_job(id) {
if token != job.token() {
ResponseTemplate::new(403)
} else if job.state() != MockJobState::Running {
ResponseTemplate::new(403).insert_header("Job-Status", &*job.state().to_string())
} else {
match job.append_log(request.body.clone(), start, end) {
Ok(()) => ResponseTemplate::new(202).insert_header(
"X-GitLab-Trace-Update-Interval",
&*self.mock.update_interval().to_string(),
),
Ok(()) => ResponseTemplate::new(202)
.insert_header(
"X-GitLab-Trace-Update-Interval",
&*self.mock.update_interval().to_string(),
)
.insert_header("Job-Status", &*job.state().to_string()),
Err(e) => ResponseTemplate::new(416).set_body_string(format!("{:?}", e)),
}
}
Expand Down
9 changes: 7 additions & 2 deletions gitlab-runner-mock/src/api/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl Respond for JobUpdateResponder {
if r.token != job.token() {
ResponseTemplate::new(403)
} else {
match (job.state(), r.state) {
let r = match (job.state(), r.state) {
(MockJobState::Running, MockJobState::Success) => {
job.update_state(r.state);
ResponseTemplate::new(200)
Expand All @@ -50,8 +50,13 @@ impl Respond for JobUpdateResponder {
job.update_state(r.state);
ResponseTemplate::new(200)
}
(current_state, _) if current_state != MockJobState::Running => {
ResponseTemplate::new(403)
}
_ => panic!("Invalid state change"),
}
};

r.append_header("Job-Status", &*job.state().to_string())
}
} else {
ResponseTemplate::new(404)
Expand Down
25 changes: 25 additions & 0 deletions gitlab-runner-mock/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum MockJobState {
Running,
Success,
Failed,
Cancelled,
}

impl MockJobState {
Expand All @@ -22,6 +23,23 @@ impl MockJobState {
}
}

impl ToString for MockJobState {
fn to_string(&self) -> String {
match *self {
MockJobState::Pending => "pending",
MockJobState::Running => "running",
MockJobState::Success => "success",
MockJobState::Failed => "failed",
// The spelling mismatch of "cancelled" vs "canceled" is
// intentional: this crate, as well as tokio_util, already use
// "cancelled", so using it here keeps the spelling consistent, even
// if it's not *identical* to the exact GitLab job status.
MockJobState::Cancelled => "canceled",
}
.to_owned()
}
}

#[derive(Debug, Error)]
pub enum LogError {
#[error("Incorrect range start")]
Expand Down Expand Up @@ -179,6 +197,13 @@ impl MockJob {
inner.artifact.clone()
}

pub fn cancel(&self) {
let mut inner = self.inner.lock().unwrap();
assert!(!inner.state.finished(), "Job is already finished");
inner.state_updates += 1;
inner.state = MockJobState::Cancelled;
}

pub(crate) fn update_state(&self, state: MockJobState) {
let mut inner = self.inner.lock().unwrap();
inner.state_updates += 1;
Expand Down
20 changes: 20 additions & 0 deletions gitlab-runner/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ where
}

const GITLAB_TRACE_UPDATE_INTERVAL: &str = "X-GitLab-Trace-Update-Interval";
const JOB_STATUS: &str = "Job-Status";

#[derive(Debug, Clone, Serialize)]
struct FeaturesInfo {
Expand Down Expand Up @@ -193,6 +194,8 @@ impl JobResponse {
pub enum Error {
#[error("Unexpected reply code {0}")]
UnexpectedStatus(StatusCode),
#[error("Job cancelled")]
JobCancelled,
#[error("Request failure {0}")]
Request(#[from] reqwest::Error),
#[error("Failed to write to destination {0}")]
Expand Down Expand Up @@ -248,6 +251,18 @@ impl Client {
}
}

fn check_for_job_cancellation(&self, response: &reqwest::Response) -> Result<(), Error> {
let job_status = response
.headers()
.get(JOB_STATUS)
.and_then(|v| v.to_str().ok());
if let Some("canceled") = job_status {
Err(Error::JobCancelled)
} else {
Ok(())
}
}

pub async fn update_job(
&self,
id: u64,
Expand All @@ -263,6 +278,9 @@ impl Client {
let update = JobUpdate { token, state };

let r = self.client.put(url).json(&update).send().await?;

self.check_for_job_cancellation(&r)?;

let trace_update_interval = r
.headers()
.get(GITLAB_TRACE_UPDATE_INTERVAL)
Expand Down Expand Up @@ -307,6 +325,8 @@ impl Client {
.send()
.await?;

self.check_for_job_cancellation(&r)?;

let trace_update_interval = r
.headers()
.get(GITLAB_TRACE_UPDATE_INTERVAL)
Expand Down
127 changes: 91 additions & 36 deletions gitlab-runner/src/run.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use bytes::Bytes;
use std::future::Future;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::time::{interval_at, Duration, Instant, Interval, MissedTickBehavior};
use tokio_util::sync::CancellationToken;
use tracing::instrument::WithSubscriber;
use tracing::warn;
use tracing::Instrument;
Expand All @@ -13,42 +15,42 @@ use crate::runlist::{RunList, RunListEntry};
use crate::uploader::Uploader;
use crate::{JobHandler, JobResult, Phase};

async fn run<F, J, Ret>(
job: Job,
async fn cancellable_wait<T, Fut>(future: Fut, cancel_token: &CancellationToken) -> Result<T, ()>
where
Fut: Future<Output = Result<T, ()>>,
{
tokio::select! {
ret = future => ret,
_ = cancel_token.cancelled() => Err(()),
}
}

async fn run_steps_and_upload<J>(
client: Client,
response: Arc<JobResponse>,
process: F,
build_dir: PathBuf,
handler: &mut J,
build_dir: &Path,
) -> JobResult
where
F: FnOnce(Job) -> Ret,
J: JobHandler,
Ret: Future<Output = Result<J, ()>>,
{
if let Err(e) = tokio::fs::create_dir(&build_dir).await {
job.trace(format!("Failed to remove build dir: {}", e));
return Err(());
}
let mut handler = process(job).await?;

let script = response.step(Phase::Script).ok_or(())?;
// TODO handle timeout
let script_result = handler.step(&script.script, Phase::Script).await;

if let Some(after) = response.step(Phase::AfterScript) {
/* gitlab ignores the after_script result; so do the same */
// gitlab ignores the after_script result; so do the same
let _ = handler.step(&after.script, Phase::AfterScript).await;
}

//let upload = match response.
let upload = response.artifacts.get(0).map_or(false, |a| match a.when {
ArtifactWhen::Always => true,
ArtifactWhen::OnSuccess => script_result.is_ok(),
ArtifactWhen::OnFailure => script_result.is_err(),
});

let r = if upload {
if let Ok(mut uploader) = Uploader::new(client, &build_dir, response) {
if upload {
if let Ok(mut uploader) = Uploader::new(client, build_dir, response) {
let r = handler.upload_artifacts(&mut uploader).await;
if r.is_ok() {
uploader.upload().await.and(script_result)
Expand All @@ -61,7 +63,33 @@ where
}
} else {
script_result
};
}
}

async fn run<F, J, Ret>(
job: Job,
client: Client,
response: Arc<JobResponse>,
process: F,
build_dir: PathBuf,
cancel_token: CancellationToken,
) -> JobResult
where
F: FnOnce(Job) -> Ret,
J: JobHandler,
Ret: Future<Output = Result<J, ()>>,
{
if let Err(e) = tokio::fs::create_dir(&build_dir).await {
job.trace(format!("Failed to remove build dir: {}", e));
return Err(());
}
let mut handler = cancellable_wait(process(job), &cancel_token).await?;

let r = cancellable_wait(
run_steps_and_upload(client, response, &mut handler, &build_dir),
&cancel_token,
)
.await;

handler.cleanup().await;

Expand Down Expand Up @@ -109,18 +137,27 @@ impl Run {
interval
}

async fn update(&self, state: JobState) -> Result<(), crate::client::Error> {
self.client
async fn update(&self, state: JobState, cancel_token: &CancellationToken) {
match self
.client
.update_job(self.response.id, &self.response.token, state)
.await?;
Ok(())
.await
{
Ok(_reply) => (),
Err(crate::client::Error::JobCancelled) => cancel_token.cancel(),
Err(err) => warn!("Failed to update job status: {:?}", err),
}
}

async fn send_trace(&mut self, buf: Bytes) -> Result<Option<Duration>, crate::client::Error> {
async fn send_trace(
&mut self,
buf: Bytes,
cancel_token: &CancellationToken,
) -> Option<Duration> {
assert!(!buf.is_empty());
let len = buf.len();

let reply = self
match self
.client
.trace(
self.response.id,
Expand All @@ -129,9 +166,21 @@ impl Run {
self.log_offset,
len,
)
.await?;
self.log_offset += len;
Ok(reply.trace_update_interval)
.await
{
Ok(reply) => {
self.log_offset += len;
reply.trace_update_interval
}
Err(crate::client::Error::JobCancelled) => {
cancel_token.cancel();
None
}
Err(err) => {
warn!("Failed to send job trace: {:?}", err);
None
}
}
}

#[tracing::instrument(skip(self, process,build_dir),fields(gitlab.job=self.response.id))]
Expand All @@ -141,6 +190,8 @@ impl Run {
J: JobHandler + 'static,
Ret: Future<Output = Result<J, ()>> + Send + 'static,
{
let cancel_token = CancellationToken::new();

let job = Job::new(
self.client.clone(),
self.response.clone(),
Expand All @@ -154,6 +205,7 @@ impl Run {
self.response.clone(),
process,
build_dir,
cancel_token.clone(),
)
.in_current_span()
.with_current_subscriber(),
Expand All @@ -168,15 +220,15 @@ impl Run {
let now = Instant::now();
if let Some(buf) = self.jobdata.split_trace() {
// TODO be resiliant against send errors
if let Ok(Some(interval)) = self.send_trace(buf).await {
if let Some(interval) = self.send_trace(buf, &cancel_token).await {
if interval != self.interval.period() {
self.interval = Self::create_interval(now, interval);
}
}
self.last_alive = now;
} else if now - self.last_alive > KEEPALIVE_INTERVAL {
} else if now - self.last_alive > KEEPALIVE_INTERVAL {
// In case of errors another update will be sent at the next tick
let _ = self.update(JobState::Running).await;
self.update(JobState::Running, &cancel_token).await;
self.last_alive = now;
}
},
Expand All @@ -186,14 +238,17 @@ impl Run {

// Send the remaining trace buffer back to gitlab.
if let Some(buf) = self.jobdata.split_trace() {
let _ = self.send_trace(buf).await.ok();
self.send_trace(buf, &cancel_token).await;
}

let state = match result {
Ok(Ok(_)) => JobState::Success,
Ok(Err(_)) => JobState::Failed,
Err(_) => JobState::Failed,
};
self.update(state).await.expect("Failed to update");
// Don't bother updating the status if cancelled, since it will just fail.
if !cancel_token.is_cancelled() {
let state = match result {
Ok(Ok(_)) => JobState::Success,
Ok(Err(_)) => JobState::Failed,
Err(_) => JobState::Failed,
};
self.update(state, &cancel_token).await;
}
}
}

0 comments on commit bdba64c

Please sign in to comment.