Skip to content

Commit

Permalink
Orderer broadcast filtering framework
Browse files Browse the repository at this point in the history
In order to allow reconfiguration messages to be supported by the
orderer as well as to allow authentication of message signatures from
clients, the ordering service will need to support limited filtering of
broadcast messages as they arrive.

This changeset defines a simple interface for doing so as well as a
structure which supports applying many rules at once.  The solo orderer
is hooked into this framework, but for the moment only supports the
existing behavior of 'Accept all non-empty messages', this will change
once other more sophisticated filters are added.

The hope is that this filtering framework can be re-used across ordering
plugins, especially the component for verifying client signatures on
incoming messages.

https://jira.hyperledger.org/browse/FAB-591

Change-Id: I534330a1cb2b675c6f34e73079654ab3280520c0
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Oct 24, 2016
1 parent 038ea83 commit 820ee67
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 14 deletions.
89 changes: 89 additions & 0 deletions orderer/broadcastfilter/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package broadcastfilter

import (
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
)

// Action is used to express the output of a rule
type Action int

const (
// Accept indicates that the message should be processed
Accept = iota
// Reconfigure indicates that this message modifies this rule, and should therefore be processed in a batch by itself
Reconfigure
// Reject indicates that the message should not be processed
Reject
// Forward indicates that the rule could not determine the correct course of action
Forward
)

// Rule defines a filter function which accepts, rejects, or forwards (to the next rule) a BroadcastMessage
type Rule interface {
// Apply applies the rule to the given BroadcastMessage, replying with the Action to take for the message
Apply(message *ab.BroadcastMessage) Action
}

// EmptyRejectRule rejects empty messages
var EmptyRejectRule = Rule(emptyRejectRule{})

type emptyRejectRule struct{}

func (a emptyRejectRule) Apply(message *ab.BroadcastMessage) Action {
if message.Data == nil {
return Reject
}
return Forward
}

// AcceptRule always returns Accept as a result for Apply
var AcceptRule = Rule(acceptRule{})

type acceptRule struct{}

func (a acceptRule) Apply(message *ab.BroadcastMessage) Action {
return Accept
}

// RuleSet is used to apply a collection of rules
type RuleSet struct {
rules []Rule
}

// NewRuleSet creates a new RuleSet with the given ordered list of Rules
func NewRuleSet(rules []Rule) *RuleSet {
return &RuleSet{
rules: rules,
}
}

// Apply applies the rules given for this set in order, returning the first non-Forward result and the Rule which generated it
// or returning Forward, nil if no rules accept or reject it
func (rs *RuleSet) Apply(message *ab.BroadcastMessage) (Action, Rule) {
for _, rule := range rs.rules {
action := rule.Apply(message)
switch action {
case Forward:
continue
default:
return action, rule
}
}
return Forward, nil
}
109 changes: 109 additions & 0 deletions orderer/broadcastfilter/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package broadcastfilter

import (
"testing"

ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
)

var RejectRule = Rule(rejectRule{})

type rejectRule struct{}

func (r rejectRule) Apply(message *ab.BroadcastMessage) Action {
return Reject
}

var ForwardRule = Rule(forwardRule{})

type forwardRule struct{}

func (r forwardRule) Apply(message *ab.BroadcastMessage) Action {
return Forward
}

func TestEmptyRejectRule(t *testing.T) {
rs := NewRuleSet([]Rule{EmptyRejectRule})
result, rule := rs.Apply(&ab.BroadcastMessage{})
if result != Reject {
t.Fatalf("Should have rejected")
}
if rule != EmptyRejectRule {
t.Fatalf("Rejected but not for the right rule")
}
result, _ = rs.Apply(&ab.BroadcastMessage{Data: []byte("fakedata")})
if result != Forward {
t.Fatalf("Should have forwarded")
}
}

func TestAcceptReject(t *testing.T) {
rs := NewRuleSet([]Rule{AcceptRule, RejectRule})
result, rule := rs.Apply(&ab.BroadcastMessage{})
if result != Accept {
t.Fatalf("Should have accepted")
}
if rule != AcceptRule {
t.Fatalf("Accepted but not for the right rule")
}
}

func TestRejectAccept(t *testing.T) {
rs := NewRuleSet([]Rule{RejectRule, AcceptRule})
result, rule := rs.Apply(&ab.BroadcastMessage{})
if result != Reject {
t.Fatalf("Should have rejected")
}
if rule != RejectRule {
t.Fatalf("Rejected but not for the right rule")
}
}

func TestForwardAccept(t *testing.T) {
rs := NewRuleSet([]Rule{ForwardRule, AcceptRule})
result, rule := rs.Apply(&ab.BroadcastMessage{})
if result != Accept {
t.Fatalf("Should have accepted")
}
if rule != AcceptRule {
t.Fatalf("Accepted but not for the right rule")
}
}

func TestForward(t *testing.T) {
rs := NewRuleSet([]Rule{ForwardRule})
result, rule := rs.Apply(&ab.BroadcastMessage{})
if result != Forward {
t.Fatalf("Should have forwarded")
}
if rule != nil {
t.Fatalf("Forwarded but rule is set")
}
}

func TestNoRule(t *testing.T) {
rs := NewRuleSet([]Rule{})
result, rule := rs.Apply(&ab.BroadcastMessage{})
if result != Forward {
t.Fatalf("Should have forwarded")
}
if rule != nil {
t.Fatalf("Forwarded but rule is set")
}
}
46 changes: 32 additions & 14 deletions orderer/solo/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/broadcastfilter"
"github.com/hyperledger/fabric/orderer/rawledger"
)

Expand All @@ -28,6 +29,7 @@ type broadcastServer struct {
batchSize int
batchTimeout time.Duration
rl rawledger.Writer
filter *broadcastfilter.RuleSet
sendChan chan *ab.BroadcastMessage
exitChan chan struct{}
}
Expand All @@ -44,6 +46,7 @@ func newPlainBroadcastServer(queueSize, batchSize int, batchTimeout time.Duratio
batchSize: batchSize,
batchTimeout: batchTimeout,
rl: rl,
filter: broadcastfilter.NewRuleSet([]broadcastfilter.Rule{broadcastfilter.EmptyRejectRule, broadcastfilter.AcceptRule}),
sendChan: make(chan *ab.BroadcastMessage),
exitChan: make(chan struct{}),
}
Expand All @@ -62,11 +65,21 @@ outer:
for {
select {
case msg := <-bs.sendChan:
curBatch = append(curBatch, msg)
if len(curBatch) < bs.batchSize {
continue
// The messages must be filtered a second time in case configuration has changed since the message was received
action, _ := bs.filter.Apply(msg)
switch action {
case broadcastfilter.Accept:
curBatch = append(curBatch, msg)
if len(curBatch) < bs.batchSize {
continue
}
logger.Debugf("Batch size met, creating block")
case broadcastfilter.Forward:
logger.Debugf("Ignoring message because it was not accepted by a filter")
default:
// TODO add support for other cases, unreachable for now
logger.Fatalf("NOT IMPLEMENTED YET")
}
logger.Debugf("Batch size met, creating block")
case <-timer:
if len(curBatch) == 0 {
continue outer
Expand Down Expand Up @@ -123,18 +136,23 @@ func (b *broadcaster) queueBroadcastMessages(srv ab.AtomicBroadcast_BroadcastSer
return err
}

if msg.Data == nil {
err = srv.Send(&ab.BroadcastResponse{Status: ab.Status_BAD_REQUEST})
if err != nil {
return err
}
}
action, _ := b.bs.filter.Apply(msg)

select {
case b.queue <- msg:
err = srv.Send(&ab.BroadcastResponse{Status: ab.Status_SUCCESS})
switch action {
case broadcastfilter.Accept:
select {
case b.queue <- msg:
err = srv.Send(&ab.BroadcastResponse{ab.Status_SUCCESS})
default:
err = srv.Send(&ab.BroadcastResponse{ab.Status_SERVICE_UNAVAILABLE})
}
case broadcastfilter.Forward:
fallthrough
case broadcastfilter.Reject:
err = srv.Send(&ab.BroadcastResponse{ab.Status_BAD_REQUEST})
default:
err = srv.Send(&ab.BroadcastResponse{Status: ab.Status_SERVICE_UNAVAILABLE})
// TODO add support for other cases, unreachable for now
logger.Fatalf("NOT IMPLEMENTED YET")
}

if err != nil {
Expand Down

0 comments on commit 820ee67

Please sign in to comment.