From 86502e9ab8c1751ef5affa2d3ff0a833098b3581 Mon Sep 17 00:00:00 2001 From: Neon Date: Wed, 31 Jul 2024 19:46:52 +0800 Subject: [PATCH 01/13] implemented a basic message queue --- gateway/Cargo.toml | 1 + gateway/src/init.rs | 5 +++ gateway/src/lib.rs | 1 + gateway/src/mq/event.rs | 58 ++++++++++++++++++++++++++++++++ gateway/src/mq/mod.rs | 2 ++ gateway/src/mq/queue.rs | 74 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 141 insertions(+) create mode 100644 gateway/src/mq/event.rs create mode 100644 gateway/src/mq/mod.rs create mode 100644 gateway/src/mq/queue.rs diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index 2ddb97e8d..73cf14cc1 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -43,3 +43,4 @@ reqwest = { workspace = true, features = ["json"] } uuid = { workspace = true, features = ["v4"] } regex = "1.10.4" ed25519-dalek = { version = "2.1.1", features = ["pkcs8"] } +crossbeam-channel = "0.5.10" diff --git a/gateway/src/init.rs b/gateway/src/init.rs index 1c5d133aa..b6ce8a987 100644 --- a/gateway/src/init.rs +++ b/gateway/src/init.rs @@ -7,3 +7,8 @@ pub async fn init_monorepo(config: Config) -> Result<(), Box Result<(), Box> { + crate::mq::queue::init_message_queue(12); + Ok(()) +} diff --git a/gateway/src/lib.rs b/gateway/src/lib.rs index d267a80d6..e48d526ac 100644 --- a/gateway/src/lib.rs +++ b/gateway/src/lib.rs @@ -6,6 +6,7 @@ pub mod init; mod lfs; pub mod relay_server; pub mod ssh_server; +pub mod mq; #[cfg(test)] mod tests {} diff --git a/gateway/src/mq/event.rs b/gateway/src/mq/event.rs new file mode 100644 index 000000000..26e81b7ec --- /dev/null +++ b/gateway/src/mq/event.rs @@ -0,0 +1,58 @@ +use std::sync::Arc; + +use serde::de::DeserializeOwned; +use async_trait::async_trait; + +use crate::api::ApiServiceState; + +pub(crate) type Message = Arc>>; + +pub(crate) enum EventType { + ApiRequestEvent, + +} + +#[async_trait] +pub trait Event: Send + Sync { + type Type: Into; + fn event_type(&self) -> Self::Type; + + async fn process(&self); + async fn done(&self); +} + +// A common event stores how to perform a async action. + +pub(crate) struct ApiRequestEvent + where T: DeserializeOwned +{ + state: ApiServiceState, + handler: fn() -> T, +} + +impl ApiRequestEvent + where T: DeserializeOwned +{ + fn new(state: ApiServiceState, handler: fn() -> T) -> Arc { + Arc::new(ApiRequestEvent { + state, + handler + }) + } +} + +#[async_trait] +impl Event for ApiRequestEvent + where T: DeserializeOwned +{ + type Type = EventType; + fn event_type(&self) -> Self::Type { EventType::ApiRequestEvent } + + async fn process(&self) { + (self.handler)(); + } + + async fn done(&self) { + todo!() + } +} diff --git a/gateway/src/mq/mod.rs b/gateway/src/mq/mod.rs new file mode 100644 index 000000000..816e33bbd --- /dev/null +++ b/gateway/src/mq/mod.rs @@ -0,0 +1,2 @@ +pub mod event; +pub mod queue; diff --git a/gateway/src/mq/queue.rs b/gateway/src/mq/queue.rs new file mode 100644 index 000000000..7a17df70e --- /dev/null +++ b/gateway/src/mq/queue.rs @@ -0,0 +1,74 @@ +use std::{borrow::Borrow, sync::Arc}; + +use crossbeam_channel::{unbounded, Receiver, Sender}; +use tokio::{ + runtime::{Builder, Runtime}, select, sync::Semaphore +}; + +use super::event::{EventType, Message}; + +pub(crate) struct MessageQueue { + sender: Sender, + receiver: Receiver, + sem: Arc, + runtime: Arc, +} + +unsafe impl Send for MessageQueue{} +unsafe impl Sync for MessageQueue{} + +impl MessageQueue { + // Should be singleton. + fn new(n_workers: usize) -> Self { + let (s, r) = unbounded::(); + let rt = Builder::new_multi_thread() + .worker_threads(n_workers) + .build() + .unwrap(); + + MessageQueue { + sender: s.to_owned(), + receiver: r.to_owned(), + sem: Arc::new(Semaphore::new(n_workers)), + runtime: Arc::new(rt), + } + } + + fn start(&self) { + let receiver = self.receiver.clone(); + // let sem = self.sem.clone(); + let rt = self.runtime.clone(); + + tokio::spawn(async move { + loop { + match receiver.recv() { + Ok(evt) => { + rt.spawn(async move { + let evt = evt.clone(); + evt.process().await; + evt.done().await; + }); + }, + Err(e) => { + // Should not error here. + panic!("Event Loop Panic: {e}"); + } + } + } + }); + } + + pub fn enqueue(&self, msg: Message) { + match self.sender.send(msg) { + Ok(()) => {} + Err(_) => {} + } + } +} + +pub(crate) fn init_message_queue(n_workers: usize) -> MessageQueue { + let mq = MessageQueue::new(n_workers); + mq.start(); + + mq +} From 4ef444e5ece6253a93c4cc7891b8cb204f70eed8 Mon Sep 17 00:00:00 2001 From: Neon Date: Thu, 1 Aug 2024 21:19:16 +0800 Subject: [PATCH 02/13] add rest api events --- gateway/src/api/api_router.rs | 8 +++- gateway/src/api/mr_router.rs | 8 +++- gateway/src/init.rs | 5 -- gateway/src/mq/event.rs | 86 +++++++++++++++++++---------------- gateway/src/mq/queue.rs | 45 +++++++++--------- 5 files changed, 84 insertions(+), 68 deletions(-) diff --git a/gateway/src/api/api_router.rs b/gateway/src/api/api_router.rs index cb16c26eb..afe423f5d 100644 --- a/gateway/src/api/api_router.rs +++ b/gateway/src/api/api_router.rs @@ -14,7 +14,7 @@ use ceres::model::{ }; use common::model::CommonResult; -use crate::api::mr_router; +use crate::{api::mr_router, mq::event::{ApiRequestEvent, ApiType}}; use crate::api::ApiServiceState; pub fn routers() -> Router { @@ -34,6 +34,7 @@ async fn get_blob_object( Query(query): Query, state: State, ) -> Result>, (StatusCode, String)> { + ApiRequestEvent::notice(ApiType::Blob, &state); let res = state .api_handler(query.path.clone().into()) .await @@ -75,6 +76,7 @@ async fn create_file( state: State, Json(json): Json, ) -> Result>, (StatusCode, String)> { + ApiRequestEvent::notice(ApiType::CreateFile, &state); let res = state .api_handler(json.path.clone().into()) .await @@ -91,6 +93,7 @@ async fn get_latest_commit( Query(query): Query, state: State, ) -> Result, (StatusCode, String)> { + ApiRequestEvent::notice(ApiType::LastestCommit, &state); let res = state .api_handler(query.path.clone().into()) .await @@ -104,6 +107,7 @@ async fn get_tree_info( Query(query): Query, state: State, ) -> Result>>, (StatusCode, String)> { + ApiRequestEvent::notice(ApiType::TreeInfo, &state); let res = state .api_handler(query.path.clone().into()) .await @@ -120,6 +124,7 @@ async fn get_tree_commit_info( Query(query): Query, state: State, ) -> Result>>, (StatusCode, String)> { + ApiRequestEvent::notice(ApiType::CommitInfo, &state); let res = state .api_handler(query.path.clone().into()) .await @@ -136,6 +141,7 @@ async fn publish_path_to_repo( state: State, Json(json): Json, ) -> Result>, (StatusCode, String)> { + ApiRequestEvent::notice(ApiType::Publish, &state); let res = state .api_handler(json.path.clone().into()) .await diff --git a/gateway/src/api/mr_router.rs b/gateway/src/api/mr_router.rs index 35d8c9f26..ca9141452 100644 --- a/gateway/src/api/mr_router.rs +++ b/gateway/src/api/mr_router.rs @@ -10,7 +10,7 @@ use axum::{ use ceres::model::mr::{MRDetail, MrInfoItem}; use common::model::CommonResult; -use crate::api::ApiServiceState; +use crate::{api::ApiServiceState, mq::event::{ApiRequestEvent, ApiType}}; pub fn routers() -> Router { Router::new() @@ -24,11 +24,14 @@ async fn merge( Path(mr_id): Path, state: State, ) -> Result>, (StatusCode, String)> { + ApiRequestEvent::notice(ApiType::MergeRequest, &state); + let res = state.monorepo().merge_mr(mr_id).await; let res = match res { Ok(_) => CommonResult::success(None), Err(err) => CommonResult::failed(&err.to_string()), }; + ApiRequestEvent::notice(ApiType::MergeDone, &state); Ok(Json(res)) } @@ -36,6 +39,7 @@ async fn get_mr_list( Query(query): Query>, state: State, ) -> Result>>, (StatusCode, String)> { + ApiRequestEvent::notice(ApiType::MergeList, &state); let status = query.get("status").unwrap(); let res = state.monorepo().mr_list(status).await; let res = match res { @@ -49,6 +53,7 @@ async fn mr_detail( Path(mr_id): Path, state: State, ) -> Result>>, (StatusCode, String)> { + ApiRequestEvent::notice(ApiType::MergeDetail, &state); let res = state.monorepo().mr_detail(mr_id).await; let res = match res { Ok(data) => CommonResult::success(Some(data)), @@ -61,6 +66,7 @@ async fn get_mr_files( Path(mr_id): Path, state: State, ) -> Result>>, (StatusCode, String)> { + ApiRequestEvent::notice(ApiType::MergeFiles, &state); let res = state.monorepo().mr_tree_files(mr_id).await; let res = match res { Ok(data) => CommonResult::success(Some(data)), diff --git a/gateway/src/init.rs b/gateway/src/init.rs index b6ce8a987..1c5d133aa 100644 --- a/gateway/src/init.rs +++ b/gateway/src/init.rs @@ -7,8 +7,3 @@ pub async fn init_monorepo(config: Config) -> Result<(), Box Result<(), Box> { - crate::mq::queue::init_message_queue(12); - Ok(()) -} diff --git a/gateway/src/mq/event.rs b/gateway/src/mq/event.rs index 26e81b7ec..3bca0a08b 100644 --- a/gateway/src/mq/event.rs +++ b/gateway/src/mq/event.rs @@ -1,58 +1,68 @@ -use std::sync::Arc; - -use serde::de::DeserializeOwned; -use async_trait::async_trait; +use axum::extract::State; use crate::api::ApiServiceState; -pub(crate) type Message = Arc>>; +use super::queue::get_mq; -pub(crate) enum EventType { - ApiRequestEvent, +pub(crate) type Message = Event; +pub enum Event { + Api(ApiRequestEvent), } -#[async_trait] -pub trait Event: Send + Sync { - type Type: Into; - fn event_type(&self) -> Self::Type; +#[derive(Debug)] +pub enum ApiType { + // Common Api enum for api_routers + CreateFile, + LastestCommit, + CommitInfo, + TreeInfo, + Blob, + Publish, - async fn process(&self); - async fn done(&self); + // Merge Api enum for mr_routers + MergeRequest, + MergeDone, + MergeList, + MergeDetail, + MergeFiles, } -// A common event stores how to perform a async action. +// pub trait EventBase: Send + Sync { +// type Type: Into; +// fn event_type(&self) -> Self::Type; -pub(crate) struct ApiRequestEvent - where T: DeserializeOwned -{ - state: ApiServiceState, - handler: fn() -> T, -} +// // async fn process(&self); +// } + +impl std::fmt::Display for Event { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + crate::mq::event::Event::Api(evt) => write!(f, "{}", evt), -impl ApiRequestEvent - where T: DeserializeOwned -{ - fn new(state: ApiServiceState, handler: fn() -> T) -> Arc { - Arc::new(ApiRequestEvent { - state, - handler - }) + #[allow(unreachable_patterns)] + _ => write!(f, "Unknown Event Type") + } } } -#[async_trait] -impl Event for ApiRequestEvent - where T: DeserializeOwned -{ - type Type = EventType; - fn event_type(&self) -> Self::Type { EventType::ApiRequestEvent } +pub struct ApiRequestEvent { + pub api: ApiType, + pub state: State, +} - async fn process(&self) { - (self.handler)(); +impl std::fmt::Display for ApiRequestEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Api Request Event: {:?}", self.api) } +} - async fn done(&self) { - todo!() +impl ApiRequestEvent { + // Create and enqueue this event. + pub fn notice(api: ApiType, state: &State) { + get_mq().send(Event::Api(ApiRequestEvent { + api, + state: state.clone() + })); } } diff --git a/gateway/src/mq/queue.rs b/gateway/src/mq/queue.rs index 7a17df70e..9e77949f7 100644 --- a/gateway/src/mq/queue.rs +++ b/gateway/src/mq/queue.rs @@ -1,16 +1,27 @@ -use std::{borrow::Borrow, sync::Arc}; +use std::sync::{Arc, OnceLock}; -use crossbeam_channel::{unbounded, Receiver, Sender}; -use tokio::{ - runtime::{Builder, Runtime}, select, sync::Semaphore -}; +use crossbeam_channel::{unbounded, Sender}; +use crossbeam_channel::Receiver; +use tokio::runtime::{Builder, Runtime}; -use super::event::{EventType, Message}; +use super::event::Message; + +// Lazy initialized static MessageQueue instance. +pub(crate) fn get_mq() -> &'static MessageQueue { + static MQ: OnceLock = OnceLock::new(); + MQ.get_or_init(|| { + // FIXME: Temp value + let mq = MessageQueue::new(12); + mq.start(); + + mq + }) +} pub(crate) struct MessageQueue { sender: Sender, receiver: Receiver, - sem: Arc, + // sem: Arc, runtime: Arc, } @@ -29,7 +40,7 @@ impl MessageQueue { MessageQueue { sender: s.to_owned(), receiver: r.to_owned(), - sem: Arc::new(Semaphore::new(n_workers)), + // sem: Arc::new(Semaphore::new(n_workers)), runtime: Arc::new(rt), } } @@ -44,9 +55,7 @@ impl MessageQueue { match receiver.recv() { Ok(evt) => { rt.spawn(async move { - let evt = evt.clone(); - evt.process().await; - evt.done().await; + tracing::info!("{}", evt); }); }, Err(e) => { @@ -58,17 +67,7 @@ impl MessageQueue { }); } - pub fn enqueue(&self, msg: Message) { - match self.sender.send(msg) { - Ok(()) => {} - Err(_) => {} - } + pub fn send(&self, msg: Message) { + let _ = self.sender.send(msg); } } - -pub(crate) fn init_message_queue(n_workers: usize) -> MessageQueue { - let mq = MessageQueue::new(n_workers); - mq.start(); - - mq -} From 674c3addd4cb32e1187dbde9b4dc4777edd0250e Mon Sep 17 00:00:00 2001 From: Neon Date: Tue, 6 Aug 2024 09:55:12 +0800 Subject: [PATCH 03/13] make mq a new module --- Cargo.toml | 2 ++ gateway/Cargo.toml | 4 ++-- gateway/src/api/api_router.rs | 3 ++- gateway/src/{mq => api}/event.rs | 32 +++++--------------------- gateway/src/api/mod.rs | 1 + gateway/src/api/mr_router.rs | 3 ++- gateway/src/lib.rs | 1 - mq/Cargo.toml | 14 +++++++++++ mq/src/event.rs | 18 +++++++++++++++ gateway/src/mq/mod.rs => mq/src/lib.rs | 0 {gateway/src/mq => mq/src}/queue.rs | 4 ++-- 11 files changed, 49 insertions(+), 33 deletions(-) rename gateway/src/{mq => api}/event.rs (56%) create mode 100644 mq/Cargo.toml create mode 100644 mq/src/event.rs rename gateway/src/mq/mod.rs => mq/src/lib.rs (100%) rename {gateway/src/mq => mq/src}/queue.rs (95%) diff --git a/Cargo.toml b/Cargo.toml index 7ae589ac9..f27b0532c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ members = [ "vault", "neptune", "saturn", + "mq", "lunar/src-tauri", ] default-members = ["mega", "libra"] @@ -29,6 +30,7 @@ gemini = { path = "gemini" } vault = { path = "vault" } neptune = { path = "neptune" } saturn = { path = "saturn" } +mq = { path = "mq" } mega = { path = "mega" } anyhow = "1.0.86" serde = "1.0.203" diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index 73cf14cc1..ffd776044 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -18,6 +18,7 @@ gemini = { workspace = true } vault = { workspace = true } venus = { workspace = true } mercury = { workspace = true } +mq = { workspace = true } anyhow = { workspace = true } axum = { workspace = true } axum-server = { version = "0.6", features = ["tls-rustls"] } @@ -37,10 +38,9 @@ tower-http = { workspace = true, features = [ "trace", "decompression-full", ] } -axum-extra = { workspace = true, features = ["typed-header"]} +axum-extra = { workspace = true, features = ["typed-header"] } tokio = { workspace = true, features = ["net"] } reqwest = { workspace = true, features = ["json"] } uuid = { workspace = true, features = ["v4"] } regex = "1.10.4" ed25519-dalek = { version = "2.1.1", features = ["pkcs8"] } -crossbeam-channel = "0.5.10" diff --git a/gateway/src/api/api_router.rs b/gateway/src/api/api_router.rs index afe423f5d..c4815b7e6 100644 --- a/gateway/src/api/api_router.rs +++ b/gateway/src/api/api_router.rs @@ -6,6 +6,7 @@ use axum::{ Json, Router, }; +use super::event::{ApiRequestEvent, ApiType}; use ceres::model::{ create_file::CreateFileInfo, publish_path::PublishPathInfo, @@ -14,7 +15,7 @@ use ceres::model::{ }; use common::model::CommonResult; -use crate::{api::mr_router, mq::event::{ApiRequestEvent, ApiType}}; +use crate::api::mr_router; use crate::api::ApiServiceState; pub fn routers() -> Router { diff --git a/gateway/src/mq/event.rs b/gateway/src/api/event.rs similarity index 56% rename from gateway/src/mq/event.rs rename to gateway/src/api/event.rs index 3bca0a08b..c1899487a 100644 --- a/gateway/src/mq/event.rs +++ b/gateway/src/api/event.rs @@ -2,13 +2,7 @@ use axum::extract::State; use crate::api::ApiServiceState; -use super::queue::get_mq; - -pub(crate) type Message = Event; - -pub enum Event { - Api(ApiRequestEvent), -} +use mq::{event::EventBase, queue::get_mq}; #[derive(Debug)] pub enum ApiType { @@ -28,24 +22,6 @@ pub enum ApiType { MergeFiles, } -// pub trait EventBase: Send + Sync { -// type Type: Into; -// fn event_type(&self) -> Self::Type; - -// // async fn process(&self); -// } - -impl std::fmt::Display for Event { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - crate::mq::event::Event::Api(evt) => write!(f, "{}", evt), - - #[allow(unreachable_patterns)] - _ => write!(f, "Unknown Event Type") - } - } -} - pub struct ApiRequestEvent { pub api: ApiType, pub state: State, @@ -57,10 +33,14 @@ impl std::fmt::Display for ApiRequestEvent { } } +impl EventBase for ApiRequestEvent { + +} + impl ApiRequestEvent { // Create and enqueue this event. pub fn notice(api: ApiType, state: &State) { - get_mq().send(Event::Api(ApiRequestEvent { + get_mq().send(Box::new(ApiRequestEvent { api, state: state.clone() })); diff --git a/gateway/src/api/mod.rs b/gateway/src/api/mod.rs index a47fa161e..e301d33f9 100644 --- a/gateway/src/api/mod.rs +++ b/gateway/src/api/mod.rs @@ -8,6 +8,7 @@ use venus::import_repo::repo::Repo; pub mod api_router; pub mod mr_router; +pub mod event; pub mod oauth; #[derive(Clone)] diff --git a/gateway/src/api/mr_router.rs b/gateway/src/api/mr_router.rs index ca9141452..4ddf379d6 100644 --- a/gateway/src/api/mr_router.rs +++ b/gateway/src/api/mr_router.rs @@ -10,7 +10,8 @@ use axum::{ use ceres::model::mr::{MRDetail, MrInfoItem}; use common::model::CommonResult; -use crate::{api::ApiServiceState, mq::event::{ApiRequestEvent, ApiType}}; +use super::event::{ApiRequestEvent, ApiType}; +use crate::api::ApiServiceState; pub fn routers() -> Router { Router::new() diff --git a/gateway/src/lib.rs b/gateway/src/lib.rs index e48d526ac..d267a80d6 100644 --- a/gateway/src/lib.rs +++ b/gateway/src/lib.rs @@ -6,7 +6,6 @@ pub mod init; mod lfs; pub mod relay_server; pub mod ssh_server; -pub mod mq; #[cfg(test)] mod tests {} diff --git a/mq/Cargo.toml b/mq/Cargo.toml new file mode 100644 index 000000000..cfd3f028e --- /dev/null +++ b/mq/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "mq" +version = "0.1.0" +edition = "2021" + +[lib] +name = "mq" +path = "src/lib.rs" + +[dependencies] +axum = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +crossbeam-channel = "0.5.10" diff --git a/mq/src/event.rs b/mq/src/event.rs new file mode 100644 index 000000000..01b3f8338 --- /dev/null +++ b/mq/src/event.rs @@ -0,0 +1,18 @@ +use std::any::Any; + +pub(crate) type Message = Box; + +pub trait EventBase: Send + Sync + Any + std::fmt::Display { + // async fn process(&self); +} + +// impl std::fmt::Display for Event { +// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +// match self { +// Event::Api(evt) => write!(f, "{}", evt), + +// #[allow(unreachable_patterns)] +// _ => write!(f, "Unknown Event Type") +// } +// } +// } diff --git a/gateway/src/mq/mod.rs b/mq/src/lib.rs similarity index 100% rename from gateway/src/mq/mod.rs rename to mq/src/lib.rs diff --git a/gateway/src/mq/queue.rs b/mq/src/queue.rs similarity index 95% rename from gateway/src/mq/queue.rs rename to mq/src/queue.rs index 9e77949f7..df331f78e 100644 --- a/gateway/src/mq/queue.rs +++ b/mq/src/queue.rs @@ -7,7 +7,7 @@ use tokio::runtime::{Builder, Runtime}; use super::event::Message; // Lazy initialized static MessageQueue instance. -pub(crate) fn get_mq() -> &'static MessageQueue { +pub fn get_mq() -> &'static MessageQueue { static MQ: OnceLock = OnceLock::new(); MQ.get_or_init(|| { // FIXME: Temp value @@ -18,7 +18,7 @@ pub(crate) fn get_mq() -> &'static MessageQueue { }) } -pub(crate) struct MessageQueue { +pub struct MessageQueue { sender: Sender, receiver: Receiver, // sem: Arc, From c9ed18b69141f675d222cb08595ad417f83759fa Mon Sep 17 00:00:00 2001 From: Neon Date: Tue, 6 Aug 2024 20:59:40 +0800 Subject: [PATCH 04/13] basic database support for messages --- gateway/src/api/api_router.rs | 12 ++++++------ gateway/src/api/event.rs | 2 +- gateway/src/api/mr_router.rs | 10 +++++----- jupiter/Cargo.toml | 1 + jupiter/callisto/src/lib.rs | 1 + jupiter/callisto/src/mq_storage.rs | 19 +++++++++++++++++++ jupiter/src/storage/mod.rs | 1 + jupiter/src/storage/mq_storage.rs | 28 ++++++++++++++++++++++++++++ mq/src/cache.rs | 10 ++++++++++ mq/src/event.rs | 2 +- mq/src/lib.rs | 1 + sql/postgres/pg_20240205__init.sql | 9 ++++++++- sql/sqlite/sqlite_20240711_init.sql | 9 ++++++++- 13 files changed, 90 insertions(+), 15 deletions(-) create mode 100644 jupiter/callisto/src/mq_storage.rs create mode 100644 jupiter/src/storage/mq_storage.rs create mode 100644 mq/src/cache.rs diff --git a/gateway/src/api/api_router.rs b/gateway/src/api/api_router.rs index c4815b7e6..5ea99e4e2 100644 --- a/gateway/src/api/api_router.rs +++ b/gateway/src/api/api_router.rs @@ -35,7 +35,7 @@ async fn get_blob_object( Query(query): Query, state: State, ) -> Result>, (StatusCode, String)> { - ApiRequestEvent::notice(ApiType::Blob, &state); + ApiRequestEvent::notify(ApiType::Blob, &state); let res = state .api_handler(query.path.clone().into()) .await @@ -77,7 +77,7 @@ async fn create_file( state: State, Json(json): Json, ) -> Result>, (StatusCode, String)> { - ApiRequestEvent::notice(ApiType::CreateFile, &state); + ApiRequestEvent::notify(ApiType::CreateFile, &state); let res = state .api_handler(json.path.clone().into()) .await @@ -94,7 +94,7 @@ async fn get_latest_commit( Query(query): Query, state: State, ) -> Result, (StatusCode, String)> { - ApiRequestEvent::notice(ApiType::LastestCommit, &state); + ApiRequestEvent::notify(ApiType::LastestCommit, &state); let res = state .api_handler(query.path.clone().into()) .await @@ -108,7 +108,7 @@ async fn get_tree_info( Query(query): Query, state: State, ) -> Result>>, (StatusCode, String)> { - ApiRequestEvent::notice(ApiType::TreeInfo, &state); + ApiRequestEvent::notify(ApiType::TreeInfo, &state); let res = state .api_handler(query.path.clone().into()) .await @@ -125,7 +125,7 @@ async fn get_tree_commit_info( Query(query): Query, state: State, ) -> Result>>, (StatusCode, String)> { - ApiRequestEvent::notice(ApiType::CommitInfo, &state); + ApiRequestEvent::notify(ApiType::CommitInfo, &state); let res = state .api_handler(query.path.clone().into()) .await @@ -142,7 +142,7 @@ async fn publish_path_to_repo( state: State, Json(json): Json, ) -> Result>, (StatusCode, String)> { - ApiRequestEvent::notice(ApiType::Publish, &state); + ApiRequestEvent::notify(ApiType::Publish, &state); let res = state .api_handler(json.path.clone().into()) .await diff --git a/gateway/src/api/event.rs b/gateway/src/api/event.rs index c1899487a..66f7f6f24 100644 --- a/gateway/src/api/event.rs +++ b/gateway/src/api/event.rs @@ -39,7 +39,7 @@ impl EventBase for ApiRequestEvent { impl ApiRequestEvent { // Create and enqueue this event. - pub fn notice(api: ApiType, state: &State) { + pub fn notify(api: ApiType, state: &State) { get_mq().send(Box::new(ApiRequestEvent { api, state: state.clone() diff --git a/gateway/src/api/mr_router.rs b/gateway/src/api/mr_router.rs index 4ddf379d6..dc0e2c332 100644 --- a/gateway/src/api/mr_router.rs +++ b/gateway/src/api/mr_router.rs @@ -25,14 +25,14 @@ async fn merge( Path(mr_id): Path, state: State, ) -> Result>, (StatusCode, String)> { - ApiRequestEvent::notice(ApiType::MergeRequest, &state); + ApiRequestEvent::notify(ApiType::MergeRequest, &state); let res = state.monorepo().merge_mr(mr_id).await; let res = match res { Ok(_) => CommonResult::success(None), Err(err) => CommonResult::failed(&err.to_string()), }; - ApiRequestEvent::notice(ApiType::MergeDone, &state); + ApiRequestEvent::notify(ApiType::MergeDone, &state); Ok(Json(res)) } @@ -40,7 +40,7 @@ async fn get_mr_list( Query(query): Query>, state: State, ) -> Result>>, (StatusCode, String)> { - ApiRequestEvent::notice(ApiType::MergeList, &state); + ApiRequestEvent::notify(ApiType::MergeList, &state); let status = query.get("status").unwrap(); let res = state.monorepo().mr_list(status).await; let res = match res { @@ -54,7 +54,7 @@ async fn mr_detail( Path(mr_id): Path, state: State, ) -> Result>>, (StatusCode, String)> { - ApiRequestEvent::notice(ApiType::MergeDetail, &state); + ApiRequestEvent::notify(ApiType::MergeDetail, &state); let res = state.monorepo().mr_detail(mr_id).await; let res = match res { Ok(data) => CommonResult::success(Some(data)), @@ -67,7 +67,7 @@ async fn get_mr_files( Path(mr_id): Path, state: State, ) -> Result>>, (StatusCode, String)> { - ApiRequestEvent::notice(ApiType::MergeFiles, &state); + ApiRequestEvent::notify(ApiType::MergeFiles, &state); let res = state.monorepo().mr_tree_files(mr_id).await; let res = match res { Ok(data) => CommonResult::success(Some(data)), diff --git a/jupiter/Cargo.toml b/jupiter/Cargo.toml index 3449a4bf9..2147c3d92 100644 --- a/jupiter/Cargo.toml +++ b/jupiter/Cargo.toml @@ -15,6 +15,7 @@ callisto = { workspace = true } common = { workspace = true } venus = { workspace = true } mercury = { workspace = true } +mq = { workspace = true } sea-orm = { workspace = true, features = [ "sqlx-postgres", "sqlx-mysql", diff --git a/jupiter/callisto/src/lib.rs b/jupiter/callisto/src/lib.rs index 445fd8195..e79632766 100644 --- a/jupiter/callisto/src/lib.rs +++ b/jupiter/callisto/src/lib.rs @@ -26,3 +26,4 @@ pub mod mega_tree; pub mod raw_blob; pub mod ztm_node; pub mod ztm_repo_info; +pub mod mq_storage; diff --git a/jupiter/callisto/src/mq_storage.rs b/jupiter/callisto/src/mq_storage.rs new file mode 100644 index 000000000..26da9ced7 --- /dev/null +++ b/jupiter/callisto/src/mq_storage.rs @@ -0,0 +1,19 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "mq_storage")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub id: i64, + pub category: String, + pub create_time: DateTime, + #[sea_orm(column_type = "Text", nullable)] + pub commit: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/jupiter/src/storage/mod.rs b/jupiter/src/storage/mod.rs index 204e4a9fa..59ac5d8f7 100644 --- a/jupiter/src/storage/mod.rs +++ b/jupiter/src/storage/mod.rs @@ -3,6 +3,7 @@ pub mod git_fs_storage; pub mod init; pub mod lfs_storage; pub mod mega_storage; +pub mod mq_storage; pub mod ztm_storage; use async_trait::async_trait; diff --git a/jupiter/src/storage/mq_storage.rs b/jupiter/src/storage/mq_storage.rs new file mode 100644 index 000000000..f9ed84a97 --- /dev/null +++ b/jupiter/src/storage/mq_storage.rs @@ -0,0 +1,28 @@ +use std::sync::Arc; + +use callisto::mq_storage; +use sea_orm::{DatabaseConnection, EntityTrait, InsertResult, IntoActiveModel, Set}; + + +#[derive(Clone)] +pub struct MQStorage { + pub connection: Arc, +} + +impl MQStorage { + pub fn get_connection(&self) -> &DatabaseConnection { + &self.connection + } + + pub async fn new(connection: Arc) -> Self { + MQStorage { connection } + } + + pub fn mock() -> Self { + MQStorage { + connection: Arc::new(DatabaseConnection::default()), + } + } + + +} diff --git a/mq/src/cache.rs b/mq/src/cache.rs new file mode 100644 index 000000000..c3e5fefa6 --- /dev/null +++ b/mq/src/cache.rs @@ -0,0 +1,10 @@ +use std::collections::{VecDeque}; + +use crate::event::Message; + +pub struct EventCache { + inner: VecDeque, + flushed: bool, + flusher_handle: i64, + +} diff --git a/mq/src/event.rs b/mq/src/event.rs index 01b3f8338..c2feef576 100644 --- a/mq/src/event.rs +++ b/mq/src/event.rs @@ -1,6 +1,6 @@ use std::any::Any; -pub(crate) type Message = Box; +pub type Message = Box; pub trait EventBase: Send + Sync + Any + std::fmt::Display { // async fn process(&self); diff --git a/mq/src/lib.rs b/mq/src/lib.rs index 816e33bbd..0a0833701 100644 --- a/mq/src/lib.rs +++ b/mq/src/lib.rs @@ -1,2 +1,3 @@ pub mod event; pub mod queue; +pub mod cache; diff --git a/sql/postgres/pg_20240205__init.sql b/sql/postgres/pg_20240205__init.sql index 0d73841cf..591646e67 100644 --- a/sql/postgres/pg_20240205__init.sql +++ b/sql/postgres/pg_20240205__init.sql @@ -243,4 +243,11 @@ CREATE TABLE IF NOT EXISTS "ztm_repo_info" ( "origin" VARCHAR(64), "update_time" BIGINT NOT NULL, "commit" VARCHAR(64) -); \ No newline at end of file +); + +CREATE TABLE IF NOT EXISTS "mq_storage" ( + "id" BIGINT PRIMARY KEY, + "category" VARCHAR(64), + "create_time" TIMESTAMP NOT NULL, + "content" TEXT +); diff --git a/sql/sqlite/sqlite_20240711_init.sql b/sql/sqlite/sqlite_20240711_init.sql index 106e88992..fa8df4d75 100644 --- a/sql/sqlite/sqlite_20240711_init.sql +++ b/sql/sqlite/sqlite_20240711_init.sql @@ -243,4 +243,11 @@ CREATE TABLE IF NOT EXISTS "ztm_repo_info" ( "origin" TEXT, "update_time" INTEGER NOT NULL, "commit" TEXT -); \ No newline at end of file +); + +CREATE TABLE IF NOT EXISTS "mq_storage" ( + "id" INTEGER PRIMARY KEY, + "category" TEXT, + "create_time" TIMESTAMP NOT NULL, + "content" TEXT +); From 675880646c46ed8e1050c782301864c1c4c71866 Mon Sep 17 00:00:00 2001 From: Neon Date: Wed, 7 Aug 2024 20:32:29 +0800 Subject: [PATCH 05/13] add type conversion and fix type errors --- gateway/src/api/api_router.rs | 14 +++--- gateway/src/api/event.rs | 48 ------------------ gateway/src/api/mod.rs | 1 - gateway/src/api/mr_router.rs | 12 ++--- jupiter/callisto/src/mq_storage.rs | 2 +- jupiter/src/storage/mq_storage.rs | 7 +++ mq/Cargo.toml | 6 +++ mq/src/cache.rs | 38 ++++++++++++-- mq/src/event.rs | 18 ------- mq/src/event/api_request.rs | 81 ++++++++++++++++++++++++++++++ mq/src/event/mod.rs | 45 +++++++++++++++++ mq/src/queue.rs | 11 ++-- 12 files changed, 194 insertions(+), 89 deletions(-) delete mode 100644 gateway/src/api/event.rs delete mode 100644 mq/src/event.rs create mode 100644 mq/src/event/api_request.rs create mode 100644 mq/src/event/mod.rs diff --git a/gateway/src/api/api_router.rs b/gateway/src/api/api_router.rs index 5ea99e4e2..d052b58ac 100644 --- a/gateway/src/api/api_router.rs +++ b/gateway/src/api/api_router.rs @@ -6,7 +6,7 @@ use axum::{ Json, Router, }; -use super::event::{ApiRequestEvent, ApiType}; +use mq::event::api_request::{ApiRequestEvent, ApiType}; use ceres::model::{ create_file::CreateFileInfo, publish_path::PublishPathInfo, @@ -35,7 +35,7 @@ async fn get_blob_object( Query(query): Query, state: State, ) -> Result>, (StatusCode, String)> { - ApiRequestEvent::notify(ApiType::Blob, &state); + ApiRequestEvent::notify(ApiType::Blob, &state.0.context.config); let res = state .api_handler(query.path.clone().into()) .await @@ -77,7 +77,7 @@ async fn create_file( state: State, Json(json): Json, ) -> Result>, (StatusCode, String)> { - ApiRequestEvent::notify(ApiType::CreateFile, &state); + ApiRequestEvent::notify(ApiType::CreateFile, &state.0.context.config); let res = state .api_handler(json.path.clone().into()) .await @@ -94,7 +94,7 @@ async fn get_latest_commit( Query(query): Query, state: State, ) -> Result, (StatusCode, String)> { - ApiRequestEvent::notify(ApiType::LastestCommit, &state); + ApiRequestEvent::notify(ApiType::LastestCommit, &state.0.context.config); let res = state .api_handler(query.path.clone().into()) .await @@ -108,7 +108,7 @@ async fn get_tree_info( Query(query): Query, state: State, ) -> Result>>, (StatusCode, String)> { - ApiRequestEvent::notify(ApiType::TreeInfo, &state); + ApiRequestEvent::notify(ApiType::TreeInfo, &state.0.context.config); let res = state .api_handler(query.path.clone().into()) .await @@ -125,7 +125,7 @@ async fn get_tree_commit_info( Query(query): Query, state: State, ) -> Result>>, (StatusCode, String)> { - ApiRequestEvent::notify(ApiType::CommitInfo, &state); + ApiRequestEvent::notify(ApiType::CommitInfo, &state.0.context.config); let res = state .api_handler(query.path.clone().into()) .await @@ -142,7 +142,7 @@ async fn publish_path_to_repo( state: State, Json(json): Json, ) -> Result>, (StatusCode, String)> { - ApiRequestEvent::notify(ApiType::Publish, &state); + ApiRequestEvent::notify(ApiType::Publish, &state.0.context.config); let res = state .api_handler(json.path.clone().into()) .await diff --git a/gateway/src/api/event.rs b/gateway/src/api/event.rs deleted file mode 100644 index 66f7f6f24..000000000 --- a/gateway/src/api/event.rs +++ /dev/null @@ -1,48 +0,0 @@ -use axum::extract::State; - -use crate::api::ApiServiceState; - -use mq::{event::EventBase, queue::get_mq}; - -#[derive(Debug)] -pub enum ApiType { - // Common Api enum for api_routers - CreateFile, - LastestCommit, - CommitInfo, - TreeInfo, - Blob, - Publish, - - // Merge Api enum for mr_routers - MergeRequest, - MergeDone, - MergeList, - MergeDetail, - MergeFiles, -} - -pub struct ApiRequestEvent { - pub api: ApiType, - pub state: State, -} - -impl std::fmt::Display for ApiRequestEvent { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Api Request Event: {:?}", self.api) - } -} - -impl EventBase for ApiRequestEvent { - -} - -impl ApiRequestEvent { - // Create and enqueue this event. - pub fn notify(api: ApiType, state: &State) { - get_mq().send(Box::new(ApiRequestEvent { - api, - state: state.clone() - })); - } -} diff --git a/gateway/src/api/mod.rs b/gateway/src/api/mod.rs index e301d33f9..a47fa161e 100644 --- a/gateway/src/api/mod.rs +++ b/gateway/src/api/mod.rs @@ -8,7 +8,6 @@ use venus::import_repo::repo::Repo; pub mod api_router; pub mod mr_router; -pub mod event; pub mod oauth; #[derive(Clone)] diff --git a/gateway/src/api/mr_router.rs b/gateway/src/api/mr_router.rs index dc0e2c332..ffda2db57 100644 --- a/gateway/src/api/mr_router.rs +++ b/gateway/src/api/mr_router.rs @@ -10,7 +10,7 @@ use axum::{ use ceres::model::mr::{MRDetail, MrInfoItem}; use common::model::CommonResult; -use super::event::{ApiRequestEvent, ApiType}; +use mq::event::api_request::{ApiRequestEvent, ApiType}; use crate::api::ApiServiceState; pub fn routers() -> Router { @@ -25,14 +25,14 @@ async fn merge( Path(mr_id): Path, state: State, ) -> Result>, (StatusCode, String)> { - ApiRequestEvent::notify(ApiType::MergeRequest, &state); + ApiRequestEvent::notify(ApiType::MergeRequest, &state.0.context.config); let res = state.monorepo().merge_mr(mr_id).await; let res = match res { Ok(_) => CommonResult::success(None), Err(err) => CommonResult::failed(&err.to_string()), }; - ApiRequestEvent::notify(ApiType::MergeDone, &state); + ApiRequestEvent::notify(ApiType::MergeDone, &state.0.context.config); Ok(Json(res)) } @@ -40,7 +40,7 @@ async fn get_mr_list( Query(query): Query>, state: State, ) -> Result>>, (StatusCode, String)> { - ApiRequestEvent::notify(ApiType::MergeList, &state); + ApiRequestEvent::notify(ApiType::MergeList, &state.0.context.config); let status = query.get("status").unwrap(); let res = state.monorepo().mr_list(status).await; let res = match res { @@ -54,7 +54,7 @@ async fn mr_detail( Path(mr_id): Path, state: State, ) -> Result>>, (StatusCode, String)> { - ApiRequestEvent::notify(ApiType::MergeDetail, &state); + ApiRequestEvent::notify(ApiType::MergeDetail, &state.0.context.config); let res = state.monorepo().mr_detail(mr_id).await; let res = match res { Ok(data) => CommonResult::success(Some(data)), @@ -67,7 +67,7 @@ async fn get_mr_files( Path(mr_id): Path, state: State, ) -> Result>>, (StatusCode, String)> { - ApiRequestEvent::notify(ApiType::MergeFiles, &state); + ApiRequestEvent::notify(ApiType::MergeFiles, &state.0.context.config); let res = state.monorepo().mr_tree_files(mr_id).await; let res = match res { Ok(data) => CommonResult::success(Some(data)), diff --git a/jupiter/callisto/src/mq_storage.rs b/jupiter/callisto/src/mq_storage.rs index 26da9ced7..148d731e2 100644 --- a/jupiter/callisto/src/mq_storage.rs +++ b/jupiter/callisto/src/mq_storage.rs @@ -10,7 +10,7 @@ pub struct Model { pub category: String, pub create_time: DateTime, #[sea_orm(column_type = "Text", nullable)] - pub commit: Option, + pub content: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/jupiter/src/storage/mq_storage.rs b/jupiter/src/storage/mq_storage.rs index f9ed84a97..81da39ce3 100644 --- a/jupiter/src/storage/mq_storage.rs +++ b/jupiter/src/storage/mq_storage.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use callisto::mq_storage; +use mq::event::Message; use sea_orm::{DatabaseConnection, EntityTrait, InsertResult, IntoActiveModel, Set}; @@ -24,5 +25,11 @@ impl MQStorage { } } + pub async fn save_messages(msgs: Vec) { + + } + + pub async fn load_latest_messages() { + } } diff --git a/mq/Cargo.toml b/mq/Cargo.toml index cfd3f028e..49312b93d 100644 --- a/mq/Cargo.toml +++ b/mq/Cargo.toml @@ -8,7 +8,13 @@ name = "mq" path = "src/lib.rs" [dependencies] +common = { workspace = true } + axum = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +thiserror = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +chrono = { workspace = true } crossbeam-channel = "0.5.10" diff --git a/mq/src/cache.rs b/mq/src/cache.rs index c3e5fefa6..3cc84359b 100644 --- a/mq/src/cache.rs +++ b/mq/src/cache.rs @@ -1,10 +1,38 @@ -use std::collections::{VecDeque}; +use std::{collections::VecDeque, sync::OnceLock}; -use crate::event::Message; +use crate::event::{Message, EventType}; +// Lazy initialized static EventCache instance. +pub fn get_mcache() -> &'static EventCache { + static MQ: OnceLock = OnceLock::new(); + MQ.get_or_init(|| { + // FIXME: Temp value + let mq = EventCache::new(); + + mq + }) +} + +// Automatically flush event cache into database +// eveny 10 seconds or 1024 message. pub struct EventCache { - inner: VecDeque, - flushed: bool, + inner: VecDeque, + last_flush: u64, flusher_handle: i64, - +} + +impl EventCache { + fn new() -> Self { + EventCache { + inner: VecDeque::new(), + last_flush: 0, + flusher_handle: -1 + } + } + + async fn flush(&self) { + let v = vec![1,2,3,4]; + + + } } diff --git a/mq/src/event.rs b/mq/src/event.rs deleted file mode 100644 index c2feef576..000000000 --- a/mq/src/event.rs +++ /dev/null @@ -1,18 +0,0 @@ -use std::any::Any; - -pub type Message = Box; - -pub trait EventBase: Send + Sync + Any + std::fmt::Display { - // async fn process(&self); -} - -// impl std::fmt::Display for Event { -// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { -// match self { -// Event::Api(evt) => write!(f, "{}", evt), - -// #[allow(unreachable_patterns)] -// _ => write!(f, "Unknown Event Type") -// } -// } -// } diff --git a/mq/src/event/api_request.rs b/mq/src/event/api_request.rs new file mode 100644 index 000000000..44976ab2c --- /dev/null +++ b/mq/src/event/api_request.rs @@ -0,0 +1,81 @@ +use common::config::Config; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +use crate::{event::EventBase, event::EventType, queue::get_mq}; + +/// # Api Request Event +/// --- +/// This is a example event definition for using message queue. \ +/// +/// Your customized event should implement `EventBase` trait. \ +/// Then the event can be put into message queue. \ +/// The event `id` and `create_time` will be attached to your event +/// and then wrapped as a `Message`. \ +/// You should also write some code in `mq::queue` to handle the event. (for now) +#[derive(Debug)] +pub struct ApiRequestEvent { + pub api: ApiType, + pub config: common::config::Config, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum ApiType { + // Common Api enum for api_routers + CreateFile, + LastestCommit, + CommitInfo, + TreeInfo, + Blob, + Publish, + + // Merge Api enum for mr_routers + MergeRequest, + MergeDone, + MergeList, + MergeDetail, + MergeFiles, +} + +impl std::fmt::Display for ApiRequestEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Api Request Event: {:?}", self.api) + } +} + +impl EventBase for ApiRequestEvent {} + +impl ApiRequestEvent { + // Create and enqueue this event. + pub fn notify(api: ApiType, config: &Config) { + get_mq().send(EventType::ApiRequest(ApiRequestEvent { + api, + config: config.clone(), + })); + } +} + +// For storing the data into database. +impl Into for ApiRequestEvent { + fn into(self) -> serde_json::Value { + json!({ + "api": self.api, + "config": self.config + }) + } +} + +impl TryFrom for ApiRequestEvent { + type Error = crate::event::Error; + + fn try_from(value: serde_json::Value) -> Result { + let api: ApiType = serde_json::from_value(value["api"].clone())?; + let config: common::config::Config = serde_json::from_value(value["config"].clone())?; + + Ok(ApiRequestEvent { + api, + config + }) + } + +} diff --git a/mq/src/event/mod.rs b/mq/src/event/mod.rs new file mode 100644 index 000000000..1e8fecfb2 --- /dev/null +++ b/mq/src/event/mod.rs @@ -0,0 +1,45 @@ +use std::{any::Any, fmt::Display}; + +use api_request::ApiRequestEvent; +use chrono::{DateTime, Utc}; +use thiserror::Error; + +pub mod api_request; + +#[derive(Debug)] +pub enum EventType { + ApiRequest(ApiRequestEvent), +} + +pub struct Message { + pub(crate) id: u64, + pub(crate) create_time: DateTime, + pub(crate) evt: EventType, +} + +#[derive(Debug, Error)] +pub enum Error { + #[error("Error converting from database")] + MismatchedData(#[from] serde_json::error::Error), +} + +pub trait EventBase: + Send + Sync + std::fmt::Display + Into + TryFrom +{ +} + +impl Display for EventType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self) + } +} + +impl Display for Message{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ID: {}, Created at: {}, conetent: [{}]", + self.id, self.create_time, self.evt + ) + } +} diff --git a/mq/src/queue.rs b/mq/src/queue.rs index df331f78e..75d2dd463 100644 --- a/mq/src/queue.rs +++ b/mq/src/queue.rs @@ -1,10 +1,11 @@ use std::sync::{Arc, OnceLock}; +use chrono::Utc; use crossbeam_channel::{unbounded, Sender}; use crossbeam_channel::Receiver; use tokio::runtime::{Builder, Runtime}; -use super::event::Message; +use crate::event::{EventBase, Message, EventType}; // Lazy initialized static MessageQueue instance. pub fn get_mq() -> &'static MessageQueue { @@ -67,7 +68,11 @@ impl MessageQueue { }); } - pub fn send(&self, msg: Message) { - let _ = self.sender.send(msg); + pub fn send(&self, evt: EventType) { + let _ = self.sender.send(Message { + id: 1, + create_time: Utc::now(), + evt + }); } } From 81a313df6e37463d6e1c693e5fcad1212bab1225 Mon Sep 17 00:00:00 2001 From: Neon Date: Wed, 7 Aug 2024 22:09:01 +0800 Subject: [PATCH 06/13] add message cache, async flush events into db --- jupiter/src/storage/mq_storage.rs | 4 +- mq/src/cache.rs | 68 ++++++++++++++++++++++--------- mq/src/queue.rs | 2 +- 3 files changed, 52 insertions(+), 22 deletions(-) diff --git a/jupiter/src/storage/mq_storage.rs b/jupiter/src/storage/mq_storage.rs index 81da39ce3..62a07d232 100644 --- a/jupiter/src/storage/mq_storage.rs +++ b/jupiter/src/storage/mq_storage.rs @@ -26,10 +26,10 @@ impl MQStorage { } pub async fn save_messages(msgs: Vec) { - + } - pub async fn load_latest_messages() { + pub async fn get_latest_messages() { } } diff --git a/mq/src/cache.rs b/mq/src/cache.rs index 3cc84359b..051416548 100644 --- a/mq/src/cache.rs +++ b/mq/src/cache.rs @@ -1,38 +1,68 @@ -use std::{collections::VecDeque, sync::OnceLock}; +use std::{borrow::BorrowMut, cell::RefCell, collections::VecDeque, sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard, OnceLock}, time::Duration}; -use crate::event::{Message, EventType}; +use chrono::Utc; -// Lazy initialized static EventCache instance. -pub fn get_mcache() -> &'static EventCache { - static MQ: OnceLock = OnceLock::new(); +use crate::{event::Message, queue::{get_mq, MessageQueue}}; + +const FLUSH_INTERVAL: u64 = 10; + +// Lazy initialized static MessageCache instance. +pub fn get_mcache() -> &'static MessageCache { + static MQ: OnceLock = OnceLock::new(); MQ.get_or_init(|| { // FIXME: Temp value - let mq = EventCache::new(); + let mc = MessageCache::new(); + mc.start(); - mq + mc }) } -// Automatically flush event cache into database +// Automatically flush message cache into database // eveny 10 seconds or 1024 message. -pub struct EventCache { - inner: VecDeque, - last_flush: u64, - flusher_handle: i64, +pub struct MessageCache { + inner: Arc>>, + bound_mq: &'static MessageQueue, + last_flush: i64, + stop: Arc, } -impl EventCache { +impl MessageCache { fn new() -> Self { - EventCache { - inner: VecDeque::new(), - last_flush: 0, - flusher_handle: -1 + let now: chrono::DateTime = Utc::now(); + + MessageCache { + inner: Arc::new(Mutex::new(Vec::new())), + bound_mq: get_mq(), + last_flush: now.timestamp_millis(), + stop: Arc::new(AtomicBool::new(false)) } } - async fn flush(&self) { - let v = vec![1,2,3,4]; + fn start(&self) { + let stop = self.stop.clone(); + let _ = tokio::spawn(async move { + loop { + if !stop.load(std::sync::atomic::Ordering::Acquire) { + return + } + tokio::time::sleep(Duration::from_secs(FLUSH_INTERVAL)).await; + instant_flush().await; + } + }); + } + async fn add(&self, msg: Message) { + let inner = self.inner.clone(); + let mut locked = inner.lock().unwrap(); + if locked.len() >= 1024 { + instant_flush().await; + } + locked.push(msg); } } + +pub async fn instant_flush() { + let c = get_mcache(); +} diff --git a/mq/src/queue.rs b/mq/src/queue.rs index 75d2dd463..f328e3109 100644 --- a/mq/src/queue.rs +++ b/mq/src/queue.rs @@ -12,7 +12,7 @@ pub fn get_mq() -> &'static MessageQueue { static MQ: OnceLock = OnceLock::new(); MQ.get_or_init(|| { // FIXME: Temp value - let mq = MessageQueue::new(12); + let mut mq = MessageQueue::new(12); mq.start(); mq From 7e3ed7b846f3018324e548d7affc052313bb63ee Mon Sep 17 00:00:00 2001 From: Neon Date: Thu, 8 Aug 2024 18:31:49 +0800 Subject: [PATCH 07/13] support cached messages flush into database --- jupiter/Cargo.toml | 1 - jupiter/src/context.rs | 6 ++- jupiter/src/storage/mq_storage.rs | 14 ++++--- mega/Cargo.toml | 1 + mega/src/commands/service/mod.rs | 3 ++ mq/Cargo.toml | 2 + mq/src/cache.rs | 53 ++++++++++++++++++++----- mq/src/event/api_request.rs | 17 ++------ mq/src/event/mod.rs | 66 +++++++++++++++++++++++++++++-- mq/src/init.rs | 11 ++++++ mq/src/lib.rs | 1 + mq/src/queue.rs | 33 ++++++++++------ 12 files changed, 161 insertions(+), 47 deletions(-) create mode 100644 mq/src/init.rs diff --git a/jupiter/Cargo.toml b/jupiter/Cargo.toml index 2147c3d92..3449a4bf9 100644 --- a/jupiter/Cargo.toml +++ b/jupiter/Cargo.toml @@ -15,7 +15,6 @@ callisto = { workspace = true } common = { workspace = true } venus = { workspace = true } mercury = { workspace = true } -mq = { workspace = true } sea-orm = { workspace = true, features = [ "sqlx-postgres", "sqlx-mysql", diff --git a/jupiter/src/context.rs b/jupiter/src/context.rs index 8d98df037..27e2eff41 100644 --- a/jupiter/src/context.rs +++ b/jupiter/src/context.rs @@ -3,8 +3,7 @@ use std::sync::Arc; use common::config::Config; use crate::storage::{ - git_db_storage::GitDbStorage, init::database_connection, lfs_storage::LfsStorage, - mega_storage::MegaStorage, ztm_storage::ZTMStorage, + git_db_storage::GitDbStorage, init::database_connection, lfs_storage::LfsStorage, mega_storage::MegaStorage, mq_storage::MQStorage, ztm_storage::ZTMStorage }; #[derive(Clone)] @@ -34,6 +33,7 @@ pub struct Service { pub git_db_storage: Arc, pub lfs_storage: Arc, pub ztm_storage: Arc, + pub mq_storage: Arc, } impl Service { @@ -48,6 +48,7 @@ impl Service { ), lfs_storage: Arc::new(LfsStorage::new(connection.clone()).await), ztm_storage: Arc::new(ZTMStorage::new(connection.clone()).await), + mq_storage: Arc::new(MQStorage::new(connection.clone()).await), } } @@ -61,6 +62,7 @@ impl Service { git_db_storage: Arc::new(GitDbStorage::mock()), lfs_storage: Arc::new(LfsStorage::mock()), ztm_storage: Arc::new(ZTMStorage::mock()), + mq_storage: Arc::new(MQStorage::mock()), }) } } diff --git a/jupiter/src/storage/mq_storage.rs b/jupiter/src/storage/mq_storage.rs index 62a07d232..1e125f85b 100644 --- a/jupiter/src/storage/mq_storage.rs +++ b/jupiter/src/storage/mq_storage.rs @@ -1,8 +1,9 @@ use std::sync::Arc; use callisto::mq_storage; -use mq::event::Message; -use sea_orm::{DatabaseConnection, EntityTrait, InsertResult, IntoActiveModel, Set}; +use sea_orm::DatabaseConnection; + +use super::batch_save_model; #[derive(Clone)] @@ -25,11 +26,12 @@ impl MQStorage { } } - pub async fn save_messages(msgs: Vec) { - + pub async fn save_messages(&self, msgs: Vec) { + let msgs: Vec = msgs.into_iter().map(|m| m.into()).collect(); + batch_save_model(self.get_connection(), msgs).await.unwrap(); } - pub async fn get_latest_messages() { - + pub async fn get_latest_messages() -> Vec { + todo!() } } diff --git a/mega/Cargo.toml b/mega/Cargo.toml index c0eeabc55..d370050fe 100644 --- a/mega/Cargo.toml +++ b/mega/Cargo.toml @@ -16,6 +16,7 @@ path = "src/main.rs" gateway = { workspace = true } common = { workspace = true } ceres = { workspace = true } +mq = { workspace = true } serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["macros"] } clap = { workspace = true, features = ["derive"] } diff --git a/mega/src/commands/service/mod.rs b/mega/src/commands/service/mod.rs index 4d12d1fde..73502ebec 100644 --- a/mega/src/commands/service/mod.rs +++ b/mega/src/commands/service/mod.rs @@ -25,6 +25,9 @@ pub fn cli() -> Command { // It determines which subcommand was used and calls the appropriate function. #[tokio::main] pub(crate) async fn exec(config: Config, args: &ArgMatches) -> MegaResult { + use mq::init::init_mq; + init_mq(&config).await; + let (cmd, subcommand_args) = match args.subcommand() { Some((cmd, args)) => (cmd, args), _ => { diff --git a/mq/Cargo.toml b/mq/Cargo.toml index 49312b93d..00ac308b5 100644 --- a/mq/Cargo.toml +++ b/mq/Cargo.toml @@ -9,6 +9,8 @@ path = "src/lib.rs" [dependencies] common = { workspace = true } +jupiter = { workspace = true } +callisto = { workspace = true } axum = { workspace = true } tokio = { workspace = true } diff --git a/mq/src/cache.rs b/mq/src/cache.rs index 051416548..d0e51fc7f 100644 --- a/mq/src/cache.rs +++ b/mq/src/cache.rs @@ -1,4 +1,4 @@ -use std::{borrow::BorrowMut, cell::RefCell, collections::VecDeque, sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard, OnceLock}, time::Duration}; +use std::{mem::swap, sync::{atomic::{AtomicBool, AtomicI64}, Arc, Mutex, OnceLock}, time::Duration}; use chrono::Utc; @@ -10,7 +10,6 @@ const FLUSH_INTERVAL: u64 = 10; pub fn get_mcache() -> &'static MessageCache { static MQ: OnceLock = OnceLock::new(); MQ.get_or_init(|| { - // FIXME: Temp value let mc = MessageCache::new(); mc.start(); @@ -23,7 +22,7 @@ pub fn get_mcache() -> &'static MessageCache { pub struct MessageCache { inner: Arc>>, bound_mq: &'static MessageQueue, - last_flush: i64, + last_flush: Arc, stop: Arc, } @@ -34,7 +33,7 @@ impl MessageCache { MessageCache { inner: Arc::new(Mutex::new(Vec::new())), bound_mq: get_mq(), - last_flush: now.timestamp_millis(), + last_flush: Arc::new(AtomicI64::new(now.timestamp_millis())), stop: Arc::new(AtomicBool::new(false)) } } @@ -52,17 +51,53 @@ impl MessageCache { }); } - async fn add(&self, msg: Message) { + // fn clear_inner(&self) -> &Self { + // let inner = self.inner.clone(); + // { + // let mut locked = inner.lock().unwrap(); + // locked.clear(); + // } + // self + // } + + fn get_cache(&self) -> Vec { + let mut res = Vec::new(); let inner = self.inner.clone(); + let mut locked = inner.lock().unwrap(); + swap(locked.as_mut(), &mut res); + + res + } + + pub(crate) async fn add(&self, msg: Message) -> &Self { + let inner = self.inner.clone(); + let should_flush: bool; + { + let mut locked = inner.lock().unwrap(); + should_flush = locked.len() >= 1024; + locked.push(msg); + } - if locked.len() >= 1024 { - instant_flush().await; + if should_flush { + instant_flush().await } - locked.push(msg); + + self } } pub async fn instant_flush() { - let c = get_mcache(); + use callisto::mq_storage::Model; + + let mc = get_mcache(); + let st = mc.bound_mq.context.services.mq_storage.clone(); + let data = mc + .get_cache() + .into_iter().map(|d| Into::::into(d)) + .collect::>(); + st.save_messages(data).await; + + let now = Utc::now(); + mc.last_flush.to_owned().store(now.timestamp_millis(), std::sync::atomic::Ordering::Relaxed); } diff --git a/mq/src/event/api_request.rs b/mq/src/event/api_request.rs index 44976ab2c..09568d824 100644 --- a/mq/src/event/api_request.rs +++ b/mq/src/event/api_request.rs @@ -1,6 +1,5 @@ use common::config::Config; use serde::{Deserialize, Serialize}; -use serde_json::json; use crate::{event::EventBase, event::EventType, queue::get_mq}; @@ -13,7 +12,7 @@ use crate::{event::EventBase, event::EventType, queue::get_mq}; /// The event `id` and `create_time` will be attached to your event /// and then wrapped as a `Message`. \ /// You should also write some code in `mq::queue` to handle the event. (for now) -#[derive(Debug)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ApiRequestEvent { pub api: ApiType, pub config: common::config::Config, @@ -58,10 +57,7 @@ impl ApiRequestEvent { // For storing the data into database. impl Into for ApiRequestEvent { fn into(self) -> serde_json::Value { - json!({ - "api": self.api, - "config": self.config - }) + serde_json::to_value(self).unwrap() } } @@ -69,13 +65,8 @@ impl TryFrom for ApiRequestEvent { type Error = crate::event::Error; fn try_from(value: serde_json::Value) -> Result { - let api: ApiType = serde_json::from_value(value["api"].clone())?; - let config: common::config::Config = serde_json::from_value(value["config"].clone())?; - - Ok(ApiRequestEvent { - api, - config - }) + let res: ApiRequestEvent = serde_json::from_value(value)?; + Ok(res) } } diff --git a/mq/src/event/mod.rs b/mq/src/event/mod.rs index 1e8fecfb2..66e3c05d8 100644 --- a/mq/src/event/mod.rs +++ b/mq/src/event/mod.rs @@ -1,18 +1,25 @@ -use std::{any::Any, fmt::Display}; +use std::fmt::Display; use api_request::ApiRequestEvent; + use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; use thiserror::Error; pub mod api_request; -#[derive(Debug)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum EventType { ApiRequest(ApiRequestEvent), + + // Reserved + ErrorEvent, } +#[derive(Debug, Clone)] pub struct Message { - pub(crate) id: u64, + pub(crate) id: i64, pub(crate) create_time: DateTime, pub(crate) evt: EventType, } @@ -34,7 +41,7 @@ impl Display for EventType { } } -impl Display for Message{ +impl Display for Message { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, @@ -43,3 +50,54 @@ impl Display for Message{ ) } } + +impl From for callisto::mq_storage::Model { + fn from(val: Message) -> Self { + use callisto::mq_storage::Model; + + let category = match val.evt { + EventType::ApiRequest(_) => "ApiRequestEvent".into(), + + #[allow(unreachable_patterns)] + _ => "Unknown".into(), + }; + + let content: Value = match val.evt { + EventType::ApiRequest(evt) => evt.into(), + + #[allow(unreachable_patterns)] + _ => Value::Null, + }; + + Model { + id: val.id, + category, + create_time: val.create_time.naive_utc(), + content: Some(content.to_string()), + } + } +} + +impl From for Message { + fn from(value: callisto::mq_storage::Model) -> Self { + let id = value.id; + let create_time = value.create_time.and_utc(); + let evt = match value.category.as_str() { + "ApiRequestEvent" => { + if let Some(s) = value.content { + let evt = serde_json::from_str(&s).unwrap(); + EventType::ApiRequest(evt) + } else { + EventType::ErrorEvent + } + }, + + _ => EventType::ErrorEvent + }; + + Self { id, create_time, evt } + } +} + +#[cfg(test)] +mod tests {} diff --git a/mq/src/init.rs b/mq/src/init.rs new file mode 100644 index 000000000..f234e25b4 --- /dev/null +++ b/mq/src/init.rs @@ -0,0 +1,11 @@ +use common::config::Config; +use jupiter::context::Context; +use crate::queue::{MessageQueue, MQ}; + +pub async fn init_mq(config: &Config) { + let ctx = Context::new(config.clone()).await; + let mq = MessageQueue::new(12, ctx); + mq.start(); + + MQ.set(mq).unwrap(); +} diff --git a/mq/src/lib.rs b/mq/src/lib.rs index 0a0833701..55f636af0 100644 --- a/mq/src/lib.rs +++ b/mq/src/lib.rs @@ -1,3 +1,4 @@ +pub mod init; pub mod event; pub mod queue; pub mod cache; diff --git a/mq/src/queue.rs b/mq/src/queue.rs index f328e3109..cdafce363 100644 --- a/mq/src/queue.rs +++ b/mq/src/queue.rs @@ -1,22 +1,19 @@ +use std::fmt::Debug; use std::sync::{Arc, OnceLock}; use chrono::Utc; use crossbeam_channel::{unbounded, Sender}; use crossbeam_channel::Receiver; +use jupiter::context::Context; use tokio::runtime::{Builder, Runtime}; -use crate::event::{EventBase, Message, EventType}; +use crate::cache::get_mcache; +use crate::event::{Message, EventType}; // Lazy initialized static MessageQueue instance. +pub(crate) static MQ: OnceLock = OnceLock::new(); pub fn get_mq() -> &'static MessageQueue { - static MQ: OnceLock = OnceLock::new(); - MQ.get_or_init(|| { - // FIXME: Temp value - let mut mq = MessageQueue::new(12); - mq.start(); - - mq - }) + MQ.get().unwrap() } pub struct MessageQueue { @@ -24,14 +21,22 @@ pub struct MessageQueue { receiver: Receiver, // sem: Arc, runtime: Arc, + pub(crate) context: Context, } unsafe impl Send for MessageQueue{} unsafe impl Sync for MessageQueue{} +impl Debug for MessageQueue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Just ignore context field. + f.debug_struct("MessageQueue").field("sender", &self.sender).field("receiver", &self.receiver).field("runtime", &self.runtime).finish() + } +} + impl MessageQueue { // Should be singleton. - fn new(n_workers: usize) -> Self { + pub(crate) fn new(n_workers: usize, ctx: Context) -> Self { let (s, r) = unbounded::(); let rt = Builder::new_multi_thread() .worker_threads(n_workers) @@ -43,18 +48,22 @@ impl MessageQueue { receiver: r.to_owned(), // sem: Arc::new(Semaphore::new(n_workers)), runtime: Arc::new(rt), + context: ctx, } } - fn start(&self) { + pub(crate) fn start(&self) { let receiver = self.receiver.clone(); // let sem = self.sem.clone(); let rt = self.runtime.clone(); tokio::spawn(async move { + let mc = get_mcache(); loop { match receiver.recv() { Ok(evt) => { + let stored = evt.clone(); + mc.add(stored).await; rt.spawn(async move { tracing::info!("{}", evt); }); @@ -68,7 +77,7 @@ impl MessageQueue { }); } - pub fn send(&self, evt: EventType) { + pub(crate) fn send(&self, evt: EventType) { let _ = self.sender.send(Message { id: 1, create_time: Utc::now(), From 4dc30bd6ae901490cec42567a7252f76dc7bb5ea Mon Sep 17 00:00:00 2001 From: Neon Date: Thu, 8 Aug 2024 21:08:51 +0800 Subject: [PATCH 08/13] fix bug worker thread not flush messages --- firedbg/target.json | 6 ++++++ firedbg/version.toml | 1 + jupiter/src/storage/mq_storage.rs | 21 +++++++++++++++------ mq/src/cache.rs | 23 +++++++++-------------- mq/src/event/mod.rs | 6 +++--- mq/src/init.rs | 7 ++++++- mq/src/queue.rs | 7 +++++-- 7 files changed, 45 insertions(+), 26 deletions(-) create mode 100644 firedbg/target.json create mode 100644 firedbg/version.toml diff --git a/firedbg/target.json b/firedbg/target.json new file mode 100644 index 000000000..636ddd0dd --- /dev/null +++ b/firedbg/target.json @@ -0,0 +1,6 @@ +{ + "binaries": [], + "examples": [], + "integration_tests": [], + "unit_tests": [] +} \ No newline at end of file diff --git a/firedbg/version.toml b/firedbg/version.toml new file mode 100644 index 000000000..c59a771e6 --- /dev/null +++ b/firedbg/version.toml @@ -0,0 +1 @@ +firedbg_cli = "1.74.0" diff --git a/jupiter/src/storage/mq_storage.rs b/jupiter/src/storage/mq_storage.rs index 1e125f85b..8ff1fd1e4 100644 --- a/jupiter/src/storage/mq_storage.rs +++ b/jupiter/src/storage/mq_storage.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use callisto::mq_storage; -use sea_orm::DatabaseConnection; +use callisto::mq_storage::*; +use sea_orm::{DatabaseConnection, EntityTrait, QueryOrder, QuerySelect}; use super::batch_save_model; @@ -26,12 +26,21 @@ impl MQStorage { } } - pub async fn save_messages(&self, msgs: Vec) { - let msgs: Vec = msgs.into_iter().map(|m| m.into()).collect(); + pub async fn save_messages(&self, msgs: Vec) { + if msgs.len() == 0 { + return; + } + + let msgs: Vec = msgs.into_iter().map(|m| m.into()).collect(); batch_save_model(self.get_connection(), msgs).await.unwrap(); } - pub async fn get_latest_messages() -> Vec { - todo!() + pub async fn get_latest_message(&self) -> Option { + Entity::find() + .order_by_desc(Column::Id) + .limit(1) + .one(self.get_connection()) + .await + .unwrap() } } diff --git a/mq/src/cache.rs b/mq/src/cache.rs index d0e51fc7f..f39fb08cc 100644 --- a/mq/src/cache.rs +++ b/mq/src/cache.rs @@ -8,8 +8,8 @@ const FLUSH_INTERVAL: u64 = 10; // Lazy initialized static MessageCache instance. pub fn get_mcache() -> &'static MessageCache { - static MQ: OnceLock = OnceLock::new(); - MQ.get_or_init(|| { + static MC: OnceLock = OnceLock::new(); + MC.get_or_init(|| { let mc = MessageCache::new(); mc.start(); @@ -42,30 +42,24 @@ impl MessageCache { let stop = self.stop.clone(); let _ = tokio::spawn(async move { loop { - if !stop.load(std::sync::atomic::Ordering::Acquire) { + if stop.load(std::sync::atomic::Ordering::Acquire) { return } tokio::time::sleep(Duration::from_secs(FLUSH_INTERVAL)).await; + instant_flush().await; } }); } - // fn clear_inner(&self) -> &Self { - // let inner = self.inner.clone(); - // { - // let mut locked = inner.lock().unwrap(); - // locked.clear(); - // } - // self - // } - fn get_cache(&self) -> Vec { let mut res = Vec::new(); let inner = self.inner.clone(); let mut locked = inner.lock().unwrap(); - swap(locked.as_mut(), &mut res); + if locked.len() != 0 { + swap(locked.as_mut(), &mut res); + } res } @@ -75,7 +69,8 @@ impl MessageCache { let should_flush: bool; { let mut locked = inner.lock().unwrap(); - should_flush = locked.len() >= 1024; + let l = locked.len(); + should_flush = l >= 1; locked.push(msg); } diff --git a/mq/src/event/mod.rs b/mq/src/event/mod.rs index 66e3c05d8..24783a736 100644 --- a/mq/src/event/mod.rs +++ b/mq/src/event/mod.rs @@ -37,7 +37,7 @@ pub trait EventBase: impl Display for EventType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self) + write!(f, "{:?}", self) } } @@ -45,8 +45,8 @@ impl Display for Message { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "ID: {}, Created at: {}, conetent: [{}]", - self.id, self.create_time, self.evt + "ID: {}, Created at: {}", + self.id, self.create_time ) } } diff --git a/mq/src/init.rs b/mq/src/init.rs index f234e25b4..9d7b3cf0a 100644 --- a/mq/src/init.rs +++ b/mq/src/init.rs @@ -4,7 +4,12 @@ use crate::queue::{MessageQueue, MQ}; pub async fn init_mq(config: &Config) { let ctx = Context::new(config.clone()).await; - let mq = MessageQueue::new(12, ctx); + let seq = match ctx.services.mq_storage.get_latest_message().await { + Some(model) => model.id + 1, + None => 1, + }; + + let mq = MessageQueue::new(12, seq, ctx); mq.start(); MQ.set(mq).unwrap(); diff --git a/mq/src/queue.rs b/mq/src/queue.rs index cdafce363..942c86186 100644 --- a/mq/src/queue.rs +++ b/mq/src/queue.rs @@ -1,4 +1,5 @@ use std::fmt::Debug; +use std::sync::atomic::AtomicI64; use std::sync::{Arc, OnceLock}; use chrono::Utc; @@ -21,6 +22,7 @@ pub struct MessageQueue { receiver: Receiver, // sem: Arc, runtime: Arc, + cur_id: Arc, pub(crate) context: Context, } @@ -36,7 +38,7 @@ impl Debug for MessageQueue { impl MessageQueue { // Should be singleton. - pub(crate) fn new(n_workers: usize, ctx: Context) -> Self { + pub(crate) fn new(n_workers: usize, seq: i64, ctx: Context) -> Self { let (s, r) = unbounded::(); let rt = Builder::new_multi_thread() .worker_threads(n_workers) @@ -48,6 +50,7 @@ impl MessageQueue { receiver: r.to_owned(), // sem: Arc::new(Semaphore::new(n_workers)), runtime: Arc::new(rt), + cur_id: Arc::new(AtomicI64::new(seq)), context: ctx, } } @@ -79,7 +82,7 @@ impl MessageQueue { pub(crate) fn send(&self, evt: EventType) { let _ = self.sender.send(Message { - id: 1, + id: self.cur_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed), create_time: Utc::now(), evt }); From 4beead665343c5e17b60eb4f9b67cfaf947be9cc Mon Sep 17 00:00:00 2001 From: Neon Date: Thu, 8 Aug 2024 21:49:18 +0800 Subject: [PATCH 09/13] dispatched event handlers --- mq/Cargo.toml | 1 + mq/README.md | 2 ++ mq/src/event/api_request.rs | 13 +++++++++---- mq/src/event/mod.rs | 20 ++++++++++++++++++++ mq/src/queue.rs | 6 +++--- 5 files changed, 35 insertions(+), 7 deletions(-) create mode 100644 mq/README.md diff --git a/mq/Cargo.toml b/mq/Cargo.toml index 00ac308b5..d093d6f01 100644 --- a/mq/Cargo.toml +++ b/mq/Cargo.toml @@ -13,6 +13,7 @@ jupiter = { workspace = true } callisto = { workspace = true } axum = { workspace = true } +async-trait = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } thiserror = { workspace = true } diff --git a/mq/README.md b/mq/README.md new file mode 100644 index 000000000..1af45fb61 --- /dev/null +++ b/mq/README.md @@ -0,0 +1,2 @@ +# Message Queue Module +This module offers mega the ability to send and handle specific events. diff --git a/mq/src/event/api_request.rs b/mq/src/event/api_request.rs index 09568d824..d74385f54 100644 --- a/mq/src/event/api_request.rs +++ b/mq/src/event/api_request.rs @@ -1,5 +1,6 @@ use common::config::Config; use serde::{Deserialize, Serialize}; +use async_trait::async_trait; use crate::{event::EventBase, event::EventType, queue::get_mq}; @@ -8,10 +9,9 @@ use crate::{event::EventBase, event::EventType, queue::get_mq}; /// This is a example event definition for using message queue. \ /// /// Your customized event should implement `EventBase` trait. \ -/// Then the event can be put into message queue. \ -/// The event `id` and `create_time` will be attached to your event +/// Then the event can be wrapped and put into message queue. \ +/// The message `id` and `create_time` will be attached to your event /// and then wrapped as a `Message`. \ -/// You should also write some code in `mq::queue` to handle the event. (for now) #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ApiRequestEvent { pub api: ApiType, @@ -42,7 +42,12 @@ impl std::fmt::Display for ApiRequestEvent { } } -impl EventBase for ApiRequestEvent {} +#[async_trait] +impl EventBase for ApiRequestEvent { + async fn process(&self) { + tracing::info!("Handling Api Request event: [{}]", &self); + } +} impl ApiRequestEvent { // Create and enqueue this event. diff --git a/mq/src/event/mod.rs b/mq/src/event/mod.rs index 24783a736..e97433399 100644 --- a/mq/src/event/mod.rs +++ b/mq/src/event/mod.rs @@ -2,6 +2,7 @@ use std::fmt::Display; use api_request::ApiRequestEvent; +use async_trait::async_trait; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -30,9 +31,12 @@ pub enum Error { MismatchedData(#[from] serde_json::error::Error), } +#[async_trait] pub trait EventBase: Send + Sync + std::fmt::Display + Into + TryFrom { + // defines the callback function for this event. + async fn process(&self); } impl Display for EventType { @@ -41,6 +45,22 @@ impl Display for EventType { } } +impl EventType { + pub(crate) async fn process(&self) { + match self { + // I can't easily add a trait bound for the enum members, + // so you have to manually add a process logic for your event here. + EventType::ApiRequest(evt) => evt.process().await, + // EventType::SomeOtherEvent(xxx) => xxx.process().await, + + // This won't happen unless failed to load events from database. + // And that's because of a event conversion error. + // You should recheck yout conversion code logic. + EventType::ErrorEvent => panic!("Got error event"), + } + } +} + impl Display for Message { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/mq/src/queue.rs b/mq/src/queue.rs index 942c86186..2e4149479 100644 --- a/mq/src/queue.rs +++ b/mq/src/queue.rs @@ -64,11 +64,11 @@ impl MessageQueue { let mc = get_mcache(); loop { match receiver.recv() { - Ok(evt) => { - let stored = evt.clone(); + Ok(msg) => { + let stored = msg.clone(); mc.add(stored).await; rt.spawn(async move { - tracing::info!("{}", evt); + msg.evt.process().await; }); }, Err(e) => { From 536646f8ac411c0d52a989ab646cc6d85993c9e0 Mon Sep 17 00:00:00 2001 From: Neon Date: Thu, 8 Aug 2024 22:53:20 +0800 Subject: [PATCH 10/13] unittest for api request event --- mq/Cargo.toml | 2 +- mq/src/event/api_request.rs | 28 +++++++++++++++++++++++++--- mq/src/event/mod.rs | 3 --- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/mq/Cargo.toml b/mq/Cargo.toml index d093d6f01..011e08d26 100644 --- a/mq/Cargo.toml +++ b/mq/Cargo.toml @@ -14,7 +14,7 @@ callisto = { workspace = true } axum = { workspace = true } async-trait = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread"]} tracing = { workspace = true } thiserror = { workspace = true } serde = { workspace = true } diff --git a/mq/src/event/api_request.rs b/mq/src/event/api_request.rs index d74385f54..d2323348d 100644 --- a/mq/src/event/api_request.rs +++ b/mq/src/event/api_request.rs @@ -60,9 +60,9 @@ impl ApiRequestEvent { } // For storing the data into database. -impl Into for ApiRequestEvent { - fn into(self) -> serde_json::Value { - serde_json::to_value(self).unwrap() +impl From for serde_json::Value { + fn from(value: ApiRequestEvent) -> Self { + serde_json::to_value(value).unwrap() } } @@ -75,3 +75,25 @@ impl TryFrom for ApiRequestEvent { } } + +#[cfg(test)] +mod tests { + use super::{ApiRequestEvent, ApiType}; + use common::config::Config; + use serde_json::Value; + + const SER: &str = + r#"{"api":"Blob","config":{"base_dir":"","database":{"db_path":"/tmp/.mega/mega.db","db_type":"sqlite","db_url":"postgres://mega:mega@localhost:5432/mega","max_connection":32,"min_connection":16,"sqlx_logging":false},"lfs":{"enable_split":true,"split_size":1073741824},"log":{"level":"info","log_path":"/tmp/.mega/logs","print_std":true},"monorepo":{"import_dir":"/third-part"},"oauth":{"github_client_id":"","github_client_secret":""},"pack":{"channel_message_size":1000000,"clean_cache_after_decode":true,"pack_decode_cache_path":"/tmp/.mega/cache","pack_decode_mem_size":4},"ssh":{"ssh_key_path":"/tmp/.mega/ssh"},"storage":{"big_obj_threshold":1024,"lfs_obj_local_path":"/tmp/.mega/lfs","obs_access_key":"","obs_endpoint":"https://obs.cn-east-3.myhuaweicloud.com","obs_region":"cn-east-3","obs_secret_key":"","raw_obj_local_path":"/tmp/.mega/objects","raw_obj_storage_type":"LOCAL"},"ztm":{"agent":"127.0.0.1:7777","ca":"127.0.0.1:9999","hub":"127.0.0.1:8888"}}}"#; + + #[test] + fn test_conversion() { + let evt = ApiRequestEvent {api: ApiType::Blob, config: Config::default()}; + + // Convert into value + let serialized: Value = Value::from(evt); + assert_eq!(serialized.to_string().as_str(), SER); + + // Convert from value + let _ = ApiRequestEvent::try_from(serialized).unwrap(); + } +} diff --git a/mq/src/event/mod.rs b/mq/src/event/mod.rs index e97433399..e3bb6deee 100644 --- a/mq/src/event/mod.rs +++ b/mq/src/event/mod.rs @@ -118,6 +118,3 @@ impl From for Message { Self { id, create_time, evt } } } - -#[cfg(test)] -mod tests {} From 2fe1dda9af39a6b4f263f6ad5617d2d6cda799a3 Mon Sep 17 00:00:00 2001 From: Neon Date: Fri, 9 Aug 2024 09:12:17 +0800 Subject: [PATCH 11/13] fix clippy --- jupiter/src/storage/mq_storage.rs | 2 +- mq/src/cache.rs | 4 ++-- mq/src/event/mod.rs | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/jupiter/src/storage/mq_storage.rs b/jupiter/src/storage/mq_storage.rs index 8ff1fd1e4..0bbbed5dc 100644 --- a/jupiter/src/storage/mq_storage.rs +++ b/jupiter/src/storage/mq_storage.rs @@ -27,7 +27,7 @@ impl MQStorage { } pub async fn save_messages(&self, msgs: Vec) { - if msgs.len() == 0 { + if msgs.is_empty() { return; } diff --git a/mq/src/cache.rs b/mq/src/cache.rs index f39fb08cc..cfbb1cf22 100644 --- a/mq/src/cache.rs +++ b/mq/src/cache.rs @@ -40,7 +40,7 @@ impl MessageCache { fn start(&self) { let stop = self.stop.clone(); - let _ = tokio::spawn(async move { + tokio::spawn(async move { loop { if stop.load(std::sync::atomic::Ordering::Acquire) { return @@ -89,7 +89,7 @@ pub async fn instant_flush() { let st = mc.bound_mq.context.services.mq_storage.clone(); let data = mc .get_cache() - .into_iter().map(|d| Into::::into(d)) + .into_iter().map(Into::::into) .collect::>(); st.save_messages(data).await; diff --git a/mq/src/event/mod.rs b/mq/src/event/mod.rs index e3bb6deee..455b555a7 100644 --- a/mq/src/event/mod.rs +++ b/mq/src/event/mod.rs @@ -10,6 +10,7 @@ use thiserror::Error; pub mod api_request; +#[allow(clippy::large_enum_variant)] #[derive(Debug, Clone, Serialize, Deserialize)] pub enum EventType { ApiRequest(ApiRequestEvent), From d6eb231d624a4fbb8db284133e0b8610c0134eca Mon Sep 17 00:00:00 2001 From: Neon Date: Fri, 9 Aug 2024 09:46:42 +0800 Subject: [PATCH 12/13] change module name mq to taurus --- Cargo.toml | 4 ++-- gateway/Cargo.toml | 2 +- gateway/src/api/api_router.rs | 2 +- gateway/src/api/mr_router.rs | 2 +- mega/Cargo.toml | 2 +- mega/src/commands/service/mod.rs | 2 +- {mq => taurus}/Cargo.toml | 4 ++-- {mq => taurus}/README.md | 0 {mq => taurus}/src/cache.rs | 0 {mq => taurus}/src/event/api_request.rs | 0 {mq => taurus}/src/event/mod.rs | 0 {mq => taurus}/src/init.rs | 0 {mq => taurus}/src/lib.rs | 0 {mq => taurus}/src/queue.rs | 0 14 files changed, 9 insertions(+), 9 deletions(-) rename {mq => taurus}/Cargo.toml (93%) rename {mq => taurus}/README.md (100%) rename {mq => taurus}/src/cache.rs (100%) rename {mq => taurus}/src/event/api_request.rs (100%) rename {mq => taurus}/src/event/mod.rs (100%) rename {mq => taurus}/src/init.rs (100%) rename {mq => taurus}/src/lib.rs (100%) rename {mq => taurus}/src/queue.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index e216988a0..9de04213d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ "neptune", "aries", "saturn", - "mq", + "taurus", "lunar/src-tauri", "atlas", ] @@ -32,7 +32,7 @@ gemini = { path = "gemini" } vault = { path = "vault" } neptune = { path = "neptune" } saturn = { path = "saturn" } -mq = { path = "mq" } +taurus = { path = "taurus" } mega = { path = "mega" } anyhow = "1.0.86" serde = {version = "1.0.203", features = ["derive"]} diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index 584038f04..8ebf9c7b7 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -18,7 +18,7 @@ gemini = { workspace = true } vault = { workspace = true } venus = { workspace = true } mercury = { workspace = true } -mq = { workspace = true } +taurus = { workspace = true } anyhow = { workspace = true } axum = { workspace = true } axum-server = { version = "0.7", features = ["tls-rustls"] } diff --git a/gateway/src/api/api_router.rs b/gateway/src/api/api_router.rs index efda40d23..3fff4acda 100644 --- a/gateway/src/api/api_router.rs +++ b/gateway/src/api/api_router.rs @@ -6,7 +6,7 @@ use axum::{ Json, Router, }; -use mq::event::api_request::{ApiRequestEvent, ApiType}; +use taurus::event::api_request::{ApiRequestEvent, ApiType}; use ceres::model::{ create_file::CreateFileInfo, publish_path::PublishPathInfo, diff --git a/gateway/src/api/mr_router.rs b/gateway/src/api/mr_router.rs index ffda2db57..18d843a81 100644 --- a/gateway/src/api/mr_router.rs +++ b/gateway/src/api/mr_router.rs @@ -10,7 +10,7 @@ use axum::{ use ceres::model::mr::{MRDetail, MrInfoItem}; use common::model::CommonResult; -use mq::event::api_request::{ApiRequestEvent, ApiType}; +use taurus::event::api_request::{ApiRequestEvent, ApiType}; use crate::api::ApiServiceState; pub fn routers() -> Router { diff --git a/mega/Cargo.toml b/mega/Cargo.toml index d370050fe..7ac1e1537 100644 --- a/mega/Cargo.toml +++ b/mega/Cargo.toml @@ -16,7 +16,7 @@ path = "src/main.rs" gateway = { workspace = true } common = { workspace = true } ceres = { workspace = true } -mq = { workspace = true } +taurus = { workspace = true } serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["macros"] } clap = { workspace = true, features = ["derive"] } diff --git a/mega/src/commands/service/mod.rs b/mega/src/commands/service/mod.rs index 73502ebec..824e357ac 100644 --- a/mega/src/commands/service/mod.rs +++ b/mega/src/commands/service/mod.rs @@ -25,7 +25,7 @@ pub fn cli() -> Command { // It determines which subcommand was used and calls the appropriate function. #[tokio::main] pub(crate) async fn exec(config: Config, args: &ArgMatches) -> MegaResult { - use mq::init::init_mq; + use taurus::init::init_mq; init_mq(&config).await; let (cmd, subcommand_args) = match args.subcommand() { diff --git a/mq/Cargo.toml b/taurus/Cargo.toml similarity index 93% rename from mq/Cargo.toml rename to taurus/Cargo.toml index 011e08d26..5eb8d0725 100644 --- a/mq/Cargo.toml +++ b/taurus/Cargo.toml @@ -1,10 +1,10 @@ [package] -name = "mq" +name = "taurus" version = "0.1.0" edition = "2021" [lib] -name = "mq" +name = "taurus" path = "src/lib.rs" [dependencies] diff --git a/mq/README.md b/taurus/README.md similarity index 100% rename from mq/README.md rename to taurus/README.md diff --git a/mq/src/cache.rs b/taurus/src/cache.rs similarity index 100% rename from mq/src/cache.rs rename to taurus/src/cache.rs diff --git a/mq/src/event/api_request.rs b/taurus/src/event/api_request.rs similarity index 100% rename from mq/src/event/api_request.rs rename to taurus/src/event/api_request.rs diff --git a/mq/src/event/mod.rs b/taurus/src/event/mod.rs similarity index 100% rename from mq/src/event/mod.rs rename to taurus/src/event/mod.rs diff --git a/mq/src/init.rs b/taurus/src/init.rs similarity index 100% rename from mq/src/init.rs rename to taurus/src/init.rs diff --git a/mq/src/lib.rs b/taurus/src/lib.rs similarity index 100% rename from mq/src/lib.rs rename to taurus/src/lib.rs diff --git a/mq/src/queue.rs b/taurus/src/queue.rs similarity index 100% rename from mq/src/queue.rs rename to taurus/src/queue.rs From 22ab2049a6260e10adfb9615fc41dd86be8cb581 Mon Sep 17 00:00:00 2001 From: Neon Date: Fri, 9 Aug 2024 10:33:45 +0800 Subject: [PATCH 13/13] fix clippy for saturn dead code --- saturn/src/context.rs | 4 ++++ saturn/src/entitystore.rs | 2 ++ 2 files changed, 6 insertions(+) diff --git a/saturn/src/context.rs b/saturn/src/context.rs index cb010954d..49fbc6bea 100644 --- a/saturn/src/context.rs +++ b/saturn/src/context.rs @@ -16,6 +16,7 @@ pub struct AppContext { schema: Schema, } +#[allow(dead_code)] #[derive(Debug, Error)] pub enum ContextError { #[error("{0}")] @@ -34,6 +35,7 @@ pub enum ContextError { Json(#[from] serde_json::Error), } +#[allow(dead_code)] #[derive(Debug, Error)] pub enum Error { #[error("Authorization Denied")] @@ -43,6 +45,7 @@ pub enum Error { } impl AppContext { + #[allow(dead_code)] pub fn new( entities: EntityStore, schema_path: impl Into, @@ -78,6 +81,7 @@ impl AppContext { } } + #[allow(dead_code)] pub fn is_authorized( &self, principal: impl AsRef, diff --git a/saturn/src/entitystore.rs b/saturn/src/entitystore.rs index a27901281..8d8a0500b 100644 --- a/saturn/src/entitystore.rs +++ b/saturn/src/entitystore.rs @@ -18,6 +18,7 @@ pub struct EntityStore { } impl EntityStore { + #[allow(dead_code)] pub fn as_entities(&self, schema: &Schema) -> Entities { let users = self.users.values().map(|user| user.clone().into()); let repos = self.repos.values().map(|repo| repo.clone().into()); @@ -32,6 +33,7 @@ impl EntityStore { Entities::from_entities(all, Some(schema)).unwrap() } + #[allow(dead_code)] pub fn merge(&mut self, other: EntityStore) { self.users.extend(other.users); self.repos.extend(other.repos);