From 5268508792a3b4d070cc5e17620481277e0bb7a1 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Thu, 6 Jun 2024 12:19:35 +0800 Subject: [PATCH] fix(interactive): persist latest kafka queue offset before stopped (#3800) Co-authored-by: nengli.ln --- charts/graphscope-store/templates/configmap.yaml | 1 + .../pegasus/network/src/transport/block/mod.rs | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/charts/graphscope-store/templates/configmap.yaml b/charts/graphscope-store/templates/configmap.yaml index 40eaa708e844..53f72855e4be 100644 --- a/charts/graphscope-store/templates/configmap.yaml +++ b/charts/graphscope-store/templates/configmap.yaml @@ -73,6 +73,7 @@ data: gaia.rpc.port=60000 gaia.engine.port=60001 + gaia.write.timeout.ms={{ .Values.pegasus.timeout }} ## Secondary config secondary.instance.enabled={{ .Values.secondary.enabled }} diff --git a/interactive_engine/executor/engine/pegasus/network/src/transport/block/mod.rs b/interactive_engine/executor/engine/pegasus/network/src/transport/block/mod.rs index 0f87123ca34f..936cd41080cd 100644 --- a/interactive_engine/executor/engine/pegasus/network/src/transport/block/mod.rs +++ b/interactive_engine/executor/engine/pegasus/network/src/transport/block/mod.rs @@ -19,6 +19,7 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; +use crate::config::BlockMode; use crate::receive::start_net_receiver; use crate::send::start_net_sender; use crate::transport::ConnectionParams; @@ -55,6 +56,14 @@ pub fn listen_on( let remote = Server { id: remote_id, addr }; if params.is_nonblocking { stream.set_nonblocking(true).ok(); + } else { + if let BlockMode::Blocking(Some(write_timelout)) = + params.get_write_params().mode + { + stream + .set_write_timeout(Some(write_timelout)) + .ok(); + } } let recv_poisoned = Arc::new(AtomicBool::new(false)); start_net_sender( @@ -127,6 +136,11 @@ pub fn connect( let remote = Server { id: remote_id, addr }; if params.is_nonblocking { conn.set_nonblocking(true).ok(); + } else { + if let BlockMode::Blocking(Some(write_timelout)) = params.get_write_params().mode { + conn.set_write_timeout(Some(write_timelout)) + .ok(); + } } let read_half = conn .try_clone()