Skip to content

Commit

Permalink
Split source files
Browse files Browse the repository at this point in the history
  • Loading branch information
hesiod committed Aug 20, 2021
1 parent 1ebc01f commit 46f4932
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 104 deletions.
18 changes: 2 additions & 16 deletions micro/src/main.rs
Expand Up @@ -21,27 +21,13 @@ use actix_web::{web, App, HttpServer};
use log::{debug, info};

mod site;
use site::create::{create_service, rmq_declare};
use site::create::create_service;
use site::rmq_ops::rmq_declare;

mod settings;

use settings::Settings;

/*
async fn create_pg_pool(
db_url: &str,
) -> bb8::Pool<bb8_postgres::PostgresConnectionManager<tokio_postgres::NoTls>> {
let pg_mgr =
bb8_postgres::PostgresConnectionManager::new_from_stringlike(db_url, tokio_postgres::NoTls)
.unwrap();
bb8::Pool::builder()
.max_size(15)
.build(pg_mgr)
.await
.unwrap()
}
*/

async fn create_rmq_pool(
db_url: &str,
Expand Down
91 changes: 4 additions & 87 deletions micro/src/site/create.rs
Expand Up @@ -7,97 +7,14 @@ use actix_multipart as mp;

use futures_util::{TryStreamExt, stream::{StreamExt}};


use rand::Rng;
use serde::{Deserialize};

use lapin::{Result as LAResult, protocol::basic::AMQPProperties};
use amq_protocol_types::{FieldTable};

use log::{debug, info};

use fabseal_micro_common::{ImageRequest, ImageType, StoredImage};

use fabseal_micro_common::{FABSEAL_EXCHANGE, FABSEAL_QUEUE, ImageRequest, ImageType, StoredImage};


#[derive(Deserialize, Debug)]
struct RequestInfo {
request_id: i32,
}

#[derive(Deserialize, Debug)]
#[serde(rename_all(deserialize = "lowercase"))]
pub enum ResultType {
Model,
Heightmap
}


#[derive(Deserialize, Debug)]
struct ResultRequestInfo {
request_id: i32,
#[serde(rename = "type")]
result_type: ResultType,
}


pub async fn rmq_declare(
pool: bb8::Pool<bb8_lapin::LapinConnectionManager>
) -> LAResult<()> {
let conn = pool.get().await.unwrap();
let channel = conn.create_channel()
.await
.unwrap();

info!("CONNECTED");

let queue = channel
.queue_declare(
FABSEAL_QUEUE,
lapin::options::QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;

info!("Declared queue {:?}", queue);

Ok(())
}


lazy_static::lazy_static! {
static ref PROPS: AMQPProperties = {
lapin::BasicProperties::default()
.with_content_type("application/json".into())
};
static ref PUBLISH_OPTIONS: lapin::options::BasicPublishOptions = {
lapin::options::BasicPublishOptions::default()
};
}

async fn rmq_publish(
conn: bb8::PooledConnection<'_, bb8_lapin::LapinConnectionManager>,
payload: Vec<u8>,
) -> LAResult<()> {
let channel = conn.create_channel()
.await
.unwrap();

let confirm = channel.basic_publish(
FABSEAL_EXCHANGE,
FABSEAL_QUEUE,
*PUBLISH_OPTIONS,
payload,
PROPS.clone()
)
.await?
.await?;

debug_assert_eq!(confirm, lapin::publisher_confirm::Confirmation::NotRequested);


Ok(())
}
use crate::site::rmq_ops::rmq_publish;
use crate::site::types::*;

/*
Expand Down Expand Up @@ -288,7 +205,7 @@ async fn create_finish(
Ok(HttpResponse::MethodNotAllowed().finish())
}

pub fn create_service(cfg: &mut web::ServiceConfig) {
pub(crate) fn create_service(cfg: &mut web::ServiceConfig) {
cfg.service(
web::scope("/userupload")
.service(fetch_model)
Expand Down
4 changes: 3 additions & 1 deletion micro/src/site/mod.rs
@@ -1,2 +1,4 @@

pub mod create;

pub(crate) mod types;
pub(crate) mod rmq_ops;
64 changes: 64 additions & 0 deletions micro/src/site/rmq_ops.rs
@@ -0,0 +1,64 @@
use lapin::{Result as LAResult, protocol::basic::AMQPProperties};
use amq_protocol_types::{FieldTable};

use log::info;

use fabseal_micro_common::{FABSEAL_EXCHANGE, FABSEAL_QUEUE};

pub(crate) async fn rmq_declare(
pool: bb8::Pool<bb8_lapin::LapinConnectionManager>
) -> LAResult<()> {
let conn = pool.get().await.unwrap();
let channel = conn.create_channel()
.await
.unwrap();

info!("CONNECTED");

let queue = channel
.queue_declare(
FABSEAL_QUEUE,
lapin::options::QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;

info!("Declared queue {:?}", queue);

Ok(())
}


lazy_static::lazy_static! {
static ref PROPS: AMQPProperties = {
lapin::BasicProperties::default()
.with_content_type("application/json".into())
};
static ref PUBLISH_OPTIONS: lapin::options::BasicPublishOptions = {
lapin::options::BasicPublishOptions::default()
};
}

pub(crate) async fn rmq_publish(
conn: bb8::PooledConnection<'_, bb8_lapin::LapinConnectionManager>,
payload: Vec<u8>,
) -> LAResult<()> {
let channel = conn.create_channel()
.await
.unwrap();

let confirm = channel.basic_publish(
FABSEAL_EXCHANGE,
FABSEAL_QUEUE,
*PUBLISH_OPTIONS,
payload,
PROPS.clone()
)
.await?
.await?;

debug_assert_eq!(confirm, lapin::publisher_confirm::Confirmation::NotRequested);


Ok(())
}
22 changes: 22 additions & 0 deletions micro/src/site/types.rs
@@ -0,0 +1,22 @@
use serde::{Deserialize};

#[derive(Deserialize, Debug)]
pub(crate) struct RequestInfo {
request_id: i32,
}

#[derive(Deserialize, Debug)]
#[serde(rename_all(deserialize = "lowercase"))]
pub(crate) enum ResultType {
Model,
Heightmap
}


#[derive(Deserialize, Debug)]
pub(crate) struct ResultRequestInfo {
request_id: i32,
#[serde(rename = "type")]
pub(crate) result_type: ResultType,
}

0 comments on commit 46f4932

Please sign in to comment.