Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
Extract parts to functions
Rename
Add some comments
`es_message::ServerMessage` to `ClientResponse`
  • Loading branch information
konsumlamm committed May 23, 2024
1 parent 763c94e commit db1f180
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 46 deletions.
106 changes: 61 additions & 45 deletions robusta/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::fs;
use std::io::Write;
use std::sync::Arc;
use std::time::Duration;

use axum::{
Expand All @@ -15,7 +16,6 @@ use axum::{
Json, Router,
};
use futures_util::SinkExt;
use kvv::LineDepartures;
use reqwest::StatusCode;
use tokio::sync::mpsc::{Receiver, Sender};
use tower::util::ServiceExt;
Expand All @@ -25,10 +25,10 @@ use tower_http::{
};
use tracing::{error, info, warn, Level};
use tracing_appender::rolling::{self, Rotation};
use unique_id::UniqueIdGen;
use ws_message::{ClientMessage, GameState, Team, TeamState};

use crate::ws_message::TeamKind;
use crate::kvv::LineDepartures;
use crate::unique_id::UniqueIdGen;
use crate::ws_message::{ClientMessage, ClientResponse, GameState, Team, TeamKind, TeamState};

mod kvv;
mod point;
Expand All @@ -54,7 +54,7 @@ enum ServerMessage {

#[derive(Debug)]
struct Client {
recv: Receiver<ws_message::ServerMessage>,
recv: Receiver<ClientResponse>,
send: Sender<InputMessage>,
id: u32,
}
Expand All @@ -63,7 +63,7 @@ struct Client {
struct ClientConnection {
id: u32,
team_id: u32,
send: Sender<ws_message::ServerMessage>,
send: Sender<ClientResponse>,
}

#[derive(Debug)]
Expand All @@ -89,17 +89,19 @@ impl AppState {
fn client(&self, id: u32) -> Option<&ClientConnection> {
self.connections.iter().find(|x| x.id == id)
}

fn client_mut(&mut self, id: u32) -> Option<&mut ClientConnection> {
self.connections.iter_mut().find(|x| x.id == id)
}

fn team_mut_by_client_id(&mut self, id: u32) -> Option<&mut TeamState> {
self.client(id)
.map(|x| x.team_id)
.and_then(|team_id| self.teams.iter_mut().find(|ts| ts.team.id == team_id))
}
}

type SharedState = std::sync::Arc<tokio::sync::Mutex<AppState>>;
type SharedState = Arc<tokio::sync::Mutex<AppState>>;

async fn handler(ws: WebSocketUpgrade, State(state): State<SharedState>) -> Response {
let (send, rec) = tokio::sync::mpsc::channel(100);
Expand Down Expand Up @@ -148,7 +150,7 @@ async fn handle_socket(socket: WebSocket, mut client: Client) {
};

if let Some(msg) = opt_msg {
if let Ok(client_msg) = serde_json::from_str::<ws_message::ClientMessage>(&msg) {
if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(&msg) {
client
.send
.send(InputMessage::Client(client_msg, client.id))
Expand Down Expand Up @@ -256,47 +258,13 @@ async fn main() {
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE)
.init();

const BINDINGS: &str = "../liberica/src/lib/bindings.ts";
const TEMP_BINDINGS: &str = "../target/bindings.ts.tmp";

specta::export::ts(TEMP_BINDINGS).unwrap();
let old = fs::read_to_string(BINDINGS).unwrap_or_default();
let new = fs::read_to_string(TEMP_BINDINGS).unwrap();

// Only update bindings if they changed to avoid triggering a recompile of the frontend
if old != new {
info!("Updating bindings");
fs::write(BINDINGS, new).unwrap();
}
update_bindings();

info!("Starting server");
kvv::init().await;

let (send, recv) = tokio::sync::mpsc::channel(100);

let mut teams = fs::read_to_string(TEAMS_FILE)
.ok()
.and_then(|x| serde_json::from_str::<Vec<TeamState>>(&x).ok())
.unwrap_or_default();

let mut state = AppState::new(send.clone());
let max_id = teams.iter().map(|ts| ts.team.id).max().unwrap_or(0);
state.team_id_gen.set_min(max_id + 1);
if !teams.iter().any(|ts| ts.team.kind == TeamKind::MrX) {
// no Mr. X present
teams.push(TeamState {
team: Team {
id: state.team_id_gen.next(),
name: MRX.to_owned(),
color: "#000000".to_owned(),
kind: TeamKind::MrX,
},
..Default::default()
});
}
state.teams = teams;

let state = std::sync::Arc::new(tokio::sync::Mutex::new(state));
let state = load_state(send.clone());

// fetch departures every 60 seconds and send them to the game logic queue
tokio::spawn(async move {
Expand Down Expand Up @@ -347,6 +315,47 @@ async fn main() {
.unwrap();
}

fn update_bindings() {
const BINDINGS: &str = "../liberica/src/lib/bindings.ts";
const TEMP_BINDINGS: &str = "../target/bindings.ts.tmp";

specta::export::ts(TEMP_BINDINGS).unwrap();
let old = fs::read_to_string(BINDINGS).unwrap_or_default();
let new = fs::read_to_string(TEMP_BINDINGS).unwrap();

// Only update bindings if they changed to avoid triggering a recompile of the frontend
if old != new {
info!("Updating bindings");
fs::write(BINDINGS, new).unwrap();
}
}

fn load_state(send: Sender<InputMessage>) -> SharedState {
let mut teams = fs::read_to_string(TEAMS_FILE)
.ok()
.and_then(|x| serde_json::from_str::<Vec<TeamState>>(&x).ok())
.unwrap_or_default();

let mut state = AppState::new(send.clone());
let max_id = teams.iter().map(|ts| ts.team.id).max().unwrap_or(0);
state.team_id_gen.set_min(max_id + 1);
if !teams.iter().any(|ts| ts.team.kind == TeamKind::MrX) {
// no Mr. X present
teams.push(TeamState {
team: Team {
id: state.team_id_gen.next(),
name: MRX.to_owned(),
color: "#000000".to_owned(),
kind: TeamKind::MrX,
},
..Default::default()
});
}
state.teams = teams;

Arc::new(tokio::sync::Mutex::new(state))
}

async fn run_game_loop(mut recv: Receiver<InputMessage>, state: SharedState) {
let mut departures = HashMap::new();
let mut log_file = rolling::Builder::new()
Expand All @@ -356,10 +365,14 @@ async fn run_game_loop(mut recv: Receiver<InputMessage>, state: SharedState) {
.max_log_files(1)
.build("logs")
.expect("failed to initialize rolling file appender");

// the time for a single frame
let mut interval = tokio::time::interval(Duration::from_millis(500));

loop {
interval.tick().await;

// handle messages
let mut state = state.lock().await;
while let Ok(msg) = recv.try_recv() {
match msg {
Expand Down Expand Up @@ -410,6 +423,7 @@ async fn run_game_loop(mut recv: Receiver<InputMessage>, state: SharedState) {
}
}

// compute train positions
let time = chrono::Utc::now();
let mut trains = kvv::train_positions(&departures, time);
trains.retain(|x| !x.line_id.contains("bus"));
Expand All @@ -424,6 +438,7 @@ async fn run_game_loop(mut recv: Receiver<InputMessage>, state: SharedState) {
}
}

// log game state
let game_state = GameState {
teams: state.teams.clone(),
trains,
Expand All @@ -437,6 +452,7 @@ async fn run_game_loop(mut recv: Receiver<InputMessage>, state: SharedState) {
.unwrap();
fs::write(TEAMS_FILE, serde_json::to_string_pretty(&game_state.teams).unwrap()).unwrap();

// send game state to clients
for connection in state.connections.iter_mut() {
let game_state = GameState {
teams: game_state
Expand All @@ -449,7 +465,7 @@ async fn run_game_loop(mut recv: Receiver<InputMessage>, state: SharedState) {
};
if let Err(err) = connection
.send
.send(ws_message::ServerMessage::GameState(game_state.clone()))
.send(ClientResponse::GameState(game_state.clone()))
.await
{
error!("failed to send game state to client {}: {}", connection.id, err);
Expand Down
2 changes: 1 addition & 1 deletion robusta/src/ws_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub enum ClientMessage {
}

#[derive(specta::Type, Clone, Serialize, Deserialize, Debug)]
pub enum ServerMessage {
pub enum ClientResponse {
GameState(GameState),
}

Expand Down

0 comments on commit db1f180

Please sign in to comment.