Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions packages/ak-common/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use sqlx::{
ConnectOptions as _, Executor as _, PgConnection, PgPool,
postgres::{PgConnectOptions, PgPoolOptions, PgSslMode},
};
use tokio::fs::read_to_string;
use tracing::{info, log::LevelFilter, trace};

use crate::{
Expand All @@ -15,7 +16,7 @@ use crate::{

static DB: OnceLock<PgPool> = OnceLock::new();

fn get_connect_opts() -> Result<PgConnectOptions> {
async fn get_connect_opts() -> Result<PgConnectOptions> {
let config = config::get();
let mut opts = PgConnectOptions::new()
.application_name(&format!(
Expand All @@ -30,13 +31,19 @@ fn get_connect_opts() -> Result<PgConnectOptions> {
.database(&config.postgresql.name)
.ssl_mode(PgSslMode::from_str(&config.postgresql.sslmode)?);
if let Some(sslrootcert) = &config.postgresql.sslrootcert {
opts = opts.ssl_root_cert_from_pem(sslrootcert.as_bytes().to_vec());
let from_fs = read_to_string(sslrootcert).await;
let data = from_fs.as_ref().unwrap_or(sslrootcert).as_bytes().to_vec();
opts = opts.ssl_root_cert_from_pem(data);
}
if let Some(sslcert) = &config.postgresql.sslcert {
opts = opts.ssl_client_cert_from_pem(sslcert.as_bytes());
let from_fs = read_to_string(sslcert).await;
let data = from_fs.as_ref().unwrap_or(sslcert).as_bytes();
opts = opts.ssl_client_cert_from_pem(data);
}
if let Some(sslkey) = &config.postgresql.sslkey {
opts = opts.ssl_client_key_from_pem(sslkey.as_bytes());
let from_fs = read_to_string(sslkey).await;
let data = from_fs.as_ref().unwrap_or(sslkey).as_bytes();
opts = opts.ssl_client_key_from_pem(data);
}
Ok(opts)
}
Expand All @@ -49,7 +56,7 @@ async fn update_connect_opts_on_config_change(arbiter: Arbiter) -> Result<()> {
Ok(Event::ConfigChanged) = events_rx.recv() => {
trace!("config change received, refreshing database connection options");
let db = get();
db.set_connect_options(get_connect_opts()?);
db.set_connect_options(get_connect_opts().await?);
},
() = arbiter.shutdown() => {
info!("stopping database watcher for config changes");
Expand All @@ -61,7 +68,7 @@ async fn update_connect_opts_on_config_change(arbiter: Arbiter) -> Result<()> {

pub async fn init(tasks: &mut Tasks) -> Result<()> {
info!("initializing database pool");
let options = get_connect_opts()?;
let options = get_connect_opts().await?;
let config = config::get();

let pool_options = PgPoolOptions::new()
Expand Down Expand Up @@ -106,7 +113,7 @@ pub fn get() -> &'static PgPool {
}

pub async fn create_conn() -> Result<PgConnection> {
let options = get_connect_opts()?;
let options = get_connect_opts().await?;
let conn = options.connect().await?;
Ok(conn)
}
Expand Down
Loading