Skip to content

Commit

Permalink
feat: add inter thing communication
Browse files Browse the repository at this point in the history
* allow sending messages to other things
* add json merge support
* allow logging in user code
  • Loading branch information
ctron committed Aug 17, 2022
1 parent a58ba9b commit 1e8e87d
Show file tree
Hide file tree
Showing 32 changed files with 1,071 additions and 336 deletions.
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 40 additions & 25 deletions backend/src/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,106 +2,121 @@ use crate::notifier::actix::WebSocketHandler;
use crate::Instance;
use actix_web::{web, HttpRequest, HttpResponse};
use actix_web_actors::ws;
use drogue_doppelgaenger_core::listener::KafkaSource;
use drogue_doppelgaenger_core::service::{JsonPatchUpdater, UpdateMode};
use drogue_doppelgaenger_core::processor::source::Sink;
use drogue_doppelgaenger_core::service::JsonMergeUpdater;
use drogue_doppelgaenger_core::{
listener::KafkaSource,
model::{Reconciliation, Thing},
notifier::Notifier,
service::{Id, Patch, ReportedStateUpdater, Service},
service::{Id, JsonPatchUpdater, Patch, ReportedStateUpdater, Service, UpdateMode},
storage::Storage,
};
use serde_json::{json, Value};
use std::collections::BTreeMap;

pub async fn things_get<S: Storage, N: Notifier>(
service: web::Data<Service<S, N>>,
pub async fn things_get<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
path: web::Path<Id>,
) -> Result<HttpResponse, actix_web::Error> {
let result = service.get(path.into_inner()).await?;

Ok(HttpResponse::Ok().json(result))
}

pub async fn things_create<S: Storage, N: Notifier>(
service: web::Data<Service<S, N>>,
pub async fn things_create<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
payload: web::Json<Thing>,
) -> Result<HttpResponse, actix_web::Error> {
service.create(payload.into_inner()).await?;

Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_update<S: Storage, N: Notifier>(
service: web::Data<Service<S, N>>,
pub async fn things_update<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
payload: web::Json<Thing>,
) -> Result<HttpResponse, actix_web::Error> {
let application = payload.metadata.application.clone();
let thing = payload.metadata.name.clone();
let payload = payload.into_inner();

service.update(Id { application, thing }, payload).await?;
service.update(&Id { application, thing }, payload).await?;

Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_patch<S: Storage, N: Notifier>(
service: web::Data<Service<S, N>>,
pub async fn things_patch<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
path: web::Path<Id>,
payload: web::Json<Patch>,
) -> Result<HttpResponse, actix_web::Error> {
let payload = payload.into_inner();

service
.update(path.into_inner(), JsonPatchUpdater(payload))
.update(&path.into_inner(), JsonPatchUpdater(payload))
.await?;

Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_update_reported_state<S: Storage, N: Notifier>(
service: web::Data<Service<S, N>>,
pub async fn things_merge<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
path: web::Path<Id>,
payload: web::Json<Value>,
) -> Result<HttpResponse, actix_web::Error> {
let payload = payload.into_inner();

service
.update(&path.into_inner(), JsonMergeUpdater(payload))
.await?;

Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_update_reported_state<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
path: web::Path<Id>,
payload: web::Json<BTreeMap<String, Value>>,
) -> Result<HttpResponse, actix_web::Error> {
let payload = payload.into_inner();

service
.update(
path.into_inner(),
&path.into_inner(),
ReportedStateUpdater(payload, UpdateMode::Merge),
)
.await?;

Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_update_reconciliation<S: Storage, N: Notifier>(
service: web::Data<Service<S, N>>,
pub async fn things_update_reconciliation<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
path: web::Path<Id>,
payload: web::Json<Reconciliation>,
) -> Result<HttpResponse, actix_web::Error> {
let payload = payload.into_inner();

service.update(path.into_inner(), payload).await?;
service.update(&path.into_inner(), payload).await?;

Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_delete<S: Storage, N: Notifier>(
service: web::Data<Service<S, N>>,
pub async fn things_delete<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
path: web::Path<Id>,
) -> Result<HttpResponse, actix_web::Error> {
service.delete(path.into_inner()).await?;

Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_notifications<S: Storage, N: Notifier>(
pub async fn things_notifications<S: Storage, N: Notifier, Si: Sink>(
req: HttpRequest,
path: web::Path<String>,
stream: web::Payload,
source: web::Data<KafkaSource>,
service: web::Data<Service<S, N>>,
service: web::Data<Service<S, N, Si>>,
instance: web::Data<Instance>,
) -> Result<HttpResponse, actix_web::Error> {
let application = path.into_inner();
Expand All @@ -116,12 +131,12 @@ pub async fn things_notifications<S: Storage, N: Notifier>(
ws::start(handler, &req, stream)
}

pub async fn things_notifications_single<S: Storage, N: Notifier>(
pub async fn things_notifications_single<S: Storage, N: Notifier, Si: Sink>(
req: HttpRequest,
path: web::Path<(String, String)>,
stream: web::Payload,
source: web::Data<KafkaSource>,
service: web::Data<Service<S, N>>,
service: web::Data<Service<S, N, Si>>,
instance: web::Data<Instance>,
) -> Result<HttpResponse, actix_web::Error> {
let (application, thing) = path.into_inner();
Expand Down
105 changes: 65 additions & 40 deletions backend/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
mod endpoints;
mod notifier;

use actix_web::{web, App, HttpServer};
use actix_web::{guard, web, App, HttpServer};
use anyhow::anyhow;
use drogue_doppelgaenger_core::listener::KafkaSource;
use drogue_doppelgaenger_core::notifier::{kafka, Notifier};
use drogue_doppelgaenger_core::processor::source::{EventStream, Sink};
use drogue_doppelgaenger_core::service::Service;
use drogue_doppelgaenger_core::storage::postgres;
use drogue_doppelgaenger_core::{app::run_main, service, storage::Storage};
use drogue_doppelgaenger_core::{app::run_main, processor, service, storage::Storage};
use futures::future::LocalBoxFuture;
use futures::{FutureExt, TryFutureExt};

#[derive(Clone, Debug, serde::Deserialize)]
Expand All @@ -18,68 +20,91 @@ pub struct Config<S: Storage, N: Notifier> {
pub service: service::Config<S, N>,

pub listener: kafka::Config,

// FIXME: fix up sink configuration
pub sink: processor::source::kafka::Config,
}

#[derive(Clone, Debug)]
pub struct Instance {
pub application: Option<String>,
}

pub fn configure<S: Storage, N: Notifier>(
pub fn configure<S: Storage, N: Notifier, Si: Sink>(
config: Config<S, N>,
) -> anyhow::Result<impl Fn(&mut web::ServiceConfig) + Send + Sync + Clone> {
let service = Service::new(config.service)?;
) -> anyhow::Result<(
impl Fn(&mut web::ServiceConfig) + Send + Sync + Clone,
LocalBoxFuture<'static, anyhow::Result<()>>,
)> {
let (_, sink) = processor::source::kafka::EventStream::new(config.sink)?;

let service = Service::new(config.service, sink)?;
let service = web::Data::new(service);

let source = KafkaSource::new(config.listener)?;
let (source, runner) = KafkaSource::new(config.listener)?;
let source = web::Data::new(source);

let instance = web::Data::new(Instance {
application: config.application,
});

Ok(move |ctx: &mut web::ServiceConfig| {
ctx.app_data(service.clone());
ctx.app_data(instance.clone());
ctx.app_data(source.clone());
ctx.service(
web::resource("/api/v1alpha1/things")
.route(web::post().to(endpoints::things_create::<S, N>))
.route(web::put().to(endpoints::things_update::<S, N>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}")
.route(web::get().to(endpoints::things_get::<S, N>))
.route(web::delete().to(endpoints::things_delete::<S, N>))
.route(web::patch().to(endpoints::things_patch::<S, N>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/reportedState")
.route(web::put().to(endpoints::things_update_reported_state::<S, N>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/reconciliation")
.route(web::put().to(endpoints::things_update_reconciliation::<S, N>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/notifications")
.route(web::get().to(endpoints::things_notifications::<S, N>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/notifications")
.route(web::get().to(endpoints::things_notifications_single::<S, N>)),
);
})
Ok((
move |ctx: &mut web::ServiceConfig| {
ctx.app_data(service.clone());
ctx.app_data(instance.clone());
ctx.app_data(source.clone());
ctx.service(
web::resource("/api/v1alpha1/things")
.route(web::post().to(endpoints::things_create::<S, N, Si>))
.route(web::put().to(endpoints::things_update::<S, N, Si>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}")
.route(web::get().to(endpoints::things_get::<S, N, Si>))
.route(web::delete().to(endpoints::things_delete::<S, N, Si>))
.route(
web::patch()
.guard(guard::Header("content-type", "application/json-patch+json"))
.to(endpoints::things_patch::<S, N, Si>),
)
.route(
web::patch()
.guard(guard::Header(
"content-type",
"application/merge-patch+json",
))
.to(endpoints::things_merge::<S, N, Si>),
),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/reportedState")
.route(web::put().to(endpoints::things_update_reported_state::<S, N, Si>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/reconciliation")
.route(web::put().to(endpoints::things_update_reconciliation::<S, N, Si>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/notifications")
.route(web::get().to(endpoints::things_notifications::<S, N, Si>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/notifications")
.route(web::get().to(endpoints::things_notifications_single::<S, N, Si>)),
);
},
async move { runner.run().await }.boxed_local(),
))
}

pub async fn run(config: Config<postgres::Storage, kafka::Notifier>) -> anyhow::Result<()> {
let configurator = configure(config)?;
let (configurator, runner) = configure::<_, _, processor::source::kafka::Sink>(config)?;

let http = HttpServer::new(move || App::new().configure(|ctx| configurator(ctx)))
.bind("[::]:8080")?
.run()
.map_err(|err| anyhow!(err))
.boxed_local();

run_main([http]).await
run_main([http, runner]).await
}
Loading

0 comments on commit 1e8e87d

Please sign in to comment.