Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

client: add bzz client, update smoke tests #1582

Merged
merged 5 commits into from Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/inspector.go
Expand Up @@ -51,7 +51,7 @@ func (inspector *Inspector) ListKnown() []string {
return res
}

func (inspector *Inspector) IsSyncing() bool {
func (inspector *Inspector) IsPullSyncing() bool {
lastReceivedChunksMsg := metrics.GetOrRegisterGauge("network.stream.received_chunks", nil)

// last received chunks msg time
Expand Down
86 changes: 86 additions & 0 deletions client/bzz.go
@@ -0,0 +1,86 @@
package client

import (
"strings"

"github.com/ethereum/go-ethereum/rpc"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/storage"
)

type Bzz struct {
client *rpc.Client
}

// NewBzz is a constructor for a Bzz API
func NewBzz(client *rpc.Client) *Bzz {
return &Bzz{
client: client,
}
}

// GetChunksBitVector returns a bit vector of presence for a given slice of chunks
func (b *Bzz) GetChunksBitVector(addrs []storage.Address) (string, error) {
var hostChunks string
const trackChunksPageSize = 7500

for len(addrs) > 0 {
var pageChunks string
// get current page size, so that we avoid a slice out of bounds on the last page
pagesize := trackChunksPageSize
if len(addrs) < trackChunksPageSize {
pagesize = len(addrs)
}

err := b.client.Call(&pageChunks, "bzz_has", addrs[:pagesize])
if err != nil {
return "", err
}
hostChunks += pageChunks
addrs = addrs[pagesize:]
}

return hostChunks, nil
}

// GetBzzAddr returns the bzzAddr of the node
func (b *Bzz) GetBzzAddr() (string, error) {
var hive string

err := b.client.Call(&hive, "bzz_hive")
if err != nil {
return "", err
}

// we make an ugly assumption about the output format of the hive.String() method
Copy link
Contributor

@skylenet skylenet Jul 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also get the BzzAddr via the bzz_info rpc call. From there you want the the BzzKey field.

e.g.

var info swarm.Info
err = client.Call(&swarminfo, "bzz_info")
if err != nil {
 return "", err
}
return info.BzzKey[2:]

// ideally we should replace this with an API call that returns the bzz addr for a given host,
// but this also works for now (provided we don't change the hive.String() method, which we haven't in some time
ss := strings.Split(strings.Split(hive, "\n")[3], " ")
return ss[len(ss)-1], nil
}

// IsPullSyncing is checking if the node is still receiving chunk deliveries due to pull syncing
func (b *Bzz) IsPullSyncing() (bool, error) {
var isSyncing bool

err := b.client.Call(&isSyncing, "bzz_isPullSyncing")
if err != nil {
log.Error("error calling host for isPullSyncing", "err", err)
return false, err
}

return isSyncing, nil
}

// IsPushSynced checks if the given `tag` is done syncing, i.e. we've received receipts for all chunks
func (b *Bzz) IsPushSynced(tagname string) (bool, error) {
var isSynced bool

err := b.client.Call(&isSynced, "bzz_isPushSynced", tagname)
if err != nil {
log.Error("error calling host for isPushSynced", "err", err)
return false, err
}

return isSynced, nil
}
145 changes: 46 additions & 99 deletions cmd/swarm-smoke/upload_and_sync.go
Expand Up @@ -24,7 +24,6 @@ import (
"io/ioutil"
"math/rand"
"os"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -33,9 +32,11 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/client"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/testutil"
"github.com/pborman/uuid"
"golang.org/x/sync/errgroup"

cli "gopkg.in/urfave/cli.v1"
)
Expand Down Expand Up @@ -117,14 +118,16 @@ func trackChunks(testData []byte, submitMetrics bool) error {
return
}

hostChunks, err := getChunksBitVectorFromHost(rpcClient, addrs)
bzzClient := client.NewBzz(rpcClient)

hostChunks, err := bzzClient.GetChunksBitVector(addrs)
if err != nil {
log.Error("error getting chunks bit vector from host", "err", err, "host", httpHost)
hasErr = true
return
}

bzzAddr, err := getBzzAddrFromHost(rpcClient)
bzzAddr, err := bzzClient.GetBzzAddr()
if err != nil {
log.Error("error getting bzz addrs from host", "err", err, "host", httpHost)
hasErr = true
Expand Down Expand Up @@ -176,46 +179,6 @@ func trackChunks(testData []byte, submitMetrics bool) error {
return nil
}

// getChunksBitVectorFromHost returns a bit vector of presence for a given slice of chunks from a given host
func getChunksBitVectorFromHost(client *rpc.Client, addrs []storage.Address) (string, error) {
var hostChunks string
const trackChunksPageSize = 7500

for len(addrs) > 0 {
var pageChunks string
// get current page size, so that we avoid a slice out of bounds on the last page
pagesize := trackChunksPageSize
if len(addrs) < trackChunksPageSize {
pagesize = len(addrs)
}

err := client.Call(&pageChunks, "bzz_has", addrs[:pagesize])
if err != nil {
return "", err
}
hostChunks += pageChunks
addrs = addrs[pagesize:]
}

return hostChunks, nil
}

// getBzzAddrFromHost returns the bzzAddr for a given host
func getBzzAddrFromHost(client *rpc.Client) (string, error) {
var hive string

err := client.Call(&hive, "bzz_hive")
if err != nil {
return "", err
}

// we make an ugly assumption about the output format of the hive.String() method
// ideally we should replace this with an API call that returns the bzz addr for a given host,
// but this also works for now (provided we don't change the hive.String() method, which we haven't in some time
ss := strings.Split(strings.Split(hive, "\n")[3], " ")
return ss[len(ss)-1], nil
}

// checkChunksVsMostProxHosts is checking:
// 1. whether a chunk has been found at less than 2 hosts. Considering our NN size, this should not happen.
// 2. if a chunk is not found at its closest node. This should also not happen.
Expand Down Expand Up @@ -369,63 +332,30 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error {
return nil
}

func isPushSynced(wsHost string, tagname string) (bool, error) {
rpcClient, err := rpc.Dial(wsHost)
if rpcClient != nil {
defer rpcClient.Close()
}

if err != nil {
log.Error("error dialing host", "err", err)
return false, err
}

var isSynced bool
err = rpcClient.Call(&isSynced, "bzz_isPushSynced", tagname)
if err != nil {
log.Error("error calling host for isPushSynced", "err", err)
return false, err
}

log.Debug("isSynced result", "host", wsHost, "isSynced", isSynced)

return isSynced, nil
}

func isSyncing(wsHost string) (bool, error) {
rpcClient, err := rpc.Dial(wsHost)
if rpcClient != nil {
defer rpcClient.Close()
}

if err != nil {
log.Error("error dialing host", "err", err)
return false, err
}

var isSyncing bool
err = rpcClient.Call(&isSyncing, "bzz_isSyncing")
if err != nil {
log.Error("error calling host for isSyncing", "err", err)
return false, err
}
func waitToPushSynced(tagname string) {
for {
time.Sleep(200 * time.Millisecond)

log.Debug("isSyncing result", "host", wsHost, "isSyncing", isSyncing)
rpcClient, err := rpc.Dial(wsEndpoint(hosts[0]))
if rpcClient != nil {
defer rpcClient.Close()
}
if err != nil {
log.Error("error dialing host", "err", err)
continue
}

return isSyncing, nil
}
bzzClient := client.NewBzz(rpcClient)

func waitToPushSynced(tagname string) {
for {
synced, err := isPushSynced(wsEndpoint(hosts[0]), tagname)
synced, err := bzzClient.IsPushSynced(tagname)
if err != nil {
log.Error(err.Error())
continue
}

if synced {
return
}
time.Sleep(200 * time.Millisecond)
}
}

Expand All @@ -438,22 +368,39 @@ func waitToSync() {
time.Sleep(3 * time.Second)

notSynced := uint64(0)
var wg sync.WaitGroup
wg.Add(len(hosts))

var g errgroup.Group
for i := 0; i < len(hosts); i++ {
i := i
go func(idx int) {
stillSyncing, err := isSyncing(wsEndpoint(hosts[idx]))
g.Go(func() error {
rpcClient, err := rpc.Dial(wsEndpoint(hosts[i]))
if rpcClient != nil {
defer rpcClient.Close()
}
if err != nil {
log.Error("error dialing host", "err", err)
return err
}

bzzClient := client.NewBzz(rpcClient)

stillSyncing, err := bzzClient.IsPullSyncing()
if err != nil {
return err
}

if stillSyncing || err != nil {
if stillSyncing {
atomic.AddUint64(&notSynced, 1)
}
wg.Done()
}(i)

return nil
})
}
wg.Wait()

ns = atomic.LoadUint64(&notSynced)
// Wait for all RPC calls to complete.
if err := g.Wait(); err == nil {
ns = atomic.LoadUint64(&notSynced)
}
}

t2 := time.Since(t1)
Expand Down
66 changes: 66 additions & 0 deletions vendor/golang.org/x/sync/errgroup/errgroup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions vendor/vendor.json
Expand Up @@ -1240,6 +1240,12 @@
"revision": "eb5bcb51f2a31c7d5141d810b70815c05d9c9146",
"revisionTime": "2019-04-03T01:06:53Z"
},
{
"checksumSHA1": "iEK5hCRfrkdc1JOJsaiWuymHmeQ=",
"path": "golang.org/x/sync/errgroup",
"revision": "112230192c580c3556b8cee6403af37a4fc5f28c",
"revisionTime": "2019-04-22T22:11:18Z"
},
{
"checksumSHA1": "FuQoDr6zh5GsiVVyo3oDZcUVC3c=",
"path": "golang.org/x/sync/singleflight",
Expand Down