Skip to content

Commit

Permalink
[FAB-7581] Enabling couchdb index creation
Browse files Browse the repository at this point in the history
This CR introduces
1) 'ChaincodeLifecycleEventListener' - this needs to be implemented
by couchdb implementation in order to get a chance to create indexes
upon chaincode install or deploy. This function will be invoked for a
chain + chaincode combination only if the latest chaincode event
(i.e., deploy on a chain or install on peer) makes the state such that
the chaincode is deployed on the chain and is installed on the peer.

2) Chaincode event management module - this exposes two functions that
are to be invoked from chaincode deploy and chaincode install path
respectively. Internally, it synchronizes the channel creation,
chaincode install, and chaincode deploy so that we do not miss events
for index creation

3) Functions for getting the status of a chaincode (deployed/installed).
Further, if installed - extract the statedb artifacts from the chaincode
package. These functions are used by the module listed in (2) above

4) statelistener functionality - This allows for registering an arbitrary
listener for state changes over a namesapce. This is used for listening
the changes over 'lscc' namespace that indicate the deployment of one or more
chaincodes in a block

5) Enabling couchdb index creation upon chaincode deploy using the all above

6) In a separate CR, one function (namely, 'HandleChaincodeInstall') needs
to be invoked from chaincode install code path so that index creation is
enabled during chaincode install as well

Change-Id: I928f6ffd10f7985cbef63a70f76da920a4ca4c56
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Jan 15, 2018
1 parent 14b4b0a commit 4fecdbd
Show file tree
Hide file tree
Showing 17 changed files with 726 additions and 19 deletions.
41 changes: 41 additions & 0 deletions core/common/ccprovider/cc_info_provider.go
@@ -0,0 +1,41 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package ccprovider

import (
"bytes"
"fmt"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/common/sysccprovider"
)

// IsChaincodeDeployed returns true if the chaincode with given name and version is deployed
func IsChaincodeDeployed(chainid, ccName, ccVersion string, ccHash []byte) (bool, error) {
sccprovider := sysccprovider.GetSystemChaincodeProvider()
qe, err := sccprovider.GetQueryExecutorForLedger(chainid)
if err != nil {
return false, fmt.Errorf("Could not retrieve QueryExecutor for channel %s, error %s", chainid, err)
}
defer qe.Done()

chaincodeDataBytes, err := qe.GetState("lscc", ccName)
if err != nil {
return false, fmt.Errorf("Could not retrieve state for chaincode %s on channel %s, error %s", ccName, chainid, err)
}

if chaincodeDataBytes == nil {
return false, nil
}

chaincodeData := &ChaincodeData{}
err = proto.Unmarshal(chaincodeDataBytes, chaincodeData)
if err != nil {
return false, fmt.Errorf("Unmarshalling ChaincodeQueryResponse failed, error %s", err)
}
return chaincodeData.CCVersion() == ccVersion && bytes.Equal(chaincodeData.Hash(), ccHash), nil
}
76 changes: 76 additions & 0 deletions core/common/ccprovider/cc_statedb_artifacts_provider.go
@@ -0,0 +1,76 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package ccprovider

import (
"archive/tar"
"bytes"
"compress/gzip"
"io"
"strings"
)

const (
ccPackageStatedbDir = "META-INF/statedb/"
)

// ExtractStatedbArtifactsAsTarbytes extracts the statedb artifacts from the code package tar and create a statedb artifact tar.
// The state db artifacts are expected to contain state db specific artifacts such as index specification in the case of couchdb.
// This function is intented to be used during chaincode instantiate/upgrade so that statedb artifacts can be created.
func ExtractStatedbArtifactsAsTarbytes(ccname, ccversion string) (installed bool, statedbArtifactsTar []byte, err error) {
ccpackage, err := GetChaincodeFromFS(ccname, ccversion)
if err != nil {
// TODO for now, we assume that an error indicates that the chaincode is not installed on the peer.
// However, we need a way to differentiate between the 'not installed' and a general error so that on general error,
// we can abort the chaincode instantiate/upgrade/install operation.
ccproviderLogger.Info("Error while loading installation package for ccname=%s, ccversion=%s. Err=%s", ccname, ccversion, err)
return false, nil, nil
}

cds := ccpackage.GetDepSpec()
is := bytes.NewReader(cds.CodePackage)
gr, err := gzip.NewReader(is)
if err != nil {
ccproviderLogger.Errorf("Failure opening codepackage gzip stream: %s", err)
return true, nil, err
}
tr := tar.NewReader(gr)
statedbTarBuffer := bytes.NewBuffer(nil)
tw := tar.NewWriter(statedbTarBuffer)

// For each file in the code package tar,
// add it to the statedb artifact tar if it has "statedb" in the path
for {
header, err := tr.Next()
if err == io.EOF {
// We only get here if there are no more entries to scan
break
}

if err != nil {
return true, nil, err
}
ccproviderLogger.Debugf("header.Name = %s", header.Name)
if !strings.HasPrefix(header.Name, ccPackageStatedbDir) {
continue
}
if err = tw.WriteHeader(header); err != nil {
ccproviderLogger.Error("Error adding header to statedb tar:", err, header.Name)
return true, nil, err
}
if _, err := io.Copy(tw, tr); err != nil {
ccproviderLogger.Error("Error copying file to statedb tar:", err, header.Name)
return true, nil, err
}
ccproviderLogger.Debug("Wrote file to statedb tar:", header.Name)
}
if err = tw.Close(); err != nil {
return true, nil, err
}
ccproviderLogger.Debug("Created statedb artifact tar")
return true, statedbTarBuffer.Bytes(), nil
}
54 changes: 54 additions & 0 deletions core/ledger/cceventmgmt/defs.go
@@ -0,0 +1,54 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package cceventmgmt

import (
"fmt"

"github.com/hyperledger/fabric/core/common/ccprovider"
)

// ChaincodeDefinition captures the info about chaincode
type ChaincodeDefinition struct {
Name string
Hash []byte
Version string
}

func (cdef *ChaincodeDefinition) String() string {
return fmt.Sprintf("Name=%s, Version=%s, Hash=%#v", cdef.Name, cdef.Version, cdef.Hash)
}

// ChaincodeLifecycleEventListener interface enables ledger components (mainly, intended for statedb)
// to be able to listen to chaincode lifecycle events. 'dbArtifactsTar' represents db specific artifacts
// (such as index specs) packaged in a tar
type ChaincodeLifecycleEventListener interface {
// HandleChaincodeDeploy is expected to creates all the necessary statedb structures (such as indexes)
HandleChaincodeDeploy(chaincodeDefinition *ChaincodeDefinition, dbArtifactsTar []byte) error
}

// ChaincodeInfoProvider interface enables event mgr to retrieve chaincode info for a given chaincode
type ChaincodeInfoProvider interface {
// IsChaincodeDeployed returns true if the given chaincode is deployed on the given channel
IsChaincodeDeployed(chainid string, chaincodeDefinition *ChaincodeDefinition) (bool, error)
// RetrieveChaincodeArtifacts checks if the given chaincode is installed on the peer and if yes,
// it extracts the state db specific artifacts from the chaincode package tarball
RetrieveChaincodeArtifacts(chaincodeDefinition *ChaincodeDefinition) (installed bool, dbArtifactsTar []byte, err error)
}

type chaincodeInfoProviderImpl struct {
}

// IsChaincodeDeployed implements function in the interface ChaincodeInfoProvider
func (p *chaincodeInfoProviderImpl) IsChaincodeDeployed(chainid string, chaincodeDefinition *ChaincodeDefinition) (bool, error) {
return ccprovider.IsChaincodeDeployed(chainid, chaincodeDefinition.Name, chaincodeDefinition.Version, chaincodeDefinition.Hash)
}

// RetrieveChaincodeArtifacts implements function in the interface ChaincodeInfoProvider
func (p *chaincodeInfoProviderImpl) RetrieveChaincodeArtifacts(chaincodeDefinition *ChaincodeDefinition) (installed bool, dbArtifactsTar []byte, err error) {
return ccprovider.ExtractStatedbArtifactsAsTarbytes(chaincodeDefinition.Name, chaincodeDefinition.Version)
}
40 changes: 40 additions & 0 deletions core/ledger/cceventmgmt/lsccstate_listener.go
@@ -0,0 +1,40 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package cceventmgmt

import (
"fmt"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/common/ccprovider"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
)

// KVLedgerLSCCStateListener listens for state changes on 'lscc' namespace
type KVLedgerLSCCStateListener struct {
}

// HandleStateUpdates iterates over key-values being written in the 'lscc' namespace (which indicates deployment of a chaincode)
// and invokes `HandleChaincodeDeploy` function on chaincode event manager (which in turn is responsible for creation of statedb
// artifacts for the chaincode statedata)
func (listener *KVLedgerLSCCStateListener) HandleStateUpdates(channelName string, stateUpdates ledger.StateUpdates) error {
kvWrites := stateUpdates.([]*kvrwset.KVWrite)
logger.Debugf("HandleStateUpdates() - channelName=%s, stateUpdates=%#v", channelName, kvWrites)
chaincodeDefs := []*ChaincodeDefinition{}
for _, kvWrite := range kvWrites {
if kvWrite.IsDelete {
continue
}
chaincodeData := &ccprovider.ChaincodeData{}
if err := proto.Unmarshal(kvWrite.Value, chaincodeData); err != nil {
return fmt.Errorf("Unmarshalling ChaincodeQueryResponse failed, error %s", err)
}
chaincodeDefs = append(chaincodeDefs, &ChaincodeDefinition{Name: chaincodeData.CCName(), Version: chaincodeData.CCVersion(), Hash: chaincodeData.Hash()})
}
return GetMgr().HandleChaincodeDeploy(channelName, chaincodeDefs)
}
146 changes: 146 additions & 0 deletions core/ledger/cceventmgmt/mgmt_test.go
@@ -0,0 +1,146 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package cceventmgmt

import (
"os"
"testing"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/common/ccprovider"
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
"github.com/stretchr/testify/assert"
)

func TestMain(m *testing.M) {
flogging.SetModuleLevel("eventmgmt", "debug")
os.Exit(m.Run())
}
func TestCCEventMgmt(t *testing.T) {
cc1Def := &ChaincodeDefinition{Name: "cc1", Version: "v1", Hash: []byte("cc1")}
cc1DBArtifactsTar := []byte("cc1DBArtifacts")

cc2Def := &ChaincodeDefinition{Name: "cc2", Version: "v1", Hash: []byte("cc2")}
cc2DBArtifactsTar := []byte("cc2DBArtifacts")

cc3Def := &ChaincodeDefinition{Name: "cc3", Version: "v1", Hash: []byte("cc3")}
cc3DBArtifactsTar := []byte("cc3DBArtifacts")

// cc1 is deployed and installed. cc2 is deployed but not installed. cc3 is not deployed but installed
mockProvider := newMockProvider()
mockProvider.setChaincodeInstalled(cc1Def, cc1DBArtifactsTar)
mockProvider.setChaincodeDeployed("channel1", cc1Def)
mockProvider.setChaincodeDeployed("channel1", cc2Def)
mockProvider.setChaincodeInstalled(cc3Def, cc3DBArtifactsTar)
setEventMgrForTest(newMgr(mockProvider))
defer clearEventMgrForTest()

handler1, handler2 := &mockHandler{}, &mockHandler{}
eventMgr := GetMgr()
assert.NotNil(t, eventMgr)
eventMgr.Register("channel1", handler1)
eventMgr.Register("channel2", handler2)

cc2ExpectedEvent := &mockEvent{cc2Def, cc2DBArtifactsTar}
cc3ExpectedEvent := &mockEvent{cc3Def, cc3DBArtifactsTar}

// Deploy cc3 on chain1 - only handler1 should recieve event because cc3 is being deployed only on chain1
eventMgr.HandleChaincodeDeploy("channel1", []*ChaincodeDefinition{cc3Def})
assert.Contains(t, handler1.eventsRecieved, cc3ExpectedEvent)
assert.NotContains(t, handler2.eventsRecieved, cc3ExpectedEvent)

// Deploy cc3 on chain2 as well and this time handler2 should also recieve event
eventMgr.HandleChaincodeDeploy("channel2", []*ChaincodeDefinition{cc3Def})
assert.Contains(t, handler2.eventsRecieved, cc3ExpectedEvent)

// Install CC2 - only handler1 should receive event because cc2 is deployed only on chain1 and not on chain2
eventMgr.HandleChaincodeInstall(cc2Def, cc2DBArtifactsTar)
assert.Contains(t, handler1.eventsRecieved, cc2ExpectedEvent)
assert.NotContains(t, handler2.eventsRecieved, cc2ExpectedEvent)
}

func TestLSCCListener(t *testing.T) {
channelName := "testChannel"
cc1Def := &ChaincodeDefinition{Name: "testChaincode", Version: "v1", Hash: []byte("hash_testChaincode")}
cc1DBArtifactsTar := []byte("cc1DBArtifacts")
// cc1 is installed but not deployed
mockProvider := newMockProvider()
mockProvider.setChaincodeInstalled(cc1Def, cc1DBArtifactsTar)
setEventMgrForTest(newMgr(mockProvider))
defer clearEventMgrForTest()
handler1 := &mockHandler{}
GetMgr().Register(channelName, handler1)
lsccStateListener := &KVLedgerLSCCStateListener{}

sampleChaincodeData := &ccprovider.ChaincodeData{Name: cc1Def.Name, Version: cc1Def.Version, Id: cc1Def.Hash}
sampleChaincodeDataBytes, err := proto.Marshal(sampleChaincodeData)
assert.NoError(t, err, "")
lsccStateListener.HandleStateUpdates(channelName, []*kvrwset.KVWrite{
&kvrwset.KVWrite{Key: cc1Def.Name, Value: sampleChaincodeDataBytes},
})
assert.Contains(t, handler1.eventsRecieved, &mockEvent{cc1Def, cc1DBArtifactsTar})
}

type mockProvider struct {
chaincodesDeployed map[[3]string]bool
chaincodesInstalled map[[2]string][]byte
}

type mockHandler struct {
eventsRecieved []*mockEvent
}

type mockEvent struct {
def *ChaincodeDefinition
dbArtifactsTar []byte
}

func (l *mockHandler) HandleChaincodeDeploy(chaincodeDefinition *ChaincodeDefinition, dbArtifactsTar []byte) error {
l.eventsRecieved = append(l.eventsRecieved, &mockEvent{def: chaincodeDefinition, dbArtifactsTar: dbArtifactsTar})
return nil
}

func newMockProvider() *mockProvider {
return &mockProvider{
make(map[[3]string]bool),
make(map[[2]string][]byte),
}
}

func (p *mockProvider) setChaincodeDeployed(chainid string, chaincodeDefinition *ChaincodeDefinition) {
p.chaincodesDeployed[[3]string{chainid, chaincodeDefinition.Name, chaincodeDefinition.Version}] = true
}

func (p *mockProvider) setChaincodeInstalled(chaincodeDefinition *ChaincodeDefinition, dbArtifactsTar []byte) {
p.chaincodesInstalled[[2]string{chaincodeDefinition.Name, chaincodeDefinition.Version}] = dbArtifactsTar
}

func (p *mockProvider) setChaincodeDeployAndInstalled(chainid string, chaincodeDefinition *ChaincodeDefinition, dbArtifactsTar []byte) {
p.setChaincodeDeployed(chainid, chaincodeDefinition)
p.setChaincodeInstalled(chaincodeDefinition, dbArtifactsTar)
}

func (p *mockProvider) IsChaincodeDeployed(chainid string, chaincodeDefinition *ChaincodeDefinition) (bool, error) {
return p.chaincodesDeployed[[3]string{chainid, chaincodeDefinition.Name, chaincodeDefinition.Version}], nil
}

func (p *mockProvider) RetrieveChaincodeArtifacts(chaincodeDefinition *ChaincodeDefinition) (installed bool, dbArtifactsTar []byte, err error) {
dbArtifactsTar, ok := p.chaincodesInstalled[[2]string{chaincodeDefinition.Name, chaincodeDefinition.Version}]
if !ok {
return false, nil, nil
}
return true, dbArtifactsTar, nil
}

func setEventMgrForTest(eventMgr *Mgr) {
mgr = eventMgr
}

func clearEventMgrForTest() {
mgr = nil
}

0 comments on commit 4fecdbd

Please sign in to comment.