Skip to content

Commit

Permalink
[FAB-1573] Make deliver API signable
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1573

This changeset modifies the Deliver API to accept an Envelope of type
DELIVER_SEEK_INFO instead of a standard SeekInfo message.  The Envelope
embeds a SeekInfo as well as the bits necessary to construct a
non-replayable signature.

This unfortunately breaks the API, but it seems better to fix it now
than to wait until there are more dependencies.

The alternative was to invent yet another signing scheme, which seemed
like it would be equally as much work on the consumer side with ongoing
messiness.

Change-Id: I3e8a759d33f68cb6a69f5242440136da730f0396
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Jan 11, 2017
1 parent 149ae0d commit 1bf6190
Show file tree
Hide file tree
Showing 14 changed files with 291 additions and 195 deletions.
45 changes: 19 additions & 26 deletions bddtests/orderer/ab_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions bddtests/orderer/ab_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import common.common_pb2 as common_dot_common__pb2
import orderer.ab_pb2 as orderer_dot_ab__pb2
import orderer.ab_pb2 as orderer_dot_ab__pb2
import common.common_pb2 as common_dot_common__pb2
import orderer.ab_pb2 as orderer_dot_ab__pb2


Expand All @@ -23,7 +23,7 @@ def __init__(self, channel):
)
self.Deliver = channel.stream_stream(
'/orderer.AtomicBroadcast/Deliver',
request_serializer=orderer_dot_ab__pb2.SeekInfo.SerializeToString,
request_serializer=common_dot_common__pb2.Envelope.SerializeToString,
response_deserializer=orderer_dot_ab__pb2.DeliverResponse.FromString,
)

Expand All @@ -38,7 +38,7 @@ def Broadcast(self, request_iterator, context):
raise NotImplementedError('Method not implemented!')

def Deliver(self, request_iterator, context):
"""deliver first requires an update containing a seek message, then a stream of block replies is received.
"""deliver first requires an Envelope of type DELIVER_SEEK_INFO with Payload data as a mashaled SeekInfo message, then a stream of block replies is received.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
Expand All @@ -54,7 +54,7 @@ def add_AtomicBroadcastServicer_to_server(servicer, server):
),
'Deliver': grpc.stream_stream_rpc_method_handler(
servicer.Deliver,
request_deserializer=orderer_dot_ab__pb2.SeekInfo.FromString,
request_deserializer=common_dot_common__pb2.Envelope.FromString,
response_serializer=orderer_dot_ab__pb2.DeliverResponse.SerializeToString,
),
}
Expand Down
17 changes: 12 additions & 5 deletions bddtests/steps/orderer_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,18 @@ def seekPosition(position):
return ab_pb2.SeekPosition(specified = ab_pb2.SeekSpecified(number = position))

def createSeekInfo(chainID = TEST_CHAIN_ID, start = 'Oldest', end = 'Newest', behavior = 'FAIL_IF_NOT_READY'):
return ab_pb2.SeekInfo(
chainID = chainID,
start = seekPosition(start),
stop = seekPosition(end),
behavior = ab_pb2.SeekInfo.SeekBehavior.Value(behavior),
return common_pb2.Envelope(
payload = common_pb2.Payload(
header = common_pb2.Header(
chainHeader = common_pb2.ChainHeader( chainID = chainID ),
signatureHeader = common_pb2.SignatureHeader(),
),
data = ab_pb2.SeekInfo(
start = seekPosition(start),
stop = seekPosition(end),
behavior = ab_pb2.SeekInfo.SeekBehavior.Value(behavior),
).SerializeToString(),
).SerializeToString(),
)

def generateBroadcastMessages(chainID = TEST_CHAIN_ID, numToGenerate = 1, timeToHoldOpen = 1):
Expand Down
39 changes: 29 additions & 10 deletions core/deliverservice/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/hyperledger/fabric/gossip/service"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
"github.com/spf13/viper"
"golang.org/x/net/context"
Expand Down Expand Up @@ -155,20 +156,38 @@ func (d *DeliverService) checkLeaderAndRunDeliver(committer committer.Committer)
}

func (d *DeliverService) seekOldest() error {
return d.client.Send(&orderer.SeekInfo{
ChainID: d.chainID,
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
return d.client.Send(&common.Envelope{
Payload: utils.MarshalOrPanic(&common.Payload{
Header: &common.Header{
ChainHeader: &common.ChainHeader{
ChainID: d.chainID,
},
SignatureHeader: &common.SignatureHeader{},
},
Data: utils.MarshalOrPanic(&orderer.SeekInfo{
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
}),
}),
})
}

func (d *DeliverService) seekLatestFromCommitter(height uint64) error {
return d.client.Send(&orderer.SeekInfo{
ChainID: d.chainID,
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: height}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
return d.client.Send(&common.Envelope{
Payload: utils.MarshalOrPanic(&common.Payload{
Header: &common.Header{
ChainHeader: &common.ChainHeader{
ChainID: d.chainID,
},
SignatureHeader: &common.SignatureHeader{},
},
Data: utils.MarshalOrPanic(&orderer.SeekInfo{
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: height}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
}),
}),
})
}

Expand Down
28 changes: 25 additions & 3 deletions orderer/common/deliver/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ limitations under the License.
package deliver

import (
"fmt"

"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/rawledger"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/op/go-logging"

"github.com/golang/protobuf/proto"
)

var logger = logging.MustGetLogger("orderer/common/deliver")
Expand Down Expand Up @@ -60,20 +64,38 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
logger.Debugf("Starting new deliver loop")
for {
logger.Debugf("Attempting to read seek info message")
seekInfo, err := srv.Recv()
envelope, err := srv.Recv()
if err != nil {
logger.Errorf("Error reading from stream: %s", err)
return err
}
logger.Debugf("Received message %v", seekInfo)
payload := &cb.Payload{}
if err = proto.Unmarshal(envelope.Payload, payload); err != nil {
logger.Errorf("Received an envelope with no payload: %s", err)
return err
}

chain, ok := ds.sm.GetChain(seekInfo.ChainID)
if payload.Header == nil || payload.Header.ChainHeader == nil {
err := fmt.Errorf("Malformed envelope recieved with bad header")
logger.Error(err)
return err
}

chain, ok := ds.sm.GetChain(payload.Header.ChainHeader.ChainID)
if !ok {
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}

// XXX add deliver authorization checking

seekInfo := &ab.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
logger.Errorf("Received a signed deliver request with malformed seekInfo payload: %s", err)
return err
}

logger.Debugf("Received seekInfo %v", seekInfo)

cursor, number := chain.Reader().Iterator(seekInfo.Start)
var stopNum uint64
switch stop := seekInfo.Stop.Type.(type) {
Expand Down

0 comments on commit 1bf6190

Please sign in to comment.