From c03e3aab9e5a4fe1d2dfcec41f5bab1bcb8e80b8 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Thu, 14 Sep 2023 21:45:54 +0100 Subject: [PATCH] Send periodic heartbeats to keep the connection alive --- hotfix/hotfix-core/src/actors/orchestrator.rs | 84 ++++++++++++++++--- hotfix/hotfix-core/src/config.rs | 1 + hotfix/hotfix-core/src/message.rs | 2 + hotfix/hotfix-core/src/message/common.rs | 5 ++ hotfix/hotfix-core/src/message/heartbeat.rs | 20 +++++ hotfix/hotfix-core/src/message/logon.rs | 14 ++-- hotfix/hotfix-core/src/session.rs | 9 +- 7 files changed, 106 insertions(+), 29 deletions(-) create mode 100644 hotfix/hotfix-core/src/message/common.rs create mode 100644 hotfix/hotfix-core/src/message/heartbeat.rs diff --git a/hotfix/hotfix-core/src/actors/orchestrator.rs b/hotfix/hotfix-core/src/actors/orchestrator.rs index 8fc0fc88..7988f9d4 100644 --- a/hotfix/hotfix-core/src/actors/orchestrator.rs +++ b/hotfix/hotfix-core/src/actors/orchestrator.rs @@ -1,11 +1,20 @@ +use std::time::Duration; +use tokio::select; use tokio::sync::mpsc; +use tokio::time::Instant; use tracing::debug; +use crate::actors::socket_writer::WriterHandle; +use crate::config::SessionConfig; +use crate::message::heartbeat::heartbeat_message; +use crate::message::logon::logon_message; use crate::message::parser::RawFixMessage; #[derive(Clone, Debug)] pub enum OrchestratorMessage { FixMessageReceived(RawFixMessage), + SendHeartbeat, + SendLogon, } #[derive(Clone)] @@ -14,9 +23,9 @@ pub struct OrchestratorHandle { } impl OrchestratorHandle { - pub fn new() -> Self { + pub fn new(config: SessionConfig, writer: WriterHandle) -> Self { let (sender, mailbox) = mpsc::channel(10); - let actor = OrchestratorActor::new(mailbox); + let actor = OrchestratorActor::new(mailbox, config, writer); tokio::spawn(run_orchestrator(actor)); Self { sender } @@ -30,32 +39,81 @@ impl OrchestratorHandle { } } -impl Default for OrchestratorHandle { - fn default() -> Self { - Self::new() - } -} - struct OrchestratorActor { mailbox: mpsc::Receiver, + config: SessionConfig, + writer: WriterHandle, + msg_seq_number: usize, } impl OrchestratorActor { - fn new(mailbox: mpsc::Receiver) -> OrchestratorActor { - Self { mailbox } + fn new( + mailbox: mpsc::Receiver, + config: SessionConfig, + writer: WriterHandle, + ) -> OrchestratorActor { + Self { + mailbox, + config, + writer, + msg_seq_number: 0, + } } - fn handle(&self, message: OrchestratorMessage) { + fn next_sequence_number(&mut self) -> usize { + self.msg_seq_number += 1; + self.msg_seq_number + } + + async fn handle(&mut self, message: OrchestratorMessage) { match message { OrchestratorMessage::FixMessageReceived(fix_message) => { debug!("received message: {}", fix_message); } + OrchestratorMessage::SendHeartbeat => { + let seq_num = self.next_sequence_number(); + let msg = heartbeat_message( + &self.config.sender_comp_id, + &self.config.target_comp_id, + seq_num, + ); + self.writer.send_raw_message(RawFixMessage::new(msg)).await; + } + OrchestratorMessage::SendLogon => { + let seq_num = self.next_sequence_number(); + let msg = logon_message( + &self.config.sender_comp_id, + &self.config.target_comp_id, + seq_num, + ); + self.writer.send_raw_message(RawFixMessage::new(msg)).await; + } } } } async fn run_orchestrator(mut actor: OrchestratorActor) { - while let Some(msg) = actor.mailbox.recv().await { - actor.handle(msg); + actor.handle(OrchestratorMessage::SendLogon).await; + + loop { + let next_message = actor.mailbox.recv(); + let next_heartbeat = + tokio::time::sleep(Duration::from_secs(actor.config.heartbeat_interval)); + tokio::pin!(next_heartbeat); + + select! { + next = next_message => { + match next { + Some(msg) => { + actor.handle(msg).await; + } + None => break, + } + } + () = &mut next_heartbeat => { + next_heartbeat.as_mut().reset(Instant::now() + Duration::from_secs(actor.config.heartbeat_interval)); + actor.handle(OrchestratorMessage::SendHeartbeat).await; + } + } } } diff --git a/hotfix/hotfix-core/src/config.rs b/hotfix/hotfix-core/src/config.rs index ace42373..57fe0a00 100644 --- a/hotfix/hotfix-core/src/config.rs +++ b/hotfix/hotfix-core/src/config.rs @@ -23,6 +23,7 @@ pub struct SessionConfig { pub ca_certificate_path: String, pub connection_host: String, pub connection_port: u16, + pub heartbeat_interval: u64, // in seconds } #[cfg(test)] diff --git a/hotfix/hotfix-core/src/message.rs b/hotfix/hotfix-core/src/message.rs index dd6447fc..527e3098 100644 --- a/hotfix/hotfix-core/src/message.rs +++ b/hotfix/hotfix-core/src/message.rs @@ -1,2 +1,4 @@ +mod common; +pub mod heartbeat; pub mod logon; pub mod parser; diff --git a/hotfix/hotfix-core/src/message/common.rs b/hotfix/hotfix-core/src/message/common.rs new file mode 100644 index 00000000..68eb91df --- /dev/null +++ b/hotfix/hotfix-core/src/message/common.rs @@ -0,0 +1,5 @@ +use fefix::TagU16; + +pub fn create_tag(t: u16) -> TagU16 { + TagU16::new(t).unwrap() +} diff --git a/hotfix/hotfix-core/src/message/heartbeat.rs b/hotfix/hotfix-core/src/message/heartbeat.rs new file mode 100644 index 00000000..091c808d --- /dev/null +++ b/hotfix/hotfix-core/src/message/heartbeat.rs @@ -0,0 +1,20 @@ +use fefix::fix_values::Timestamp; +use fefix::tagvalue::{Config, Encoder}; + +use crate::message::common::create_tag; + +pub fn heartbeat_message( + sender_comp_id: &str, + target_comp_id: &str, + msg_seq_num: usize, +) -> Vec { + let mut buffer = Vec::new(); + let mut encoder: Encoder = Encoder::default(); + let mut msg = encoder.start_message(b"FIX.4.4", &mut buffer, b"0"); + msg.set_any(create_tag(49), sender_comp_id.as_bytes()); // sender comp id + msg.set_any(create_tag(56), target_comp_id.as_bytes()); // target comp id + msg.set_any(create_tag(34), msg_seq_num); // msg sequence number + msg.set_any(create_tag(52), Timestamp::utc_now()); // sending time + + msg.wrap().to_vec() +} diff --git a/hotfix/hotfix-core/src/message/logon.rs b/hotfix/hotfix-core/src/message/logon.rs index c904fcc6..424566c8 100644 --- a/hotfix/hotfix-core/src/message/logon.rs +++ b/hotfix/hotfix-core/src/message/logon.rs @@ -1,21 +1,19 @@ +use fefix::fix_values::Timestamp; use fefix::tagvalue::{Config, Encoder}; -use fefix::TagU16; -pub fn create_login_message(sender_comp_id: &str, target_comp_id: &str) -> Vec { +use crate::message::common::create_tag; + +pub fn logon_message(sender_comp_id: &str, target_comp_id: &str, msg_seq_num: usize) -> Vec { let mut buffer = Vec::new(); let mut encoder: Encoder = Encoder::default(); let mut msg = encoder.start_message(b"FIX.4.4", &mut buffer, b"A"); msg.set_any(create_tag(49), sender_comp_id.as_bytes()); // sender comp id msg.set_any(create_tag(56), target_comp_id.as_bytes()); // target comp id - msg.set_any(create_tag(34), b"1"); // msg sequence number - msg.set_any(create_tag(52), b"20230912-08:24:56.574"); // sending time + msg.set_any(create_tag(34), msg_seq_num); // msg sequence number + msg.set_any(create_tag(52), Timestamp::utc_now()); // sending time msg.set_any(create_tag(98), b"0"); // encrypt method msg.set_any(create_tag(108), b"30"); // heartbeat interval msg.set_any(create_tag(141), b"Y"); // reset seq num flag msg.wrap().to_vec() } - -fn create_tag(t: u16) -> TagU16 { - TagU16::new(t).unwrap() -} diff --git a/hotfix/hotfix-core/src/session.rs b/hotfix/hotfix-core/src/session.rs index b2108d41..681c2c79 100644 --- a/hotfix/hotfix-core/src/session.rs +++ b/hotfix/hotfix-core/src/session.rs @@ -2,8 +2,6 @@ use crate::actors::orchestrator::OrchestratorHandle; use crate::actors::socket_reader::reader_loop; use crate::actors::socket_writer::WriterHandle; use crate::config::SessionConfig; -use crate::message::logon; -use crate::message::parser::RawFixMessage; use crate::tls_client::Client; pub struct Session { @@ -24,15 +22,10 @@ async fn establish_connection(config: SessionConfig) { let tls_client = Client::new(&config).await; let (reader, writer) = tls_client.split(); - let logon_message = logon::create_login_message(&config.sender_comp_id, &config.target_comp_id); let writer = WriterHandle::new(writer); - let orchestrator = OrchestratorHandle::new(); + let orchestrator = OrchestratorHandle::new(config, writer); let fut_reader = reader_loop(reader, orchestrator); - writer - .send_raw_message(RawFixMessage::new(logon_message)) - .await; - fut_reader.await; }