Skip to content

Commit

Permalink
feat: flow recreate on reboot (#4509)
Browse files Browse the repository at this point in the history
* feat: flow reboot clean

* refactor: per review

* refactor: per review

* test: sqlness flow reboot
  • Loading branch information
discord9 authored Aug 5, 2024
1 parent 8037800 commit e6cc4df
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/cmd/src/flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
Expand Down Expand Up @@ -296,11 +297,13 @@ impl StartCommand {
Arc::new(executor),
);

let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
let flownode_builder = FlownodeBuilder::new(
opts,
Plugins::new(),
table_metadata_manager,
catalog_manager.clone(),
flow_metadata_manager,
)
.with_heartbeat_task(heartbeat_task);

Expand Down
3 changes: 2 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,13 @@ impl StartCommand {
.await
.context(StartDatanodeSnafu)?;

let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let flow_builder = FlownodeBuilder::new(
Default::default(),
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
flow_metadata_manager.clone(),
);
let flownode = Arc::new(
flow_builder
Expand Down Expand Up @@ -511,7 +513,6 @@ impl StartCommand {
opts.wal.into(),
kv_backend.clone(),
));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
Expand Down
11 changes: 10 additions & 1 deletion src/flow/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to list flows in flownode={id:?}"))]
ListFlows {
id: Option<common_meta::FlownodeId>,
source: common_meta::error::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Flow already exist, id={id}"))]
FlowAlreadyExist {
id: FlowId,
Expand Down Expand Up @@ -214,7 +222,8 @@ impl ErrorExt for Error {
}
Self::TableNotFound { .. }
| Self::TableNotFoundMeta { .. }
| Self::FlowNotFound { .. } => StatusCode::TableNotFound,
| Self::FlowNotFound { .. }
| Self::ListFlows { .. } => StatusCode::TableNotFound,
Self::InvalidQueryProst { .. }
| &Self::InvalidQuery { .. }
| &Self::Plan { .. }
Expand Down
95 changes: 92 additions & 3 deletions src/flow/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ use common_meta::cache::{
};
use common_meta::ddl::{table_meta, ProcedureExecutorRef};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::{self, Flownode, NodeManagerRef};
use common_query::Output;
use common_telemetry::tracing::info;
use futures::FutureExt;
use futures::{FutureExt, StreamExt, TryStreamExt};
use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
use itertools::Itertools;
use meta_client::client::MetaClient;
Expand All @@ -47,7 +48,7 @@ use serde::de::Unexpected;
use servers::error::{AlreadyStartedSnafu, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
use servers::heartbeat_options::HeartbeatOptions;
use servers::server::Server;
use session::context::QueryContextRef;
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::{broadcast, oneshot, Mutex};
Expand All @@ -57,7 +58,8 @@ use tonic::{Request, Response, Status};

use crate::adapter::FlowWorkerManagerRef;
use crate::error::{
CacheRequiredSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, ParseAddrSnafu,
ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::transform::register_function_to_query_engine;
Expand Down Expand Up @@ -240,6 +242,7 @@ pub struct FlownodeBuilder {
plugins: Plugins,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
flow_metadata_manager: FlowMetadataManagerRef,
heartbeat_task: Option<HeartbeatTask>,
}

Expand All @@ -250,12 +253,14 @@ impl FlownodeBuilder {
plugins: Plugins,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
flow_metadata_manager: FlowMetadataManagerRef,
) -> Self {
Self {
opts,
plugins,
table_meta,
catalog_manager,
flow_metadata_manager,
heartbeat_task: None,
}
}
Expand Down Expand Up @@ -283,6 +288,11 @@ impl FlownodeBuilder {
self.build_manager(query_engine_factory.query_engine())
.await?,
);

if let Err(err) = self.recover_flows(&manager).await {
common_telemetry::error!(err; "Failed to recover flows");
}

let server = FlownodeServer::new(FlowService::new(manager.clone()));

let heartbeat_task = self.heartbeat_task;
Expand All @@ -296,6 +306,85 @@ impl FlownodeBuilder {
Ok(instance)
}

/// recover all flow tasks in this flownode in distributed mode(nodeid is Some(<num>))
///
/// or recover all existing flow tasks if in standalone mode(nodeid is None)
///
/// TODO(discord9): persisent flow tasks with internal state
async fn recover_flows(&self, manager: &FlowWorkerManagerRef) -> Result<usize, Error> {
let nodeid = self.opts.node_id;
let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid {
let to_be_recover = self
.flow_metadata_manager
.flownode_flow_manager()
.flows(nodeid)
.try_collect::<Vec<_>>()
.await
.context(ListFlowsSnafu { id: Some(nodeid) })?;
to_be_recover.into_iter().map(|(id, _)| id).collect()
} else {
let all_catalogs = self
.catalog_manager
.catalog_names()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let mut all_flow_ids = vec![];
for catalog in all_catalogs {
let flows = self
.flow_metadata_manager
.flow_name_manager()
.flow_names(&catalog)
.await
.try_collect::<Vec<_>>()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
}
all_flow_ids
};
let cnt = to_be_recovered.len();

// TODO(discord9): recover in parallel
for flow_id in to_be_recovered {
let info = self
.flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.context(FlowNotFoundSnafu { id: flow_id })?;

let sink_table_name = [
info.sink_table_name().catalog_name.clone(),
info.sink_table_name().schema_name.clone(),
info.sink_table_name().table_name.clone(),
];
manager
.create_flow(
flow_id as _,
sink_table_name,
info.source_table_ids(),
true,
info.expire_after(),
Some(info.comment().clone()),
info.raw_sql().clone(),
info.options().clone(),
Some(
QueryContextBuilder::default()
.current_catalog(info.catalog_name().clone())
.build(),
),
)
.await?;
}

Ok(cnt)
}

/// build [`FlowWorkerManager`], note this doesn't take ownership of `self`,
/// nor does it actually start running the worker.
async fn build_manager(
Expand Down
1 change: 1 addition & 0 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl GreptimeDbStandaloneBuilder {
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
flow_metadata_manager.clone(),
);
let flownode = Arc::new(flow_builder.build().await.unwrap());

Expand Down
1 change: 1 addition & 0 deletions tests/cases/standalone/common/flow/flow_basic.result
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ select flush_flow('test_numbers_basic')<=1;
| true |
+----------------------------------------------------+

-- SQLNESS ARG restart=true
INSERT INTO numbers_input_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
Expand Down
1 change: 1 addition & 0 deletions tests/cases/standalone/common/flow/flow_basic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ SELECT sum(number) FROM numbers_input_basic GROUP BY tumble(ts, '1 second', '202
-- because flush_flow result is at most 1
select flush_flow('test_numbers_basic')<=1;

-- SQLNESS ARG restart=true
INSERT INTO numbers_input_basic
VALUES
(20, "2021-07-01 00:00:00.200"),
Expand Down

0 comments on commit e6cc4df

Please sign in to comment.