Skip to content

Commit

Permalink
Add restarting state and restart api
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Oct 13, 2023
1 parent e5ec3f7 commit ad25712
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 27 deletions.
3 changes: 3 additions & 0 deletions arroyo-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ pub(crate) fn get_action(state: &str, running_desired: &bool) -> (String, Option
("Recovering", true) => ("Stop", Some(Checkpoint), InProgress),
("Recovering", false) => ("Stopping", Option::None, InProgress),

("Restarting", true) => ("Stop", Some(Checkpoint), InProgress),
("Restarting", false) => ("Stopping", Option::None, InProgress),

("Stopping", true) => ("Stopping", Some(Checkpoint), InProgress),
("Stopping", false) => ("Stopping", Option::None, InProgress),

Expand Down
10 changes: 6 additions & 4 deletions arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::pipelines::__path_get_pipelines;
use crate::pipelines::__path_post_pipeline;
use crate::pipelines::{
__path_delete_pipeline, __path_get_pipeline, __path_get_pipeline_jobs, __path_patch_pipeline,
__path_validate_query, __path_validate_udfs,
__path_restart_pipeline, __path_validate_query, __path_validate_udfs,
};
use crate::rest::__path_ping;
use crate::rest_utils::{bad_request, log_and_map, ErrorResp};
Expand All @@ -34,9 +34,10 @@ use arroyo_rpc::types::{
JobLogMessageCollection, Metric, MetricGroup, MetricNames, OperatorCheckpointGroup,
OperatorCheckpointGroupCollection, OperatorMetricGroup, OutputData, PaginationQueryParams,
Pipeline, PipelineCollection, PipelineEdge, PipelineGraph, PipelineNode, PipelinePatch,
PipelinePost, PrimitiveType, QueryValidationResult, PipelineRestart, SchemaDefinition, SourceField,
SourceFieldType, StopType as StopTypeRest, StructType, SubtaskCheckpointGroup, SubtaskMetrics,
TestSourceMessage, Udf, UdfLanguage, UdfValidationResult, ValidateQueryPost, ValidateUdfsPost,
PipelinePost, PipelineRestart, PrimitiveType, QueryValidationResult, SchemaDefinition,
SourceField, SourceFieldType, StopType as StopTypeRest, StructType, SubtaskCheckpointGroup,
SubtaskMetrics, TestSourceMessage, Udf, UdfLanguage, UdfValidationResult, ValidateQueryPost,
ValidateUdfsPost,
};

mod cloud;
Expand Down Expand Up @@ -142,6 +143,7 @@ pub(crate) fn to_micros(dt: OffsetDateTime) -> u64 {
validate_udfs,
post_pipeline,
patch_pipeline,
restart_pipeline,
get_pipeline,
delete_pipeline,
get_pipelines,
Expand Down
18 changes: 14 additions & 4 deletions arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use arroyo_rpc::grpc::{CheckUdfsReq, ValidationResult};
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use arroyo_rpc::types::{
Job, JobCollection, PaginationQueryParams, Pipeline, PipelineCollection, PipelineEdge,
PipelineGraph, PipelineNode, PipelinePatch, PipelinePost, QueryValidationResult, StopType,
UdfValidationResult, ValidateQueryPost, ValidateUdfsPost, PipelineRestart
PipelineGraph, PipelineNode, PipelinePatch, PipelinePost, PipelineRestart,
QueryValidationResult, StopType, UdfValidationResult, ValidateQueryPost, ValidateUdfsPost,
};
use arroyo_server_common::log_event;
use arroyo_sql::{ArroyoSchemaProvider, SqlConfig};
Expand Down Expand Up @@ -532,7 +532,8 @@ pub async fn post_pipeline(
"is_preview": preview,
"job_id": job_id,
"parallelism": pipeline_post.parallelism,
"has_udfs": !pipeline_post.udfs.map(|e| e.is_empty()).unwrap_or(false),
"has_udfs": pipeline_post.udfs.map(|e| !e.is_empty() && !e[0].definition.trim().is_empty())
.unwrap_or(false),
"features": program.features(),
}),
);
Expand Down Expand Up @@ -730,7 +731,16 @@ pub async fn get_pipelines(
has_more,
data: pipelines
.into_iter()
.map(|p| p.try_into().unwrap())
.filter_map(|p| {
let id = p.pub_id.clone();
match p.try_into() {
Ok(p) => Some(p),
Err(e) => {
warn!("Failed to map pipeline {} from database: {:?}", id, e);
None
}
}
})
.collect(),
}))
}
Expand Down
2 changes: 1 addition & 1 deletion arroyo-api/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::jobs::{
use crate::metrics::get_operator_metric_groups;
use crate::pipelines::{
delete_pipeline, get_pipeline, get_pipeline_jobs, get_pipelines, patch_pipeline, post_pipeline,
validate_query, validate_udfs, restart_pipeline
restart_pipeline, validate_query, validate_udfs,
};
use crate::rest_utils::not_found;
use crate::ApiDoc;
Expand Down
35 changes: 35 additions & 0 deletions arroyo-console/src/gen/api-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ export interface paths {
*/
get: operations["get_pipeline_jobs"];
};
"/v1/pipelines/{id}/restart": {
/**
* Restart a pipeline
* @description Restart a pipeline
*/
post: operations["restart_pipeline"];
};
"/v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints": {
/**
* List a job's checkpoints
Expand Down Expand Up @@ -427,6 +434,9 @@ export interface components {
query: string;
udfs?: (components["schemas"]["Udf"])[] | null;
};
PipelineRestart: {
force?: boolean | null;
};
/** @enum {string} */
PrimitiveType: "int32" | "int64" | "u_int32" | "u_int64" | "f32" | "f64" | "bool" | "string" | "bytes" | "unix_millis" | "unix_micros" | "unix_nanos" | "date_time" | "json";
QueryValidationResult: {
Expand Down Expand Up @@ -844,6 +854,31 @@ export interface operations {
};
};
};
/**
* Restart a pipeline
* @description Restart a pipeline
*/
restart_pipeline: {
parameters: {
path: {
/** @description Pipeline id */
id: string;
};
};
requestBody: {
content: {
"application/json": components["schemas"]["PipelineRestart"];
};
};
responses: {
/** @description Updated pipeline */
200: {
content: {
"application/json": components["schemas"]["Pipeline"];
};
};
};
};
/**
* List a job's checkpoints
* @description List a job's checkpoints
Expand Down
9 changes: 9 additions & 0 deletions arroyo-console/src/lib/data_fetching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,14 @@ export const usePipeline = (pipelineId?: string, refresh: boolean = false) => {
await mutate();
};

const restartPipeline = async () => {
await post('/v1/pipelines/{id}/restart', {
params: { path: { id: pipelineId! } },
body: {},
});
await mutate();
};

const deletePipeline = async () => {
const { error } = await del('/v1/pipelines/{id}', {
params: { path: { id: pipelineId! } },
Expand All @@ -455,6 +463,7 @@ export const usePipeline = (pipelineId?: string, refresh: boolean = false) => {
pipelineLoading: isLoading,
updatePipeline,
deletePipeline,
restartPipeline,
};
};

Expand Down
36 changes: 24 additions & 12 deletions arroyo-console/src/routes/pipelines/PipelineDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export function PipelineDetails() {

let { pipelineId: id } = useParams();

const { pipeline, pipelineError, updatePipeline } = usePipeline(id, true);
const { pipeline, pipelineError, updatePipeline, restartPipeline } = usePipeline(id, true);
const { jobs, jobsError } = usePipelineJobs(id, true);
const job = jobs?.length ? jobs[0] : undefined;
const { checkpoints } = useJobCheckpoints(id, job?.id);
Expand Down Expand Up @@ -256,17 +256,29 @@ export function PipelineDetails() {
let actionButton = <></>;
if (pipeline) {
editPipelineButton = <Button onClick={onConfigModalOpen}>Edit</Button>;
actionButton = (
<Button
isDisabled={pipeline.action == null}
onClick={async () => {
await updateJobState(pipeline.action!);
}}
>
{pipeline.actionInProgress ? <Spinner size="xs" mr={2} /> : null}
{pipeline.actionText}
</Button>
);
if (job.state == 'Failed') {
actionButton = (
<Button
onClick={async () => {
await restartPipeline();
}}
>
Restart
</Button>
);
} else {
actionButton = (
<Button
isDisabled={pipeline.action == null}
onClick={async () => {
await updateJobState(pipeline.action!);
}}
>
{pipeline.actionInProgress ? <Spinner size="xs" mr={2} /> : null}
{pipeline.actionText}
</Button>
);
}
}

const headerArea = (
Expand Down
3 changes: 2 additions & 1 deletion arroyo-controller/queries/controller_queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ SET state = :state,
restarts = :restarts,
pipeline_path = :pipeline_path,
wasm_path = :wasm_path,
run_id = :run_id
run_id = :run_id,
restart_nonce = :restart_nonce
WHERE id = :job_id;

--! get_program
Expand Down
5 changes: 3 additions & 2 deletions arroyo-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ include!(concat!(env!("OUT_DIR"), "/controller-sql.rs"));

use crate::schedulers::{nomad::NomadScheduler, NodeScheduler, ProcessScheduler, Scheduler};
use types::public::LogLevel;
use types::public::{StopMode, RestartMode};
use types::public::{RestartMode, StopMode};

pub const CHECKPOINTS_TO_KEEP: u32 = 5;

Expand Down Expand Up @@ -116,6 +116,7 @@ impl JobStatus {
&self.pipeline_path,
&self.wasm_path,
&self.run_id,
&self.restart_nonce,
&self.id,
)
.await
Expand Down Expand Up @@ -631,7 +632,7 @@ impl ControllerServer {
restarts: p.restarts,
pipeline_path: p.pipeline_path,
wasm_path: p.wasm_path,
restart_nonce: p.status_restart_nonce
restart_nonce: p.status_restart_nonce,
};

if let Some(sm) = jobs.get_mut(&config.id) {
Expand Down
36 changes: 35 additions & 1 deletion arroyo-controller/src/states/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod compiling;
mod finishing;
mod recovering;
mod rescaling;
mod restarting;
mod running;
mod scheduling;
mod stopping;
Expand Down Expand Up @@ -106,7 +107,12 @@ impl State for Failed {

async fn next(self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError> {
handle_terminal(ctx).await;
Ok(Transition::Stop)
if ctx.config.restart_nonce != ctx.status.restart_nonce {
// the user has requested a restart
Ok(Transition::next(*self, Compiling {}))
} else {
Ok(Transition::Stop)
}
}

fn is_terminal(&self) -> bool {
Expand Down Expand Up @@ -209,6 +215,15 @@ impl TransitionTo<Scheduling> for Rescaling {
}

impl TransitionTo<Compiling> for Recovering {}
impl TransitionTo<Compiling> for Failed {
fn update_status(&self) -> TransitionFn {
Box::new(|ctx| {
ctx.status.restart_nonce = ctx.config.restart_nonce;
ctx.status.restarts = 0;
ctx.status.failure_message = None;
})
}
}

fn done_transition(ctx: &mut JobContext) {
ctx.status.finish_time = Some(OffsetDateTime::now_utc());
Expand All @@ -234,6 +249,24 @@ impl TransitionTo<Finished> for Finishing {
}
}

impl TransitionTo<Restarting> for Running {
fn update_status(&self) -> TransitionFn {
Box::new(|ctx| {
ctx.status.restart_nonce = ctx.config.restart_nonce;
})
}
}
impl TransitionTo<Restarting> for Restarting {}
impl TransitionTo<Scheduling> for Restarting {
fn update_status(&self) -> TransitionFn {
Box::new(|ctx| {
ctx.status.run_id += 1;
})
}
}
impl TransitionTo<Stopping> for Restarting {}
impl TransitionTo<CheckpointStopping> for Restarting {}

// Macro to handle stopping behavior from a running state, where we want to
// support checkpoint stopping
macro_rules! stop_if_desired_running {
Expand Down Expand Up @@ -311,6 +344,7 @@ macro_rules! stop_if_desired_non_running {
};
}

use crate::states::restarting::Restarting;
pub(crate) use stop_if_desired_non_running;
pub(crate) use stop_if_desired_running;

Expand Down
4 changes: 2 additions & 2 deletions arroyo-controller/src/states/recovering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct Recovering {}

impl Recovering {
// tries, with increasing levels of force, to tear down the existing cluster
async fn cleanup<'a>(&mut self, ctx: &mut JobContext<'a>) -> anyhow::Result<()> {
pub async fn cleanup<'a>(ctx: &mut JobContext<'a>) -> anyhow::Result<()> {
let job_controller = ctx.job_controller.as_mut().unwrap();

// first try to stop it gracefully
Expand Down Expand Up @@ -92,7 +92,7 @@ impl State for Recovering {

async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError> {
// tear down the existing cluster
if let Err(e) = self.cleanup(ctx).await {
if let Err(e) = Self::cleanup(ctx).await {
return Err(ctx.retryable(self, "failed to tear down existing cluster", e, 10));
}

Expand Down

0 comments on commit ad25712

Please sign in to comment.