Skip to content

Commit

Permalink
Initial Worker implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
hesiod committed Aug 20, 2021
1 parent 24e5b45 commit 4556921
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 137 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

members = [
"micro",
"micro-common",
"worker-blender"
]
15 changes: 15 additions & 0 deletions micro-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "fabseal-micro-common"
version = "0.1.0"
authors = [ "Tobias Markus <tobias@miglix.eu>" ]
edition = "2018"
license = "GPL-3.0-or-later"
repository = "https://github.com/Siegler-von-Catan/fabseal-micro/"
homepage = "https://fabseal.de"
categories = [ "computer-vision", "web-programming::http-server" ]
publish = false

[dependencies]
serde = "1.0"
serde_json = "1.0"
mime = "0.3"
42 changes: 42 additions & 0 deletions micro-common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use serde::{Deserialize, Serialize};


#[derive(Serialize, Deserialize, Debug)]
pub enum ImageType {
PNG,
JPEG
}

#[derive(Serialize, Deserialize, Debug)]
pub struct StoredImage {
pub image_type: ImageType,
pub image: Vec<u8>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ImageRequest {
pub request_id: u32,
pub image: StoredImage,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct RequestSettings {
pub start_x: i32,
pub end_x: i32,
pub start_y: i32,
pub end_y: i32,
pub is_inverted: bool,
pub is_low_quality: bool,
}

pub const FABSEAL_QUEUE: &'static str = "fabseal";

pub const FABSEAL_EXCHANGE: &'static str = "";

pub const RESULT_EXPIRATION_SECONDS: usize = 10*60;

pub fn result_key(
request_id: u32
) -> String {
format!("rs_{}", request_id)
}
32 changes: 24 additions & 8 deletions micro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,29 @@ categories = [ "computer-vision", "web-programming::http-server" ]
publish = false

[dependencies]
actix-web = "4.0.0-beta"
actix-http = "3.0.0-beta"
fabseal-micro-common = { path = "../micro-common" }

bytes = "1.0"
actix-web = "3"
actix-http = "2"
# actix-session = "0.5-beta.2"
actix-session = { git = 'https://github.com/actix/actix-extras.git' }
tokio = { version = "1.7", features = ["rt"] }
actix-multipart = "0.3"
# actix-session = { git = 'https://github.com/actix/actix-extras.git' }
actix-session = "0.4"
actix-redis = "0.9"
actix-storage = "0.2"
actix-storage-redis = "0.2"
# actix-storage-sled = { version = "0.2", features = ["actor"] }

# tokio = { version = "1.7", features = ["rt"] }
# tokio-postgres = "0.7"
tokio-amqp = "1.0"
bb8 = "0.7"
# tokio-amqp = "1.0"
tokio = "0.2"
bb8 = "0.4.2"
# bb8-postgres = "0.7"
bb8-lapin = { git = 'https://github.com/adrianbenavides/bb8-lapin.git' }
bb8-redis = "0.9"
# bb8-lapin = { git = 'https://github.com/adrianbenavides/bb8-lapin.git' }
bb8-lapin = "0.1"
# bb8-redis = "0.9"
env_logger = "0.8"
lapin = "1.7"
amq-protocol-types = "6.0"
Expand All @@ -30,3 +42,7 @@ serde_json = "1.0"
lazy_static = "1.4"
rand = "0.8"
time = "0.2"
mime = "0.3"
futures-util = "0.3"

config = "0.11"
76 changes: 61 additions & 15 deletions micro/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
#[macro_use]
extern crate lazy_static;
// #[macro_use]
// extern crate lazy_static;

use actix_redis::RedisSession;
use actix_web::cookie::SameSite;
use actix_web::web::Data;
use time::Duration;

use rand::Rng;

use lapin::{Connection, ConnectionProperties, Result as LAResult};
use lapin::{ConnectionProperties};

// use actix_storage_sled::{actor::ToActorExt, SledConfig};
use actix_storage::Storage;
use actix_storage_redis::{RedisBackend, ConnectionInfo, ConnectionAddr};

use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer};
use actix_session::CookieSession;

use log::{info, debug};
use log::info;

mod site;
use site::create::{create_service, rmq_declare};
use tokio_amqp::LapinTokioExt;
// use tokio_amqp::LapinTokioExt;

/*
async fn create_pg_pool(
Expand All @@ -41,7 +46,8 @@ async fn create_rmq_pool(
// Create RabbitMQ pool
let manager = bb8_lapin::LapinConnectionManager::new(
db_url,
ConnectionProperties::default().with_tokio()
ConnectionProperties::default()
// .with_tokio()
);
bb8::Pool::builder()
.max_size(15)
Expand All @@ -52,11 +58,31 @@ async fn create_rmq_pool(

const COOKIE_DURATION: Duration = Duration::hour();

const THREADS_NUMBER: usize = 4;

fn create_redis_session(
redis_endpoint: &str,
key: &[u8],
dev_env: bool
) -> RedisSession {
RedisSession::new(redis_endpoint, key)
// .domain("localhost")
.cookie_name("fabseal_session")
.cookie_path("/api/v1")
.cookie_secure(!dev_env)
// .expires_in_time(COOKIE_DURATION)
.cookie_http_only(true)
.cookie_max_age(COOKIE_DURATION)
.cookie_same_site(SameSite::Strict)
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
// access logs are printed with the INFO level so ensure it is enabled by default
env_logger::init_from_env(env_logger::Env::new().default_filter_or("debug"));

let dev_env: bool = std::env::var("DEV").is_ok();

let rmq_db_url = std::env::var("AMQP_ADDR")
.unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let rmq_pool = create_rmq_pool(rmq_db_url.as_str()).await;
Expand All @@ -69,24 +95,44 @@ async fn main() -> std::io::Result<()> {
let http_endpoint = std::env::var("HTTP_ADDR")
.unwrap_or_else(|_| "127.0.0.1:8080".into());

let redis_endpoint = std::env::var("REDIS_ADDR")
.unwrap_or_else(|_| "127.0.0.1:6379".into());

let rr = rmq_declare(rmq_pool.clone()).await.unwrap();
info!("{:?}", rr);

let mut rng = rand::thread_rng();
let key: [u8; 32] = rng.gen();
/*
// Refer to sled's documentation for more options
let sled_db = SledConfig::default().temporary(true);
// Open the database and make an actor(not started yet)
let actor = sled_db.to_actor()?;
let store = actor
// If you want to scan the database on start for expiration
.scan_db_on_start(true)
// If you want the expiration thread to perform deletion instead of soft deleting items
.perform_deletion(true)
// Finally start the actor
.start(THREADS_NUMBER);
let storage = Storage::build().expiry_store(store).finish();
*/


let store = RedisBackend::connect_default().await.unwrap();

let storage = Storage::build().expiry_store(store).finish();

let key: [u8; 32] = rand::thread_rng().gen();

HttpServer::new(move || {
App::new()
.app_data(storage.clone())
.app_data(Data::new(rmq_pool.clone()))
.wrap(actix_web::middleware::Compress::default())
.wrap(Logger::default())
.wrap(CookieSession::signed(&key)
.domain("fabseal.de")
.name("fabseal_session")
.secure(true)
.expires_in_time(COOKIE_DURATION)
.same_site(SameSite::Strict)
)
.wrap(create_redis_session(&redis_endpoint, &key, dev_env))
.service(web::scope("/api/v1").configure(create_service))
})
.bind(http_endpoint)?
Expand Down
Loading

0 comments on commit 4556921

Please sign in to comment.