Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Worker trait #236

Merged
merged 13 commits into from Sep 21, 2020
3 changes: 3 additions & 0 deletions bee-common-ext/Cargo.toml
Expand Up @@ -11,6 +11,9 @@ keywords = ["iota", "tangle", "bee", "framework", "common"]
homepage = "https://www.iota.org"

[dependencies]
bee-common = { git = "https://github.com/iotaledger/bee.git", branch = "dev" }

async-trait = "0.1.40"
dashmap = "3.11"
futures = "0.3.5"

Expand Down
1 change: 1 addition & 0 deletions bee-common-ext/src/lib.rs
Expand Up @@ -14,3 +14,4 @@

pub mod event;
pub mod wait_priority_queue;
pub mod worker;
27 changes: 27 additions & 0 deletions bee-common-ext/src/worker.rs
@@ -0,0 +1,27 @@
// Copyright 2020 IOTA Stiftung
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and limitations under the License.

use bee_common::worker::Error as WorkerError;

use async_trait::async_trait;
use futures::Stream;

use std::any::TypeId;

#[async_trait]
pub trait Worker {
const DEPS: &'static [TypeId] = &[];

type Event;
type Receiver: Stream<Item = Self::Event>;

async fn run(self, receiver: Self::Receiver) -> Result<(), WorkerError>;
}
1 change: 1 addition & 0 deletions bee-protocol/Cargo.toml
Expand Up @@ -21,6 +21,7 @@ bee-ternary = { git = "https://github.com/iotaledger/bee.git", branch = "dev" }
bee-transaction = { path = "../bee-transaction" }

async-std = "1.6.2"
async-trait = "0.1.40"
bitflags = "1.2.1"
bytemuck = "1.2.0"
dashmap = "3.11"
Expand Down
1 change: 1 addition & 0 deletions bee-protocol/src/lib.rs
Expand Up @@ -15,6 +15,7 @@ pub mod tangle;

mod message;
mod milestone;
mod node;
mod peer;
mod protocol;
mod worker;
Expand Down
89 changes: 89 additions & 0 deletions bee-protocol/src/node.rs
@@ -0,0 +1,89 @@
// Copyright 2020 IOTA Stiftung
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and limitations under the License.

use bee_common_ext::worker::Worker;

use std::{
any::TypeId,
collections::{HashMap, HashSet},
};

trait Node {
fn new() -> Self;
}

struct NodeBuilder<N: Node> {
deps: HashMap<TypeId, &'static [TypeId]>,
closures: HashMap<TypeId, Box<dyn FnOnce(&N)>>,
}

impl<N: Node> NodeBuilder<N> {
fn with_worker<W: Worker + 'static>(mut self) -> Self {
self.closures.insert(TypeId::of::<W>(), Box::new(|node| {}));
self.deps.insert(TypeId::of::<W>(), W::DEPS);
self
}

fn finish(mut self) {
let order = TopologicalOrder::sort(self.deps);
let node = N::new();
for id in order {
self.closures.remove(&id).unwrap()(&node);
}
}
}

struct TopologicalOrder {
graph: HashMap<TypeId, &'static [TypeId]>,
non_visited: Vec<TypeId>,
being_visited: HashSet<TypeId>,
order: Vec<TypeId>,
}

impl TopologicalOrder {
fn visit(&mut self, id: TypeId) {
if let Some(index) = self
.non_visited
.iter()
.enumerate()
.find_map(|(index, id2)| if id == *id2 { Some(index) } else { None })
{
if self.being_visited.insert(id) {
panic!("Cyclic dependency detected.");
}

for &id in self.graph[&id] {
self.visit(id);
}

self.being_visited.remove(&id);
self.non_visited.remove(index);
self.order.insert(0, id);
}
}

fn sort(graph: HashMap<TypeId, &'static [TypeId]>) -> Vec<TypeId> {
let non_visited = graph.keys().copied().collect();

let mut this = Self {
graph,
non_visited,
being_visited: HashSet::new(),
order: vec![],
};

while let Some(id) = this.non_visited.pop() {
this.visit(id);
}

this.order
}
}