From 0fb0bcb51bfb7d62ca2453bc64a2442ff8a77346 Mon Sep 17 00:00:00 2001 From: Elliott Davis Date: Thu, 4 Oct 2018 12:14:55 -0500 Subject: [PATCH] Make all calls to the db from the api direct and synchronous Signed-off-by: Elliott Davis --- components/builder-api/src/server/db.rs | 24 ++-- .../src/server/handlers/account.rs | 52 -------- .../src/server/handlers/channel.rs | 43 ------- .../builder-api/src/server/handlers/mod.rs | 2 - components/builder-api/src/server/mod.rs | 12 +- .../builder-api/src/server/models/account.rs | 21 --- .../builder-api/src/server/models/channel.rs | 19 +-- .../src/server/resources/channels.rs | 120 ++++++++++-------- .../src/server/resources/profile.rs | 72 +++++------ 9 files changed, 121 insertions(+), 244 deletions(-) delete mode 100644 components/builder-api/src/server/handlers/account.rs delete mode 100644 components/builder-api/src/server/handlers/channel.rs delete mode 100644 components/builder-api/src/server/handlers/mod.rs diff --git a/components/builder-api/src/server/db.rs b/components/builder-api/src/server/db.rs index 594bb574f5..b43a94aa13 100644 --- a/components/builder-api/src/server/db.rs +++ b/components/builder-api/src/server/db.rs @@ -1,4 +1,3 @@ -use actix_web::actix::{Actor, Addr, SyncArbiter, SyncContext}; use actix_web::{error, Error}; use db::config::DataStoreCfg; use diesel::pg::PgConnection; @@ -7,22 +6,19 @@ use diesel::r2d2::{ConnectionManager, Pool, PooledConnection}; type PgPool = Pool>; type PgPooledConnection = PooledConnection>; -pub struct DbExecutor(pub PgPool); +#[derive(Clone)] +pub struct DbPool(pub PgPool); -impl Actor for DbExecutor { - type Context = SyncContext; +pub fn init(config: DataStoreCfg) -> DbPool { + DbPool( + Pool::builder() + .max_size(config.pool_size) + .build(ConnectionManager::::new(config.to_string())) + .expect("Failed to create pool."), + ) } -pub fn init(config: DataStoreCfg) -> Addr { - let manager = ConnectionManager::::new(config.to_string()); - let pool = Pool::builder() - .max_size(config.pool_size) - .build(manager) - .expect("Failed to create pool."); - SyncArbiter::start(4, move || DbExecutor(pool.clone())) -} - -impl DbExecutor { +impl DbPool { pub fn get_conn(&self) -> Result { self.0.get().map_err(|e| error::ErrorInternalServerError(e)) } diff --git a/components/builder-api/src/server/handlers/account.rs b/components/builder-api/src/server/handlers/account.rs deleted file mode 100644 index 365503a39d..0000000000 --- a/components/builder-api/src/server/handlers/account.rs +++ /dev/null @@ -1,52 +0,0 @@ -use actix_web::{actix::Handler, error, Error}; -use server::db::DbExecutor; -use server::models::account::{ - Account, CreateAccount, FindOrCreateAccount, GetAccount, GetAccountById, UpdateAccount, -}; -use std::ops::Deref; - -impl Handler for DbExecutor { - type Result = Result; - - fn handle(&mut self, account: GetAccount, _: &mut Self::Context) -> Self::Result { - Account::get(account, self.get_conn()?.deref()) - .map_err(|_| error::ErrorInternalServerError("Error fetching account")) - } -} - -impl Handler for DbExecutor { - type Result = Result; - - fn handle(&mut self, id: GetAccountById, _: &mut Self::Context) -> Self::Result { - Account::get_by_id(id, self.get_conn()?.deref()) - .map_err(|_| error::ErrorInternalServerError("Error fetching account by ID")) - } -} - -impl Handler for DbExecutor { - type Result = Result; - - fn handle(&mut self, account: CreateAccount, _: &mut Self::Context) -> Self::Result { - Account::create(account, self.get_conn()?.deref()) - .map_err(|_| error::ErrorInternalServerError("Error creating account")) - } -} - -impl Handler for DbExecutor { - type Result = Result<(), Error>; - - fn handle(&mut self, account: UpdateAccount, _: &mut Self::Context) -> Self::Result { - Account::update(account, self.get_conn()?.deref()) - .map(|_| ()) - .map_err(|_| error::ErrorInternalServerError("Error updating account")) - } -} - -impl Handler for DbExecutor { - type Result = Result; - - fn handle(&mut self, account: FindOrCreateAccount, _: &mut Self::Context) -> Self::Result { - Account::find_or_create(account, self.get_conn()?.deref()) - .map_err(|_| error::ErrorInternalServerError("Error on FetchOrCreate account")) - } -} diff --git a/components/builder-api/src/server/handlers/channel.rs b/components/builder-api/src/server/handlers/channel.rs deleted file mode 100644 index b4c91eb308..0000000000 --- a/components/builder-api/src/server/handlers/channel.rs +++ /dev/null @@ -1,43 +0,0 @@ -use actix_web::{actix::Handler, error, Error}; -use diesel::result::{DatabaseErrorKind, Error::DatabaseError}; -use server::db::DbExecutor; -use server::models::channel::{Channel, CreateChannel, DeleteChannel, GetChannel, ListChannels}; -use std::ops::Deref; - -impl Handler for DbExecutor { - type Result = Result, Error>; - fn handle(&mut self, channel: ListChannels, _: &mut Self::Context) -> Self::Result { - Channel::list(channel, self.get_conn()?.deref()) - .map_err(|_| error::ErrorInternalServerError("Error listing channels")) - } -} - -impl Handler for DbExecutor { - type Result = Result; - fn handle(&mut self, channel: GetChannel, _: &mut Self::Context) -> Self::Result { - Channel::get(channel, self.get_conn()?.deref()) - .map_err(|_| error::ErrorInternalServerError("Error fetching channel")) - } -} - -impl Handler for DbExecutor { - type Result = Result; - fn handle(&mut self, channel: CreateChannel, _: &mut Self::Context) -> Self::Result { - match Channel::create(channel, self.get_conn()?.deref()) { - Ok(channel) => Ok(channel), - Err(DatabaseError(DatabaseErrorKind::UniqueViolation, _)) => { - Err(error::ErrorConflict("channel already exists")) - } - Err(_) => Err(error::ErrorInternalServerError("Error creating channel")), - } - } -} - -impl Handler for DbExecutor { - type Result = Result<(), Error>; - fn handle(&mut self, channel: DeleteChannel, _: &mut Self::Context) -> Self::Result { - Channel::delete(channel, self.get_conn()?.deref()) - .map(|_| ()) - .map_err(|_| error::ErrorInternalServerError("Error deleting channel")) - } -} diff --git a/components/builder-api/src/server/handlers/mod.rs b/components/builder-api/src/server/handlers/mod.rs deleted file mode 100644 index 6381af5c52..0000000000 --- a/components/builder-api/src/server/handlers/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod account; -pub mod channel; diff --git a/components/builder-api/src/server/mod.rs b/components/builder-api/src/server/mod.rs index 5cec8fdaeb..56fcb8141d 100644 --- a/components/builder-api/src/server/mod.rs +++ b/components/builder-api/src/server/mod.rs @@ -16,7 +16,6 @@ pub mod authorize; pub mod db; pub mod error; pub mod framework; -pub mod handlers; pub mod helpers; pub mod models; pub mod resources; @@ -30,7 +29,6 @@ use std::sync::Arc; use std::thread; use actix; -use actix_web::actix::Addr; use actix_web::http::StatusCode; use actix_web::middleware::Logger; use actix_web::server::{self, KeepAlive}; @@ -42,7 +40,7 @@ use hab_net::socket; use oauth_client::client::OAuth2Client; use segment_api_client::SegmentClient; -use self::db::{init, DbExecutor}; +use self::db::{init, DbPool}; use self::error::Error; use self::framework::middleware::{Authentication, XRouteClient}; @@ -81,11 +79,11 @@ pub struct AppState { segment: SegmentClient, upstream: UpstreamClient, memcache: RefCell, - db: Addr, + db: DbPool, } impl AppState { - pub fn new(config: &Config, db: Addr) -> AppState { + pub fn new(config: &Config, db: DbPool) -> AppState { AppState { config: config.clone(), packages: S3Handler::new(config.s3.clone()), @@ -154,10 +152,10 @@ pub fn run(config: Config) -> Result<()> { // TED TODO: When originsrv gets removed we need to do the migrations here - let addr = init(config.datastore.clone()); + let db_pool = init(config.datastore.clone()); server::new(move || { - let app_state = AppState::new(&config, addr.clone()); + let app_state = AppState::new(&config, db_pool.clone()); App::with_state(app_state) .middleware(Logger::default().exclude("/v1/status")) diff --git a/components/builder-api/src/server/models/account.rs b/components/builder-api/src/server/models/account.rs index 87b66aaca7..15f0c76291 100644 --- a/components/builder-api/src/server/models/account.rs +++ b/components/builder-api/src/server/models/account.rs @@ -1,4 +1,3 @@ -use actix_web::{actix::Message, Error}; use diesel; use diesel::pg::PgConnection; use diesel::result::QueryResult; @@ -38,26 +37,6 @@ pub struct FindOrCreateAccount { email: String, } -impl Message for GetAccount { - type Result = Result; -} - -impl Message for GetAccountById { - type Result = Result; -} - -impl Message for CreateAccount { - type Result = Result; -} - -impl Message for UpdateAccount { - type Result = Result<(), Error>; -} - -impl Message for FindOrCreateAccount { - type Result = Result; -} - impl Account { pub fn get(account: GetAccount, conn: &PgConnection) -> QueryResult { diesel::sql_query("SELECT * FROM get_account_by_name_v1($1)") diff --git a/components/builder-api/src/server/models/channel.rs b/components/builder-api/src/server/models/channel.rs index fc20708b26..a06d6c8146 100644 --- a/components/builder-api/src/server/models/channel.rs +++ b/components/builder-api/src/server/models/channel.rs @@ -1,5 +1,4 @@ use super::db_id_format; -use actix_web::{actix::Message, Error}; use chrono::NaiveDateTime; use diesel; use diesel::pg::PgConnection; @@ -8,7 +7,7 @@ use diesel::sql_types::{BigInt, Bool, Text}; use diesel::RunQueryDsl; use server::schema::channel::*; -#[derive(Debug, Serialize, QueryableByName)] +#[derive(Debug, Serialize, Deserialize, QueryableByName)] #[table_name = "origin_channels"] pub struct Channel { #[serde(with = "db_id_format")] @@ -43,22 +42,6 @@ pub struct DeleteChannel { pub channel: String, } -impl Message for CreateChannel { - type Result = Result; -} - -impl Message for ListChannels { - type Result = Result, Error>; -} - -impl Message for GetChannel { - type Result = Result; -} - -impl Message for DeleteChannel { - type Result = Result<(), Error>; -} - impl Channel { pub fn list(channel: ListChannels, conn: &PgConnection) -> QueryResult> { diesel::sql_query("select * from get_origin_channels_for_origin_v3($1, $2)") diff --git a/components/builder-api/src/server/resources/channels.rs b/components/builder-api/src/server/resources/channels.rs index 5974245c79..52839b7ff5 100644 --- a/components/builder-api/src/server/resources/channels.rs +++ b/components/builder-api/src/server/resources/channels.rs @@ -15,9 +15,11 @@ use std::str::FromStr; use actix_web::http::{self, Method, StatusCode}; -use actix_web::{error, App, HttpRequest, HttpResponse, Path, Query}; -use actix_web::{AsyncResponder, FromRequest, FutureResponse}; -use futures::{future, Future}; +use actix_web::FromRequest; +use actix_web::{App, HttpRequest, HttpResponse, Path, Query}; +use diesel::result::{DatabaseErrorKind, Error::DatabaseError}; + +use std::ops::Deref; use bldr_core::metrics::CounterMetric; use hab_core::package::{Identifiable, PackageTarget}; @@ -31,7 +33,7 @@ use server::error::{Error, Result}; use server::framework::headers; use server::framework::middleware::route_message; use server::helpers::{self, Pagination, Target}; -use server::models::channel::{CreateChannel, DeleteChannel, ListChannels}; +use server::models::channel::{Channel, CreateChannel, DeleteChannel, ListChannels}; use server::services::metrics::Counter; use server::AppState; @@ -97,89 +99,105 @@ impl Channels { // // Route handlers - these functions can return any Responder trait // -fn get_channels( - (req, sandbox): (HttpRequest, Query), -) -> FutureResponse { +fn get_channels((req, sandbox): (HttpRequest, Query)) -> HttpResponse { let origin = Path::<(String)>::extract(&req).unwrap().into_inner(); - req.state() - .db - .send(ListChannels { + let conn = match req.state().db.get_conn() { + Ok(conn_ref) => conn_ref, + Err(_) => return HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR), + }; + + match Channel::list( + ListChannels { origin: origin, include_sandbox_channels: sandbox.is_set, - }).from_err() - .and_then(|res| match res { - Ok(list) => { - // TED: This is to maintain backwards API compat while killing some proto definitions - // currently the output looks like [{"name": "foo"}] when it probably should be ["foo"] - #[derive(Serialize)] - struct Temp { - name: String, - } - let ident_list: Vec = list - .iter() - .map(|channel| Temp { - name: channel.name.clone(), - }).collect(); - Ok(HttpResponse::Ok() - .header(http::header::CACHE_CONTROL, headers::NO_CACHE) - .json(ident_list)) + }, + conn.deref(), + ) { + Ok(list) => { + // TED: This is to maintain backwards API compat while killing some proto definitions + // currently the output looks like [{"name": "foo"}] when it probably should be ["foo"] + #[derive(Serialize)] + struct Temp { + name: String, } - Err(_err) => Ok(HttpResponse::InternalServerError().into()), - }).responder() + let ident_list: Vec = list + .iter() + .map(|channel| Temp { + name: channel.name.clone(), + }).collect(); + HttpResponse::Ok() + .header(http::header::CACHE_CONTROL, headers::NO_CACHE) + .json(ident_list) + } + Err(_err) => HttpResponse::InternalServerError().into(), + } } -fn create_channel(req: HttpRequest) -> FutureResponse { +fn create_channel(req: HttpRequest) -> HttpResponse { let (origin, channel) = Path::<(String, String)>::extract(&req) .unwrap() .into_inner(); let session_id = match authorize_session(&req, Some(&origin)) { Ok(session_id) => session_id as i64, - Err(e) => return future::err(error::ErrorUnauthorized(e)).responder(), + Err(_) => return HttpResponse::new(StatusCode::UNAUTHORIZED), }; - req.state() - .db - .send(CreateChannel { + let conn = match req.state().db.get_conn() { + Ok(conn_ref) => conn_ref, + Err(_) => return HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR), + }; + + match Channel::create( + CreateChannel { channel: channel, origin: origin, owner_id: session_id, - }).from_err() - .and_then(|res| match res { - Ok(channel) => Ok(HttpResponse::Created().json(channel)), - Err(e) => Err(e), - }).responder() + }, + conn.deref(), + ) { + Ok(channel) => HttpResponse::Created().json(channel), + Err(DatabaseError(DatabaseErrorKind::UniqueViolation, _)) => { + HttpResponse::Conflict().into() + } + Err(_) => HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR), + } } -fn delete_channel(req: HttpRequest) -> FutureResponse { +fn delete_channel(req: HttpRequest) -> HttpResponse { let (origin, channel) = Path::<(String, String)>::extract(&req) .unwrap() .into_inner(); - if let Err(err) = authorize_session(&req, Some(&origin)) { - return future::err(error::ErrorUnauthorized(err)).responder(); + if let Err(_err) = authorize_session(&req, Some(&origin)) { + return HttpResponse::new(StatusCode::UNAUTHORIZED); } if channel == "stable" || channel == "unstable" { - return future::err(error::ErrorForbidden(format!("{} is protected", channel))).responder(); + return HttpResponse::new(StatusCode::FORBIDDEN); } + let conn = match req.state().db.get_conn() { + Ok(conn_ref) => conn_ref, + Err(_) => return HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR), + }; + req.state() .memcache .borrow_mut() .clear_cache_for_channel(&origin, &channel); - req.state() - .db - .send(DeleteChannel { + match Channel::delete( + DeleteChannel { origin: origin, channel: channel, - }).from_err() - .and_then(|res| match res { - Ok(_) => Ok(HttpResponse::new(StatusCode::OK)), - Err(e) => Err(e), - }).responder() + }, + conn.deref(), + ) { + Ok(_) => HttpResponse::new(StatusCode::OK), + Err(_) => HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR), + } } fn promote_package(req: HttpRequest) -> HttpResponse { diff --git a/components/builder-api/src/server/resources/profile.rs b/components/builder-api/src/server/resources/profile.rs index a611c6f77c..f5b6444440 100644 --- a/components/builder-api/src/server/resources/profile.rs +++ b/components/builder-api/src/server/resources/profile.rs @@ -13,10 +13,9 @@ // limitations under the License. // use actix_web::http::{Method, StatusCode}; -use actix_web::{ - error, App, AsyncResponder, FromRequest, FutureResponse, HttpRequest, HttpResponse, Json, Path, -}; -use futures::{future, Future}; +use actix_web::{App, FromRequest, HttpRequest, HttpResponse, Json, Path}; + +use std::ops::Deref; use bldr_core; use hab_net::NetOk; @@ -25,7 +24,7 @@ use protocol::originsrv::*; use server::authorize::authorize_session; use server::error::Result; use server::framework::middleware::route_message; -use server::models::account::{GetAccountById, UpdateAccount}; +use server::models::account::{Account as AccountModel, GetAccountById, UpdateAccount}; use server::AppState; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -48,8 +47,7 @@ impl Profile { "/profile/access-tokens", Method::POST, generate_access_token, - ) - .route( + ).route( "/profile/access-tokens/{id}", Method::DELETE, revoke_access_token, @@ -68,23 +66,26 @@ pub fn do_get_access_tokens(req: &HttpRequest, account_id: u64) -> Res // // Route handlers - these functions can return any Responder trait // -fn get_account(req: HttpRequest) -> FutureResponse { +fn get_account(req: HttpRequest) -> HttpResponse { let session_id = match authorize_session(&req, None) { Ok(session_id) => session_id as i64, - Err(err) => return future::err(error::ErrorUnauthorized(err)).responder(), + Err(_err) => return HttpResponse::new(StatusCode::UNAUTHORIZED), + }; + + let conn = match req.state().db.get_conn() { + Ok(conn_ref) => conn_ref, + Err(_) => return HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR), }; - req.state() - .db - .send(GetAccountById { + match AccountModel::get_by_id( + GetAccountById { id: session_id.clone(), - }) - .from_err() - .and_then(move |res| match res { - Ok(account) => Ok(HttpResponse::Ok().json(account)), - Err(e) => Err(e), - }) - .responder() + }, + conn.deref(), + ) { + Ok(account) => HttpResponse::Ok().json(account), + Err(_e) => HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR), + } } fn get_access_tokens(req: HttpRequest) -> HttpResponse { @@ -181,30 +182,29 @@ fn revoke_access_token(req: HttpRequest) -> HttpResponse { } } -fn update_account( - (req, body): (HttpRequest, Json), -) -> FutureResponse { +fn update_account((req, body): (HttpRequest, Json)) -> HttpResponse { let session_id = match authorize_session(&req, None) { Ok(session_id) => session_id as i64, - Err(err) => return future::err(error::ErrorUnauthorized(err)).responder(), + Err(_err) => return HttpResponse::new(StatusCode::UNAUTHORIZED), }; if body.email.len() <= 0 { - return future::err(error::ErrorBadRequest( - "No email address provided with request", - )).responder(); + return HttpResponse::new(StatusCode::BAD_REQUEST); } - req.state() - .db - .send(UpdateAccount { + let conn = match req.state().db.get_conn() { + Ok(conn_ref) => conn_ref, + Err(_) => return HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR), + }; + + match AccountModel::update( + UpdateAccount { id: session_id.clone(), email: body.email.to_owned(), - }) - .from_err() - .and_then(move |res| match res { - Ok(_) => Ok(HttpResponse::new(StatusCode::OK)), - Err(e) => Err(e), - }) - .responder() + }, + conn.deref(), + ) { + Ok(_) => HttpResponse::new(StatusCode::OK), + Err(_e) => HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR), + } }