diff --git a/frameworks/Rust/salvo/Cargo.toml b/frameworks/Rust/salvo/Cargo.toml index aa0a2e980322..49a744d906dd 100644 --- a/frameworks/Rust/salvo/Cargo.toml +++ b/frameworks/Rust/salvo/Cargo.toml @@ -15,22 +15,26 @@ path = "src/main_diesel.rs" name = "main-pg" path = "src/main_pg.rs" +[[bin]] +name = "main-moka" +path = "src/main_moka.rs" + [dependencies] anyhow = "1" async-trait = "0.1" bytes = "1" -diesel = { version = "1.4", features = ["postgres", "r2d2"] } -futures = "0.3" +diesel = { version = "2", features = ["postgres", "r2d2"] } +futures-util = "0.3" +moka = "0.10" markup = "0.13" -# mimalloc = { version = "0.1", default-features = false } +mimalloc = { version = "0.1", default-features = false } once_cell = "1" rand = { version = "0.8", features = ["min_const_gen", "small_rng"] } -random-fast-rng = "0.1" -salvo = { version = "0.34", default-features = false, features = ["anyhow"] } +salvo = { version = "0.38", default-features = false, features = ["anyhow", "http1"] } serde = { version = "1", features = ["derive"] } serde_json = "1" -smallvec = "1" -snmalloc-rs = { version = "0.3", features = ["native-cpu"] } +# smallvec = "1" +# snmalloc-rs = { version = "0.3", features = ["native-cpu"] } tokio = { version = "1", features = ["macros", "rt"] } tokio-postgres = "0.7" v_htmlescape = "0.15" @@ -40,5 +44,6 @@ lto = true opt-level = 3 codegen-units = 1 panic = "abort" -debug = false -incremental = false + +[patch.crates-io] +salvo = { git = "https://github.com/salvo-rs/salvo.git" } diff --git a/frameworks/Rust/salvo/benchmark_config.json b/frameworks/Rust/salvo/benchmark_config.json index aaecef6b8f4b..b9d7a02f0bbb 100644 --- a/frameworks/Rust/salvo/benchmark_config.json +++ b/frameworks/Rust/salvo/benchmark_config.json @@ -26,7 +26,6 @@ "fortune_url": "/fortunes", "query_url": "/queries?q=", "update_url": "/updates?q=", - "cached_query_url": "/cached_queries?q=", "port": 8080, "approach": "Realistic", "classification": "Micro", @@ -38,7 +37,7 @@ "webserver": "Hyper", "os": "Linux", "database_os": "Linux", - "display_name": "Salvo [Diesel]", + "display_name": "Salvo [diesel]", "notes": "", "versus": "" }, @@ -47,6 +46,22 @@ "fortune_url": "/fortunes", "query_url": "/queries?q=", "update_url": "/updates?q=", + "port": 8080, + "approach": "Realistic", + "classification": "Micro", + "database": "Postgres", + "framework": "salvo", + "language": "Rust", + "orm": "Raw", + "platform": "Rust", + "webserver": "Hyper", + "os": "Linux", + "database_os": "Linux", + "display_name": "Salvo [pg]", + "notes": "", + "versus": "" + }, + "moka": { "cached_query_url": "/cached_queries?q=", "port": 8080, "approach": "Realistic", @@ -59,7 +74,7 @@ "webserver": "Hyper", "os": "Linux", "database_os": "Linux", - "display_name": "Salvo [PG]", + "display_name": "Salvo [moka]", "notes": "", "versus": "" } diff --git a/frameworks/Rust/salvo/salvo-diesel.dockerfile b/frameworks/Rust/salvo/salvo-diesel.dockerfile index 1736b0194b71..89299924f48a 100644 --- a/frameworks/Rust/salvo/salvo-diesel.dockerfile +++ b/frameworks/Rust/salvo/salvo-diesel.dockerfile @@ -1,12 +1,10 @@ -FROM rust:1.62.1 - -RUN apt-get update -yqq && apt-get install -yqq cmake g++ +FROM rust:1.68.2 ADD ./ /salvo WORKDIR /salvo -RUN cargo clean -RUN RUSTFLAGS="-C target-cpu=native" cargo build --release +ENV RUSTFLAGS "-C target-cpu=native" +RUN cargo build --release EXPOSE 8080 diff --git a/frameworks/Rust/salvo/salvo-moka.dockerfile b/frameworks/Rust/salvo/salvo-moka.dockerfile new file mode 100644 index 000000000000..20e522ff744a --- /dev/null +++ b/frameworks/Rust/salvo/salvo-moka.dockerfile @@ -0,0 +1,11 @@ +FROM rust:1.68.2 + +ADD ./ /salvo +WORKDIR /salvo + +ENV RUSTFLAGS "-C target-cpu=native" +RUN cargo build --release + +EXPOSE 8080 + +CMD ./target/release/main-moka diff --git a/frameworks/Rust/salvo/salvo-pg.dockerfile b/frameworks/Rust/salvo/salvo-pg.dockerfile index c696a92741a6..40c3f1a71d58 100644 --- a/frameworks/Rust/salvo/salvo-pg.dockerfile +++ b/frameworks/Rust/salvo/salvo-pg.dockerfile @@ -1,12 +1,10 @@ -FROM rust:1.62.1 - -RUN apt-get update -yqq && apt-get install -yqq cmake g++ +FROM rust:1.68.2 ADD ./ /salvo WORKDIR /salvo -RUN cargo clean -RUN RUSTFLAGS="-C target-cpu=native" cargo build --release +ENV RUSTFLAGS "-C target-cpu=native" +RUN cargo build --release EXPOSE 8080 diff --git a/frameworks/Rust/salvo/salvo.dockerfile b/frameworks/Rust/salvo/salvo.dockerfile index b58afb1817d5..7095deb73455 100644 --- a/frameworks/Rust/salvo/salvo.dockerfile +++ b/frameworks/Rust/salvo/salvo.dockerfile @@ -1,15 +1,10 @@ -FROM rust:1.62.1 - -# Disable simd at jsonescape -ENV CARGO_CFG_JSONESCAPE_DISABLE_AUTO_SIMD= - -RUN apt-get update -yqq && apt-get install -yqq cmake g++ +FROM rust:1.68.2 ADD ./ /salvo WORKDIR /salvo -RUN cargo clean -RUN RUSTFLAGS="-C target-cpu=native" cargo build --release +ENV RUSTFLAGS "-C target-cpu=native" +RUN cargo build --release EXPOSE 8080 diff --git a/frameworks/Rust/salvo/src/main.rs b/frameworks/Rust/salvo/src/main.rs index 7231c340ddf6..8a4b8f8fd73a 100644 --- a/frameworks/Rust/salvo/src/main.rs +++ b/frameworks/Rust/salvo/src/main.rs @@ -1,15 +1,21 @@ -// #[global_allocator] -// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; +#[global_allocator] +static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use std::sync::Arc; +use std::thread::available_parallelism; use bytes::Bytes; +use salvo::conn::tcp::TcpAcceptor; +use salvo::http::body::ResBody; use salvo::http::header::{self, HeaderValue}; -use salvo::http::response::Body; use salvo::prelude::*; use serde::Serialize; -mod server; +mod utils; + +static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo"); +static JSON_HEADER: HeaderValue = HeaderValue::from_static("application/json"); +static PLAIN_HEADER: HeaderValue = HeaderValue::from_static("text/plain"); #[derive(Serialize)] pub struct Message { @@ -19,37 +25,50 @@ pub struct Message { #[handler] fn json(res: &mut Response) { let headers = res.headers_mut(); - headers.insert(header::SERVER, HeaderValue::from_static("S")); - headers.insert( - header::CONTENT_TYPE, - HeaderValue::from_static("application/json"), - ); + headers.insert(header::SERVER, SERVER_HEADER.clone()); + headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone()); let data = serde_json::to_vec(&Message { message: "Hello, World!", }) .unwrap(); - res.set_body(Body::Once(Bytes::from(data))); + res.set_body(ResBody::Once(Bytes::from(data))); } #[handler] fn plaintext(res: &mut Response) { let headers = res.headers_mut(); - headers.insert(header::SERVER, HeaderValue::from_static("S")); - headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("text/plain")); - res.set_body(Body::Once(Bytes::from_static(b"Hello, world!"))); + headers.insert(header::SERVER, SERVER_HEADER.clone()); + headers.insert(header::CONTENT_TYPE, PLAIN_HEADER.clone()); + res.set_body(ResBody::Once(Bytes::from_static(b"Hello, world!"))); } -#[tokio::main] -async fn main() { +fn main() { + let size = available_parallelism().map(|n| n.get()).unwrap_or(16); let router = Arc::new( Router::new() .push(Router::with_path("plaintext").get(plaintext)) .push(Router::with_path("json").get(json)), ); - server::builder() - .http1_pipeline_flush(true) - .serve(Service::new(router)) - .await + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() .unwrap(); + for _ in 1..size { + let router = router.clone(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(serve(router)); + }); + } + println!("Started http server: 127.0.0.1:8080"); + rt.block_on(serve(router)); +} + +async fn serve(router: Arc) { + let acceptor: TcpAcceptor = utils::reuse_listener().unwrap().try_into().unwrap(); + Server::new(acceptor).serve(router).await } diff --git a/frameworks/Rust/salvo/src/main_diesel.rs b/frameworks/Rust/salvo/src/main_diesel.rs index 91117613e999..f34461aa1394 100644 --- a/frameworks/Rust/salvo/src/main_diesel.rs +++ b/frameworks/Rust/salvo/src/main_diesel.rs @@ -1,11 +1,12 @@ -#[global_allocator] -static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; // #[global_allocator] -// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; +// static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; +#[global_allocator] +static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; #[macro_use] extern crate diesel; +use bytes::Bytes; use std::cmp; use std::fmt::Write; use std::sync::Arc; @@ -17,12 +18,14 @@ use diesel::r2d2::{ConnectionManager, Pool, PoolError, PooledConnection}; use once_cell::sync::OnceCell; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; +use salvo::conn::tcp::TcpAcceptor; use salvo::http::header::{self, HeaderValue}; +use salvo::http::ResBody; use salvo::prelude::*; mod models; mod schema; -mod server; +mod utils; use models::*; use schema::*; @@ -30,7 +33,9 @@ const DB_URL: &str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/he type PgPool = Pool>; static DB_POOL: OnceCell = OnceCell::new(); -static CACHED_WORLDS: OnceCell> = OnceCell::new(); +static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo"); +static JSON_HEADER: HeaderValue = HeaderValue::from_static("application/json"); +static HTML_HEADER: HeaderValue = HeaderValue::from_static("text/html; charset=utf-8"); fn connect() -> Result>, PoolError> { unsafe { DB_POOL.get_unchecked().get() } @@ -50,92 +55,87 @@ fn build_pool(database_url: &str, size: u32) -> Result { async fn world_row(res: &mut Response) -> Result<(), Error> { let mut rng = SmallRng::from_entropy(); let random_id = rng.gen_range(1..10_001); - let conn = connect()?; - let row = world::table.find(random_id).first::(&conn)?; - res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S")); - res.render(Json(row)); + let mut conn = connect()?; + let world = world::table.find(random_id).first::(&mut conn)?; + + let data = serde_json::to_vec(&world).unwrap(); + let headers = res.headers_mut(); + headers.insert(header::SERVER, SERVER_HEADER.clone()); + headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone()); + res.set_body(ResBody::Once(Bytes::from(data))); Ok(()) } #[handler] async fn queries(req: &mut Request, res: &mut Response) -> Result<(), Error> { - let count = req.query::("q").unwrap_or(1); + let count = req.query::("q").unwrap_or(1); let count = cmp::min(500, cmp::max(1, count)); - let mut worlds = Vec::with_capacity(count); + let mut worlds = Vec::with_capacity(count as usize); let mut rng = SmallRng::from_entropy(); - let conn = connect()?; + let mut conn = connect()?; for _ in 0..count { let id = rng.gen_range(1..10_001); - let w = world::table.find(id).get_result::(&conn)?; + let w = world::table.find(id).get_result::(&mut conn)?; worlds.push(w); } - res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S")); - res.render(Json(worlds)); - Ok(()) -} -#[handler] -async fn cached_queries(req: &mut Request, res: &mut Response) -> Result<(), Error> { - let count = req.query::("q").unwrap_or(1); - let count = cmp::min(500, cmp::max(1, count)); - let mut worlds = Vec::with_capacity(count); - let mut rng = SmallRng::from_entropy(); - for _ in 0..count { - let idx = rng.gen_range(0..10_000); - unsafe { - let w = CACHED_WORLDS.get_unchecked().get(idx).unwrap(); - worlds.push(w); - } - } - res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S")); - res.render(Json(worlds)); + let data = serde_json::to_vec(&worlds).unwrap(); + let headers = res.headers_mut(); + headers.insert(header::SERVER, SERVER_HEADER.clone()); + headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone()); + res.set_body(ResBody::Once(Bytes::from(data))); Ok(()) } #[handler] async fn updates(req: &mut Request, res: &mut Response) -> Result<(), Error> { - let count = req.query::("q").unwrap_or(1); + let count = req.query::("q").unwrap_or(1); let count = cmp::min(500, cmp::max(1, count)); - let conn = connect()?; - let mut worlds = Vec::with_capacity(count); + let mut conn = connect()?; + let mut worlds = Vec::with_capacity(count as usize); let mut rng = SmallRng::from_entropy(); for _ in 0..count { let w_id: i32 = rng.gen_range(1..10_001); - let mut w = world::table.find(w_id).first::(&conn)?; + let mut w = world::table.find(w_id).first::(&mut conn)?; w.randomnumber = rng.gen_range(1..10_001); worlds.push(w); } worlds.sort_by_key(|w| w.id); - conn.transaction::<(), Error, _>(|| { + conn.transaction::<(), Error, _>(|conn| { for w in &worlds { diesel::update(world::table) .filter(world::id.eq(w.id)) .set(world::randomnumber.eq(w.randomnumber)) - .execute(&conn)?; + .execute(conn)?; } Ok(()) })?; - res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S")); - res.render(Json(worlds)); + let data = serde_json::to_vec(&worlds).unwrap(); + let headers = res.headers_mut(); + headers.insert(header::SERVER, SERVER_HEADER.clone()); + headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone()); + res.set_body(ResBody::Once(Bytes::from(data))); Ok(()) } #[handler] async fn fortunes(res: &mut Response) -> Result<(), Error> { - let conn = connect()?; - let mut items = fortune::table.get_results::(&conn)?; + let mut conn = connect()?; + let mut items = fortune::table.get_results::(&mut conn)?; items.push(Fortune { id: 0, message: "Additional fortune added at request time.".to_string(), }); items.sort_by(|it, next| it.message.cmp(&next.message)); - let mut body = String::new(); - write!(&mut body, "{}", FortunesTemplate { items }).unwrap(); + let mut data = String::new(); + write!(&mut data, "{}", FortunesTemplate { items }).unwrap(); - res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S")); - res.render(Text::Html(body)); + let headers = res.headers_mut(); + headers.insert(header::SERVER, SERVER_HEADER.clone()); + headers.insert(header::CONTENT_TYPE, HTML_HEADER.clone()); + res.set_body(ResBody::Once(Bytes::from(data))); Ok(()) } @@ -161,27 +161,26 @@ markup::define! { } } -fn populate_cache() -> Result<(), Error> { - let conn = connect()?; - let worlds = world::table.limit(10_000).get_results::(&conn)?; - CACHED_WORLDS.set(worlds).unwrap(); - Ok(()) -} - fn main() { + let size = available_parallelism().map(|n| n.get()).unwrap_or(16); + DB_POOL + .set( + build_pool(DB_URL, size as u32) + .unwrap_or_else(|_| panic!("Error connecting to {}", &DB_URL)), + ) + .ok(); + let router = Arc::new( Router::new() .push(Router::with_path("db").get(world_row)) .push(Router::with_path("fortunes").get(fortunes)) .push(Router::with_path("queries").get(queries)) - .push(Router::with_path("cached_queries").get(cached_queries)) .push(Router::with_path("updates").get(updates)), ); - let size = available_parallelism().map(|n| n.get()).unwrap_or(16); - DB_POOL - .set(build_pool(DB_URL, size as u32).unwrap_or_else(|_| panic!("Error connecting to {}", &DB_URL))) - .ok(); - populate_cache().expect("error cache worlds"); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); for _ in 1..size { let router = router.clone(); std::thread::spawn(move || { @@ -192,14 +191,11 @@ fn main() { rt.block_on(serve(router)); }); } - println!("Starting http server: 127.0.0.1:8080"); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); + println!("Started http server: 127.0.0.1:8080"); rt.block_on(serve(router)); } async fn serve(router: Arc) { - server::builder().serve(Service::new(router)).await.unwrap(); + let acceptor: TcpAcceptor = utils::reuse_listener().unwrap().try_into().unwrap(); + Server::new(acceptor).serve(router).await } diff --git a/frameworks/Rust/salvo/src/main_moka.rs b/frameworks/Rust/salvo/src/main_moka.rs new file mode 100644 index 000000000000..62cd73ca11b3 --- /dev/null +++ b/frameworks/Rust/salvo/src/main_moka.rs @@ -0,0 +1,97 @@ +// #[global_allocator] +// static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; +#[global_allocator] +static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; + +use std::cmp; +use std::sync::Arc; +use std::thread::available_parallelism; + +use anyhow::Error; +use bytes::Bytes; +use moka::sync::Cache as MokaCache; +use once_cell::sync::OnceCell; +use rand::rngs::SmallRng; +use rand::{Rng, SeedableRng}; +use salvo::conn::tcp::TcpAcceptor; +use salvo::http::header::{self, HeaderValue}; +use salvo::http::ResBody; +use salvo::prelude::*; + +mod models; +mod utils; +use models::*; +mod pg_conn; +use pg_conn::PgConnection; + +const DB_URL: &str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world"; +static CACHED_WORLDS: OnceCell> = OnceCell::new(); + +static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo"); +static JSON_HEADER: HeaderValue = HeaderValue::from_static("application/json"); + +#[handler] +fn cached_queries(req: &mut Request, res: &mut Response) -> Result<(), Error> { + let count = req.query::("q").unwrap_or(1); + let count = cmp::min(500, cmp::max(1, count)); + let mut worlds = Vec::with_capacity(count as usize); + let mut rng = SmallRng::from_entropy(); + for _ in 0..count { + let idx = rng.gen_range(0..10_000); + unsafe { + let w = CACHED_WORLDS.get_unchecked().get(&idx).unwrap(); + worlds.push(w); + } + } + let data = serde_json::to_vec(&worlds).unwrap(); + let headers = res.headers_mut(); + headers.insert(header::SERVER, SERVER_HEADER.clone()); + headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone()); + res.set_body(ResBody::Once(Bytes::from(data))); + Ok(()) +} + +async fn populate_cache() -> Result<(), Error> { + let conn = PgConnection::create(DB_URL).await?; + let worlds = conn.get_worlds(10_000).await?; + let cache = MokaCache::new(10_000); + for (i, word) in worlds.into_iter().enumerate() { + cache.insert(i, word); + } + CACHED_WORLDS.set(cache).unwrap(); + Ok(()) +} + +fn main() { + let size = available_parallelism().map(|n| n.get()).unwrap_or(16); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + populate_cache().await.expect("error cache worlds"); + }); + + let router = Arc::new(Router::with_path("cached_queries").get(cached_queries)); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + for _ in 1..size{ + let router = router.clone(); + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(serve(router)); + }); + } + println!("Started http server: 127.0.0.1:8080"); + rt.block_on(serve(router)); +} + +async fn serve(router: Arc) { + let acceptor: TcpAcceptor = utils::reuse_listener().unwrap().try_into().unwrap(); + Server::new(acceptor).serve(router).await +} diff --git a/frameworks/Rust/salvo/src/main_pg.rs b/frameworks/Rust/salvo/src/main_pg.rs index 4bab1015a4e6..173d08b2f2e8 100644 --- a/frameworks/Rust/salvo/src/main_pg.rs +++ b/frameworks/Rust/salvo/src/main_pg.rs @@ -1,183 +1,32 @@ -#[global_allocator] -static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; // #[global_allocator] -// static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; +// static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; +#[global_allocator] +static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; use std::cmp; -use std::collections::HashMap; use std::fmt::Write; -use std::io; +use std::sync::Arc; use std::thread::available_parallelism; -use anyhow::Error; use async_trait::async_trait; -use futures::stream::futures_unordered::FuturesUnordered; -use futures::TryStreamExt; -use once_cell::sync::OnceCell; -use rand::distributions::{Distribution, Uniform}; -use rand::rngs::SmallRng; -use rand::{Rng, SeedableRng}; +use bytes::Bytes; +use salvo::conn::tcp::TcpAcceptor; use salvo::http::header::{self, HeaderValue}; +use salvo::http::ResBody; use salvo::prelude::*; use salvo::routing::FlowCtrl; -use tokio_postgres::types::ToSql; -use tokio_postgres::{self, Client, NoTls, Statement}; mod models; -mod server; -use models::*; +mod pg_conn; +mod utils; +use pg_conn::PgConnection; const DB_URL: &str = "postgres://benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world"; -static CACHED_WORLDS: OnceCell> = OnceCell::new(); -type DbResult = Result; - -struct PgConnection { - client: Client, - fortune: Statement, - world: Statement, - updates: HashMap, -} - -impl PgConnection { - pub async fn create(db_url: &str) -> Result { - let (client, conn) = tokio_postgres::connect(db_url, NoTls) - .await - .expect("can not connect to postgresql"); - tokio::spawn(async move { - if let Err(e) = conn.await { - eprintln!("connection error: {}", e); - } - }); - - let fortune = client.prepare("SELECT id, message FROM fortune").await.unwrap(); - let world = client.prepare("SELECT * FROM world WHERE id=$1").await.unwrap(); - let mut updates = HashMap::new(); - for num in 1..=500u16 { - let mut pl: u16 = 1; - let mut q = String::new(); - q.push_str("UPDATE world SET randomnumber = CASE id "); - for _ in 1..=num { - let _ = write!(&mut q, "when ${} then ${} ", pl, pl + 1); - pl += 2; - } - q.push_str("ELSE randomnumber END WHERE id IN ("); - for _ in 1..=num { - let _ = write!(&mut q, "${},", pl); - pl += 1; - } - q.pop(); - q.push(')'); - updates.insert(num, client.prepare(&q).await.unwrap()); - } - - Ok(PgConnection { - client, - fortune, - world, - updates, - }) - } - - async fn query_one_world(&self, w_id: i32) -> DbResult { - self.client.query_one(&self.world, &[&w_id]).await.map(|row| World { - id: row.get(0), - randomnumber: row.get(1), - }) - } - - pub async fn get_world(&self) -> DbResult { - let mut rng = SmallRng::from_entropy(); - let id: i32 = rng.gen_range(1..10_001); - self.query_one_world(id).await - } - pub async fn get_worlds(&self, count: u16) -> DbResult> { - let worlds = { - let mut rng = SmallRng::from_entropy(); - let between = Uniform::from(1..10_001); - (0..count) - .map(|_| { - let id: i32 = between.sample(&mut rng); - self.query_one_world(id) - }) - .collect::>() - }; - - worlds.try_collect().await - } - - pub async fn update(&self, count: u16) -> DbResult> { - let worlds = { - let mut rng = SmallRng::from_entropy(); - let between = Uniform::from(1..10_001); - (0..count) - .map(|_| { - let id: i32 = between.sample(&mut rng); - let w_id: i32 = between.sample(&mut rng); - async move { - let mut world = self.query_one_world(w_id).await?; - world.randomnumber = id; - Ok(world) - } - }) - .collect::>() - }; - - let worlds = worlds.try_collect::>().await?; - let mut params = Vec::<&(dyn ToSql + Sync)>::with_capacity(count as usize * 3); - for w in &worlds { - params.push(&w.id); - params.push(&w.randomnumber); - } - for w in &worlds { - params.push(&w.id); - } - - let st = self.updates.get(&count).unwrap(); - self.client.query(st, params.as_slice()).await?; - Ok(worlds) - } - pub async fn tell_fortune(&self) -> DbResult { - let mut items = self - .client - .query(&self.fortune, &[]) - .await? - .iter() - .map(|row| Fortune { - id: row.get(0), - message: row.get(1), - }) - .collect::>(); - items.push(Fortune { - id: 0, - message: "Additional fortune added at request time.".to_string(), - }); - items.sort_by(|it, next| it.message.cmp(&next.message)); - Ok(FortunesTemplate { items }) - } -} +static SERVER_HEADER: HeaderValue = HeaderValue::from_static("salvo"); +static JSON_HEADER: HeaderValue = HeaderValue::from_static("application/json"); +static HTML_HEADER: HeaderValue = HeaderValue::from_static("text/html; charset=utf-8"); -markup::define! { - FortunesTemplate(items: Vec) { - {markup::doctype()} - html { - head { - title { "Fortunes" } - } - body { - table { - tr { th { "id" } th { "message" } } - @for item in items { - tr { - td { {item.id} } - td { {markup::raw(v_htmlescape::escape(&item.message).to_string())} } - } - } - } - } - } - } -} struct WorldHandler { conn: PgConnection, } @@ -185,16 +34,26 @@ impl WorldHandler { async fn new() -> Self { Self { conn: PgConnection::create(DB_URL) - .await.unwrap_or_else(|_| panic!("Error connecting to {}", &DB_URL)), + .await + .unwrap_or_else(|_| panic!("Error connecting to {}", &DB_URL)), } } } #[async_trait] impl Handler for WorldHandler { - async fn handle(&self, _req: &mut Request, _depot: &mut Depot, res: &mut Response, _ctrl: &mut FlowCtrl) { - res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S")); + async fn handle( + &self, + _req: &mut Request, + _depot: &mut Depot, + res: &mut Response, + _ctrl: &mut FlowCtrl, + ) { let world = self.conn.get_world().await.unwrap(); - res.render(Json(world)); + let data = serde_json::to_vec(&world).unwrap(); + let headers = res.headers_mut(); + headers.insert(header::SERVER, SERVER_HEADER.clone()); + headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone()); + res.set_body(ResBody::Once(Bytes::from(data))); } } struct WorldsHandler { @@ -211,12 +70,22 @@ impl WorldsHandler { } #[async_trait] impl Handler for WorldsHandler { - async fn handle(&self, req: &mut Request, _depot: &mut Depot, res: &mut Response, _ctrl: &mut FlowCtrl) { + async fn handle( + &self, + req: &mut Request, + _depot: &mut Depot, + res: &mut Response, + _ctrl: &mut FlowCtrl, + ) { let count = req.query::("q").unwrap_or(1); let count = cmp::min(500, cmp::max(1, count)); - res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S")); let worlds = self.conn.get_worlds(count).await.unwrap(); - res.render(Json(worlds)); + + let data = serde_json::to_vec(&worlds).unwrap(); + let headers = res.headers_mut(); + headers.insert(header::SERVER, SERVER_HEADER.clone()); + headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone()); + res.set_body(ResBody::Once(Bytes::from(data))); } } struct UpdatesHandler { @@ -233,12 +102,24 @@ impl UpdatesHandler { } #[async_trait] impl Handler for UpdatesHandler { - async fn handle(&self, req: &mut Request, _depot: &mut Depot, res: &mut Response, _ctrl: &mut FlowCtrl) { + async fn handle( + &self, + req: &mut Request, + _depot: &mut Depot, + res: &mut Response, + _ctrl: &mut FlowCtrl, + ) { let count = req.query::("q").unwrap_or(1); let count = cmp::min(500, cmp::max(1, count)); - res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S")); + res.headers_mut() + .insert(header::SERVER, SERVER_HEADER.clone()); let worlds = self.conn.update(count).await.unwrap(); - res.render(Json(worlds)); + + let data = serde_json::to_vec(&worlds).unwrap(); + let headers = res.headers_mut(); + headers.insert(header::SERVER, SERVER_HEADER.clone()); + headers.insert(header::CONTENT_TYPE, JSON_HEADER.clone()); + res.set_body(ResBody::Once(Bytes::from(data))); } } struct FortunesHandler { @@ -255,48 +136,30 @@ impl FortunesHandler { } #[async_trait] impl Handler for FortunesHandler { - async fn handle(&self, _req: &mut Request, _depot: &mut Depot, res: &mut Response, _ctrl: &mut FlowCtrl) { - let mut body = String::new(); - write!(&mut body, "{}", self.conn.tell_fortune().await.unwrap()).unwrap(); - res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S")); - res.render(Text::Html(body)); - } -} + async fn handle( + &self, + _req: &mut Request, + _depot: &mut Depot, + res: &mut Response, + _ctrl: &mut FlowCtrl, + ) { + let mut data = String::new(); + write!(&mut data, "{}", self.conn.tell_fortune().await.unwrap()).unwrap(); -#[handler] -async fn cached_queries(req: &mut Request, res: &mut Response) -> Result<(), Error> { - let count = req.query::("q").unwrap_or(1); - let count = cmp::min(500, cmp::max(1, count)); - let mut worlds = Vec::with_capacity(count); - let mut rng = SmallRng::from_entropy(); - for _ in 0..count { - let idx = rng.gen_range(0..10_000); - unsafe { - let w = CACHED_WORLDS.get_unchecked().get(idx).unwrap(); - worlds.push(w); - } + let headers = res.headers_mut(); + headers.insert(header::SERVER, SERVER_HEADER.clone()); + headers.insert(header::CONTENT_TYPE, HTML_HEADER.clone()); + res.set_body(ResBody::Once(Bytes::from(data))); } - res.headers_mut().insert(header::SERVER, HeaderValue::from_static("S")); - res.render(Json(worlds)); - Ok(()) -} - -async fn populate_cache() -> Result<(), Error> { - let conn = PgConnection::create(DB_URL).await?; - let worlds = conn.get_worlds(10_000).await?; - CACHED_WORLDS.set(worlds).unwrap(); - Ok(()) } fn main() { + let size = available_parallelism().map(|n| n.get()).unwrap_or(16); let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); - rt.block_on(async { - populate_cache().await.expect("error cache worlds"); - }); - for _ in 1..available_parallelism().map(|n| n.get()).unwrap_or(16) { + for _ in 1..size { std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -314,8 +177,7 @@ async fn serve() { .push(Router::with_path("db").get(WorldHandler::new().await)) .push(Router::with_path("fortunes").get(FortunesHandler::new().await)) .push(Router::with_path("queries").get(WorldsHandler::new().await)) - .push(Router::with_path("cached_queries").get(cached_queries)) .push(Router::with_path("updates").get(UpdatesHandler::new().await)); - - server::builder().serve(Service::new(router)).await.unwrap(); + let acceptor: TcpAcceptor = utils::reuse_listener().unwrap().try_into().unwrap(); + Server::new(acceptor).serve(router).await } diff --git a/frameworks/Rust/salvo/src/models.rs b/frameworks/Rust/salvo/src/models.rs index 54abd1570655..2efe9ebb3b58 100644 --- a/frameworks/Rust/salvo/src/models.rs +++ b/frameworks/Rust/salvo/src/models.rs @@ -7,7 +7,7 @@ pub struct Message { } #[allow(non_snake_case)] -#[derive(Serialize, Queryable, Debug)] +#[derive(Serialize, Queryable, Clone, Debug)] pub struct World { pub id: i32, pub randomnumber: i32, diff --git a/frameworks/Rust/salvo/src/pg_conn.rs b/frameworks/Rust/salvo/src/pg_conn.rs new file mode 100644 index 000000000000..c11199a99d09 --- /dev/null +++ b/frameworks/Rust/salvo/src/pg_conn.rs @@ -0,0 +1,167 @@ +use std::collections::HashMap; +use std::fmt::Write; +use std::io; + +use futures_util::stream::{FuturesUnordered, TryStreamExt}; +use rand::distributions::{Distribution, Uniform}; +use rand::rngs::SmallRng; +use rand::{Rng, SeedableRng}; +use tokio_postgres::types::ToSql; +use tokio_postgres::{self, Client, NoTls, Statement}; + +use crate::models::*; + +type DbResult = Result; + +pub struct PgConnection { + client: Client, + #[allow(dead_code)] + fortune: Statement, + world: Statement, + #[allow(dead_code)] + updates: HashMap, +} + +impl PgConnection { + pub async fn create(db_url: &str) -> Result { + let (client, conn) = tokio_postgres::connect(db_url, NoTls) + .await + .expect("can not connect to postgresql"); + tokio::spawn(async move { + if let Err(e) = conn.await { + eprintln!("connection error: {}", e); + } + }); + + let fortune = client.prepare("SELECT id, message FROM fortune").await.unwrap(); + let world = client.prepare("SELECT * FROM world WHERE id=$1").await.unwrap(); + let mut updates = HashMap::new(); + for num in 1..=500u16 { + let mut pl: u16 = 1; + let mut q = String::new(); + q.push_str("UPDATE world SET randomnumber = CASE id "); + for _ in 1..=num { + let _ = write!(&mut q, "when ${} then ${} ", pl, pl + 1); + pl += 2; + } + q.push_str("ELSE randomnumber END WHERE id IN ("); + for _ in 1..=num { + let _ = write!(&mut q, "${},", pl); + pl += 1; + } + q.pop(); + q.push(')'); + updates.insert(num, client.prepare(&q).await.unwrap()); + } + + Ok(PgConnection { + client, + fortune, + world, + updates, + }) + } + + async fn query_one_world(&self, w_id: i32) -> DbResult { + self.client.query_one(&self.world, &[&w_id]).await.map(|row| World { + id: row.get(0), + randomnumber: row.get(1), + }) + } + + #[allow(dead_code)] + pub async fn get_world(&self) -> DbResult { + let mut rng = SmallRng::from_entropy(); + let id: i32 = rng.gen_range(1..10_001); + self.query_one_world(id).await + } + pub async fn get_worlds(&self, count: u16) -> DbResult> { + let worlds = { + let mut rng = SmallRng::from_entropy(); + let between = Uniform::from(1..10_001); + (0..count) + .map(|_| { + let id = between.sample(&mut rng); + self.query_one_world(id) + }) + .collect::>() + }; + + worlds.try_collect().await + } + + #[allow(dead_code)] + pub async fn update(&self, count: u16) -> DbResult> { + let worlds = { + let mut rng = SmallRng::from_entropy(); + let between = Uniform::from(1..10_001); + (0..count) + .map(|_| { + let id: i32 = between.sample(&mut rng); + let w_id: i32 = between.sample(&mut rng); + async move { + let mut world = self.query_one_world(w_id).await?; + world.randomnumber = id; + Ok(world) + } + }) + .collect::>() + }; + + let worlds = worlds.try_collect::>().await?; + let mut params = Vec::<&(dyn ToSql + Sync)>::with_capacity(count as usize * 3); + for w in &worlds { + params.push(&w.id); + params.push(&w.randomnumber); + } + for w in &worlds { + params.push(&w.id); + } + + let st = self.updates.get(&count).unwrap(); + self.client.query(st, params.as_slice()).await?; + Ok(worlds) + } + + #[allow(dead_code)] + pub async fn tell_fortune(&self) -> DbResult { + let mut items = self + .client + .query(&self.fortune, &[]) + .await? + .iter() + .map(|row| Fortune { + id: row.get(0), + message: row.get(1), + }) + .collect::>(); + items.push(Fortune { + id: 0, + message: "Additional fortune added at request time.".to_string(), + }); + items.sort_by(|it, next| it.message.cmp(&next.message)); + Ok(FortunesTemplate { items }) + } +} + +markup::define! { + FortunesTemplate(items: Vec) { + {markup::doctype()} + html { + head { + title { "Fortunes" } + } + body { + table { + tr { th { "id" } th { "message" } } + @for item in items { + tr { + td { {item.id} } + td { {markup::raw(v_htmlescape::escape(&item.message).to_string())} } + } + } + } + } + } + } +} diff --git a/frameworks/Rust/salvo/src/server.rs b/frameworks/Rust/salvo/src/server.rs deleted file mode 100644 index a56501c24d60..000000000000 --- a/frameworks/Rust/salvo/src/server.rs +++ /dev/null @@ -1,33 +0,0 @@ -use std::io; -use std::net::{Ipv4Addr, SocketAddr}; - -use salvo::hyper; -use salvo::hyper::server::conn::AddrIncoming; -use tokio::net::{TcpListener, TcpSocket}; - -pub fn builder() -> hyper::server::Builder { - let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8080)); - let listener = reuse_listener(addr).expect("couldn't bind to addr"); - let incoming = AddrIncoming::from_listener(listener).unwrap(); - hyper::Server::builder(incoming) - .http1_only(true) - .tcp_nodelay(true) -} - -fn reuse_listener(addr: SocketAddr) -> io::Result { - let socket = match addr { - SocketAddr::V4(_) => TcpSocket::new_v4()?, - SocketAddr::V6(_) => TcpSocket::new_v6()?, - }; - - #[cfg(unix)] - { - if let Err(e) = socket.set_reuseport(true) { - eprintln!("error setting SO_REUSEPORT: {}", e); - } - } - - socket.set_reuseaddr(true)?; - socket.bind(addr)?; - socket.listen(1024) -} diff --git a/frameworks/Rust/salvo/src/utils.rs b/frameworks/Rust/salvo/src/utils.rs new file mode 100644 index 000000000000..f68ca4b53bc7 --- /dev/null +++ b/frameworks/Rust/salvo/src/utils.rs @@ -0,0 +1,24 @@ +use std::io; +use std::net::{Ipv4Addr, SocketAddr}; + +use tokio::net::{TcpListener, TcpSocket}; + +#[allow(dead_code)] +pub fn reuse_listener() -> io::Result { + let addr = SocketAddr::from((Ipv4Addr::UNSPECIFIED, 8080)); + let socket = match addr { + SocketAddr::V4(_) => TcpSocket::new_v4()?, + SocketAddr::V6(_) => TcpSocket::new_v6()?, + }; + + #[cfg(unix)] + { + if let Err(e) = socket.set_reuseport(true) { + eprintln!("error setting SO_REUSEPORT: {e}"); + } + } + + socket.set_reuseaddr(true)?; + socket.bind(addr)?; + socket.listen(1024) +}