Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/ce-rust/cerk into feat/te…
Browse files Browse the repository at this point in the history
…st-reliable-mqtt

# Conflicts:
#	cerk_port_mqtt/Cargo.toml
#	cerk_port_mqtt/README.md
#	cerk_port_mqtt/src/lib.rs
#	cerk_runtime_threading/Cargo.toml
  • Loading branch information
linuxbasic committed Dec 12, 2020
2 parents 20d0442 + d64acbb commit 0bf8bdc
Show file tree
Hide file tree
Showing 55 changed files with 830 additions and 334 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# CERK

[![Build status](https://badge.buildkite.com/4494e29d5f2c47e3fe998af46dff78a447800a76a68024e392.svg?branch=master)](https://buildkite.com/ce-rust/cerk)
[![Crates.io](https://img.shields.io/crates/v/cerk)](https://docs.rs/cerk/*/cerk/)
[![Docs status](https://docs.rs/cerk/badge.svg)](https://docs.rs/cerk/)
![Docker Cloud Build Status](https://img.shields.io/docker/cloud/build/cloudeventsrouter/cerk)

[CERK](https://github.com/ce-rust/cerk) is an open-source [CloudEvents](https://github.com/cloudevents/spec) Router written in Rust with a MicroKernel architecture.

Expand All @@ -10,6 +13,16 @@ CERK lets you route your [CloudEvents](https://github.com/cloudevents/spec) betw
Ports are transport layer bindings over which CloudEvents can be exchanged.
It is built with modularity and portability in mind.

The project was initially created during the student research project [CloudEvents Router](https://eprints.hsr.ch/832/).


## Get Started

The easiest way to use the router is to use the [docker image](https://hub.docker.com/repository/docker/cloudeventsrouter/cerk) from Docker Hub.

If you like to build the router by yourself, start with an [example](#examples). E.g. the [hello world example](https://github.com/ce-rust/cerk/tree/master/examples/src/hello_world).


## Components

CERK comes with a couple of prefabricated components, but implementing custom components is easy.
Expand Down Expand Up @@ -123,6 +136,17 @@ cargo doc --no-deps --open
2. `cd <crate>`
3. `cargo readme > README.md`

## Release Management

Release management is organized by cargo-workspaces.

pre-requirement: `cargo install cargo-workspaces`

1. check out master and create a new branch `release`
2. `cargo workspaces version --allow-branch="release"`
3. `cargo workspaces publish --allow-branch="release" --from-git --skip-published --no-git-push --no-git-tag`
4. merge it back into the master with a pull request

## License

Apache-2.0
2 changes: 1 addition & 1 deletion cerk/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cerk"
version = "0.2.3"
version = "0.2.6"
authors = [
"Linus Basig <linus@basig.me>",
"Fabrizio Lazzaretti <fabrizio@lazzaretti.me>"
Expand Down
2 changes: 1 addition & 1 deletion cerk/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# cerk 0.2.3
# cerk

[![Build status](https://badge.buildkite.com/4494e29d5f2c47e3fe998af46dff78a447800a76a68024e392.svg?branch=master)](https://buildkite.com/ce-rust/cerk)
[![Crates.io](https://img.shields.io/crates/v/cerk)](https://docs.rs/cerk/*/cerk/)
Expand Down
2 changes: 1 addition & 1 deletion cerk/README.tpl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# {{crate}} {{version}}
# {{crate}}

[![Build status](https://badge.buildkite.com/4494e29d5f2c47e3fe998af46dff78a447800a76a68024e392.svg?branch=master)](https://buildkite.com/ce-rust/cerk)
[![Crates.io](https://img.shields.io/crates/v/cerk)](https://docs.rs/cerk/*/cerk/)
Expand Down
2 changes: 2 additions & 0 deletions cerk/src/kernel/broker_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ pub struct RoutingResult {
pub routing: Vec<OutgoingCloudEvent>,
/// routing arguments to define how a CloudEvent should be routed - this config is used by the kernel; the args for the ports are inside the `Vec<OutgoingCloudEvent>`
pub args: CloudEventRoutingArgs,
/// outcome of the routing, was it successful?
pub result: ProcessingResult,
}

/// Struct for `BrokerEvent::OutgoingCloudEvent`
Expand Down
12 changes: 6 additions & 6 deletions cerk/src/kernel/delivery_guarantees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::convert::TryFrom;

/// Message delivery guarantees for the routing (defined per port channel)
#[repr(u8)]
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Copy)]
pub enum DeliveryGuarantee {
/// unspecified behaviour, the default
Unspecified = 0,
/// best effort: there is no guarantee that a message will be routed, the default
BestEffort = 0,
/// At Least Once the message should be received at the destination
AtLeastOnce = 2,
}
Expand All @@ -15,15 +15,15 @@ impl DeliveryGuarantee {
/// Does the selected delivery guarantee requires an acknowledgment?
pub fn requires_acknowledgment(&self) -> bool {
match self {
DeliveryGuarantee::Unspecified => false,
DeliveryGuarantee::BestEffort => false,
_ => true,
}
}
}

impl Default for DeliveryGuarantee {
fn default() -> Self {
DeliveryGuarantee::Unspecified
DeliveryGuarantee::BestEffort
}
}

Expand All @@ -39,7 +39,7 @@ impl TryFrom<&Config> for DeliveryGuarantee {
fn try_from(value: &Config) -> Result<Self, Self::Error> {
if let Config::U8(number) = value {
match number {
0 => Ok(DeliveryGuarantee::Unspecified),
0 => Ok(DeliveryGuarantee::BestEffort),
2 => Ok(DeliveryGuarantee::AtLeastOnce),
_ => bail!("number out of range"),
}
Expand Down
128 changes: 94 additions & 34 deletions cerk/src/kernel/kernel_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,49 @@ use crate::kernel::{CloudEventMessageRoutingId, ProcessingResult};
use crate::runtime::channel::{BoxedReceiver, BoxedSender};
use crate::runtime::InternalServerId;
use std::collections::HashMap;
use std::ops::Add;
use std::time::{Duration, SystemTime};

const ROUTER_ID: &str = "router";
const CONFIG_LOADER_ID: &str = "config_loader";
const ROUTING_TTL_MS: u64 = 100;

struct PendingDelivery {
sender: InternalServerId,
missing_receivers: Vec<InternalServerId>,
ttl: SystemTime,
}

type Outboxes = HashMap<InternalServerId, BoxedSender>;
type PendingDeliveries = HashMap<CloudEventMessageRoutingId, PendingDelivery>;

fn clean_pending_deliveries(outboxes: &Outboxes, pending_deliveries: &mut PendingDeliveries) {
let now = SystemTime::now();
if pending_deliveries.len() > 0 {
let to_remove: Vec<CloudEventMessageRoutingId> = {
let dead_messages: HashMap<&CloudEventMessageRoutingId, &PendingDelivery> =
pending_deliveries
.iter()
.filter(|(_, v)| v.ttl > now)
.collect();
for (routing_id, data) in dead_messages.iter() {
warn!("ttl exceeded for routing_id={}, will send back to receiver={} with ProcessingResult::Timeout", routing_id, data.sender);
outboxes
.get(&data.sender)
.unwrap()
.send(BrokerEvent::IncomingCloudEventProcessed(
(*routing_id).clone(),
ProcessingResult::Timeout,
));
}
dead_messages.iter().map(|(k, _)| *k).cloned().collect()
};
for routing_id in to_remove {
pending_deliveries.remove_entry(&routing_id);
}
}
}

fn process_routing_result(
event: RoutingResult,
outboxes: &mut Outboxes,
Expand All @@ -30,44 +61,72 @@ fn process_routing_result(
routing,
incoming_id: receiver_id,
args,
result,
} = event;
debug!("received RoutingResult for event_id={}", &routing_id);
debug!(
"received RoutingResult status={} for event_id={}",
result, &routing_id
);

if routing.is_empty() {
debug!("routing is empty - nothing to do");
} else {
if args.delivery_guarantee.requires_acknowledgment() {
let missing_receivers: Vec<_> = routing
.iter()
.map(|event| event.destination_id.clone())
.collect();

if pending_deliveries
.insert(
routing_id.clone(),
PendingDelivery {
sender: receiver_id,
missing_receivers,
},
)
.is_some()
{
error!(
"a routing for event_id={} already existed, the old one was overwritten",
&routing_id
);
match result {
ProcessingResult::Successful => {
if routing.is_empty() {
debug!("routing is empty - nothing to do; ack if needed");
if args.delivery_guarantee.requires_acknowledgment() {
outboxes.get(&receiver_id).unwrap().send(
BrokerEvent::IncomingCloudEventProcessed(
routing_id,
ProcessingResult::Successful,
),
);
}
} else {
if args.delivery_guarantee.requires_acknowledgment() {
let missing_receivers: Vec<_> = routing
.iter()
.map(|event| event.destination_id.clone())
.collect();

clean_pending_deliveries(outboxes, pending_deliveries);
if pending_deliveries
.insert(
routing_id.clone(),
PendingDelivery {
sender: receiver_id,
missing_receivers,
ttl: SystemTime::now().add(Duration::from_millis(ROUTING_TTL_MS)),
},
)
.is_some()
{
error!(
"a routing for event_id={} already existed, the old one was overwritten",
&routing_id
);
}
} else {
debug!("no acknowledgments needed for event_id={}", &routing_id)
}

for subevent in routing {
outboxes
.get(&subevent.destination_id)
.unwrap()
.send(BrokerEvent::OutgoingCloudEvent(subevent));
}
debug!("all routings sent for event_id={}", routing_id);
}
} else {
debug!("no acknowledgments needed for event_id={}", &routing_id)
}

for subevent in routing {
outboxes
.get(&subevent.destination_id)
.unwrap()
.send(BrokerEvent::OutgoingCloudEvent(subevent));
s @ ProcessingResult::PermanentError
| s @ ProcessingResult::TransientError
| s @ ProcessingResult::Timeout => {
if args.delivery_guarantee.requires_acknowledgment() {
outboxes
.get(&receiver_id)
.unwrap()
.send(BrokerEvent::IncomingCloudEventProcessed(routing_id, s));
}
}
debug!("all routings sent for event_id={}", routing_id);
}
}

Expand Down Expand Up @@ -204,7 +263,8 @@ pub fn kernel_start(
sender_to_scheduler: BoxedSender,
) {
let mut outboxes = Outboxes::new();
// todo this list could grow and entries could potentially be there for ever - TTL at kernel level?
// old entries are deleted with clean_pending_deliveries() before new are inserted.
// At the moment this is only done before a new event is created, if this should change with e.g. a job add a lock! as in 24bb886a37c187936d906a0df90a9e90a3cf4255
let mut pending_deliveries = PendingDeliveries::new();

sender_to_scheduler.send(BrokerEvent::ScheduleInternalServer(
Expand Down
5 changes: 5 additions & 0 deletions cerk/src/kernel/outgoing_processing_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ pub enum ProcessingResult {
/// The send action was not successful.
/// However, the error is permanent (e.g., parsing or config error) and should not be retried.
PermanentError,

/// The send action was not responded by all components in the given time.
/// The kernel canceled the routing.
/// At the moment there is no guarantee that the timout will be sent after a certain time, it is only to prevent the tracking table to grow.
Timeout,
}

/// from result to ProcessingResult; for every error a `ProcessingResult::PermanentError` is used
Expand Down
2 changes: 1 addition & 1 deletion cerk_config_loader_file/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cerk_config_loader_file"
version = "0.2.0"
version = "0.2.6"
authors = [
"Linus Basig <linus@basig.me>",
"Fabrizio Lazzaretti <fabrizio@lazzaretti.me>"
Expand Down
2 changes: 1 addition & 1 deletion cerk_config_loader_file/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# cerk_config_loader_file 0.2.0
# cerk_config_loader_file

[![Build status](https://badge.buildkite.com/4494e29d5f2c47e3fe998af46dff78a447800a76a68024e392.svg?branch=master)](https://buildkite.com/ce-rust/cerk)
[![Crates.io](https://img.shields.io/crates/v/cerk)](https://docs.rs/cerk_config_loader_file/*/cerk_config_loader_file/)
Expand Down
2 changes: 1 addition & 1 deletion cerk_config_loader_file/README.tpl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# {{crate}} {{version}}
# {{crate}}

[![Build status](https://badge.buildkite.com/4494e29d5f2c47e3fe998af46dff78a447800a76a68024e392.svg?branch=master)](https://buildkite.com/ce-rust/cerk)
[![Crates.io](https://img.shields.io/crates/v/cerk)](https://docs.rs/cerk_config_loader_file/*/cerk_config_loader_file/)
Expand Down
2 changes: 1 addition & 1 deletion cerk_loader_file/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "cerk_loader_file"
version = "0.2.0"
version = "0.2.6"
authors = [
"Linus Basig <linus@basig.me>",
"Fabrizio Lazzaretti <fabrizio@lazzaretti.me>"
Expand Down
28 changes: 20 additions & 8 deletions cerk_loader_file/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# cerk_loader_file 0.2.0
# cerk_loader_file

[![Build status](https://badge.buildkite.com/4494e29d5f2c47e3fe998af46dff78a447800a76a68024e392.svg?branch=master)](https://buildkite.com/ce-rust/cerk)

Expand All @@ -23,7 +23,7 @@ A good overview is provided on [GitHub](https://github.com/ce-rust/cerk/).
The cerk_loader_file link the different modules together and pass it to the `bootstrap` function.

It uses a `ComponentStartLinks` file with all links to the start functions and a configuration file.
The configuration file could be passed by the env variable `$CONFIG_PATH` or just use the path `./init.json`.
The configuration file could be passed by the env variable `$INIT_PATH` or just use the path `./init.json`.


#### Example Config
Expand All @@ -46,12 +46,24 @@ The configuration file could be passed by the env variable `$CONFIG_PATH` or jus
extern crate cerk_loader_file;
use cerk_loader_file::{start, ComponentStartLinks};

#
#
#
#
#
#
use cerk::runtime::{InternalServerId, InternalServerFn, InternalServerFnRefStatic, ScheduleFn, ScheduleFnRefStatic};
use cerk::runtime::channel::{BoxedReceiver, BoxedSender};
use cerk::kernel::{StartOptions, KernelFn};

fn dummy_scheduler(_: StartOptions, _: KernelFn) {}

fn dummy_router(_: InternalServerId, _: BoxedReceiver, _: BoxedSender) {}

fn dummy_config_loader(_: InternalServerId, _: BoxedReceiver, _: BoxedSender) {}

fn dummy_port(_: InternalServerId, _: BoxedReceiver, _: BoxedSender) {}

fn dummy_port_other(_: InternalServerId, _: BoxedReceiver, _: BoxedSender) {}

const SCHEDULER: ScheduleFnRefStatic = &(dummy_scheduler as ScheduleFn);
const ROUTER: InternalServerFnRefStatic = &(dummy_router as InternalServerFn);
const CONFIG_LOADER: InternalServerFnRefStatic = &(dummy_config_loader as InternalServerFn);
const PORT: InternalServerFnRefStatic = &(dummy_port as InternalServerFn);

fn main() {
let link = ComponentStartLinks {
Expand Down
2 changes: 1 addition & 1 deletion cerk_loader_file/README.tpl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# {{crate}} {{version}}
# {{crate}}

[![Build status](https://badge.buildkite.com/4494e29d5f2c47e3fe998af46dff78a447800a76a68024e392.svg?branch=master)](https://buildkite.com/ce-rust/cerk)

Expand Down
2 changes: 1 addition & 1 deletion cerk_loader_file/src/cerk_loader_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub fn load_by_path<'a>(path: String, links: ComponentStartLinks<'static>) -> Re
}

fn load<'a>(links: ComponentStartLinks<'static>) -> Result<StartOptions> {
let path = env::var("CONFIG_PATH").unwrap_or(String::from("./init.json"));
let path = env::var("INIT_PATH").unwrap_or(String::from("./init.json"));
load_by_path(path, links)
}

Expand Down
Loading

0 comments on commit 0bf8bdc

Please sign in to comment.