Skip to content

Commit

Permalink
Adding test for failed retries
Browse files Browse the repository at this point in the history
  • Loading branch information
aloknerurkar committed May 6, 2021
1 parent 9e566a8 commit 2798870
Showing 1 changed file with 135 additions and 2 deletions.
137 changes: 135 additions & 2 deletions pkg/pushsync/pushsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,133 @@ func TestPushChunkToNextClosest(t *testing.T) {
}
}

func TestPushChunkToClosestFailedAttemptRetry(t *testing.T) {

// chunk data to upload
chunk := testingc.FixtureChunk("7000")

// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000

peer1 := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")
peer2 := swarm.MustParseHexAddress("5000000000000000000000000000000000000000000000000000000000000000")
peer3 := swarm.MustParseHexAddress("9000000000000000000000000000000000000000000000000000000000000000")
peer4 := swarm.MustParseHexAddress("4000000000000000000000000000000000000000000000000000000000000000")

// peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to
psPeer1, storerPeer1, _, peerAccounting1 := createPushSyncNode(t, peer1, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer1.Close()

psPeer2, storerPeer2, _, peerAccounting2 := createPushSyncNode(t, peer2, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer2.Close()

psPeer3, storerPeer3, _, peerAccounting3 := createPushSyncNode(t, peer3, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer3.Close()

psPeer4, storerPeer4, _, peerAccounting4 := createPushSyncNode(t, peer4, defaultPrices, nil, nil, defaultSigner, mock.WithClosestPeerErr(topology.ErrWantSelf))
defer storerPeer4.Close()

recorder := streamtest.New(
streamtest.WithProtocols(
psPeer1.Protocol(),
psPeer2.Protocol(),
psPeer3.Protocol(),
psPeer4.Protocol(),
),
streamtest.WithBaseAddr(pivotNode),
)

pivotAccounting := accountingmock.NewAccounting(
accountingmock.WithReserveFunc(func(ctx context.Context, peer swarm.Address, price uint64) error {
if peer.String() == peer4.String() {
return nil
}
return errors.New("unable to reserve")
}),
)

psPivot, storerPivot, pivotTags := createPushSyncNodeWithAccounting(t, pivotNode, defaultPrices, recorder, nil, defaultSigner, pivotAccounting, mock.WithPeers(peer1, peer2, peer3, peer4))
defer storerPivot.Close()

ta, err := pivotTags.Create(1)
if err != nil {
t.Fatal(err)
}
chunk = chunk.WithTagID(ta.Uid)

ta1, err := pivotTags.Get(ta.Uid)
if err != nil {
t.Fatal(err)
}

if ta1.Get(tags.StateSent) != 0 || ta1.Get(tags.StateSynced) != 0 {
t.Fatalf("tags initialization error")
}

// Trigger the sending of chunk to the closest node
receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
if err != nil {
t.Fatal(err)
}

if !chunk.Address().Equal(receipt.Address) {
t.Fatal("invalid receipt")
}

// this intercepts the outgoing delivery message
waitOnRecordAndTest(t, peer4, recorder, chunk.Address(), chunk.Data())

// this intercepts the incoming receipt message
waitOnRecordAndTest(t, peer4, recorder, chunk.Address(), nil)

ta2, err := pivotTags.Get(ta.Uid)
if err != nil {
t.Fatal(err)
}
// out of 4, 3 peers should return accouting error. So we should have effectively
// sent only 1 msg
if ta2.Get(tags.StateSent) != 1 {
t.Fatalf("tags error")
}

balance, err := pivotAccounting.Balance(peer4)
if err != nil {
t.Fatal(err)
}

if balance.Int64() != -int64(fixedPrice) {
t.Fatalf("unexpected balance on pivot. want %d got %d", -int64(fixedPrice), balance)
}

balance4, err := peerAccounting4.Balance(pivotNode)
if err != nil {
t.Fatal(err)
}

if balance4.Int64() != int64(fixedPrice) {
t.Fatalf("unexpected balance on peer4. want %d got %d", int64(fixedPrice), balance4)
}

for _, p := range []struct {
addr swarm.Address
acct accounting.Interface
}{
{peer1, peerAccounting1},
{peer2, peerAccounting2},
{peer3, peerAccounting3},
} {
bal, err := p.acct.Balance(p.addr)
if err != nil {
t.Fatal(err)
}

if bal.Int64() != 0 {
t.Fatalf("unexpected balance on %s. want %d got %d", p.addr, 0, bal)
}
}
}

// TestHandler expect a chunk from a node on a stream. It then stores the chunk in the local store and
// sends back a receipt. This is tested by intercepting the incoming stream for proper messages.
// It also sends the chunk to the closest peer and receives a receipt.
Expand Down Expand Up @@ -651,14 +778,20 @@ func TestSignsReceipt(t *testing.T) {
}

func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags, accounting.Interface) {
t.Helper()
mockAccounting := accountingmock.NewAccounting()
ps, mstorer, ts := createPushSyncNodeWithAccounting(t, addr, prices, recorder, unwrap, signer, mockAccounting, mockOpts...)
return ps, mstorer, ts, mockAccounting
}

func createPushSyncNodeWithAccounting(t *testing.T, addr swarm.Address, prices pricerParameters, recorder *streamtest.Recorder, unwrap func(swarm.Chunk), signer crypto.Signer, acct accounting.Interface, mockOpts ...mock.Option) (*pushsync.PushSync, *mocks.MockStorer, *tags.Tags) {
t.Helper()
logger := logging.New(ioutil.Discard, 0)
storer := mocks.NewStorer()

mockTopology := mock.NewTopologyDriver(mockOpts...)
mockStatestore := statestore.NewStateStore()
mtag := tags.NewTags(mockStatestore, logger)
mockAccounting := accountingmock.NewAccounting()

mockPricer := pricermock.NewMockService(prices.price, prices.peerPrice)

Expand All @@ -670,7 +803,7 @@ func createPushSyncNode(t *testing.T, addr swarm.Address, prices pricerParameter
return ch.WithStamp(postage.NewStamp(nil, nil)), nil
}

return pushsync.New(addr, recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, mockAccounting, mockPricer, signer, nil), storer, mtag, mockAccounting
return pushsync.New(addr, recorderDisconnecter, storer, mockTopology, mtag, true, unwrap, validStamp, logger, acct, mockPricer, signer, nil), storer, mtag
}

func waitOnRecordAndTest(t *testing.T, peer swarm.Address, recorder *streamtest.Recorder, add swarm.Address, data []byte) {
Expand Down

0 comments on commit 2798870

Please sign in to comment.