-
Notifications
You must be signed in to change notification settings - Fork 5
User Guide
"Interface" in Go language enables the "duck-typing" style of software composition, in which components can be plugged together as long as they provide the methods defined in interfaces. It does not require any kind of inheritance hierarchy in code (neither interface inheritance such as Java's "implement" nor implementation inheritance such as Java's "extend"). This allows libraries (or packages) to be designed as "plug and play" frameworks with a set of components, and the framework will define the interfaces of components. "router" is designed and implemented this way. Users can use "router" in two typical ways:
- based on application's specific requirements, choose from the set of provided components and use them with router;
- create customized components according to the interfaces defined, and plug them into router.
Router and Proxy provide the "plug and play" framework.
Router is the main access point to functionality. Applications will create an instance of it thru router.New(...) and attach channels to ids in the instance. It maintains the pub/sub namespace and sends notifications when namespace change.
Router's main methods are as following:
type Router interface { //---- core api ---- //Attach chans to id in router, with an optional argument (chan *BindEvent) //When specified, the optional argument will serve two purposes: //1. used to tell when the remote peers connecting/disconn //2. in AttachRecvChan, used as a flag to ask router to keep recv chan open when all senders close //the returned RoutedChan object can be used to find the number of bound peers: routCh.NumPeers() AttachSendChan(Id, interface{}, ...interface{}) (*RoutedChan, os.Error) //3. When attaching recv chans, an optional integer can specify the internal buffering size AttachRecvChan(Id, interface{}, ...interface{}) (*RoutedChan, os.Error) //Detach sendChan/recvChan from router DetachChan(Id, interface{}) os.Error //Shutdown router Close() //Connect this router to another router. //1. internally it calls Proxy.Connect(...) to do the real job //2. The connection can be disconnected by calling Proxy.Close() on returned proxy object //3. for more compilcated connection setup (such as setting IdFilter and IdTranslator), use Proxy.Connect() instead //Connect to a local router Connect(Router) (Proxy, Proxy, os.Error) //Connect to a remote router thru io conn //1. io.ReadWriteCloser: transport connection //2. MarshalingPolicy: gob or json marshaling //3. remaining args can be a FlowControlPolicy (e.g. window based or XOnOff) ConnectRemote(io.ReadWriteCloser, MarshalingPolicy, ...bool) (Proxy, os.Error) //--- other utils --- //return pre-created SysIds according to the router's id-type, with ScopeGlobal / MemberLocal SysID(idx int) Id //create a new SysId with "args..." specifying scope/membership NewSysID(idx int, args ...int) Id //return all ids and their ChanTypes from router's namespace which satisfy predicate IdsForSend(predicate func(id Id) bool) map[interface{}]*IdChanInfo IdsForRecv(predicate func(id Id) bool) map[interface{}]*IdChanInfo } func New(seedId Id, bufSize int, disp DispatchPolicy, args ...) Router
router.New(...) accepts the following arguments:
- seedId: a dummy id to show what type of ids will be used. New ids will be type-checked against this.
- bufSize: the buffer size used for router's internal channels.
- if bufSize >= 0, its value will be used
- if bufSize < 0, it means unlimited buffering, so router is async and sending on attached channels will never block
- disp: dispatch policy for router. by default, it is BroadcastPolicy
- optional arguments ...:
- name: router's name, if name is defined, router internal logging will be turned on, ie LogRecord generated
- LogScope: if this is set, a console log sink is installed to show router internal log:
- if logScope == ScopeLocal, only log msgs from local router will show up
- if logScope == ScopeGlobal, all log msgs from connected routers will show up
Router's Connect() / ConnectRemote() methods are just convenience wrappers which forward call to Proxy's Connect() / ConnectRemote().
The components plugged into Router are:
- Id: there are predefined Id types to allow use use integer, string, path name or MsgId in router
- Dispatcher: there are three predefined dispatchers: broadcast, roundrobin, random
more detials can be found in the following.
Proxy is the primary interface to connect router to its peer router. At both ends of a connection, there is a proxy object for its router. Simple router connection can be set up thru calling Router.Connect(). Proxy.Connect() can be used to set up more complicated connections, such as setting IdFilter to allow only a subset of messages pass thru the connection, or setting IdTranslator which can "relocate" remote message ids into a subspace in local router's namespace, or setting a flow control policy. Proxy.Close() is called to disconnect router from its peer.
type Proxy interface { //Connect to a local router Connect(Proxy) error //Connect to a remote router thru io conn //1. io.ReadWriteCloser: transport connection //2. MarshalingPolicy: gob or json marshaling //3. remaining args can be a FlowControlPolicy (e.g. window based or XOnOff) ConnectRemote(io.ReadWriteCloser, MarshalingPolicy, ...interface{}) error //close proxy and disconnect from peer Close() //query messaging interface with peer LocalPubInfo() []*ChanInfo LocalSubInfo() []*ChanInfo PeerPubInfo() []*ChanInfo PeerSubInfo() []*ChanInfo } func NewProxy(r Router, name string, f IdFilter, t IdTranslator) Proxy
NewProxy(...) accepts the following arguments:
- r: the router which will be bound with this proxy and be owner of this proxy
- name: proxy's name, used in log messages if owner router's log is turned on
- f: IdFilter to be installed at this proxy
- t: IdTranslator to be installed at this proxy
The components plugged into Proxy are:
- Marshaling: there are two predefined marshaling polcies: GobMarshaling use package "gob" and JsonMarshaling use package "json".
- IdFilter and IdTranslator: there are no predefined filters and translators; applications can define them with application specific rules.
- FlowController: there are two predefined flow control policies: simple window protocol and x-on/x-off protocol.
More details about components can be found in following.
Each id contains three pieces of data:
- Val - used as key in routing
- Scope - the scope to send/recv msgs
- Membership - local/remote peer, identifying if peers are inside same router or not
- Scope Scope defines the range of send/recv operations. there are three scopes:
- ScopeLocal: send msgs to local peers in the same router, or recv msgs from local peers
- ScopeRemote: send msgs to remote peers in connected routers, or recv msgs from remote peers
- ScopeGlobal: send msgs to all local/remote peers, or recv msgs from all local/remote peers the default attributes of id is ScopeGlobal (and Membership = MemberLocal)
- matching algorithm different types of ids use diff matching algorithms:
- exact match: for int/string/struct ids
- prefix match: for pathname ids
- assoc match: for regex ids or tuple ids (future)
- binding When send/recv chans are attached to router, their ids are matched against ids of chans which are already attached, and their bindings will be decided:
- For send chan, its bindings is the set of recv chans with matched id.
- For recv chan, its bindings is the set of send chans with matched id.
There are four predefined id types: IntId, StrId, PathId and MsgId, all implementing the following interface:
type Id interface { //methods to query Id content Scope() int Member() int //key value for storing Id in map Key() interface{} //for id matching Match(Id) bool MatchType() MatchType //Generators for creating other ids of same type. Since often we don't //know the exact types of Id.Val, so we have to create new ones from an existing id SysID(int, ...) (Id, os.Error) //generate sys ids, also called as method of Router Clone(...) (Id, os.Error) //create a new id with same id, but possible diff scope & membership //Stringer interface String() string }
Applications can define their own customized id type according to the above interface, and use it in router.
Dispatchers define how the values from a send channel are dispatched to the recv channels bound to it. DispatcherPolicy is used to create instances of disptachers. Some dispatchers are stateful, such as roundrobin, we need a separate dispatcher for each send channel. A instance of DispatchPolicy is passed into router's constructor, which will define router's overall dispatching behaviour.
The programming of Dispatchers are "generic" in the following aspects:
- do not depend on specific chan types
- messages sent are represented as reflect.Value
- receivers are array of RoutedChans with Channel interface
- Channel is api of reflect.ChanValue with Send()/Recv()/...
There are three predefined dispatchers: broadcast, roundrobin and random, all implementing the following interfaces:
type Dispatcher interface { Dispatch(v reflect.Value, recvers []*RoutedChan) } type DispatchPolicy interface { NewDispatcher() Dispatcher }
Customized dispatchers can be created according to these interfaces and plugged into router.
For remote router connections (thru sockets or others), we need to marshal the values or messages from local send channel into data streams, pass thru io connection, and at remote router, demarshal back into values or messages to forward to remote recv channels.
There are three interfaces involved. MarshallingPolicy is used to create instances of Marshaler and Demarshaler. There are two predefined marshaling policies: GobMarshaling using "gob" and JsonMarshaling using "json", all implementing the following interfaces:
type Marshaler interface { Marshal(interface{}) os.Error } type Demarshaler interface { Demarshal(interface{}) os.Error } type MarshallingPolicy interface { NewMarshaler(io.Writer) Marshaler NewDemarshaler(io.Reader) Demarshaler }
Customized marshaling policies can be created according to the above interfaces and plugged into Proxy.
When two routers connect, their namespaces will merge as following to enable channels in one router to communicate to channels in the other router transparently:
- Ids merging from router2 to router1: all ids in the intersection of router1's input interface (its set of recv ids with global / remote scope) and router2's output interface (its set of send ids with global / remote scope)
- Ids merging from router1 to router2: all ids in the intersection of router2's input interface (its set of recv ids with global / remote scope) and router1's output interface (its set of send ids with global / remote scope)
- new ids are propagated automatically to connected routers according to its id / scope / membership.
- when routers are disconnected, routers' namespaces will be updated automatically so that all publications and subscriptions from remote routers will be removed.
To manage namespace changes, we can apply IdFilter and IdTranslator at Proxy for the following effects:
- IdFilter: allow only a specific set of Ids (and their messages) to pass thru a connection
- IdTranslator: relocate ids / messages from a connection to a "subspace" in local namespace
The interfaces are defined as following:
type IdFilter interface { BlockInward(Id) bool BlockOutward(Id) bool } type IdTranslator interface { TranslateInward(Id) Id TranslateOutward(Id) Id }
Customized filters and translators can be defined according the above interfaces and with application specific logic.
For remote router connections, multiple sender-recver pair connections are multiplexed over the same io stream. For synchronization and congestion avoidance, we could apply flow control policy.
a flow control protocol has two parts:
- sender: which send msgs and recv acks from recver.
- recver: which recv msgs and send acks to sender. So besides wrapping a transport Channel (for send/recv msgs)
- FlowSender expose Ack(int) method to recv acks
- FlowRecver constructor will take as argument an ack(int) callback for sending acks. The following three interfaces are implemented for each flow control policy:
type FlowControlPolicy interface { NewFlowSender(ch Channel, args ...interface{}) (FlowSender, error) NewFlowRecver(ch Channel, ack func(int), args ...interface{}) (FlowRecver, error) String() string } type FlowSender interface { Channel Ack(int) } type FlowRecver interface { Channel }
There are two predefined flow control policies: Windowing and XOnOff, which are based on Chapter 4 of "Design And Validation Of Computer Protocols" by Gerard J. Holzmann.