Skip to content

Commit

Permalink
Add restarting state and restart api
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Oct 13, 2023
1 parent e5ec3f7 commit 250a676
Show file tree
Hide file tree
Showing 13 changed files with 222 additions and 20 deletions.
3 changes: 3 additions & 0 deletions arroyo-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ pub(crate) fn get_action(state: &str, running_desired: &bool) -> (String, Option
("Recovering", true) => ("Stop", Some(Checkpoint), InProgress),
("Recovering", false) => ("Stopping", Option::None, InProgress),

("Restarting", true) => ("Stop", Some(Checkpoint), InProgress),
("Restarting", false) => ("Stopping", Option::None, InProgress),

("Stopping", true) => ("Stopping", Some(Checkpoint), InProgress),
("Stopping", false) => ("Stopping", Option::None, InProgress),

Expand Down
2 changes: 2 additions & 0 deletions arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::pipelines::__path_post_pipeline;
use crate::pipelines::{
__path_delete_pipeline, __path_get_pipeline, __path_get_pipeline_jobs, __path_patch_pipeline,
__path_validate_query, __path_validate_udfs,
__path_restart_pipeline
};
use crate::rest::__path_ping;
use crate::rest_utils::{bad_request, log_and_map, ErrorResp};
Expand Down Expand Up @@ -142,6 +143,7 @@ pub(crate) fn to_micros(dt: OffsetDateTime) -> u64 {
validate_udfs,
post_pipeline,
patch_pipeline,
restart_pipeline,
get_pipeline,
delete_pipeline,
get_pipelines,
Expand Down
14 changes: 12 additions & 2 deletions arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,8 @@ pub async fn post_pipeline(
"is_preview": preview,
"job_id": job_id,
"parallelism": pipeline_post.parallelism,
"has_udfs": !pipeline_post.udfs.map(|e| e.is_empty()).unwrap_or(false),
"has_udfs": pipeline_post.udfs.map(|e| !e.is_empty() && !e[0].definition.trim().is_empty())
.unwrap_or(false),
"features": program.features(),
}),
);
Expand Down Expand Up @@ -730,7 +731,16 @@ pub async fn get_pipelines(
has_more,
data: pipelines
.into_iter()
.map(|p| p.try_into().unwrap())
.filter_map(|p| {
let id = p.pub_id.clone();
match p.try_into() {
Ok(p) => Some(p),
Err(e) => {
warn!("Failed to map pipeline {} from database: {:?}", id, e);
None
}
}
})
.collect(),
}))
}
Expand Down
35 changes: 35 additions & 0 deletions arroyo-console/src/gen/api-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ export interface paths {
*/
get: operations["get_pipeline_jobs"];
};
"/v1/pipelines/{id}/restart": {
/**
* Restart a pipeline
* @description Restart a pipeline
*/
post: operations["restart_pipeline"];
};
"/v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints": {
/**
* List a job's checkpoints
Expand Down Expand Up @@ -427,6 +434,9 @@ export interface components {
query: string;
udfs?: (components["schemas"]["Udf"])[] | null;
};
PipelineRestart: {
force?: boolean | null;
};
/** @enum {string} */
PrimitiveType: "int32" | "int64" | "u_int32" | "u_int64" | "f32" | "f64" | "bool" | "string" | "bytes" | "unix_millis" | "unix_micros" | "unix_nanos" | "date_time" | "json";
QueryValidationResult: {
Expand Down Expand Up @@ -844,6 +854,31 @@ export interface operations {
};
};
};
/**
* Restart a pipeline
* @description Restart a pipeline
*/
restart_pipeline: {
parameters: {
path: {
/** @description Pipeline id */
id: string;
};
};
requestBody: {
content: {
"application/json": components["schemas"]["PipelineRestart"];
};
};
responses: {
/** @description Updated pipeline */
200: {
content: {
"application/json": components["schemas"]["Pipeline"];
};
};
};
};
/**
* List a job's checkpoints
* @description List a job's checkpoints
Expand Down
9 changes: 9 additions & 0 deletions arroyo-console/src/lib/data_fetching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,14 @@ export const usePipeline = (pipelineId?: string, refresh: boolean = false) => {
await mutate();
};

const restartPipeline = async () => {
await post('/v1/pipelines/{id}/restart', {
params: { path: { id: pipelineId! } },
body: {},
});
await mutate();
};

const deletePipeline = async () => {
const { error } = await del('/v1/pipelines/{id}', {
params: { path: { id: pipelineId! } },
Expand All @@ -455,6 +463,7 @@ export const usePipeline = (pipelineId?: string, refresh: boolean = false) => {
pipelineLoading: isLoading,
updatePipeline,
deletePipeline,
restartPipeline,
};
};

Expand Down
36 changes: 24 additions & 12 deletions arroyo-console/src/routes/pipelines/PipelineDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export function PipelineDetails() {

let { pipelineId: id } = useParams();

const { pipeline, pipelineError, updatePipeline } = usePipeline(id, true);
const { pipeline, pipelineError, updatePipeline, restartPipeline } = usePipeline(id, true);
const { jobs, jobsError } = usePipelineJobs(id, true);
const job = jobs?.length ? jobs[0] : undefined;
const { checkpoints } = useJobCheckpoints(id, job?.id);
Expand Down Expand Up @@ -256,17 +256,29 @@ export function PipelineDetails() {
let actionButton = <></>;
if (pipeline) {
editPipelineButton = <Button onClick={onConfigModalOpen}>Edit</Button>;
actionButton = (
<Button
isDisabled={pipeline.action == null}
onClick={async () => {
await updateJobState(pipeline.action!);
}}
>
{pipeline.actionInProgress ? <Spinner size="xs" mr={2} /> : null}
{pipeline.actionText}
</Button>
);
if (job.state == 'Failed') {
actionButton = (
<Button
onClick={async () => {
await restartPipeline();
}}
>
Restart
</Button>
);
} else {
actionButton = (
<Button
isDisabled={pipeline.action == null}
onClick={async () => {
await updateJobState(pipeline.action!);
}}
>
{pipeline.actionInProgress ? <Spinner size="xs" mr={2} /> : null}
{pipeline.actionText}
</Button>
);
}
}

const headerArea = (
Expand Down
3 changes: 2 additions & 1 deletion arroyo-controller/queries/controller_queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ SET state = :state,
restarts = :restarts,
pipeline_path = :pipeline_path,
wasm_path = :wasm_path,
run_id = :run_id
run_id = :run_id,
restart_nonce = :restart_nonce
WHERE id = :job_id;

--! get_program
Expand Down
5 changes: 3 additions & 2 deletions arroyo-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ include!(concat!(env!("OUT_DIR"), "/controller-sql.rs"));

use crate::schedulers::{nomad::NomadScheduler, NodeScheduler, ProcessScheduler, Scheduler};
use types::public::LogLevel;
use types::public::{StopMode, RestartMode};
use types::public::{RestartMode, StopMode};

pub const CHECKPOINTS_TO_KEEP: u32 = 5;

Expand Down Expand Up @@ -116,6 +116,7 @@ impl JobStatus {
&self.pipeline_path,
&self.wasm_path,
&self.run_id,
&self.restart_nonce,
&self.id,
)
.await
Expand Down Expand Up @@ -631,7 +632,7 @@ impl ControllerServer {
restarts: p.restarts,
pipeline_path: p.pipeline_path,
wasm_path: p.wasm_path,
restart_nonce: p.status_restart_nonce
restart_nonce: p.status_restart_nonce,
};

if let Some(sm) = jobs.get_mut(&config.id) {
Expand Down
36 changes: 35 additions & 1 deletion arroyo-controller/src/states/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod compiling;
mod finishing;
mod recovering;
mod rescaling;
mod restarting;
mod running;
mod scheduling;
mod stopping;
Expand Down Expand Up @@ -106,7 +107,12 @@ impl State for Failed {

async fn next(self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError> {
handle_terminal(ctx).await;
Ok(Transition::Stop)
if ctx.config.restart_nonce != ctx.status.restart_nonce {
// the user has requested a restart
Ok(Transition::next(*self, Compiling {}))
} else {
Ok(Transition::Stop)
}
}

fn is_terminal(&self) -> bool {
Expand Down Expand Up @@ -209,6 +215,15 @@ impl TransitionTo<Scheduling> for Rescaling {
}

impl TransitionTo<Compiling> for Recovering {}
impl TransitionTo<Compiling> for Failed {
fn update_status(&self) -> TransitionFn {
Box::new(|ctx| {
ctx.status.restart_nonce = ctx.config.restart_nonce;
ctx.status.restarts = 0;
ctx.status.failure_message = None;
})
}
}

fn done_transition(ctx: &mut JobContext) {
ctx.status.finish_time = Some(OffsetDateTime::now_utc());
Expand All @@ -234,6 +249,24 @@ impl TransitionTo<Finished> for Finishing {
}
}

impl TransitionTo<Restarting> for Running {
fn update_status(&self) -> TransitionFn {
Box::new(|ctx| {
ctx.status.restart_nonce = ctx.config.restart_nonce;
})
}
}
impl TransitionTo<Restarting> for Restarting {}
impl TransitionTo<Scheduling> for Restarting {
fn update_status(&self) -> TransitionFn {
Box::new(|ctx| {
ctx.status.run_id += 1;
})
}
}
impl TransitionTo<Stopping> for Restarting {}
impl TransitionTo<CheckpointStopping> for Restarting {}

// Macro to handle stopping behavior from a running state, where we want to
// support checkpoint stopping
macro_rules! stop_if_desired_running {
Expand Down Expand Up @@ -311,6 +344,7 @@ macro_rules! stop_if_desired_non_running {
};
}

use crate::states::restarting::Restarting;
pub(crate) use stop_if_desired_non_running;
pub(crate) use stop_if_desired_running;

Expand Down
4 changes: 2 additions & 2 deletions arroyo-controller/src/states/recovering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct Recovering {}

impl Recovering {
// tries, with increasing levels of force, to tear down the existing cluster
async fn cleanup<'a>(&mut self, ctx: &mut JobContext<'a>) -> anyhow::Result<()> {
pub async fn cleanup<'a>(ctx: &mut JobContext<'a>) -> anyhow::Result<()> {
let job_controller = ctx.job_controller.as_mut().unwrap();

// first try to stop it gracefully
Expand Down Expand Up @@ -92,7 +92,7 @@ impl State for Recovering {

async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError> {
// tear down the existing cluster
if let Err(e) = self.cleanup(ctx).await {
if let Err(e) = Self::cleanup(ctx).await {
return Err(ctx.retryable(self, "failed to tear down existing cluster", e, 10));
}

Expand Down
83 changes: 83 additions & 0 deletions arroyo-controller/src/states/restarting.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use crate::states::recovering::Recovering;
use crate::states::scheduling::Scheduling;
use crate::states::stop_if_desired_non_running;
use crate::types::public::RestartMode;
use crate::JobMessage;

use super::{JobContext, State, StateError, Transition};

#[derive(Debug)]
pub struct Restarting {
pub mode: RestartMode,
}

#[async_trait::async_trait]
impl State for Restarting {
fn name(&self) -> &'static str {
"Restarting"
}

async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError> {
let job_controller = ctx.job_controller.as_mut().unwrap();

match self.mode {
RestartMode::safe => {
if let Err(e) = job_controller.checkpoint(true).await {
return Err(ctx.retryable(self, "failed to initiate final checkpoint", e, 10));
}

loop {
match job_controller.checkpoint_finished().await {
Ok(done) => {
if done && job_controller.finished() {
return Ok(Transition::next(*self, Scheduling {}));
}
}
Err(e) => {
return Err(ctx.retryable(
self,
"failed while monitoring final checkpoint",
e,
10,
));
}
}

match ctx.rx.recv().await.expect("channel closed while receiving") {
JobMessage::RunningMessage(msg) => {
if let Err(e) = job_controller.handle_message(msg).await {
return Err(ctx.retryable(
self,
"failed while waiting for job finish",
e,
10,
));
}
}
JobMessage::ConfigUpdate(c) => {
if c.restart_mode == RestartMode::force {
return Ok(Transition::next(
*self,
Restarting {
mode: RestartMode::force,
},
));
}
stop_if_desired_non_running!(self, &c);
}
_ => {
// ignore other messages
}
}
}
}
RestartMode::force => {
if let Err(e) = Recovering::cleanup(ctx).await {
return Err(ctx.retryable(self, "failed to tear down existing cluster", e, 10));
}

Ok(Transition::next(*self, Scheduling {}))
}
}
}
}

0 comments on commit 250a676

Please sign in to comment.