Skip to content

Commit

Permalink
[FAB-17059] Extend private data integration tests to cover case when a
Browse files Browse the repository at this point in the history
new eligible peer with different certs joins the channel and attempts to fetch
private data before processing the config update that adds its certs to
the channel config

Signed-off-by: Danny Cao <dcao@us.ibm.com>
(cherry picked from commit 8dd5e2d)
  • Loading branch information
caod123 authored and denyeart committed Feb 18, 2020
1 parent 0885c2f commit bb46909
Showing 1 changed file with 343 additions and 0 deletions.
343 changes: 343 additions & 0 deletions integration/pvtdata/pvtdata_test.go
Expand Up @@ -10,11 +10,14 @@ import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"

Expand All @@ -29,6 +32,7 @@ import (
cb "github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/ledger/rwset"
"github.com/hyperledger/fabric-protos-go/ledger/rwset/kvrwset"
mspp "github.com/hyperledger/fabric-protos-go/msp"
ab "github.com/hyperledger/fabric-protos-go/orderer"
pb "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/bccsp/sw"
Expand All @@ -42,6 +46,7 @@ import (
"github.com/onsi/gomega/gbytes"
"github.com/onsi/gomega/gexec"
"github.com/tedsuo/ifrit"
"github.com/tedsuo/ifrit/grouper"
)

// The chaincode used in these tests has two collections defined:
Expand Down Expand Up @@ -134,6 +139,228 @@ var _ bool = Describe("PrivateData", func() {
})
})

Describe("Pvtdata behavior when a peer with new certs joins the network", func() {
var (
peerProcesses map[string]ifrit.Process
)

BeforeEach(func() {
By("setting up the network")
network = initThreeOrgsSetup()

By("starting the network")
peerProcesses = make(map[string]ifrit.Process)
network.Bootstrap()

members := grouper.Members{
{Name: "brokers", Runner: network.BrokerGroupRunner()},
{Name: "orderers", Runner: network.OrdererGroupRunner()},
}
networkRunner := grouper.NewOrdered(syscall.SIGTERM, members)
process = ifrit.Invoke(networkRunner)
Eventually(process.Ready()).Should(BeClosed())

org1peer0 := network.Peer("Org1", "peer0")
org2peer0 := network.Peer("Org2", "peer0")
org3peer0 := network.Peer("Org3", "peer0")

testPeers := []*nwo.Peer{org1peer0, org2peer0, org3peer0}
for _, peer := range testPeers {
pr := network.PeerRunner(peer)
p := ifrit.Invoke(pr)
peerProcesses[peer.ID()] = p
Eventually(p.Ready(), network.EventuallyTimeout).Should(BeClosed())
}

orderer = network.Orderer("orderer")
network.CreateAndJoinChannel(orderer, channelID)
network.UpdateChannelAnchors(orderer, channelID)

By("verifying membership")
network.VerifyMembership(network.Peers, channelID)

By("installing and instantiating chaincode on all peers")
testChaincode := chaincode{
Chaincode: nwo.Chaincode{
Name: "marblesp",
Version: "1.0",
Path: "github.com/hyperledger/fabric/integration/chaincode/marbles_private/cmd",
Ctor: `{"Args":["init"]}`,
Policy: `OR ('Org1MSP.member','Org2MSP.member', 'Org3MSP.member')`,
CollectionsConfig: filepath.Join("testdata", "collection_configs", "collections_config1.json"),
},
isLegacy: true,
}
deployChaincode(network, orderer, testChaincode)

By("adding marble1 with an org 1 peer as endorser")
peer := network.Peer("Org1", "peer0")
marbleDetails := `{"name":"marble1", "color":"blue", "size":35, "owner":"tom", "price":99}`
addMarble(network, orderer, testChaincode.Name, marbleDetails, peer)

By("waiting for block to propagate")
nwo.WaitUntilEqualLedgerHeight(network, channelID, nwo.GetLedgerHeight(network, network.Peers[0], channelID), network.Peers...)

org2Peer1 := &nwo.Peer{
Name: "peer1",
Organization: "Org2",
Channels: []*nwo.PeerChannel{}, // Don't set channels here so the UpdateConfig call doesn't try to fetch blocks for org2Peer1 with the default Admin user
}
network.Peers = append(network.Peers, org2Peer1)
})

AfterEach(func() {
for _, peerProcess := range peerProcesses {
if peerProcess != nil {
peerProcess.Signal(syscall.SIGTERM)
Eventually(peerProcess.Wait(), network.EventuallyTimeout).Should(Receive())
}
}
})

It("verifies private data is pulled when joining a new peer with new certs", func() {
By("generating new certs for org2Peer1")
org2Peer1 := network.Peer("Org2", "peer1")
tempCryptoDir, err := ioutil.TempDir("", "crypto")
Expect(err).NotTo(HaveOccurred())
defer os.RemoveAll(tempCryptoDir)
generateNewCertsForPeer(network, tempCryptoDir, org2Peer1)

By("updating the channel config with the new certs")
updateConfigWithNewCertsForPeer(network, tempCryptoDir, orderer, org2Peer1)

By("starting the peer1.org2 process")
pr := network.PeerRunner(org2Peer1)
p := ifrit.Invoke(pr)
peerProcesses[org2Peer1.ID()] = p
Eventually(p.Ready(), network.EventuallyTimeout).Should(BeClosed())

By("joining peer1.org2 to the channel with its Admin2 user")
tempFile, err := ioutil.TempFile("", "genesis-block")
Expect(err).NotTo(HaveOccurred())
tempFile.Close()
defer os.Remove(tempFile.Name())

sess, err := network.PeerUserSession(org2Peer1, "Admin2", commands.ChannelFetch{
Block: "0",
ChannelID: channelID,
Orderer: network.OrdererAddress(orderer, nwo.ListenPort),
OutputFile: tempFile.Name(),
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))

sess, err = network.PeerUserSession(org2Peer1, "Admin2", commands.ChannelJoin{
BlockPath: tempFile.Name(),
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))

org2Peer1.Channels = append(org2Peer1.Channels, &nwo.PeerChannel{Name: channelID, Anchor: false})

ledgerHeight := nwo.GetLedgerHeight(network, network.Peers[0], channelID)

By("fetching latest blocks to peer1.org2")
// Retry channel fetch until peer1.org2 retrieves latest block
// Channel Fetch will repeatedly fail until org2Peer1 commits the config update adding its new cert
Eventually(fetchBlocksForPeer(network, org2Peer1, "Admin2"), network.EventuallyTimeout).Should(gbytes.Say(fmt.Sprintf("Received block: %d", ledgerHeight-1)))

By("installing chaincode on peer1.org2 to be able to query it")
chaincode := nwo.Chaincode{
Name: "marblesp",
Version: "1.0",
Path: "github.com/hyperledger/fabric/integration/chaincode/marbles_private/cmd",
Ctor: `{"Args":["init"]}`,
Policy: `OR ('Org1MSP.member','Org2MSP.member', 'Org3MSP.member')`,
CollectionsConfig: filepath.Join("testdata", "collection_configs", "collections_config1.json")}

sess, err = network.PeerUserSession(org2Peer1, "Admin2", commands.ChaincodeInstallLegacy{
Name: chaincode.Name,
Version: chaincode.Version,
Path: chaincode.Path,
Lang: chaincode.Lang,
PackageFile: chaincode.PackageFile,
ClientAuth: network.ClientAuthRequired,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))

sess, err = network.PeerUserSession(org2Peer1, "Admin2", commands.ChaincodeListInstalledLegacy{
ClientAuth: network.ClientAuthRequired,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess).To(gbytes.Say(fmt.Sprintf("Name: %s, Version: %s,", chaincode.Name, chaincode.Version)))

expectedPeers := []*nwo.Peer{
network.Peer("Org1", "peer0"),
network.Peer("Org2", "peer0"),
network.Peer("Org2", "peer1"),
network.Peer("Org3", "peer0"),
}

By("making sure all peers have the same ledger height")
for _, peer := range expectedPeers {
Eventually(func() int {
var (
sess *gexec.Session
err error
)
if peer.ID() == "Org2.peer1" {
// use Admin2 user for peer1.org2
sess, err = network.PeerUserSession(peer, "Admin2", commands.ChannelInfo{
ChannelID: channelID,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))
channelInfoStr := strings.TrimPrefix(string(sess.Buffer().Contents()[:]), "Blockchain info:")
var channelInfo = cb.BlockchainInfo{}
err = json.Unmarshal([]byte(channelInfoStr), &channelInfo)
Expect(err).NotTo(HaveOccurred())
return int(channelInfo.Height)
}

// If not Org2.peer1, just use regular getLedgerHeight call with User1
return nwo.GetLedgerHeight(network, peer, channelID)
}(), network.EventuallyTimeout).Should(Equal(ledgerHeight))
}

By("verifying membership")
expectedDiscoveredPeers := make([]nwo.DiscoveredPeer, 0, len(expectedPeers))
for _, peer := range expectedPeers {
expectedDiscoveredPeers = append(expectedDiscoveredPeers, network.DiscoveredPeer(peer, "_lifecycle", "marblesp"))
}
for _, peer := range expectedPeers {
By(fmt.Sprintf("checking expected peers for peer: %s", peer.ID()))
if peer.ID() == "Org2.peer1" {
// use Admin2 user for peer1.org2
Eventually(nwo.DiscoverPeers(network, peer, "Admin2", channelID), network.EventuallyTimeout).Should(ConsistOf(expectedDiscoveredPeers))
} else {
Eventually(nwo.DiscoverPeers(network, peer, "User1", channelID), network.EventuallyTimeout).Should(ConsistOf(expectedDiscoveredPeers))
}
}

By("verifying peer1.org2 got the private data that was created historically")
sess, err = network.PeerUserSession(org2Peer1, "Admin2", commands.ChaincodeQuery{
ChannelID: channelID,
Name: "marblesp",
Ctor: `{"Args":["readMarble","marble1"]}`,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess).To(gbytes.Say(`{"docType":"marble","name":"marble1","color":"blue","size":35,"owner":"tom"}`))

sess, err = network.PeerUserSession(org2Peer1, "Admin2", commands.ChaincodeQuery{
ChannelID: channelID,
Name: "marblesp",
Ctor: `{"Args":["readMarblePrivateDetails","marble1"]}`,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess).To(gbytes.Say(`{"docType":"marblePrivateDetails","name":"marble1","price":99}`))
})
})

Describe("Pvtdata behavior with untouched peer configs", func() {
var (
legacyChaincode nwo.Chaincode
Expand Down Expand Up @@ -1433,3 +1660,119 @@ func getValueForCollectionMarblePrivateDetails(marbleName string, price int) []b
marbleJSONasString := `{"docType":"marblePrivateDetails","name":"` + marbleName + `","price":` + strconv.Itoa(price) + `}`
return []byte(marbleJSONasString)
}

// fetchBlocksForPeer attempts to fetch the newest block on the given peer.
// It skips the orderer and returns the session's Err buffer for parsing.
func fetchBlocksForPeer(n *nwo.Network, peer *nwo.Peer, user string) func() *gbytes.Buffer {
return func() *gbytes.Buffer {
sess, err := n.PeerUserSession(peer, user, commands.ChannelFetch{
Block: "newest",
ChannelID: channelID,
OutputFile: filepath.Join(n.RootDir, "newest_block.pb"),
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit())
return sess.Err
}
}

// updateConfigWithNewCertsForPeer updates the channel config with new certs for the designated peer
func updateConfigWithNewCertsForPeer(network *nwo.Network, tempCryptoDir string, orderer *nwo.Orderer, peer *nwo.Peer) {
org := network.Organization(peer.Organization)

By("fetching the channel policy")
currentConfig := nwo.GetConfig(network, network.Peers[0], orderer, channelID)
updatedConfig := proto.Clone(currentConfig).(*cb.Config)

By("parsing the old and new MSP configs")
oldConfig := &mspp.MSPConfig{}
err := proto.Unmarshal(
updatedConfig.ChannelGroup.Groups["Application"].Groups[org.Name].Values["MSP"].Value,
oldConfig)
Expect(err).NotTo(HaveOccurred())

tempOrgMSPPath := filepath.Join(tempCryptoDir, "peerOrganizations", org.Domain, "msp")
newConfig, err := msp.GetVerifyingMspConfig(tempOrgMSPPath, org.MSPID, "bccsp")
Expect(err).NotTo(HaveOccurred())
oldMspConfig := &mspp.FabricMSPConfig{}
newMspConfig := &mspp.FabricMSPConfig{}
err = proto.Unmarshal(oldConfig.Config, oldMspConfig)
Expect(err).NotTo(HaveOccurred())
err = proto.Unmarshal(newConfig.Config, newMspConfig)
Expect(err).NotTo(HaveOccurred())

By("merging the two MSP configs")
updateOldMspConfigWithNewMspConfig(oldMspConfig, newMspConfig)

By("updating the channel config")
updatedConfig.ChannelGroup.Groups["Application"].Groups[org.Name].Values["MSP"].Value = protoutil.MarshalOrPanic(
&mspp.MSPConfig{
Type: oldConfig.Type,
Config: protoutil.MarshalOrPanic(oldMspConfig),
})
nwo.UpdateConfig(network, orderer, channelID, currentConfig, updatedConfig, false, network.Peer(org.Name, "peer0"))
}

// updateOldMspConfigWithNewMspConfig updates the oldMspConfig with certs from the newMspConfig
func updateOldMspConfigWithNewMspConfig(oldMspConfig, newMspConfig *mspp.FabricMSPConfig) {
oldMspConfig.RootCerts = append(oldMspConfig.RootCerts, newMspConfig.RootCerts...)
oldMspConfig.TlsRootCerts = append(oldMspConfig.TlsRootCerts, newMspConfig.TlsRootCerts...)
oldMspConfig.FabricNodeOus.PeerOuIdentifier.Certificate = nil
oldMspConfig.FabricNodeOus.ClientOuIdentifier.Certificate = nil
oldMspConfig.FabricNodeOus.AdminOuIdentifier.Certificate = nil
}

// generateNewCertsForPeer generates new certs with cryptogen for the designated peer and copies
// the necessary certs to the original crypto dir as well as creating an Admin2 user to use for
// any peer operations involving the peer
func generateNewCertsForPeer(network *nwo.Network, tempCryptoDir string, peer *nwo.Peer) {
sess, err := network.Cryptogen(commands.Generate{
Config: network.CryptoConfigPath(),
Output: tempCryptoDir,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))

By("copying the new msp certs for the peer to the original crypto dir")
oldPeerMSPPath := network.PeerLocalMSPDir(peer)
org := network.Organization(peer.Organization)
tempPeerMSPPath := filepath.Join(
tempCryptoDir,
"peerOrganizations",
org.Domain,
"peers",
fmt.Sprintf("%s.%s", peer.Name, org.Domain),
"msp",
)
os.RemoveAll(oldPeerMSPPath)
err = exec.Command("cp", "-r", tempPeerMSPPath, oldPeerMSPPath).Run()
Expect(err).NotTo(HaveOccurred())

// This lets us keep the old user certs for the org for any peers still remaining in the org
// using the old certs
By("copying the new Admin user cert to the original user certs dir as Admin2")
oldAdminUserPath := filepath.Join(
network.RootDir,
"crypto",
"peerOrganizations",
org.Domain,
"users",
fmt.Sprintf("Admin2@%s", org.Domain),
)
tempAdminUserPath := filepath.Join(
tempCryptoDir,
"peerOrganizations",
org.Domain,
"users",
fmt.Sprintf("Admin@%s", org.Domain),
)
os.RemoveAll(oldAdminUserPath)
err = exec.Command("cp", "-r", tempAdminUserPath, oldAdminUserPath).Run()
Expect(err).NotTo(HaveOccurred())
// We need to rename the signcert from Admin to Admin2 as well
err = os.Rename(
filepath.Join(oldAdminUserPath, "msp", "signcerts", fmt.Sprintf("Admin@%s-cert.pem", org.Domain)),
filepath.Join(oldAdminUserPath, "msp", "signcerts", fmt.Sprintf("Admin2@%s-cert.pem", org.Domain)),
)
Expect(err).NotTo(HaveOccurred())
}

0 comments on commit bb46909

Please sign in to comment.