-
Notifications
You must be signed in to change notification settings - Fork 271
/
generic.rs
134 lines (124 loc) · 5.17 KB
/
generic.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
//! A generic allocator, wrapping known implementors of `Allocate`.
//!
//! This type is useful in settings where it is difficult to write code generic in `A: Allocate`,
//! for example closures whose type arguments must be specified.
use std::rc::Rc;
use std::cell::RefCell;
use std::collections::VecDeque;
use crate::allocator::thread::ThreadBuilder;
use crate::allocator::process::ProcessBuilder as TypedProcessBuilder;
use crate::allocator::{Allocate, AllocateBuilder, Event, Thread, Process};
use crate::allocator::zero_copy::allocator_process::{ProcessBuilder, ProcessAllocator};
use crate::allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator};
use crate::{Push, Pull, Data, Message};
/// Enumerates known implementors of `Allocate`.
/// Passes trait method calls on to members.
pub enum Generic {
/// Intra-thread allocator.
Thread(Thread),
/// Inter-thread, intra-process allocator.
Process(Process),
/// Inter-thread, intra-process serializing allocator.
ProcessBinary(ProcessAllocator),
/// Inter-process allocator.
ZeroCopy(TcpAllocator<Process>),
}
impl Generic {
/// The index of the worker out of `(0..self.peers())`.
pub fn index(&self) -> usize {
match self {
&Generic::Thread(ref t) => t.index(),
&Generic::Process(ref p) => p.index(),
&Generic::ProcessBinary(ref pb) => pb.index(),
&Generic::ZeroCopy(ref z) => z.index(),
}
}
/// The number of workers.
pub fn peers(&self) -> usize {
match self {
&Generic::Thread(ref t) => t.peers(),
&Generic::Process(ref p) => p.peers(),
&Generic::ProcessBinary(ref pb) => pb.peers(),
&Generic::ZeroCopy(ref z) => z.peers(),
}
}
/// Constructs several send endpoints and one receive endpoint.
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>) {
match self {
&mut Generic::Thread(ref mut t) => t.allocate(identifier),
&mut Generic::Process(ref mut p) => p.allocate(identifier),
&mut Generic::ProcessBinary(ref mut pb) => pb.allocate(identifier),
&mut Generic::ZeroCopy(ref mut z) => z.allocate(identifier),
}
}
/// Perform work before scheduling operators.
fn receive(&mut self) {
match self {
&mut Generic::Thread(ref mut t) => t.receive(),
&mut Generic::Process(ref mut p) => p.receive(),
&mut Generic::ProcessBinary(ref mut pb) => pb.receive(),
&mut Generic::ZeroCopy(ref mut z) => z.receive(),
}
}
/// Perform work after scheduling operators.
pub fn release(&mut self) {
match self {
&mut Generic::Thread(ref mut t) => t.release(),
&mut Generic::Process(ref mut p) => p.release(),
&mut Generic::ProcessBinary(ref mut pb) => pb.release(),
&mut Generic::ZeroCopy(ref mut z) => z.release(),
}
}
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
match self {
&Generic::Thread(ref t) => t.events(),
&Generic::Process(ref p) => p.events(),
&Generic::ProcessBinary(ref pb) => pb.events(),
&Generic::ZeroCopy(ref z) => z.events(),
}
}
}
impl Allocate for Generic {
fn index(&self) -> usize { self.index() }
fn peers(&self) -> usize { self.peers() }
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<Push<Message<T>>>>, Box<Pull<Message<T>>>) {
self.allocate(identifier)
}
fn receive(&mut self) { self.receive(); }
fn release(&mut self) { self.release(); }
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> { self.events() }
fn await_events(&self, _duration: Option<std::time::Duration>) {
match self {
&Generic::Thread(ref t) => t.await_events(_duration),
&Generic::Process(ref p) => p.await_events(_duration),
&Generic::ProcessBinary(ref pb) => pb.await_events(_duration),
&Generic::ZeroCopy(ref z) => z.await_events(_duration),
}
}
}
/// Enumerations of constructable implementors of `Allocate`.
///
/// The builder variants are meant to be `Send`, so that they can be moved across threads,
/// whereas the allocator they construct may not. As an example, the `ProcessBinary` type
/// contains `Rc` wrapped state, and so cannot itself be moved across threads.
pub enum GenericBuilder {
/// Builder for `Thread` allocator.
Thread(ThreadBuilder),
/// Builder for `Process` allocator.
Process(TypedProcessBuilder),
/// Builder for `ProcessBinary` allocator.
ProcessBinary(ProcessBuilder),
/// Builder for `ZeroCopy` allocator.
ZeroCopy(TcpBuilder<TypedProcessBuilder>),
}
impl AllocateBuilder for GenericBuilder {
type Allocator = Generic;
fn build(self) -> Generic {
match self {
GenericBuilder::Thread(t) => Generic::Thread(t.build()),
GenericBuilder::Process(p) => Generic::Process(p.build()),
GenericBuilder::ProcessBinary(pb) => Generic::ProcessBinary(pb.build()),
GenericBuilder::ZeroCopy(z) => Generic::ZeroCopy(z.build()),
}
}
}