Skip to content

Commit

Permalink
FAB-466 integrate ledgernext with chaincode framework
Browse files Browse the repository at this point in the history
The ledgernext and kvledger packages provide APIs to simulate
transactions by collecting read-write sets from invokes of
chaincodes. This change set integrates this for the Endorser
flows.  The main purpose of this code is to enable read write
sets to be propagated so end to end flow ending in a commit to
the ledger can be tested.

The chaincode unit tests continue to use the old ledger. This
allows us to (1) incrementally integrate ledger and (2) show
that the two packages can coexist from a build and runtime
point of view.

It is worth noting that the file kv_ledgers.go hosts a temporary
container for ledgers. This simple approach is expected to be
revised when (sub)ledgers are implemented.

Change-Id: I6e0bf4b9795b83d2ae869244b212c02ef9b5214a
Signed-off-by: Srinivasan Muralidharan <muralisr@us.ibm.com>
  • Loading branch information
Srinivasan Muralidharan committed Sep 28, 2016
1 parent ea9f840 commit 909b517
Show file tree
Hide file tree
Showing 10 changed files with 382 additions and 66 deletions.
7 changes: 5 additions & 2 deletions core/chaincode/chaincode_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ const (
chaincodeStartupTimeoutDefault int = 5000
chaincodeInstallPathDefault string = "/opt/gopath/bin/"
peerAddressDefault string = "0.0.0.0:7051"

//TXSimulatorKey is used to attach ledger simulation context
TXSimulatorKey string = "txsimulatorkey"
)

// chains is a map between different blockchains and their ChaincodeSupport.
Expand Down Expand Up @@ -257,7 +260,7 @@ func (chaincodeSupport *ChaincodeSupport) sendInitOrReady(context context.Contex

var notfy chan *pb.ChaincodeMessage
var err error
if notfy, err = chrte.handler.initOrReady(txid, initArgs, tx, depTx); err != nil {
if notfy, err = chrte.handler.initOrReady(context, txid, initArgs, tx, depTx); err != nil {
return fmt.Errorf("Error sending %s: %s", pb.ChaincodeMessage_INIT, err)
}
if notfy != nil {
Expand Down Expand Up @@ -648,7 +651,7 @@ func (chaincodeSupport *ChaincodeSupport) Execute(ctxt context.Context, chaincod

var notfy chan *pb.ChaincodeMessage
var err error
if notfy, err = chrte.handler.sendExecuteMessage(msg, tx); err != nil {
if notfy, err = chrte.handler.sendExecuteMessage(ctxt, msg, tx); err != nil {
return nil, fmt.Errorf("Error sending %s: %s", msg.Type.String(), err)
}
var ccresp *pb.ChaincodeMessage
Expand Down
16 changes: 3 additions & 13 deletions core/chaincode/chaincodeexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"fmt"

"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/util"
pb "github.com/hyperledger/fabric/protos"
)
Expand All @@ -40,24 +39,15 @@ func createTx(typ pb.Transaction_Type, ccname string, args [][]byte) (*pb.Transa
}

// ExecuteChaincode executes a given chaincode given chaincode name and arguments
func ExecuteChaincode(typ pb.Transaction_Type, chainname string, ccname string, args [][]byte) ([]byte, error) {
func ExecuteChaincode(ctxt context.Context, typ pb.Transaction_Type, chainname string, ccname string, args [][]byte) ([]byte, error) {
var tx *pb.Transaction
var err error
var b []byte
var lgr *ledger.Ledger

tx, err = createTx(typ, ccname, args)
lgr, err = ledger.GetLedger()
if err != nil {
return nil, fmt.Errorf("Failed to get handle to ledger: %s ", err)
}
//TODO - new ledger access will change this call to take a context
lgr.BeginTxBatch("1")
b, _, err = Execute(context.Background(), GetChain(ChainName(chainname)), tx)
b, _, err = Execute(ctxt, GetChain(ChainName(chainname)), tx)
if err != nil {
return nil, fmt.Errorf("Error deploying chaincode: %s", err)
}
//TODO - new ledger access will change this call to take a context
lgr.CommitTxBatch("1", []*pb.Transaction{tx}, nil, nil)

return b, err
}
51 changes: 32 additions & 19 deletions core/chaincode/exectransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"golang.org/x/net/context"

"github.com/hyperledger/fabric/core/ledger"
ledgernext "github.com/hyperledger/fabric/core/ledgernext"
"github.com/hyperledger/fabric/events/producer"
pb "github.com/hyperledger/fabric/protos"
)
Expand All @@ -33,10 +34,16 @@ import (
func Execute(ctxt context.Context, chain *ChaincodeSupport, t *pb.Transaction) ([]byte, *pb.ChaincodeEvent, error) {
var err error

// get a handle to ledger to mark the begin/finish of a tx
ledger, ledgerErr := ledger.GetLedger()
if ledgerErr != nil {
return nil, nil, fmt.Errorf("Failed to get handle to ledger (%s)", ledgerErr)
//are we in V1 mode doing simulation ?
txsim, _ := ctxt.Value(TXSimulatorKey).(ledgernext.TxSimulator)

var lgr *ledger.Ledger
if txsim == nil {
// get a handle to ledger to mark the begin/finish of a tx
lgr, err = ledger.GetLedger()
if err != nil {
return nil, nil, fmt.Errorf("Failed to get handle to ledger (%s)", err)
}
}

if secHelper := chain.getSecHelper(); nil != secHelper {
Expand All @@ -55,13 +62,13 @@ func Execute(ctxt context.Context, chain *ChaincodeSupport, t *pb.Transaction) (
}

//launch and wait for ready
markTxBegin(ledger, t)
markTxBegin(lgr, t)
_, _, err = chain.Launch(ctxt, t)
if err != nil {
markTxFinish(ledger, t, false)
markTxFinish(lgr, t, false)
return nil, nil, fmt.Errorf("%s", err)
}
markTxFinish(ledger, t, true)
markTxFinish(lgr, t, true)
} else if t.Type == pb.Transaction_CHAINCODE_INVOKE || t.Type == pb.Transaction_CHAINCODE_QUERY {
//will launch if necessary (and wait for ready)
cID, cMsg, err := chain.Launch(ctxt, t)
Expand Down Expand Up @@ -97,15 +104,15 @@ func Execute(ctxt context.Context, chain *ChaincodeSupport, t *pb.Transaction) (
}
}

markTxBegin(ledger, t)
markTxBegin(lgr, t)
resp, err := chain.Execute(ctxt, chaincode, ccMsg, timeout, t)
if err != nil {
// Rollback transaction
markTxFinish(ledger, t, false)
markTxFinish(lgr, t, false)
return nil, nil, fmt.Errorf("Failed to execute transaction or query(%s)", err)
} else if resp == nil {
// Rollback transaction
markTxFinish(ledger, t, false)
markTxFinish(lgr, t, false)
return nil, nil, fmt.Errorf("Failed to receive a response for (%s)", t.Txid)
} else {
if resp.ChaincodeEvent != nil {
Expand All @@ -115,14 +122,14 @@ func Execute(ctxt context.Context, chain *ChaincodeSupport, t *pb.Transaction) (

if resp.Type == pb.ChaincodeMessage_COMPLETED || resp.Type == pb.ChaincodeMessage_QUERY_COMPLETED {
// Success
markTxFinish(ledger, t, true)
markTxFinish(lgr, t, true)
return resp.Payload, resp.ChaincodeEvent, nil
} else if resp.Type == pb.ChaincodeMessage_ERROR || resp.Type == pb.ChaincodeMessage_QUERY_ERROR {
// Rollback transaction
markTxFinish(ledger, t, false)
markTxFinish(lgr, t, false)
return nil, resp.ChaincodeEvent, fmt.Errorf("Transaction or query returned with failure: %s", string(resp.Payload))
}
markTxFinish(ledger, t, false)
markTxFinish(lgr, t, false)
return resp.Payload, nil, fmt.Errorf("receive a response for (%s) but in invalid state(%d)", t.Txid, resp.Type)
}

Expand Down Expand Up @@ -201,17 +208,23 @@ func getTimeout(cID *pb.ChaincodeID) (time.Duration, error) {
}

func markTxBegin(ledger *ledger.Ledger, t *pb.Transaction) {
if t.Type == pb.Transaction_CHAINCODE_QUERY {
return
//ledger would be nil if are in simulation mode
if ledger != nil {
if t.Type == pb.Transaction_CHAINCODE_QUERY {
return
}
ledger.TxBegin(t.Txid)
}
ledger.TxBegin(t.Txid)
}

func markTxFinish(ledger *ledger.Ledger, t *pb.Transaction, successful bool) {
if t.Type == pb.Transaction_CHAINCODE_QUERY {
return
//ledger would be nil if are in simulation mode
if ledger != nil {
if t.Type == pb.Transaction_CHAINCODE_QUERY {
return
}
ledger.TxFinished(t.Txid, successful)
}
ledger.TxFinished(t.Txid, successful)
}

func sendTxRejectedEvent(tx *pb.Transaction, errorMsg string) {
Expand Down
43 changes: 35 additions & 8 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
ccintf "github.com/hyperledger/fabric/core/container/ccintf"
"github.com/hyperledger/fabric/core/crypto"
"github.com/hyperledger/fabric/core/ledger/statemgmt"
ledgernext "github.com/hyperledger/fabric/core/ledgernext"
"github.com/hyperledger/fabric/core/util"
pb "github.com/hyperledger/fabric/protos"
"github.com/looplab/fsm"
Expand Down Expand Up @@ -61,6 +62,8 @@ type transactionContext struct {

// tracks open iterators used for range queries
rangeQueryIteratorMap map[string]statemgmt.RangeScanIterator

txsimulator ledgernext.TxSimulator
}

type nextStateInfo struct {
Expand Down Expand Up @@ -113,7 +116,7 @@ func (handler *Handler) serialSend(msg *pb.ChaincodeMessage) error {
return nil
}

func (handler *Handler) createTxContext(txid string, tx *pb.Transaction) (*transactionContext, error) {
func (handler *Handler) createTxContext(ctxt context.Context, txid string, tx *pb.Transaction) (*transactionContext, error) {
if handler.txCtxs == nil {
return nil, fmt.Errorf("cannot create notifier for txid:%s", txid)
}
Expand All @@ -125,6 +128,10 @@ func (handler *Handler) createTxContext(txid string, tx *pb.Transaction) (*trans
txctx := &transactionContext{transactionSecContext: tx, responseNotifier: make(chan *pb.ChaincodeMessage, 1),
rangeQueryIteratorMap: make(map[string]statemgmt.RangeScanIterator)}
handler.txCtxs[txid] = txctx
if txsim, ok := ctxt.Value(TXSimulatorKey).(ledgernext.TxSimulator); ok {
txctx.txsimulator = txsim
}

return txctx, nil
}

Expand Down Expand Up @@ -624,7 +631,16 @@ func (handler *Handler) handleGetState(msg *pb.ChaincodeMessage) {
chaincodeID := handler.ChaincodeID.Name

readCommittedState := !handler.getIsTransaction(msg.Txid)
res, err := ledgerObj.GetState(chaincodeID, key, readCommittedState)
var res []byte
var err error

txContext := handler.getTxContext(msg.Txid)
if txContext.txsimulator != nil {
res, err = txContext.txsimulator.GetState(chaincodeID, key)
} else {
res, err = ledgerObj.GetState(chaincodeID, key, readCommittedState)
}

if err != nil {
// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
Expand Down Expand Up @@ -1038,12 +1054,23 @@ func (handler *Handler) enterBusyState(e *fsm.Event, state string) {
// Encrypt the data if the confidential is enabled
if pVal, err = handler.encrypt(msg.Txid, putStateInfo.Value); err == nil {
// Invoke ledger to put state
err = ledgerObj.SetState(chaincodeID, putStateInfo.Key, pVal)
txContext := handler.getTxContext(msg.Txid)
if txContext.txsimulator != nil {
err = txContext.txsimulator.SetState(chaincodeID, putStateInfo.Key, pVal)
} else {
err = ledgerObj.SetState(chaincodeID, putStateInfo.Key, pVal)
}

}
} else if msg.Type.String() == pb.ChaincodeMessage_DEL_STATE.String() {
// Invoke ledger to delete state
key := string(msg.Payload)
err = ledgerObj.DeleteState(chaincodeID, key)
txContext := handler.getTxContext(msg.Txid)
if txContext.txsimulator != nil {
err = txContext.txsimulator.DeleteState(chaincodeID, key)
} else {
err = ledgerObj.DeleteState(chaincodeID, key)
}
} else if msg.Type.String() == pb.ChaincodeMessage_INVOKE_CHAINCODE.String() {
//check and prohibit C-call-C for CONFIDENTIAL txs
if triggerNextStateMsg = handler.canCallChaincode(msg.Txid); triggerNextStateMsg != nil {
Expand Down Expand Up @@ -1256,11 +1283,11 @@ func (handler *Handler) setChaincodeSecurityContext(tx *pb.Transaction, msg *pb.

//if initArgs is set (should be for "deploy" only) move to Init
//else move to ready
func (handler *Handler) initOrReady(txid string, initArgs [][]byte, tx *pb.Transaction, depTx *pb.Transaction) (chan *pb.ChaincodeMessage, error) {
func (handler *Handler) initOrReady(ctxt context.Context, txid string, initArgs [][]byte, tx *pb.Transaction, depTx *pb.Transaction) (chan *pb.ChaincodeMessage, error) {
var ccMsg *pb.ChaincodeMessage
var send bool

txctx, funcErr := handler.createTxContext(txid, tx)
txctx, funcErr := handler.createTxContext(ctxt, txid, tx)
if funcErr != nil {
return nil, funcErr
}
Expand Down Expand Up @@ -1451,8 +1478,8 @@ func filterError(errFromFSMEvent error) error {
return nil
}

func (handler *Handler) sendExecuteMessage(msg *pb.ChaincodeMessage, tx *pb.Transaction) (chan *pb.ChaincodeMessage, error) {
txctx, err := handler.createTxContext(msg.Txid, tx)
func (handler *Handler) sendExecuteMessage(ctxt context.Context, msg *pb.ChaincodeMessage, tx *pb.Transaction) (chan *pb.ChaincodeMessage, error) {
txctx, err := handler.createTxContext(ctxt, msg.Txid, tx)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 909b517

Please sign in to comment.