Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 71 additions & 13 deletions hotfix/hotfix-core/src/actors/orchestrator.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
use std::time::Duration;
use tokio::select;
use tokio::sync::mpsc;
use tokio::time::Instant;
use tracing::debug;

use crate::actors::socket_writer::WriterHandle;
use crate::config::SessionConfig;
use crate::message::heartbeat::heartbeat_message;
use crate::message::logon::logon_message;
use crate::message::parser::RawFixMessage;

#[derive(Clone, Debug)]
pub enum OrchestratorMessage {
FixMessageReceived(RawFixMessage),
SendHeartbeat,
SendLogon,
}

#[derive(Clone)]
Expand All @@ -14,9 +23,9 @@ pub struct OrchestratorHandle {
}

impl OrchestratorHandle {
pub fn new() -> Self {
pub fn new(config: SessionConfig, writer: WriterHandle) -> Self {
let (sender, mailbox) = mpsc::channel(10);
let actor = OrchestratorActor::new(mailbox);
let actor = OrchestratorActor::new(mailbox, config, writer);
tokio::spawn(run_orchestrator(actor));

Self { sender }
Expand All @@ -30,32 +39,81 @@ impl OrchestratorHandle {
}
}

impl Default for OrchestratorHandle {
fn default() -> Self {
Self::new()
}
}

struct OrchestratorActor {
mailbox: mpsc::Receiver<OrchestratorMessage>,
config: SessionConfig,
writer: WriterHandle,
msg_seq_number: usize,
}

impl OrchestratorActor {
fn new(mailbox: mpsc::Receiver<OrchestratorMessage>) -> OrchestratorActor {
Self { mailbox }
fn new(
mailbox: mpsc::Receiver<OrchestratorMessage>,
config: SessionConfig,
writer: WriterHandle,
) -> OrchestratorActor {
Self {
mailbox,
config,
writer,
msg_seq_number: 0,
}
}

fn handle(&self, message: OrchestratorMessage) {
fn next_sequence_number(&mut self) -> usize {
self.msg_seq_number += 1;
self.msg_seq_number
}

async fn handle(&mut self, message: OrchestratorMessage) {
match message {
OrchestratorMessage::FixMessageReceived(fix_message) => {
debug!("received message: {}", fix_message);
}
OrchestratorMessage::SendHeartbeat => {
let seq_num = self.next_sequence_number();
let msg = heartbeat_message(
&self.config.sender_comp_id,
&self.config.target_comp_id,
seq_num,
);
self.writer.send_raw_message(RawFixMessage::new(msg)).await;
}
OrchestratorMessage::SendLogon => {
let seq_num = self.next_sequence_number();
let msg = logon_message(
&self.config.sender_comp_id,
&self.config.target_comp_id,
seq_num,
);
self.writer.send_raw_message(RawFixMessage::new(msg)).await;
}
}
}
}

async fn run_orchestrator(mut actor: OrchestratorActor) {
while let Some(msg) = actor.mailbox.recv().await {
actor.handle(msg);
actor.handle(OrchestratorMessage::SendLogon).await;

loop {
let next_message = actor.mailbox.recv();
let next_heartbeat =
tokio::time::sleep(Duration::from_secs(actor.config.heartbeat_interval));
tokio::pin!(next_heartbeat);

select! {
next = next_message => {
match next {
Some(msg) => {
actor.handle(msg).await;
}
None => break,
}
}
() = &mut next_heartbeat => {
next_heartbeat.as_mut().reset(Instant::now() + Duration::from_secs(actor.config.heartbeat_interval));
actor.handle(OrchestratorMessage::SendHeartbeat).await;
}
}
}
}
1 change: 1 addition & 0 deletions hotfix/hotfix-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct SessionConfig {
pub ca_certificate_path: String,
pub connection_host: String,
pub connection_port: u16,
pub heartbeat_interval: u64, // in seconds
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions hotfix/hotfix-core/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
mod common;
pub mod heartbeat;
pub mod logon;
pub mod parser;
5 changes: 5 additions & 0 deletions hotfix/hotfix-core/src/message/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use fefix::TagU16;

pub fn create_tag(t: u16) -> TagU16 {
TagU16::new(t).unwrap()
}
20 changes: 20 additions & 0 deletions hotfix/hotfix-core/src/message/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use fefix::fix_values::Timestamp;
use fefix::tagvalue::{Config, Encoder};

use crate::message::common::create_tag;

pub fn heartbeat_message(
sender_comp_id: &str,
target_comp_id: &str,
msg_seq_num: usize,
) -> Vec<u8> {
let mut buffer = Vec::new();
let mut encoder: Encoder<Config> = Encoder::default();
let mut msg = encoder.start_message(b"FIX.4.4", &mut buffer, b"0");
msg.set_any(create_tag(49), sender_comp_id.as_bytes()); // sender comp id
msg.set_any(create_tag(56), target_comp_id.as_bytes()); // target comp id
msg.set_any(create_tag(34), msg_seq_num); // msg sequence number
msg.set_any(create_tag(52), Timestamp::utc_now()); // sending time

msg.wrap().to_vec()
}
14 changes: 6 additions & 8 deletions hotfix/hotfix-core/src/message/logon.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use fefix::fix_values::Timestamp;
use fefix::tagvalue::{Config, Encoder};
use fefix::TagU16;

pub fn create_login_message(sender_comp_id: &str, target_comp_id: &str) -> Vec<u8> {
use crate::message::common::create_tag;

pub fn logon_message(sender_comp_id: &str, target_comp_id: &str, msg_seq_num: usize) -> Vec<u8> {
let mut buffer = Vec::new();
let mut encoder: Encoder<Config> = Encoder::default();
let mut msg = encoder.start_message(b"FIX.4.4", &mut buffer, b"A");
msg.set_any(create_tag(49), sender_comp_id.as_bytes()); // sender comp id
msg.set_any(create_tag(56), target_comp_id.as_bytes()); // target comp id
msg.set_any(create_tag(34), b"1"); // msg sequence number
msg.set_any(create_tag(52), b"20230912-08:24:56.574"); // sending time
msg.set_any(create_tag(34), msg_seq_num); // msg sequence number
msg.set_any(create_tag(52), Timestamp::utc_now()); // sending time
msg.set_any(create_tag(98), b"0"); // encrypt method
msg.set_any(create_tag(108), b"30"); // heartbeat interval
msg.set_any(create_tag(141), b"Y"); // reset seq num flag

msg.wrap().to_vec()
}

fn create_tag(t: u16) -> TagU16 {
TagU16::new(t).unwrap()
}
9 changes: 1 addition & 8 deletions hotfix/hotfix-core/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use crate::actors::orchestrator::OrchestratorHandle;
use crate::actors::socket_reader::reader_loop;
use crate::actors::socket_writer::WriterHandle;
use crate::config::SessionConfig;
use crate::message::logon;
use crate::message::parser::RawFixMessage;
use crate::tls_client::Client;

pub struct Session {
Expand All @@ -24,15 +22,10 @@ async fn establish_connection(config: SessionConfig) {
let tls_client = Client::new(&config).await;

let (reader, writer) = tls_client.split();
let logon_message = logon::create_login_message(&config.sender_comp_id, &config.target_comp_id);

let writer = WriterHandle::new(writer);
let orchestrator = OrchestratorHandle::new();
let orchestrator = OrchestratorHandle::new(config, writer);
let fut_reader = reader_loop(reader, orchestrator);

writer
.send_raw_message(RawFixMessage::new(logon_message))
.await;

fut_reader.await;
}