From d8926b8a8bc01e56c829518bccf9f50a2c06d5ab Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Sun, 9 Apr 2023 01:47:25 +0200 Subject: [PATCH] feat(sink): introduce log trait and adopt log reader and write in sink executor (#8558) --- src/stream/src/common/log_store/mod.rs | 403 +++++++++++++++++++++++++ src/stream/src/common/mod.rs | 1 + src/stream/src/executor/error.rs | 11 + src/stream/src/executor/sink.rs | 182 ++++++++--- src/stream/src/from_proto/sink.rs | 26 +- 5 files changed, 567 insertions(+), 56 deletions(-) create mode 100644 src/stream/src/common/log_store/mod.rs diff --git a/src/stream/src/common/log_store/mod.rs b/src/stream/src/common/log_store/mod.rs new file mode 100644 index 000000000000..13af6722e554 --- /dev/null +++ b/src/stream/src/common/log_store/mod.rs @@ -0,0 +1,403 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::future::Future; +use std::sync::Arc; + +use risingwave_common::array::StreamChunk; +use risingwave_common::buffer::Bitmap; +use risingwave_common::util::epoch::INVALID_EPOCH; +use tokio::sync::mpsc::{ + channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, +}; +use tokio::sync::oneshot; + +use crate::common::log_store::LogReaderEpochProgress::{AwaitingTruncate, Consuming}; + +#[derive(thiserror::Error, Debug)] +pub enum LogStoreError { + #[error("EndOfLogStream")] + EndOfLogStream, +} + +pub type LogStoreResult = Result; + +#[derive(Debug)] +pub enum LogStoreReadItem { + StreamChunk(StreamChunk), + Barrier { + next_epoch: u64, + is_checkpoint: bool, + }, +} + +pub trait LogWriter { + type InitFuture<'a>: Future> + Send + 'a + where + Self: 'a; + type WriteChunkFuture<'a>: Future> + Send + 'a + where + Self: 'a; + type FlushCurrentEpoch<'a>: Future> + Send + 'a + where + Self: 'a; + + /// Initialize the log writer with an epoch + fn init(&mut self, epoch: u64) -> Self::InitFuture<'_>; + + /// Write a stream chunk to the log writer + fn write_chunk(&mut self, chunk: StreamChunk) -> Self::WriteChunkFuture<'_>; + + /// Mark current epoch as finished and sealed, and flush the unconsumed log data. + fn flush_current_epoch( + &mut self, + next_epoch: u64, + is_checkpoint: bool, + ) -> Self::FlushCurrentEpoch<'_>; + + /// Update the vnode bitmap of the log writer + fn update_vnode_bitmap(&mut self, new_vnodes: Arc); +} + +pub trait LogReader { + type InitFuture<'a>: Future> + Send + 'a + where + Self: 'a; + type NextItemFuture<'a>: Future> + Send + 'a + where + Self: 'a; + type TruncateFuture<'a>: Future> + Send + 'a + where + Self: 'a; + + /// Initialize the log reader. Usually function as waiting for log writer to be initialized. + fn init(&mut self) -> Self::InitFuture<'_>; + + /// Emit the next item. + fn next_item(&mut self) -> Self::NextItemFuture<'_>; + + /// Mark that all items emitted so far have been consumed and it is safe to truncate the log + /// from the current offset. + fn truncate(&mut self) -> Self::TruncateFuture<'_>; +} + +pub trait LogStoreFactory: 'static { + type Reader: LogReader + Send + 'static; + type Writer: LogWriter + Send + 'static; + + type BuildFuture: Future + Send; + + fn build(self) -> Self::BuildFuture; +} + +/// An in-memory log store that can buffer a bounded amount of stream chunk in memory via bounded +/// mpsc channel. +/// +/// Since it is in-memory, when `flush_current_epoch` with checkpoint epoch, it should wait for the +/// reader to finish consuming all the data in current checkpoint epoch. +pub struct BoundedInMemLogStoreWriter { + /// Current epoch. Should be `Some` after `init` + curr_epoch: Option, + + /// Holder of oneshot channel to send the initial epoch to the associated log reader. + init_epoch_tx: Option>, + + /// Sending log store item to log reader + item_tx: Sender, + + /// Receiver for the epoch consumed by log reader. + truncated_epoch_rx: UnboundedReceiver, +} + +#[derive(Eq, PartialEq, Debug)] +enum LogReaderEpochProgress { + /// In progress of consuming data in current epoch. + Consuming(u64), + /// Finished emitting the data in checkpoint epoch, and waiting for a call on `truncate`. + AwaitingTruncate { sealed_epoch: u64, next_epoch: u64 }, +} + +const UNINITIALIZED: LogReaderEpochProgress = LogReaderEpochProgress::Consuming(INVALID_EPOCH); + +pub struct BoundedInMemLogStoreReader { + /// Current progress of log reader. Can be either consuming an epoch, or has finished consuming + /// an epoch and waiting to be truncated. + epoch_progress: LogReaderEpochProgress, + + /// Holder for oneshot channel to receive the initial epoch + init_epoch_rx: Option>, + + /// Receiver to fetch log store item + item_rx: Receiver, + + /// Sender of consumed epoch to the log writer + truncated_epoch_tx: UnboundedSender, +} + +pub struct BoundedInMemLogStoreFactory { + bound: usize, +} + +impl BoundedInMemLogStoreFactory { + pub fn new(bound: usize) -> Self { + Self { bound } + } +} + +impl LogStoreFactory for BoundedInMemLogStoreFactory { + type Reader = BoundedInMemLogStoreReader; + type Writer = BoundedInMemLogStoreWriter; + + type BuildFuture = impl Future; + + fn build(self) -> Self::BuildFuture { + async move { + let (init_epoch_tx, init_epoch_rx) = oneshot::channel(); + let (item_tx, item_rx) = channel(self.bound); + let (truncated_epoch_tx, truncated_epoch_rx) = unbounded_channel(); + let reader = BoundedInMemLogStoreReader { + epoch_progress: UNINITIALIZED, + init_epoch_rx: Some(init_epoch_rx), + item_rx, + truncated_epoch_tx, + }; + let writer = BoundedInMemLogStoreWriter { + curr_epoch: None, + init_epoch_tx: Some(init_epoch_tx), + item_tx, + truncated_epoch_rx, + }; + (reader, writer) + } + } +} + +impl LogReader for BoundedInMemLogStoreReader { + type InitFuture<'a> = impl Future> + 'a; + type NextItemFuture<'a> = impl Future> + 'a; + type TruncateFuture<'a> = impl Future> + 'a; + + fn init(&mut self) -> Self::InitFuture<'_> { + async { + let init_epoch_rx = self + .init_epoch_rx + .take() + .expect("should not init for twice"); + // TODO: should return the error + let epoch = init_epoch_rx.await.expect("should be able to init"); + assert_eq!(self.epoch_progress, UNINITIALIZED); + self.epoch_progress = LogReaderEpochProgress::Consuming(epoch); + Ok(epoch) + } + } + + fn next_item(&mut self) -> Self::NextItemFuture<'_> { + async { + match self.item_rx.recv().await { + Some(item) => { + if let LogStoreReadItem::Barrier { + next_epoch, + is_checkpoint, + } = &item + { + match self.epoch_progress { + LogReaderEpochProgress::Consuming(current_epoch) => { + if *is_checkpoint { + self.epoch_progress = AwaitingTruncate { + next_epoch: *next_epoch, + sealed_epoch: current_epoch, + }; + } else { + self.epoch_progress = Consuming(*next_epoch); + } + } + LogReaderEpochProgress::AwaitingTruncate { .. } => { + unreachable!("should not be awaiting for when barrier comes") + } + } + } + Ok(item) + } + None => Err(LogStoreError::EndOfLogStream), + } + } + } + + fn truncate(&mut self) -> Self::TruncateFuture<'_> { + async move { + let sealed_epoch = match self.epoch_progress { + Consuming(_) => unreachable!("should be awaiting truncate"), + LogReaderEpochProgress::AwaitingTruncate { + sealed_epoch, + next_epoch, + } => { + self.epoch_progress = Consuming(next_epoch); + sealed_epoch + } + }; + // TODO: should return error + self.truncated_epoch_tx + .send(sealed_epoch) + .expect("should not error"); + Ok(()) + } + } +} + +impl LogWriter for BoundedInMemLogStoreWriter { + type FlushCurrentEpoch<'a> = impl Future> + 'a; + type InitFuture<'a> = impl Future> + 'a; + type WriteChunkFuture<'a> = impl Future> + 'a; + + fn init(&mut self, epoch: u64) -> Self::InitFuture<'_> { + let init_epoch_tx = self.init_epoch_tx.take().expect("cannot be init for twice"); + // TODO: return the error + init_epoch_tx.send(epoch).unwrap(); + self.curr_epoch = Some(epoch); + async { Ok(()) } + } + + fn write_chunk(&mut self, chunk: StreamChunk) -> Self::WriteChunkFuture<'_> { + async { + // TODO: return the sender error + self.item_tx + .send(LogStoreReadItem::StreamChunk(chunk)) + .await + .expect("should be able to send"); + Ok(()) + } + } + + fn flush_current_epoch( + &mut self, + next_epoch: u64, + is_checkpoint: bool, + ) -> Self::FlushCurrentEpoch<'_> { + async move { + // TODO: return the sender error + self.item_tx + .send(LogStoreReadItem::Barrier { + next_epoch, + is_checkpoint, + }) + .await + .expect("should be able to send"); + + let prev_epoch = self + .curr_epoch + .replace(next_epoch) + .expect("should have epoch"); + + if is_checkpoint { + // TODO: return err at None + let truncated_epoch = self.truncated_epoch_rx.recv().await.unwrap(); + assert_eq!(truncated_epoch, prev_epoch); + } + + Ok(()) + } + } + + fn update_vnode_bitmap(&mut self, _new_vnodes: Arc) { + // Since this is in memory, we don't need to handle the vnode bitmap + } +} + +#[cfg(test)] +mod tests { + use risingwave_common::array::{Op, StreamChunk}; + use risingwave_common::row::OwnedRow; + use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::util::chunk_coalesce::DataChunkBuilder; + + use crate::common::log_store::{ + BoundedInMemLogStoreFactory, LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, + }; + + #[tokio::test] + async fn test_in_memory_log_store() { + let factory = BoundedInMemLogStoreFactory::new(4); + let (mut reader, mut writer) = factory.build().await; + + let init_epoch = 233; + let epoch1 = init_epoch + 1; + let epoch2 = init_epoch + 2; + + let ops = vec![Op::Insert, Op::Delete, Op::UpdateInsert, Op::UpdateDelete]; + let mut builder = DataChunkBuilder::new(vec![DataType::Int64, DataType::Varchar], 10000); + for i in 0..ops.len() { + assert!(builder + .append_one_row(OwnedRow::new(vec![ + Some(ScalarImpl::Int64(i as i64)), + Some(ScalarImpl::Utf8(format!("name_{}", i).into_boxed_str())) + ])) + .is_none()); + } + let data_chunk = builder.consume_all().unwrap(); + let stream_chunk = StreamChunk::from_parts(ops, data_chunk); + let stream_chunk_clone = stream_chunk.clone(); + + let join_handle = tokio::spawn(async move { + writer.init(init_epoch).await.unwrap(); + writer + .write_chunk(stream_chunk_clone.clone()) + .await + .unwrap(); + writer.flush_current_epoch(epoch1, false).await.unwrap(); + writer.write_chunk(stream_chunk_clone).await.unwrap(); + writer.flush_current_epoch(epoch2, true).await.unwrap(); + }); + + assert_eq!(init_epoch, reader.init().await.unwrap()); + match reader.next_item().await.unwrap() { + LogStoreReadItem::StreamChunk(chunk) => { + assert_eq!(&chunk, &stream_chunk); + } + LogStoreReadItem::Barrier { .. } => unreachable!(), + } + + match reader.next_item().await.unwrap() { + LogStoreReadItem::StreamChunk(_) => unreachable!(), + LogStoreReadItem::Barrier { + is_checkpoint, + next_epoch, + } => { + assert!(!is_checkpoint); + assert_eq!(next_epoch, epoch1); + } + } + + match reader.next_item().await.unwrap() { + LogStoreReadItem::StreamChunk(chunk) => { + assert_eq!(&chunk, &stream_chunk); + } + LogStoreReadItem::Barrier { .. } => unreachable!(), + } + + match reader.next_item().await.unwrap() { + LogStoreReadItem::StreamChunk(_) => unreachable!(), + LogStoreReadItem::Barrier { + is_checkpoint, + next_epoch, + } => { + assert!(is_checkpoint); + assert_eq!(next_epoch, epoch2); + } + } + + reader.truncate().await.unwrap(); + join_handle.await.unwrap(); + } +} diff --git a/src/stream/src/common/mod.rs b/src/stream/src/common/mod.rs index 2789fb88d409..9cba182b530a 100644 --- a/src/stream/src/common/mod.rs +++ b/src/stream/src/common/mod.rs @@ -18,4 +18,5 @@ pub use column_mapping::*; mod builder; pub mod cache; mod column_mapping; +pub mod log_store; pub mod table; diff --git a/src/stream/src/executor/error.rs b/src/stream/src/executor/error.rs index 88c09bd7d043..e0b1827794a2 100644 --- a/src/stream/src/executor/error.rs +++ b/src/stream/src/executor/error.rs @@ -26,6 +26,7 @@ use risingwave_rpc_client::error::RpcError; use risingwave_storage::error::StorageError; use super::Barrier; +use crate::common::log_store::LogStoreError; #[derive(thiserror::Error, Debug)] enum Inner { @@ -36,6 +37,9 @@ enum Inner { StorageError, ), + #[error("Log store error: {0}")] + LogStoreError(LogStoreError), + #[error("Chunk operation error: {0}")] EvalError(Either), @@ -122,6 +126,13 @@ impl From for StreamExecutorError { } } +/// Log store error +impl From for StreamExecutorError { + fn from(e: LogStoreError) -> Self { + Inner::LogStoreError(e).into() + } +} + /// Chunk operation error. impl From for StreamExecutorError { fn from(e: ArrayError) -> Self { diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index c42a0f56af1d..5ffd8fb1317b 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -15,8 +15,10 @@ use std::sync::Arc; use std::time::Instant; -use futures::StreamExt; +use futures::stream::select; +use futures::{FutureExt, Stream, StreamExt}; use futures_async_stream::try_stream; +use prometheus::Histogram; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::row::Row; @@ -28,10 +30,11 @@ use risingwave_connector::ConnectorParams; use super::error::{StreamExecutorError, StreamExecutorResult}; use super::{BoxedExecutor, Executor, Message}; +use crate::common::log_store::{LogReader, LogStoreFactory, LogStoreReadItem, LogWriter}; use crate::executor::monitor::StreamingMetrics; -use crate::executor::PkIndices; +use crate::executor::{expect_first_barrier, ActorContextRef, PkIndices}; -pub struct SinkExecutor { +pub struct SinkExecutor { input: BoxedExecutor, metrics: Arc, config: SinkConfig, @@ -40,6 +43,9 @@ pub struct SinkExecutor { schema: Schema, pk_indices: Vec, sink_type: SinkType, + actor_context: ActorContextRef, + log_reader: F::Reader, + log_writer: F::Writer, } async fn build_sink( @@ -48,10 +54,8 @@ async fn build_sink( pk_indices: PkIndices, connector_params: ConnectorParams, sink_type: SinkType, -) -> StreamExecutorResult> { - Ok(Box::new( - SinkImpl::new(config, schema, pk_indices, connector_params, sink_type).await?, - )) +) -> StreamExecutorResult { + Ok(SinkImpl::new(config, schema, pk_indices, connector_params, sink_type).await?) } // Drop all the DELETE messages in this chunk and convert UPDATE INSERT into INSERT. @@ -69,9 +73,9 @@ fn force_append_only(chunk: StreamChunk, data_types: Vec) -> Option SinkExecutor { #[allow(clippy::too_many_arguments)] - pub fn new( + pub async fn new( materialize_executor: BoxedExecutor, metrics: Arc, config: SinkConfig, @@ -80,7 +84,10 @@ impl SinkExecutor { schema: Schema, pk_indices: Vec, sink_type: SinkType, + actor_context: ActorContextRef, + log_store_factory: F, ) -> Self { + let (log_reader, log_writer) = log_store_factory.build().await; Self { input: materialize_executor, metrics, @@ -90,35 +97,67 @@ impl SinkExecutor { schema, connector_params, sink_type, + actor_context, + log_reader, + log_writer, } } - #[try_stream(ok = Message, error = StreamExecutorError)] - async fn execute_inner(self) { - // the flag is required because kafka transaction requires at least one - // message, so we should abort the transaction if the flag is true. - let mut empty_checkpoint_flag = true; - let mut in_transaction = false; - let mut epoch = 0; - let data_types = self.schema.data_types(); + fn execute_inner(self) -> impl Stream> { + let config = self.config.clone(); + let schema = self.schema.clone(); - let mut sink = build_sink( - self.config.clone(), - self.schema, + let metrics = self + .metrics + .sink_commit_duration + .with_label_values(&[self.identity.as_str(), self.config.get_connector()]); + let consume_log_stream = Self::execute_consume_log( + config, + schema, self.pk_indices, self.connector_params, self.sink_type, - ) - .await?; + self.log_reader, + metrics, + ); - let input = self.input.execute(); + let write_log_stream = Self::execute_write_log( + self.input, + self.log_writer, + self.schema, + self.sink_type, + self.actor_context, + ); + + select(consume_log_stream.into_stream(), write_log_stream) + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn execute_write_log( + input: BoxedExecutor, + mut log_writer: impl LogWriter, + schema: Schema, + sink_type: SinkType, + actor_context: ActorContextRef, + ) { + let data_types = schema.data_types(); + let mut input = input.execute(); + + let barrier = expect_first_barrier(&mut input).await?; + + let epoch_pair = barrier.epoch; + + log_writer.init(epoch_pair.curr).await?; + + // Propagate the first barrier + yield Message::Barrier(barrier); #[for_await] for msg in input { match msg? { Message::Watermark(w) => yield Message::Watermark(w), Message::Chunk(chunk) => { - let visible_chunk = if self.sink_type == SinkType::ForceAppendOnly { + let visible_chunk = if sink_type == SinkType::ForceAppendOnly { // Force append-only by dropping UPDATE/DELETE messages. We do this when the // user forces the sink to be append-only while it is actually not based on // the frontend derivation result. @@ -133,22 +172,68 @@ impl SinkExecutor { // At this point (instead of the point above when we receive the upstream // data chunk), we make sure that we do have data to send out, and we can // thus mark the txn as started. - if !in_transaction { - sink.begin_epoch(epoch).await?; - in_transaction = true; - } - - if let Err(e) = sink.write_batch(chunk.clone()).await { - sink.abort().await?; - return Err(e.into()); - } - empty_checkpoint_flag = false; + log_writer.write_chunk(chunk.clone()).await?; yield Message::Chunk(chunk); } } Message::Barrier(barrier) => { - if barrier.checkpoint { + log_writer + .flush_current_epoch(barrier.epoch.curr, barrier.checkpoint) + .await?; + if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(actor_context.id) { + log_writer.update_vnode_bitmap(vnode_bitmap); + } + yield Message::Barrier(barrier); + } + } + } + } + + async fn execute_consume_log( + config: SinkConfig, + schema: Schema, + pk_indices: Vec, + connector_params: ConnectorParams, + sink_type: SinkType, + mut log_reader: R, + sink_commit_duration_metrics: Histogram, + ) -> StreamExecutorResult { + let mut sink = build_sink(config, schema, pk_indices, connector_params, sink_type).await?; + + let mut epoch = log_reader.init().await?; + + // the flag is required because kafka transaction requires at least one + // message, so we should abort the transaction if the flag is true. + let mut empty_checkpoint_flag = true; + let mut in_transaction = false; + + loop { + let item: LogStoreReadItem = log_reader.next_item().await?; + match item { + LogStoreReadItem::StreamChunk(chunk) => { + // NOTE: We start the txn here because a force-append-only sink might + // receive a data chunk full of DELETE messages and then drop all of them. + // At this point (instead of the point above when we receive the upstream + // data chunk), we make sure that we do have data to send out, and we can + // thus mark the txn as started. + if !in_transaction { + sink.begin_epoch(epoch).await?; + in_transaction = true; + } + + if let Err(e) = sink.write_batch(chunk.clone()).await { + sink.abort().await?; + return Err(e.into()); + } + empty_checkpoint_flag = false; + } + LogStoreReadItem::Barrier { + next_epoch, + is_checkpoint, + } => { + assert!(next_epoch > epoch); + if is_checkpoint { if in_transaction { if empty_checkpoint_flag { sink.abort().await?; @@ -159,28 +244,24 @@ impl SinkExecutor { } else { let start_time = Instant::now(); sink.commit().await?; - self.metrics - .sink_commit_duration - .with_label_values(&[ - self.identity.as_str(), - self.config.get_connector(), - ]) + sink_commit_duration_metrics .observe(start_time.elapsed().as_millis() as f64); } } + log_reader.truncate().await?; in_transaction = false; empty_checkpoint_flag = true; } - epoch = barrier.epoch.curr; - yield Message::Barrier(barrier); + epoch = next_epoch; } } } } } -impl Executor for SinkExecutor { +impl Executor for SinkExecutor { fn execute(self: Box) -> super::BoxedMessageStream { + // TODO: dispatch in enum self.execute_inner().boxed() } @@ -200,7 +281,9 @@ impl Executor for SinkExecutor { #[cfg(test)] mod test { use super::*; + use crate::common::log_store::BoundedInMemLogStoreFactory; use crate::executor::test_utils::*; + use crate::executor::ActorContext; #[tokio::test] async fn test_force_append_only_sink() { @@ -226,11 +309,12 @@ mod test { schema.clone(), pk.clone(), vec![ + Message::Barrier(Barrier::new_test_barrier(1)), Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( " I I + 3 2", ))), - Message::Barrier(Barrier::new_test_barrier(1)), + Message::Barrier(Barrier::new_test_barrier(2)), Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( " I I U- 3 2 @@ -254,10 +338,16 @@ mod test { schema.clone(), pk.clone(), SinkType::ForceAppendOnly, - ); + ActorContext::create(0), + BoundedInMemLogStoreFactory::new(1), + ) + .await; let mut executor = SinkExecutor::execute(Box::new(sink_executor)); + // Barrier message. + executor.next().await.unwrap().unwrap(); + let chunk_msg = executor.next().await.unwrap().unwrap(); assert_eq!( chunk_msg.into_chunk().unwrap(), diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index a35580999c9b..399ad389a21e 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -18,6 +18,7 @@ use risingwave_connector::sink::{SinkConfig, DOWNSTREAM_SINK_KEY}; use risingwave_pb::stream_plan::SinkNode; use super::*; +use crate::common::log_store::BoundedInMemLogStoreFactory; use crate::executor::{SinkExecutor, StreamExecutorError}; pub struct SinkExecutorBuilder; @@ -53,15 +54,20 @@ impl ExecutorBuilder for SinkExecutorBuilder { } let config = SinkConfig::from_hashmap(properties).map_err(StreamExecutorError::from)?; - Ok(Box::new(SinkExecutor::new( - materialize_executor, - stream.streaming_metrics.clone(), - config, - params.executor_id, - params.env.connector_params(), - schema, - pk_indices, - sink_type, - ))) + Ok(Box::new( + SinkExecutor::new( + materialize_executor, + stream.streaming_metrics.clone(), + config, + params.executor_id, + params.env.connector_params(), + schema, + pk_indices, + sink_type, + params.actor_context, + BoundedInMemLogStoreFactory::new(1), + ) + .await, + )) } }