Skip to content

Commit

Permalink
FAB-2055 GetHistoryForKey() returns timestamp
Browse files Browse the repository at this point in the history
Updated Ledger's GetHistoryForKey() API to return key, value,
timestamp and delete marker. Further, query response from ledger
is directly forwarded to shim layer to cast the response
appropriately and return to chaincode as a struct.
We have made the following changes to realize this.

(i) Renamed QueryStateResponse proto message to QueryResponse.
The QueryResult from ledger is converted to a 2D byte array (encoding
using gob) and appropriately set in QueryResponse struct. Hence, the
QueryResponse is consistent across different query types.
This approach reduces the repetition of code in peer's handler.go
that process the QueryResult from ledger.

(iii) Introduced two types of iterator in chaincode shim: One
for iterating over QueryResult of range/execute query and another for
iterating over QueryResult of history query. Now, these iterator
processes QueryResult (i.e., 2D byte array, decoded using gob) and
returns a struct to chaincode:
   - ledger.KV struct for range and execute query,
   - ledger.KeyModification struct for history query.
As a result, chaincode examples (map.go, marbles_chaincode.go)
and mockstub have been modified

(iv) Added test for historyQueries (from shim layer)

(v) As java shim is significnatly behind go shim, I have
commented out range query functionality for now. As a
separate changeset, need to update java shim's range
query with updated chaincode proto and add other query
functionalities.

Change-Id: I5e0cfd5f712f686361f8610d69db57646ad52b4f
Signed-off-by: senthil <cendhu@gmail.com>
  • Loading branch information
cendhu committed Apr 1, 2017
1 parent 375ccf1 commit b2f9d56
Show file tree
Hide file tree
Showing 22 changed files with 530 additions and 347 deletions.
58 changes: 58 additions & 0 deletions core/chaincode/exectransaction_test.go
Expand Up @@ -138,8 +138,13 @@ func startTxSimulation(ctxt context.Context, chainID string) (context.Context, l
if err != nil {
return nil, nil, err
}
historyQueryExecutor, err := lgr.NewHistoryQueryExecutor()
if err != nil {
return nil, nil, err
}

ctxt = context.WithValue(ctxt, TXSimulatorKey, txsim)
ctxt = context.WithValue(ctxt, HistoryQueryExecutorKey, historyQueryExecutor)
return ctxt, txsim, nil
}

Expand Down Expand Up @@ -1137,6 +1142,59 @@ func TestQueries(t *testing.T) {
//Reset the query limit to default
viper.Set("ledger.state.queryLimit", 10000)

if ledgerconfig.IsHistoryDBEnabled() == true {

f = "put"
args = util.ToChaincodeArgs(f, "marble12", "{\"docType\":\"marble\",\"name\":\"marble12\",\"color\":\"red\",\"size\":30,\"owner\":\"jerry\"}")
spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber)
nextBlockNumber++
if err != nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", ccID, err)
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
return
}

f = "put"
args = util.ToChaincodeArgs(f, "marble12", "{\"docType\":\"marble\",\"name\":\"marble12\",\"color\":\"red\",\"size\":30,\"owner\":\"jerry\"}")
spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, _, err = invoke(ctxt, chainID, spec, nextBlockNumber)
nextBlockNumber++
if err != nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", ccID, err)
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
return
}

//The following history query for "marble12" should return 3 records
f = "history"
args = util.ToChaincodeArgs(f, "marble12")

spec = &pb.ChaincodeSpec{Type: 1, ChaincodeId: cID, Input: &pb.ChaincodeInput{Args: args}}
_, _, retval, err := invoke(ctxt, chainID, spec, nextBlockNumber)
nextBlockNumber++
if err != nil {
t.Fail()
t.Logf("Error invoking <%s>: %s", ccID, err)
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
return
}

var history []interface{}
err = json.Unmarshal(retval, &history)

//default query limit of 10000 is used, query should return all records that meet the criteria
if len(history) != 3 {
t.Fail()
t.Logf("Error detected with the history query, should have returned 3 but returned %v", len(keys))
theChaincodeSupport.Stop(ctxt, cccid, &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec})
return
}

}

if ledgerconfig.IsCouchDBEnabled() == true {

//The following rich query for should return 9 marbles
Expand Down
203 changes: 88 additions & 115 deletions core/chaincode/handler.go
Expand Up @@ -18,6 +18,7 @@ package chaincode

import (
"bytes"
"encoding/gob"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -193,7 +194,8 @@ func (handler *Handler) createTxContext(ctxt context.Context, chainID string, tx
if handler.txCtxs[txid] != nil {
return nil, fmt.Errorf("txid:%s exists", txid)
}
txctx := &transactionContext{chainID: chainID, signedProp: signedProp, proposal: prop, responseNotifier: make(chan *pb.ChaincodeMessage, 1),
txctx := &transactionContext{chainID: chainID, signedProp: signedProp,
proposal: prop, responseNotifier: make(chan *pb.ChaincodeMessage, 1),
queryIteratorMap: make(map[string]commonledger.ResultsIterator)}
handler.txCtxs[txid] = txctx
txctx.txsimulator = getTxSimulator(ctxt)
Expand Down Expand Up @@ -720,38 +722,20 @@ func (handler *Handler) handleGetStateByRange(msg *pb.ChaincodeMessage) {
}

handler.putQueryIterator(txContext, iterID, rangeIter)
var payload *pb.QueryResponse
payload, err = getQueryResponse(handler, txContext, rangeIter, iterID)

var keysAndValues []*pb.QueryStateKeyValue
var i = 0
var queryLimit = ledgerconfig.GetQueryLimit()
var qresult commonledger.QueryResult
for ; i < queryLimit; i++ {
qresult, err = rangeIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult == nil {
break
}
kv := qresult.(*ledger.KV)
keyAndValue := pb.QueryStateKeyValue{Key: kv.Key, Value: kv.Value}
keysAndValues = append(keysAndValues, &keyAndValue)
}
if qresult != nil {
if err != nil {
rangeIter.Close()
handler.deleteQueryIterator(txContext, iterID)
//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
//following changes to the future paging design.
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get query result. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}

//TODO - HasMore is set to false until the requery issue for the peer is resolved
//FAB-2462 - Re-introduce paging for range queries and rich queries
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID}

payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID}
payloadBytes, err := proto.Marshal(payload)
var payloadBytes []byte
payloadBytes, err = proto.Marshal(payload)
if err != nil {
rangeIter.Close()
handler.deleteQueryIterator(txContext, iterID)
Expand All @@ -769,6 +753,60 @@ func (handler *Handler) handleGetStateByRange(msg *pb.ChaincodeMessage) {
}()
}

func getBytes(qresult interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(qresult)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

//getQueryResponse takes an iterator and fetch state to construct QueryResponse
func getQueryResponse(handler *Handler, txContext *transactionContext, iter commonledger.ResultsIterator,
iterID string) (*pb.QueryResponse, error) {

var i = 0
var err error
var queryLimit = ledgerconfig.GetQueryLimit()
var queryResult commonledger.QueryResult
var queryResultsBytes []*pb.QueryResultBytes

for ; i < queryLimit; i++ {
queryResult, err = iter.Next()
if err != nil {
return nil, err
}
if queryResult == nil {
break
}
var resultBytes []byte
resultBytes, err = getBytes(queryResult)
if err != nil {
return nil, err
}

qresultBytes := pb.QueryResultBytes{ResultBytes: resultBytes}
queryResultsBytes = append(queryResultsBytes, &qresultBytes)
}

if queryResult == nil {
iter.Close()
handler.deleteQueryIterator(txContext, iterID)
} else {
//TODO: remove this else part completely when paging design is implemented
//FAB-2462 - Re-introduce paging for range queries and rich queries
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
iter.Close()
handler.deleteQueryIterator(txContext, iterID)
}
//TODO - HasMore is set to false until the requery issue for the peer is resolved
//FAB-2462 - Re-introduce paging for range queries and rich queries
//payload := &pb.QueryResponse{Data: data, HasMore: qresult != nil, Id: iterID}
return &pb.QueryResponse{Results: queryResultsBytes, HasMore: false, Id: iterID}, nil
}

// afterQueryStateNext handles a QUERY_STATE_NEXT request from the chaincode.
func (handler *Handler) afterQueryStateNext(e *fsm.Event, state string) {
msg, ok := e.Args[0].(*pb.ChaincodeMessage)
Expand Down Expand Up @@ -824,39 +862,17 @@ func (handler *Handler) handleQueryStateNext(msg *pb.ChaincodeMessage) {
return
}

var keysAndValues []*pb.QueryStateKeyValue
var i = 0
var queryLimit = ledgerconfig.GetQueryLimit()

var qresult commonledger.QueryResult
var err error
for ; i < queryLimit; i++ {
qresult, err = queryIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult != nil {
break
}
kv := qresult.(*ledger.KV)
keyAndValue := pb.QueryStateKeyValue{Key: kv.Key, Value: kv.Value}
keysAndValues = append(keysAndValues, &keyAndValue)
}
payload, err := getQueryResponse(handler, txContext, queryIter, queryStateNext.Id)

if qresult != nil {
if err != nil {
queryIter.Close()
handler.deleteQueryIterator(txContext, queryStateNext.Id)

//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
//following changes to the future paging design.
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get query result. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}

//TODO - HasMore is set to false until the requery issue for the peer is resolved
//FAB-2462 - Re-introduce paging for range queries and rich queries
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: queryStateNext.Id}
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: queryStateNext.Id}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
queryIter.Close()
Expand Down Expand Up @@ -927,7 +943,7 @@ func (handler *Handler) handleQueryStateClose(msg *pb.ChaincodeMessage) {
handler.deleteQueryIterator(txContext, queryStateClose.Id)
}

payload := &pb.QueryStateResponse{HasMore: false, Id: queryStateClose.Id}
payload := &pb.QueryResponse{HasMore: false, Id: queryStateClose.Id}
payloadBytes, err := proto.Marshal(payload)
if err != nil {

Expand Down Expand Up @@ -1010,48 +1026,27 @@ func (handler *Handler) handleGetQueryResult(msg *pb.ChaincodeMessage) {
}

handler.putQueryIterator(txContext, iterID, executeIter)
var payload *pb.QueryResponse
payload, err = getQueryResponse(handler, txContext, executeIter, iterID)

var keysAndValues []*pb.QueryStateKeyValue
var i = 0
var queryLimit = ledgerconfig.GetQueryLimit()
var qresult commonledger.QueryResult
for ; i < queryLimit; i++ {
qresult, err = executeIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult == nil {
break
}
queryRecord := qresult.(*ledger.QueryRecord)
keyAndValue := pb.QueryStateKeyValue{Key: queryRecord.Key, Value: queryRecord.Record}
keysAndValues = append(keysAndValues, &keyAndValue)
}

if qresult != nil {
if err != nil {
executeIter.Close()
handler.deleteQueryIterator(txContext, iterID)

//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
//following changes to the future paging design.
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get query result. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}

var payloadBytes []byte

//TODO - HasMore is set to false until the requery issue for the peer is resolved
//FAB-2462 - Re-introduce paging for range queries and rich queries
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID}
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID}
payloadBytes, err = proto.Marshal(payload)
if err != nil {
executeIter.Close()
handler.deleteQueryIterator(txContext, iterID)

// Send error msg back to chaincode. GetState will not trigger event
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed marshall resopnse. Sending %s", pb.ChaincodeMessage_ERROR)
chaincodeLogger.Errorf("Failed marshall response. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}
Expand Down Expand Up @@ -1128,41 +1123,19 @@ func (handler *Handler) handleGetHistoryForKey(msg *pb.ChaincodeMessage) {

handler.putQueryIterator(txContext, iterID, historyIter)

// TODO QueryStateKeyValue can be re-used for now since history records have a string (TxID)
// and value (value). But we'll need to use another structure if we add other fields like timestamp.
var keysAndValues []*pb.QueryStateKeyValue
var i = 0
var queryLimit = ledgerconfig.GetQueryLimit()
var qresult commonledger.QueryResult
for ; i < queryLimit; i++ {
qresult, err = historyIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult == nil {
break
}
queryRecord := qresult.(*ledger.KeyModification)
keyAndValue := pb.QueryStateKeyValue{Key: queryRecord.TxID, Value: queryRecord.Value}
keysAndValues = append(keysAndValues, &keyAndValue)
}
var payload *pb.QueryResponse
payload, err = getQueryResponse(handler, txContext, historyIter, iterID)

if qresult != nil {
if err != nil {
historyIter.Close()
handler.deleteQueryIterator(txContext, iterID)

//TODO log the warning that the queryLimit was exceeded. this will need to be revisited
//following changes to the future paging design.
chaincodeLogger.Warningf("Query limit of %v was exceeded. Not all values meeting the criteria were returned.", queryLimit)
payload := []byte(err.Error())
chaincodeLogger.Errorf("Failed to get query result. Sending %s", pb.ChaincodeMessage_ERROR)
serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
return
}

var payloadBytes []byte

//TODO - HasMore is set to false until the requery issue for the peer is resolved
//FAB-2462 - Re-introduce paging for range queries and rich queries
//payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, Id: iterID}
payload := &pb.QueryStateResponse{KeysAndValues: keysAndValues, HasMore: false, Id: iterID}
payloadBytes, err = proto.Marshal(payload)
if err != nil {
historyIter.Close()
Expand Down

0 comments on commit b2f9d56

Please sign in to comment.