Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

client: add bzz client, update smoke tests #1582

Merged
merged 5 commits into from Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/inspector.go
Expand Up @@ -51,7 +51,7 @@ func (inspector *Inspector) ListKnown() []string {
return res
}

func (inspector *Inspector) IsSyncing() bool {
func (inspector *Inspector) IsPullSyncing() bool {
lastReceivedChunksMsg := metrics.GetOrRegisterGauge("network.stream.received_chunks", nil)

// last received chunks msg time
Expand Down
81 changes: 81 additions & 0 deletions client/bzz.go
@@ -0,0 +1,81 @@
package client

import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethersphere/swarm"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/storage"
)

type Bzz struct {
client *rpc.Client
}

// NewBzz is a constructor for a Bzz API
func NewBzz(client *rpc.Client) *Bzz {
return &Bzz{
client: client,
}
}

// GetChunksBitVector returns a bit vector of presence for a given slice of chunks
func (b *Bzz) GetChunksBitVector(addrs []storage.Address) (string, error) {
var hostChunks string
const trackChunksPageSize = 7500

for len(addrs) > 0 {
var pageChunks string
// get current page size, so that we avoid a slice out of bounds on the last page
pagesize := trackChunksPageSize
if len(addrs) < trackChunksPageSize {
pagesize = len(addrs)
}

err := b.client.Call(&pageChunks, "bzz_has", addrs[:pagesize])
if err != nil {
return "", err
}
hostChunks += pageChunks
addrs = addrs[pagesize:]
}

return hostChunks, nil
}

// GetBzzAddr returns the bzzAddr of the node
func (b *Bzz) GetBzzAddr() (string, error) {
var info swarm.Info

err := b.client.Call(&info, "bzz_info")
if err != nil {
return "", err
}

return info.BzzKey[2:], nil
}

// IsPullSyncing is checking if the node is still receiving chunk deliveries due to pull syncing
func (b *Bzz) IsPullSyncing() (bool, error) {
var isSyncing bool

err := b.client.Call(&isSyncing, "bzz_isPullSyncing")
if err != nil {
log.Error("error calling host for isPullSyncing", "err", err)
return false, err
}

return isSyncing, nil
}

// IsPushSynced checks if the given `tag` is done syncing, i.e. we've received receipts for all chunks
func (b *Bzz) IsPushSynced(tagname string) (bool, error) {
var isSynced bool

err := b.client.Call(&isSynced, "bzz_isPushSynced", tagname)
if err != nil {
log.Error("error calling host for isPushSynced", "err", err)
return false, err
}

return isSynced, nil
}
145 changes: 46 additions & 99 deletions cmd/swarm-smoke/upload_and_sync.go
Expand Up @@ -24,7 +24,6 @@ import (
"io/ioutil"
"math/rand"
"os"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -33,9 +32,11 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/client"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/testutil"
"github.com/pborman/uuid"
"golang.org/x/sync/errgroup"

cli "gopkg.in/urfave/cli.v1"
)
Expand Down Expand Up @@ -117,14 +118,16 @@ func trackChunks(testData []byte, submitMetrics bool) error {
return
}

hostChunks, err := getChunksBitVectorFromHost(rpcClient, addrs)
bzzClient := client.NewBzz(rpcClient)

hostChunks, err := bzzClient.GetChunksBitVector(addrs)
if err != nil {
log.Error("error getting chunks bit vector from host", "err", err, "host", httpHost)
hasErr = true
return
}

bzzAddr, err := getBzzAddrFromHost(rpcClient)
bzzAddr, err := bzzClient.GetBzzAddr()
if err != nil {
log.Error("error getting bzz addrs from host", "err", err, "host", httpHost)
hasErr = true
Expand Down Expand Up @@ -176,46 +179,6 @@ func trackChunks(testData []byte, submitMetrics bool) error {
return nil
}

// getChunksBitVectorFromHost returns a bit vector of presence for a given slice of chunks from a given host
func getChunksBitVectorFromHost(client *rpc.Client, addrs []storage.Address) (string, error) {
var hostChunks string
const trackChunksPageSize = 7500

for len(addrs) > 0 {
var pageChunks string
// get current page size, so that we avoid a slice out of bounds on the last page
pagesize := trackChunksPageSize
if len(addrs) < trackChunksPageSize {
pagesize = len(addrs)
}

err := client.Call(&pageChunks, "bzz_has", addrs[:pagesize])
if err != nil {
return "", err
}
hostChunks += pageChunks
addrs = addrs[pagesize:]
}

return hostChunks, nil
}

// getBzzAddrFromHost returns the bzzAddr for a given host
func getBzzAddrFromHost(client *rpc.Client) (string, error) {
var hive string

err := client.Call(&hive, "bzz_hive")
if err != nil {
return "", err
}

// we make an ugly assumption about the output format of the hive.String() method
// ideally we should replace this with an API call that returns the bzz addr for a given host,
// but this also works for now (provided we don't change the hive.String() method, which we haven't in some time
ss := strings.Split(strings.Split(hive, "\n")[3], " ")
return ss[len(ss)-1], nil
}

// checkChunksVsMostProxHosts is checking:
// 1. whether a chunk has been found at less than 2 hosts. Considering our NN size, this should not happen.
// 2. if a chunk is not found at its closest node. This should also not happen.
Expand Down Expand Up @@ -369,63 +332,30 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error {
return nil
}

func isPushSynced(wsHost string, tagname string) (bool, error) {
rpcClient, err := rpc.Dial(wsHost)
if rpcClient != nil {
defer rpcClient.Close()
}

if err != nil {
log.Error("error dialing host", "err", err)
return false, err
}

var isSynced bool
err = rpcClient.Call(&isSynced, "bzz_isPushSynced", tagname)
if err != nil {
log.Error("error calling host for isPushSynced", "err", err)
return false, err
}

log.Debug("isSynced result", "host", wsHost, "isSynced", isSynced)

return isSynced, nil
}

func isSyncing(wsHost string) (bool, error) {
rpcClient, err := rpc.Dial(wsHost)
if rpcClient != nil {
defer rpcClient.Close()
}

if err != nil {
log.Error("error dialing host", "err", err)
return false, err
}

var isSyncing bool
err = rpcClient.Call(&isSyncing, "bzz_isSyncing")
if err != nil {
log.Error("error calling host for isSyncing", "err", err)
return false, err
}
func waitToPushSynced(tagname string) {
for {
time.Sleep(200 * time.Millisecond)

log.Debug("isSyncing result", "host", wsHost, "isSyncing", isSyncing)
rpcClient, err := rpc.Dial(wsEndpoint(hosts[0]))
if rpcClient != nil {
defer rpcClient.Close()
}
if err != nil {
log.Error("error dialing host", "err", err)
continue
}

return isSyncing, nil
}
bzzClient := client.NewBzz(rpcClient)

func waitToPushSynced(tagname string) {
for {
synced, err := isPushSynced(wsEndpoint(hosts[0]), tagname)
synced, err := bzzClient.IsPushSynced(tagname)
if err != nil {
log.Error(err.Error())
continue
}

if synced {
return
}
time.Sleep(200 * time.Millisecond)
}
}

Expand All @@ -438,22 +368,39 @@ func waitToSync() {
time.Sleep(3 * time.Second)

notSynced := uint64(0)
var wg sync.WaitGroup
wg.Add(len(hosts))

var g errgroup.Group
for i := 0; i < len(hosts); i++ {
i := i
go func(idx int) {
stillSyncing, err := isSyncing(wsEndpoint(hosts[idx]))
g.Go(func() error {
rpcClient, err := rpc.Dial(wsEndpoint(hosts[i]))
if rpcClient != nil {
defer rpcClient.Close()
}
if err != nil {
log.Error("error dialing host", "err", err)
return err
}

bzzClient := client.NewBzz(rpcClient)

stillSyncing, err := bzzClient.IsPullSyncing()
if err != nil {
return err
}

if stillSyncing || err != nil {
if stillSyncing {
atomic.AddUint64(&notSynced, 1)
}
wg.Done()
}(i)

return nil
})
}
wg.Wait()

ns = atomic.LoadUint64(&notSynced)
// Wait for all RPC calls to complete.
if err := g.Wait(); err == nil {
ns = atomic.LoadUint64(&notSynced)
}
}

t2 := time.Since(t1)
Expand Down
66 changes: 66 additions & 0 deletions vendor/golang.org/x/sync/errgroup/errgroup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions vendor/vendor.json
Expand Up @@ -1240,6 +1240,12 @@
"revision": "eb5bcb51f2a31c7d5141d810b70815c05d9c9146",
"revisionTime": "2019-04-03T01:06:53Z"
},
{
"checksumSHA1": "iEK5hCRfrkdc1JOJsaiWuymHmeQ=",
"path": "golang.org/x/sync/errgroup",
"revision": "112230192c580c3556b8cee6403af37a4fc5f28c",
"revisionTime": "2019-04-22T22:11:18Z"
},
{
"checksumSHA1": "FuQoDr6zh5GsiVVyo3oDZcUVC3c=",
"path": "golang.org/x/sync/singleflight",
Expand Down