Skip to content

Commit

Permalink
feat: work on commands
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Aug 17, 2022
1 parent e3aa525 commit 1aaaa3e
Show file tree
Hide file tree
Showing 39 changed files with 1,394 additions and 670 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 32 additions & 26 deletions backend/src/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use actix_web::{web, HttpRequest, HttpResponse};
use actix_web_actors::ws;
use chrono::Utc;
use drogue_doppelgaenger_core::{
command::CommandSink,
listener::KafkaSource,
model::{Reconciliation, SyntheticType, Thing},
notifier::Notifier,
Expand All @@ -25,26 +26,26 @@ const OPTS: UpdateOptions = UpdateOptions {
ignore_unclean_inbox: true,
};

pub async fn things_get<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
pub async fn things_get<S: Storage, N: Notifier, Si: Sink, Cmd: CommandSink>(
service: web::Data<Service<S, N, Si, Cmd>>,
path: web::Path<Id>,
) -> Result<HttpResponse, actix_web::Error> {
let result = service.get(&path.into_inner()).await?;

Ok(HttpResponse::Ok().json(result))
}

pub async fn things_create<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
pub async fn things_create<S: Storage, N: Notifier, Si: Sink, Cmd: CommandSink>(
service: web::Data<Service<S, N, Si, Cmd>>,
payload: web::Json<Thing>,
) -> Result<HttpResponse, actix_web::Error> {
service.create(payload.into_inner()).await?;

Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_update<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
pub async fn things_update<S: Storage, N: Notifier, Si: Sink, Cmd: CommandSink>(
service: web::Data<Service<S, N, Si, Cmd>>,
payload: web::Json<Thing>,
) -> Result<HttpResponse, actix_web::Error> {
let application = payload.metadata.application.clone();
Expand All @@ -58,8 +59,8 @@ pub async fn things_update<S: Storage, N: Notifier, Si: Sink>(
Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_patch<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
pub async fn things_patch<S: Storage, N: Notifier, Si: Sink, Cmd: CommandSink>(
service: web::Data<Service<S, N, Si, Cmd>>,
path: web::Path<Id>,
payload: web::Json<Patch>,
) -> Result<HttpResponse, actix_web::Error> {
Expand All @@ -72,8 +73,8 @@ pub async fn things_patch<S: Storage, N: Notifier, Si: Sink>(
Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_merge<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
pub async fn things_merge<S: Storage, N: Notifier, Si: Sink, Cmd: CommandSink>(
service: web::Data<Service<S, N, Si, Cmd>>,
path: web::Path<Id>,
payload: web::Json<Value>,
) -> Result<HttpResponse, actix_web::Error> {
Expand All @@ -86,8 +87,8 @@ pub async fn things_merge<S: Storage, N: Notifier, Si: Sink>(
Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_update_reported_state<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
pub async fn things_update_reported_state<S: Storage, N: Notifier, Si: Sink, Cmd: CommandSink>(
service: web::Data<Service<S, N, Si, Cmd>>,
path: web::Path<Id>,
payload: web::Json<BTreeMap<String, Value>>,
) -> Result<HttpResponse, actix_web::Error> {
Expand All @@ -104,8 +105,8 @@ pub async fn things_update_reported_state<S: Storage, N: Notifier, Si: Sink>(
Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_update_synthetic_state<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
pub async fn things_update_synthetic_state<S: Storage, N: Notifier, Si: Sink, Cmd: CommandSink>(
service: web::Data<Service<S, N, Si, Cmd>>,
path: web::Path<(String, String, String)>,
payload: web::Json<SyntheticType>,
) -> Result<HttpResponse, actix_web::Error> {
Expand All @@ -123,8 +124,8 @@ pub async fn things_update_synthetic_state<S: Storage, N: Notifier, Si: Sink>(
Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_update_desired_state<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
pub async fn things_update_desired_state<S: Storage, N: Notifier, Si: Sink, Cmd: CommandSink>(
service: web::Data<Service<S, N, Si, Cmd>>,
path: web::Path<(String, String, String)>,
payload: web::Json<DesiredStateUpdate>,
) -> Result<HttpResponse, actix_web::Error> {
Expand All @@ -142,9 +143,14 @@ pub async fn things_update_desired_state<S: Storage, N: Notifier, Si: Sink>(
Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_update_desired_state_value<S: Storage, N: Notifier, Si: Sink>(
pub async fn things_update_desired_state_value<
S: Storage,
N: Notifier,
Si: Sink,
Cmd: CommandSink,
>(
request: HttpRequest,
service: web::Data<Service<S, N, Si>>,
service: web::Data<Service<S, N, Si, Cmd>>,
path: web::Path<(String, String, String)>,
payload: web::Json<Value>,
) -> Result<HttpResponse, actix_web::Error> {
Expand Down Expand Up @@ -178,8 +184,8 @@ pub async fn things_update_desired_state_value<S: Storage, N: Notifier, Si: Sink
Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_update_reconciliation<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
pub async fn things_update_reconciliation<S: Storage, N: Notifier, Si: Sink, Cmd: CommandSink>(
service: web::Data<Service<S, N, Si, Cmd>>,
path: web::Path<Id>,
payload: web::Json<Reconciliation>,
) -> Result<HttpResponse, actix_web::Error> {
Expand All @@ -190,21 +196,21 @@ pub async fn things_update_reconciliation<S: Storage, N: Notifier, Si: Sink>(
Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_delete<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
pub async fn things_delete<S: Storage, N: Notifier, Si: Sink, Cmd: CommandSink>(
service: web::Data<Service<S, N, Si, Cmd>>,
path: web::Path<Id>,
) -> Result<HttpResponse, actix_web::Error> {
service.delete(&path.into_inner()).await?;

Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_notifications<S: Storage, N: Notifier, Si: Sink>(
pub async fn things_notifications<S: Storage, N: Notifier, Si: Sink, Cmd: CommandSink>(
req: HttpRequest,
path: web::Path<String>,
stream: web::Payload,
source: web::Data<KafkaSource>,
service: web::Data<Service<S, N, Si>>,
service: web::Data<Service<S, N, Si, Cmd>>,
instance: web::Data<Instance>,
) -> Result<HttpResponse, actix_web::Error> {
let application = path.into_inner();
Expand All @@ -219,12 +225,12 @@ pub async fn things_notifications<S: Storage, N: Notifier, Si: Sink>(
ws::start(handler, &req, stream)
}

pub async fn things_notifications_single<S: Storage, N: Notifier, Si: Sink>(
pub async fn things_notifications_single<S: Storage, N: Notifier, Si: Sink, Cmd: CommandSink>(
req: HttpRequest,
path: web::Path<(String, String)>,
stream: web::Payload,
source: web::Data<KafkaSource>,
service: web::Data<Service<S, N, Si>>,
service: web::Data<Service<S, N, Si, Cmd>>,
instance: web::Data<Instance>,
) -> Result<HttpResponse, actix_web::Error> {
let (application, thing) = path.into_inner();
Expand Down
168 changes: 81 additions & 87 deletions backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ mod utils;
use actix_web::{guard, web, App, HttpServer};
use anyhow::anyhow;
use drogue_doppelgaenger_core::{
app::run_main,
app::{run_main, Spawner},
command::{mqtt, CommandSink},
listener::KafkaSource,
notifier::{kafka, Notifier},
processor::sink::{self, Sink},
service::{self, Service},
storage::{postgres, Storage},
};
use futures::{future::LocalBoxFuture, FutureExt, TryFutureExt};
use futures::{FutureExt, TryFutureExt};

#[derive(Clone, Debug, serde::Deserialize)]
pub struct Config<S: Storage, N: Notifier, Si: Sink> {
pub struct Config<S: Storage, N: Notifier, Si: Sink, Cmd: CommandSink> {
pub application: Option<String>,
// serde(bound) required as S isn't serializable: https://github.com/serde-rs/serde/issues/1296
#[serde(bound = "")]
pub service: service::Config<S, N, Si>,
pub service: service::Config<S, N, Si, Cmd>,

pub listener: kafka::Config,
}
Expand All @@ -29,107 +30,100 @@ pub struct Instance {
pub application: Option<String>,
}

pub fn configure<S: Storage, N: Notifier, Si: Sink>(
config: Config<S, N, Si>,
) -> anyhow::Result<(
impl Fn(&mut web::ServiceConfig) + Send + Sync + Clone,
LocalBoxFuture<'static, anyhow::Result<()>>,
)> {
let service = Service::from_config(config.service)?;
pub fn configure<S: Storage, N: Notifier, Si: Sink, Cmd: CommandSink>(
spawner: &mut dyn Spawner,
config: Config<S, N, Si, Cmd>,
) -> anyhow::Result<impl Fn(&mut web::ServiceConfig) + Send + Sync + Clone> {
let service = Service::from_config(spawner, config.service)?;
let service = web::Data::new(service);

let (source, runner) = KafkaSource::new(config.listener)?;
let source = KafkaSource::new(spawner, config.listener)?;
let source = web::Data::new(source);

let instance = web::Data::new(Instance {
application: config.application,
});

Ok((
move |ctx: &mut web::ServiceConfig| {
ctx.app_data(service.clone());
ctx.app_data(instance.clone());
ctx.app_data(source.clone());
ctx.service(
web::resource("/api/v1alpha1/things")
.route(web::post().to(endpoints::things_create::<S, N, Si>))
.route(web::put().to(endpoints::things_update::<S, N, Si>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}")
.route(web::get().to(endpoints::things_get::<S, N, Si>))
.route(web::delete().to(endpoints::things_delete::<S, N, Si>))
.route(
web::patch()
.guard(guard::Header("content-type", "application/json-patch+json"))
.to(endpoints::things_patch::<S, N, Si>),
)
.route(
web::patch()
.guard(guard::Header(
"content-type",
"application/merge-patch+json",
))
.to(endpoints::things_merge::<S, N, Si>),
),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/reportedStates")
.route(web::put().to(endpoints::things_update_reported_state::<S, N, Si>)),
);
ctx.service(
web::resource(
"/api/v1alpha1/things/{application}/things/{thing}/syntheticStates/{name}",
Ok(move |ctx: &mut web::ServiceConfig| {
ctx.app_data(service.clone());
ctx.app_data(instance.clone());
ctx.app_data(source.clone());
ctx.service(
web::resource("/api/v1alpha1/things")
.route(web::post().to(endpoints::things_create::<S, N, Si, Cmd>))
.route(web::put().to(endpoints::things_update::<S, N, Si, Cmd>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}")
.route(web::get().to(endpoints::things_get::<S, N, Si, Cmd>))
.route(web::delete().to(endpoints::things_delete::<S, N, Si, Cmd>))
.route(
web::patch()
.guard(guard::Header("content-type", "application/json-patch+json"))
.to(endpoints::things_patch::<S, N, Si, Cmd>),
)
.route(web::put().to(endpoints::things_update_synthetic_state::<
S,
N,
Si,
>)),
);
ctx.service(
web::resource(
"/api/v1alpha1/things/{application}/things/{thing}/desiredStates/{name}",
)
.route(web::put().to(endpoints::things_update_desired_state::<
S,
N,
Si,
>)),
);
ctx.service(
web::resource(
"/api/v1alpha1/things/{application}/things/{thing}/desiredStates/{name}/value",
)
.route(web::put().to(endpoints::things_update_desired_state_value::<S, N, Si>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/reconciliations")
.route(web::put().to(endpoints::things_update_reconciliation::<S, N, Si>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/notifications")
.route(web::get().to(endpoints::things_notifications::<S, N, Si>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/notifications")
.route(web::get().to(endpoints::things_notifications_single::<S, N, Si>)),
);
},
async move { runner.run().await }.boxed_local(),
))
.route(
web::patch()
.guard(guard::Header(
"content-type",
"application/merge-patch+json",
))
.to(endpoints::things_merge::<S, N, Si, Cmd>),
),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/reportedStates")
.route(web::put().to(endpoints::things_update_reported_state::<S, N, Si, Cmd>)),
);
ctx.service(
web::resource(
"/api/v1alpha1/things/{application}/things/{thing}/syntheticStates/{name}",
)
.route(web::put().to(endpoints::things_update_synthetic_state::<
S,
N,
Si,
Cmd,
>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/desiredStates/{name}")
.route(web::put().to(endpoints::things_update_desired_state::<S, N, Si, Cmd>)),
);
ctx.service(
web::resource(
"/api/v1alpha1/things/{application}/things/{thing}/desiredStates/{name}/value",
)
.route(web::put().to(endpoints::things_update_desired_state_value::<S, N, Si, Cmd>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/reconciliations")
.route(web::put().to(endpoints::things_update_reconciliation::<S, N, Si, Cmd>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/notifications")
.route(web::get().to(endpoints::things_notifications::<S, N, Si, Cmd>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/notifications")
.route(web::get().to(endpoints::things_notifications_single::<S, N, Si, Cmd>)),
);
})
}

pub async fn run(
config: Config<postgres::Storage, kafka::Notifier, sink::kafka::Sink>,
config: Config<postgres::Storage, kafka::Notifier, sink::kafka::Sink, mqtt::CommandSink>,
) -> anyhow::Result<()> {
let (configurator, runner) = configure::<_, _, _>(config)?;
let mut spawner = vec![];
let configurator = configure::<_, _, _, _>(&mut spawner, config)?;

let http = HttpServer::new(move || App::new().configure(|ctx| configurator(ctx)))
.bind("[::]:8080")?
.run()
.map_err(|err| anyhow!(err))
.boxed_local();

run_main([http, runner]).await
spawner.spawn(http);

run_main(spawner).await
}
Loading

0 comments on commit 1aaaa3e

Please sign in to comment.