diff --git a/Cargo.lock b/Cargo.lock index eb8d515..1c4dcca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -367,14 +367,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.4.5", "bytes", "futures-util", "http", "http-body", "http-body-util", "itoa", - "matchit", + "matchit 0.7.3", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de45108900e1f9b9242f7f2e254aa3e2c029c921c258fe9e6b4217eeebd54288" +dependencies = [ + "axum-core 0.5.2", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "itoa", + "matchit 0.8.4", "memchr", "mime", "percent-encoding", @@ -407,6 +433,25 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -537,17 +582,19 @@ dependencies = [ [[package]] name = "code0-flow" -version = "0.0.3" +version = "0.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3de77c35d0d017dd1d1d0a15dde722c5d0d25f348aecab9ca14ab54f7a6ab4bc" +checksum = "24dbc44b78c316d08a604ba077a2bd6c7cdf3d28919247161d08709159a2225b" dependencies = [ "async-trait", + "futures-lite 2.6.0", "lapin", "log", "redis", + "serde", "serde_json", "tokio", - "tucana", + "tucana 0.0.19", ] [[package]] @@ -1423,6 +1470,12 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "memchr" version = "2.7.4" @@ -1926,9 +1979,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.28.2" +version = "0.29.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e37ec3fd44bea2ec947ba6cc7634d7999a6590aca7c35827c250bc0de502bda6" +checksum = "1bc42f3a12fd4408ce64d8efef67048a924e543bd35c6591c0447fda9054695f" dependencies = [ "arc-swap", "async-std", @@ -1940,6 +1993,8 @@ dependencies = [ "percent-encoding", "pin-project-lite", "ryu", + "serde", + "serde_json", "sha1_smol", "socket2 0.5.9", "tokio", @@ -2355,12 +2410,11 @@ name = "taurus" version = "0.1.0" dependencies = [ "code0-flow", - "futures-lite 2.6.0", "lapin", "serde", "serde_json", "tokio", - "tucana", + "tucana 0.0.17", ] [[package]] @@ -2508,7 +2562,7 @@ checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.7.9", "base64", "bytes", "h2", @@ -2530,6 +2584,35 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85839f0b32fd242bb3209262371d07feda6d780d16ee9d2bc88581b89da1549b" +dependencies = [ + "async-trait", + "axum 0.8.3", + "base64", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2 0.5.9", + "tokio", + "tokio-stream", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tonic-build" version = "0.12.3" @@ -2544,6 +2627,20 @@ dependencies = [ "syn", ] +[[package]] +name = "tonic-build" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d85f0383fadd15609306383a90e85eaed44169f931a5d2be1b42c76ceff1825e" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn", +] + [[package]] name = "tower" version = "0.4.13" @@ -2572,10 +2669,15 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", + "indexmap 2.9.0", "pin-project-lite", + "slab", "sync_wrapper", + "tokio", + "tokio-util", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -2637,8 +2739,22 @@ dependencies = [ "prost-types", "serde", "serde_json", - "tonic", - "tonic-build", + "tonic 0.12.3", + "tonic-build 0.12.3", +] + +[[package]] +name = "tucana" +version = "0.0.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6c0791d9505b75b1405db08e0a2fd32d3dff98a4346c2d3ecf17ed55ea36de2" +dependencies = [ + "prost", + "prost-types", + "serde", + "serde_json", + "tonic 0.13.0", + "tonic-build 0.13.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 53a5b07..a48354e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,8 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -code0-flow = "0.0.3" -futures-lite = "2.6.0" +code0-flow = { version = "0.0.7", features = ["all"] } lapin = "2.5.3" serde = "1.0.219" serde_json = "1.0.140" diff --git a/src/main.rs b/src/main.rs index d71b935..fdc10d5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,176 +1,14 @@ -use futures_lite::stream::StreamExt; -use lapin::{ - options::{BasicAckOptions, BasicConsumeOptions, QueueDeclareOptions}, - types::FieldTable, - Channel, Connection, -}; -use serde::{Deserialize, Serialize}; +use code0_flow::flow_queue::service::{Message, RabbitmqClient}; use std::sync::Arc; -use tokio::sync::Mutex; -#[derive(Serialize, Deserialize)] -enum MessageType { - ExecuteFlow, - TestExecuteFlow, -} - -#[derive(Serialize, Deserialize)] -struct Sender { - name: String, - protocol: String, - version: String, -} - -#[derive(Serialize, Deserialize)] -struct Message { - message_type: MessageType, - sender: Sender, - timestamp: i64, - telegram_id: String, - body: String, -} - -async fn build_connection(rabbitmq_url: &str) -> Connection { - match Connection::connect(rabbitmq_url, lapin::ConnectionProperties::default()).await { - Ok(env) => env, - Err(error) => panic!( - "Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}", - error - ), - } -} - -// Thread-safe wrapper for RabbitMQ channel -struct RabbitmqClient { - channel: Arc>, -} - -impl RabbitmqClient { - // Create a new RabbitMQ client with channel - async fn new(rabbitmq_url: &str) -> Self { - let connection = build_connection(rabbitmq_url).await; - let channel = connection.create_channel().await.unwrap(); - - // Declare the queue once during initialization - channel - .queue_declare( - "send_queue", - QueueDeclareOptions::default(), - FieldTable::default(), - ) - .await - .unwrap(); - - channel - .queue_declare( - "recieve_queue", - QueueDeclareOptions::default(), - FieldTable::default(), - ) - .await - .unwrap(); - - RabbitmqClient { - channel: Arc::new(Mutex::new(channel)), - } - } - - // Send message to the queue - async fn send_message(&self, message_json: String, queue_name: &str) { - let channel = self.channel.lock().await; - - channel - .basic_publish( - "", // exchange - queue_name, // routing key (queue name) - lapin::options::BasicPublishOptions::default(), - message_json.as_bytes(), - lapin::BasicProperties::default(), - ) - .await - .expect("TEST"); - } - - // Receive messages from a queue - async fn receive_messages(&self, queue_name: &str) -> Result<(), lapin::Error> { - let mut consumer = { - let channel = self.channel.lock().await; - - let consumer_res = channel - .basic_consume( - queue_name, - "consumer", - BasicConsumeOptions::default(), - FieldTable::default(), - ) - .await; - - match consumer_res { - Ok(consumer) => consumer, - Err(err) => panic!("{}", err), - } - }; - - println!("Starting to consume from {}", queue_name); - - while let Some(delivery) = consumer.next().await { - let delivery = match delivery { - Ok(del) => del, - Err(err) => { - println!("Error receiving message: {}", err); - return Err(err); - } - }; - - let data = &delivery.data; - let message_str = match std::str::from_utf8(&data) { - Ok(str) => { - println!("Received message: {}", str); - str - } - Err(err) => { - println!("Error decoding message: {}", err); - return Ok(()); - } - }; - // Parse the message - let message = match serde_json::from_str::(message_str) { - Ok(mess) => { - println!("Parsed message with telegram_id: {}", mess.telegram_id); - mess - } - Err(err) => { - println!("Error parsing message: {}", err); - return Ok(()); - } - }; - - // Process the message here - let hello_world = Message { - telegram_id: message.telegram_id, - message_type: message.message_type, - timestamp: message.timestamp, - sender: message.sender, - body: "{ \"text\": \"Hello, World!\" }".to_string(), - }; - - let hello_world_json = serde_json::to_string(&hello_world).unwrap(); - - println!("{}", hello_world_json); - - { - self.send_message(hello_world_json, "recieve_queue").await; - } - - // Acknowledge the message - delivery - .ack(BasicAckOptions::default()) - .await - .expect("Failed to acknowledge message"); - } - - Ok(()) - } +fn handle_message(message: Message) -> Result { + Ok(Message { + message_id: message.message_id, + message_type: message.message_type, + timestamp: message.timestamp, + sender: message.sender, + body: "{ \"text\": \"Hihi, World!\" }".to_string(), + }) } #[tokio::main] @@ -178,7 +16,10 @@ async fn main() { let rabbitmq_client = Arc::new(RabbitmqClient::new("amqp://localhost:5672").await); // Receive messages from the send_queue - if let Err(e) = rabbitmq_client.receive_messages("send_queue").await { + if let Err(e) = rabbitmq_client + .receive_messages("send_queue", handle_message) + .await + { eprintln!("Failed to receive messages: {}", e); } }