Skip to content

Commit

Permalink
RabbitMQ: Add service to docker compose
Browse files Browse the repository at this point in the history
  • Loading branch information
Arthi-chaud committed Apr 7, 2024
1 parent 3f04b85 commit 9abccd2
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 40 deletions.
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ POSTGRES_USER=
POSTGRES_PASSWORD=
POSTGRES_DB=blee

# Message Queue. Fill in the missing fiels with secure values
RABBIT_USER=
RABBIT_PASS=

# Fill these values with random, secure strings
SCANNER_API_KEY=
# Where can Blee find the video files
Expand Down
85 changes: 85 additions & 0 deletions api/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions api/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ entity = { path = "../entity" }

async-stream = { version = "0.3" }
async-trait = { version = "0.1" }
amqprs = { version = "1.5.4", features = ["urispec"] }
deadpool-amqprs = "0.2.0"
rocket = { version = "0.5.0", features = ["uuid", "json"] }
serde = { version = "1.0", features = ["derive"] }
rocket_okapi = { version = "0.8.0", features = ["swagger", "uuid", "rocket_db_pools"] }
Expand Down
82 changes: 69 additions & 13 deletions api/api/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,79 @@
use amqprs::callbacks::{DefaultChannelCallback, DefaultConnectionCallback};
use amqprs::channel::{BasicPublishArguments, QueueBindArguments, QueueDeclareArguments};
use amqprs::BasicProperties;
use deadpool_amqprs::Pool;
use migration::EVENT_CHANNEL;
use rocket::tokio::spawn;
use rocket::{fairing, Build, Rocket};
use sea_orm_rocket::Database;
use serde::Deserialize;
use sqlx::postgres::PgNotification;
use sqlx::postgres::PgListener;

use crate::database::Db;

#[derive(Deserialize, Debug)]
pub enum ActionType {
INSERT,
UPDATE,
DELETE,
INSERT,
UPDATE,
DELETE,
}

#[derive(Deserialize, Debug)]
pub struct EventPayload {
pub table: String,
pub action_type: ActionType,
pub id: String,
pub name: String,
pub table: String,
pub action_type: ActionType,
pub id: String,
pub name: String,
}

pub fn handle_database_event(event: &PgNotification) {
let strr = event.payload().to_owned();
let payload = serde_json::from_str::<EventPayload>(&strr).unwrap();
println!("{}", payload.id);
}
pub async fn hook_psql_events(rocket: Rocket<Build>, rabbit_pool: Pool) -> fairing::Result {
let pool = Db::fetch(&rocket)
.unwrap()
.conn
.get_postgres_connection_pool();
let mut listener = PgListener::connect_with(&pool).await.unwrap();

let _ = listener.listen(EVENT_CHANNEL).await;
spawn(async move {
let rabbit_connection = rabbit_pool.get().await.unwrap();
rabbit_connection
.register_callback(DefaultConnectionCallback)
.await
.unwrap();

let rabbit_channel = rabbit_connection.open_channel(None).await.unwrap();
rabbit_channel
.register_callback(DefaultChannelCallback)
.await
.unwrap();
let (queue_name, _, _) = rabbit_channel
.queue_declare(QueueDeclareArguments::durable_client_named(
"amqprs.examples.basic",
))
.await
.unwrap()
.unwrap();
let routing_key = "amqprs.example";
let exchange_name = "amq.topic";
rabbit_channel
.queue_bind(QueueBindArguments::new(
&queue_name,
exchange_name,
routing_key,
))
.await
.unwrap();
loop {
while let Some(notification) = listener.try_recv().await.unwrap() {
let args = BasicPublishArguments::new(exchange_name, routing_key);
let strr = notification.payload().to_owned();
let payload = serde_json::from_str::<EventPayload>(&strr).unwrap();
rabbit_channel
.basic_publish(BasicProperties::default(), strr.into(), args)
.await
.unwrap();
}
}
});
Ok(rocket)
}
43 changes: 18 additions & 25 deletions api/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ use crate::controllers::index::index;
use crate::database::Db;
use crate::error_handling::{not_found, unprocessable_entity};
use crate::swagger::custom_openapi_spec;
use migration::{MigratorTrait, EVENT_CHANNEL};
use amqprs::connection::OpenConnectionArguments;
use migration::MigratorTrait;
use rocket::fairing::{self, AdHoc};
use rocket::figment::providers::Serialized;
use rocket::figment::Figment;
use rocket::tokio::spawn;
use rocket::{Build, Rocket};
use rocket_okapi::settings::OpenApiSettings;
use rocket_okapi::{mount_endpoints_and_merged_docs, swagger_ui::*};
use sea_orm_rocket::Database;
use sqlx::postgres::PgListener;
use std::env;

pub mod config;
Expand Down Expand Up @@ -41,24 +40,6 @@ async fn run_migrations(rocket: Rocket<Build>) -> fairing::Result {
Ok(rocket)
}

async fn hook_psql_events(rocket: Rocket<Build>) -> fairing::Result {
let pool = Db::fetch(&rocket)
.unwrap()
.conn
.get_postgres_connection_pool();
let mut listener = PgListener::connect_with(&pool).await.unwrap();

let _ = listener.listen(EVENT_CHANNEL).await;
spawn(async move {
loop {
while let Some(notification) = listener.try_recv().await.unwrap() {
events::handle_database_event(&notification);
}
}
});
Ok(rocket)
}

fn create_server() -> Rocket<Build> {
let data_dir = env::var("CONFIG_DIR").expect("env variable `CONFIG_DIR` not set.");
let scanner_api_key =
Expand All @@ -70,15 +51,27 @@ fn create_server() -> Rocket<Build> {
data_folder: data_dir,
scanner_api_key: scanner_api_key,
}));
let rabbit_config = deadpool_amqprs::Config::new(
OpenConnectionArguments::new(
&env::var("RABBIT_HOST").unwrap(),
env::var("RABBIT_PORT")
.unwrap_or("5672".to_string())
.parse::<u16>()
.unwrap(),
&env::var("RABBIT_USER").unwrap(),
&env::var("RABBIT_PASS").unwrap(),
),
None,
);
let rabbit_pool = rabbit_config.create_pool();

let mut building_rocket = rocket::custom(figment)
.attach(Db::init())
.attach(AdHoc::config::<Config>())
.attach(AdHoc::try_on_ignite("Run migrations", run_migrations))
.attach(AdHoc::try_on_ignite(
"Hook Database Events",
hook_psql_events,
))
.attach(AdHoc::try_on_ignite("Hook Database Events", |r| {
events::hook_psql_events(r, rabbit_pool)
}))
.register("/", catchers![not_found, unprocessable_entity])
.mount(
"/swagger",
Expand Down
11 changes: 9 additions & 2 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,26 @@ services:
condition: service_healthy
healthcheck:
test: curl --fail http://localhost:8000/ || exit 1
interval: 5s
retries: 3
interval: 10s
retries: 10
environment:
- POSTGRES_HOST=db
- POSTGRES_PORT=5432
- CONFIG_DIR=/data
- RABBIT_HOST=mq
- RABBIT_PORT=5672
env_file:
- .env
volumes:
- ./api:/app
- ./data:/data
- api_target:/app/target
- api_cargo:/usr/local/cargo
mq:
image: rabbitmq:3.13-alpine
environment:
- RABBITMQ_DEFAULT_USER=${RABBIT_USER}
- RABBITMQ_DEFAULT_PASS=${RABBIT_PASS}
db:
image: postgres:alpine3.16
healthcheck:
Expand Down
7 changes: 7 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@ services:
environment:
- POSTGRES_HOST=db
- POSTGRES_PORT=5432
- RABBIT_HOST=mq
- RABBIT_PORT=5672
env_file:
- .env
healthcheck:
test: ["CMD-SHELL", "wget -qO- localhost:8000"]
interval: 5s
retries: 3
mq:
image: rabbitmq:3.13-alpine
environment:
- RABBITMQ_DEFAULT_USER=${RABBIT_USER}
- RABBITMQ_DEFAULT_PASS=${RABBIT_PASS}
scanner:
build:
context: ./scanner
Expand Down

0 comments on commit 9abccd2

Please sign in to comment.