Skip to content

Commit

Permalink
Add restarting state and pipeline restart API (#356)
Browse files Browse the repository at this point in the history
Add restarting state and restart api
  • Loading branch information
mwylde committed Oct 13, 2023
1 parent 5f0575c commit 99f9f8d
Show file tree
Hide file tree
Showing 17 changed files with 319 additions and 28 deletions.
9 changes: 9 additions & 0 deletions arroyo-api/migrations/V16__restart_failed.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TYPE restart_mode as ENUM (
'safe', 'force');

ALTER TABLE job_configs
ADD COLUMN restart_nonce int not null default 0,
ADD COLUMN restart_mode restart_mode not null default 'safe';

ALTER TABLE job_statuses
ADD COLUMN restart_nonce int not null default 0;
9 changes: 9 additions & 0 deletions arroyo-api/queries/api_queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ SET
parallelism_overrides = COALESCE(:parallelism_overrides, parallelism_overrides)
WHERE id = :job_id AND organization_id = :organization_id;

--! restart_job(mode)
UPDATE job_configs
SET
updated_at = :updated_at,
updated_by = :updated_by,
restart_nonce = restart_nonce + 1,
restart_mode = :mode
WHERE id = :job_id AND organization_id = :organization_id;

--! create_job(ttl_micros?)
INSERT INTO job_configs
(id, organization_id, pipeline_name, created_by, pipeline_id, checkpoint_interval_micros, ttl_micros)
Expand Down
3 changes: 3 additions & 0 deletions arroyo-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ pub(crate) fn get_action(state: &str, running_desired: &bool) -> (String, Option
("Recovering", true) => ("Stop", Some(Checkpoint), InProgress),
("Recovering", false) => ("Stopping", Option::None, InProgress),

("Restarting", true) => ("Stop", Some(Checkpoint), InProgress),
("Restarting", false) => ("Stopping", Option::None, InProgress),

("Stopping", true) => ("Stopping", Some(Checkpoint), InProgress),
("Stopping", false) => ("Stopping", Option::None, InProgress),

Expand Down
11 changes: 7 additions & 4 deletions arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::pipelines::__path_get_pipelines;
use crate::pipelines::__path_post_pipeline;
use crate::pipelines::{
__path_delete_pipeline, __path_get_pipeline, __path_get_pipeline_jobs, __path_patch_pipeline,
__path_validate_query, __path_validate_udfs,
__path_restart_pipeline, __path_validate_query, __path_validate_udfs,
};
use crate::rest::__path_ping;
use crate::rest_utils::{bad_request, log_and_map, ErrorResp};
Expand All @@ -34,9 +34,10 @@ use arroyo_rpc::types::{
JobLogMessageCollection, Metric, MetricGroup, MetricNames, OperatorCheckpointGroup,
OperatorCheckpointGroupCollection, OperatorMetricGroup, OutputData, PaginationQueryParams,
Pipeline, PipelineCollection, PipelineEdge, PipelineGraph, PipelineNode, PipelinePatch,
PipelinePost, PrimitiveType, QueryValidationResult, SchemaDefinition, SourceField,
SourceFieldType, StopType as StopTypeRest, StructType, SubtaskCheckpointGroup, SubtaskMetrics,
TestSourceMessage, Udf, UdfLanguage, UdfValidationResult, ValidateQueryPost, ValidateUdfsPost,
PipelinePost, PipelineRestart, PrimitiveType, QueryValidationResult, SchemaDefinition,
SourceField, SourceFieldType, StopType as StopTypeRest, StructType, SubtaskCheckpointGroup,
SubtaskMetrics, TestSourceMessage, Udf, UdfLanguage, UdfValidationResult, ValidateQueryPost,
ValidateUdfsPost,
};

mod cloud;
Expand Down Expand Up @@ -142,6 +143,7 @@ pub(crate) fn to_micros(dt: OffsetDateTime) -> u64 {
validate_udfs,
post_pipeline,
patch_pipeline,
restart_pipeline,
get_pipeline,
delete_pipeline,
get_pipelines,
Expand All @@ -165,6 +167,7 @@ pub(crate) fn to_micros(dt: OffsetDateTime) -> u64 {
components(schemas(
PipelinePost,
PipelinePatch,
PipelineRestart,
Pipeline,
PipelineGraph,
PipelineNode,
Expand Down
74 changes: 69 additions & 5 deletions arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use arroyo_rpc::grpc::{CheckUdfsReq, ValidationResult};
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use arroyo_rpc::types::{
Job, JobCollection, PaginationQueryParams, Pipeline, PipelineCollection, PipelineEdge,
PipelineGraph, PipelineNode, PipelinePatch, PipelinePost, QueryValidationResult, StopType,
UdfValidationResult, ValidateQueryPost, ValidateUdfsPost,
PipelineGraph, PipelineNode, PipelinePatch, PipelinePost, PipelineRestart,
QueryValidationResult, StopType, UdfValidationResult, ValidateQueryPost, ValidateUdfsPost,
};
use arroyo_server_common::log_event;
use arroyo_sql::{ArroyoSchemaProvider, SqlConfig};
Expand All @@ -38,7 +38,7 @@ use crate::rest_utils::{
authenticate, bad_request, client, log_and_map, not_found, paginate_results, required_field,
unauthorized, validate_pagination_params, ApiError, BearerAuth, ErrorResp,
};
use crate::types::public::{PipelineType, StopMode};
use crate::types::public::{PipelineType, RestartMode, StopMode};
use crate::{connection_tables, to_micros};
use crate::{handle_db_error, optimizations, AuthData};
use create_pipeline_req::Config::Sql;
Expand Down Expand Up @@ -532,7 +532,8 @@ pub async fn post_pipeline(
"is_preview": preview,
"job_id": job_id,
"parallelism": pipeline_post.parallelism,
"has_udfs": !pipeline_post.udfs.map(|e| e.is_empty()).unwrap_or(false),
"has_udfs": pipeline_post.udfs.map(|e| !e.is_empty() && !e[0].definition.trim().is_empty())
.unwrap_or(false),
"features": program.features(),
}),
);
Expand Down Expand Up @@ -634,6 +635,60 @@ pub async fn patch_pipeline(
Ok(Json(pipeline))
}

/// Restart a pipeline
#[utoipa::path(
post,
path = "/v1/pipelines/{id}/restart",
tag = "pipelines",
params(
("id" = String, Path, description = "Pipeline id")
),
request_body = PipelineRestart,
responses(
(status = 200, description = "Updated pipeline", body = Pipeline)),
)]
pub async fn restart_pipeline(
State(state): State<AppState>,
bearer_auth: BearerAuth,
Path(id): Path<String>,
WithRejection(Json(req), _): WithRejection<Json<PipelineRestart>, ApiError>,
) -> Result<Json<Pipeline>, ErrorResp> {
let client = client(&state.pool).await?;
let auth_data = authenticate(&state.pool, bearer_auth).await?;

let job_id = api_queries::get_pipeline_jobs()
.bind(&client, &auth_data.organization_id, &id)
.one()
.await
.map_err(log_and_map)?
.id;

let mode = if req.force == Some(true) {
RestartMode::force
} else {
RestartMode::safe
};

let res = api_queries::restart_job()
.bind(
&client,
&OffsetDateTime::now_utc(),
&auth_data.user_id,
&mode,
&job_id,
&auth_data.organization_id,
)
.await
.map_err(log_and_map)?;

if res == 0 {
return Err(not_found("Pipeline".to_string()));
}

let pipeline = query_pipeline_by_pub_id(&id, &client, &auth_data).await?;
Ok(Json(pipeline))
}

/// List all pipelines
#[utoipa::path(
get,
Expand Down Expand Up @@ -676,7 +731,16 @@ pub async fn get_pipelines(
has_more,
data: pipelines
.into_iter()
.map(|p| p.try_into().unwrap())
.filter_map(|p| {
let id = p.pub_id.clone();
match p.try_into() {
Ok(p) => Some(p),
Err(e) => {
warn!("Failed to map pipeline {} from database: {:?}", id, e);
None
}
}
})
.collect(),
}))
}
Expand Down
3 changes: 2 additions & 1 deletion arroyo-api/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::jobs::{
use crate::metrics::get_operator_metric_groups;
use crate::pipelines::{
delete_pipeline, get_pipeline, get_pipeline_jobs, get_pipelines, patch_pipeline, post_pipeline,
validate_query, validate_udfs,
restart_pipeline, validate_query, validate_udfs,
};
use crate::rest_utils::not_found;
use crate::ApiDoc;
Expand Down Expand Up @@ -124,6 +124,7 @@ pub fn create_rest_app(pool: Pool, controller_addr: &str) -> Router {
.route("/pipelines/validate_udfs", post(validate_udfs))
.route("/pipelines/:id", patch(patch_pipeline))
.route("/pipelines/:id", get(get_pipeline))
.route("/pipelines/:id/restart", post(restart_pipeline))
.route("/pipelines/:id", delete(delete_pipeline))
.nest("/pipelines/:id/jobs", jobs_routes)
.fallback(api_fallback);
Expand Down
35 changes: 35 additions & 0 deletions arroyo-console/src/gen/api-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ export interface paths {
*/
get: operations["get_pipeline_jobs"];
};
"/v1/pipelines/{id}/restart": {
/**
* Restart a pipeline
* @description Restart a pipeline
*/
post: operations["restart_pipeline"];
};
"/v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints": {
/**
* List a job's checkpoints
Expand Down Expand Up @@ -427,6 +434,9 @@ export interface components {
query: string;
udfs?: (components["schemas"]["Udf"])[] | null;
};
PipelineRestart: {
force?: boolean | null;
};
/** @enum {string} */
PrimitiveType: "int32" | "int64" | "u_int32" | "u_int64" | "f32" | "f64" | "bool" | "string" | "bytes" | "unix_millis" | "unix_micros" | "unix_nanos" | "date_time" | "json";
QueryValidationResult: {
Expand Down Expand Up @@ -844,6 +854,31 @@ export interface operations {
};
};
};
/**
* Restart a pipeline
* @description Restart a pipeline
*/
restart_pipeline: {
parameters: {
path: {
/** @description Pipeline id */
id: string;
};
};
requestBody: {
content: {
"application/json": components["schemas"]["PipelineRestart"];
};
};
responses: {
/** @description Updated pipeline */
200: {
content: {
"application/json": components["schemas"]["Pipeline"];
};
};
};
};
/**
* List a job's checkpoints
* @description List a job's checkpoints
Expand Down
9 changes: 9 additions & 0 deletions arroyo-console/src/lib/data_fetching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,14 @@ export const usePipeline = (pipelineId?: string, refresh: boolean = false) => {
await mutate();
};

const restartPipeline = async () => {
await post('/v1/pipelines/{id}/restart', {
params: { path: { id: pipelineId! } },
body: {},
});
await mutate();
};

const deletePipeline = async () => {
const { error } = await del('/v1/pipelines/{id}', {
params: { path: { id: pipelineId! } },
Expand All @@ -455,6 +463,7 @@ export const usePipeline = (pipelineId?: string, refresh: boolean = false) => {
pipelineLoading: isLoading,
updatePipeline,
deletePipeline,
restartPipeline,
};
};

Expand Down
36 changes: 24 additions & 12 deletions arroyo-console/src/routes/pipelines/PipelineDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export function PipelineDetails() {

let { pipelineId: id } = useParams();

const { pipeline, pipelineError, updatePipeline } = usePipeline(id, true);
const { pipeline, pipelineError, updatePipeline, restartPipeline } = usePipeline(id, true);
const { jobs, jobsError } = usePipelineJobs(id, true);
const job = jobs?.length ? jobs[0] : undefined;
const { checkpoints } = useJobCheckpoints(id, job?.id);
Expand Down Expand Up @@ -256,17 +256,29 @@ export function PipelineDetails() {
let actionButton = <></>;
if (pipeline) {
editPipelineButton = <Button onClick={onConfigModalOpen}>Edit</Button>;
actionButton = (
<Button
isDisabled={pipeline.action == null}
onClick={async () => {
await updateJobState(pipeline.action!);
}}
>
{pipeline.actionInProgress ? <Spinner size="xs" mr={2} /> : null}
{pipeline.actionText}
</Button>
);
if (job.state == 'Failed') {
actionButton = (
<Button
onClick={async () => {
await restartPipeline();
}}
>
Restart
</Button>
);
} else {
actionButton = (
<Button
isDisabled={pipeline.action == null}
onClick={async () => {
await updateJobState(pipeline.action!);
}}
>
{pipeline.actionInProgress ? <Spinner size="xs" mr={2} /> : null}
{pipeline.actionText}
</Button>
);
}
}

const headerArea = (
Expand Down
8 changes: 6 additions & 2 deletions arroyo-controller/queries/controller_queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ SELECT
restarts,
run_id,
pipeline_path,
wasm_path
wasm_path,
job_configs.restart_nonce as config_restart_nonce,
job_statuses.restart_nonce as status_restart_nonce,
restart_mode
FROM job_configs
LEFT JOIN job_statuses ON job_configs.id = job_statuses.id;

Expand All @@ -30,7 +33,8 @@ SET state = :state,
restarts = :restarts,
pipeline_path = :pipeline_path,
wasm_path = :wasm_path,
run_id = :run_id
run_id = :run_id,
restart_nonce = :restart_nonce
WHERE id = :job_id;

--! get_program
Expand Down

0 comments on commit 99f9f8d

Please sign in to comment.