-
Notifications
You must be signed in to change notification settings - Fork 112
network: new stream! protocol and pull syncer implementation #1538
Conversation
63263ee
to
d6f6042
Compare
83fca80
to
a0f0e35
Compare
a9965ac
to
23f841e
Compare
network/newstream/wire.go
Outdated
@@ -111,7 +96,7 @@ type GetRange struct { | |||
Ruid uint | |||
Stream ID | |||
From uint64 | |||
To uint64 `rlp:nil` | |||
To *uint64 `rlp:"nil"` | |||
BatchSize uint | |||
Roundtrip bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general do not add code that is not used for no good reason. A good reason is if you will have an immediate PR that will make use of it, for example the PR with bzz-retrieval
, even though the code was not immediately used is a good example.
network/newstream/sync_provider.go
Outdated
// Function used only in tests to detect chunks that are synced | ||
// multiple times within the same stream. This function pointer must be | ||
// nil in production. | ||
var putSeenTestHook func(addr chunk.Address, id enode.ID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good use-case of something we could test if we redirect logs with a simulation. Then we could check for presence of the given log message containing addr
and id
. For now we don't have this functionality, so fine to leave this here, but ultimately we will be able to test this kind of impl. behaviour without custom hooks.
Just an idea.
network/newstream/sync_provider.go
Outdated
|
||
func (s *syncProvider) StreamName() string { return s.name } | ||
|
||
func (s *syncProvider) Boundedness() bool { return false } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fail to see why we need Boundedness
and Bound
. Why do we add such indirection without using it. If this is so important how come we don't have an implementation of it.
From the spec:
when a client requests a bounded range, server should respond to the client range requests with either offered hashes (if roundtrip is required) or chunks (if not) or an end-of-batch message if there are no more to offer. If none of these responses arrive within a timeout interval, client must drop the upstream peer.
We don't have an example of Boundedness
or of Roundtrip
, so I fail to see why this is useful, apart from introducing indirection in already not simple functionality. We've been working on pull-sync
new version for months, the previous version has been buggy in production for probably more than a year (and still throwing subscription errors), but we still continue to add code prematurely so that we make our lives harder...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
swarm.go
Outdated
if !config.SyncEnabled || config.LightNodeEnabled { | ||
syncing = stream.SyncingDisabled | ||
} | ||
//if !config.SyncEnabled || config.LightNodeEnabled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are already unhooking the old stream
protocol, so please remove this code, not just commented out, and also remove all the code in network/stream
that is no longer used. No point in keeping it around, specially when it is not hooked up anymore here in swarm.go
, and we are directly switching to the new implementation.
network/newstream/cursors_test.go
Outdated
return nil | ||
} | ||
|
||
func TestCorrectCursorsExchangeRace(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mental taxation in understanding what this is doing is a bit too high for my taste. I'm happy we have started to push away complexity from production Swarm code, but we should probably rethink the complexity in tests or at least be mindful about that.
It is not immediately obvious what this test is testing. Tests should generally be simple and I shouldn't have to read the code and think for 15min to figure that out. What is this testing?
- We are using a simulation framework.
- We are mocking a devp2p protocol and injecting some custom hook
- We are building peer fixtures using the p2p library.
We should probably be building more tools to write tests in a more simple manner.
No action required (or at least I don't know what I would expect), but this is def. not the direction we should be going in general IMO.
@acud @janos overall great work. I don't see any blockers from my side. I've added a few comments, and talked to @acud off-band about some things that need to be fleshed out, but nothing major.
These are my general comments. The most important thing is that the code seems to be correct and integration tests seem to be passing, so I see this generally as an improvement to |
We created a super spec for stream! and didnt implement it. instead we implement a temporary mix of sync and stream which was identified as a problem with the current implementation. This PR saw the tension between two different approaches one that condones any abstraction in the spirit of 'just write what the thing should do now nothing more' type of simplicity on the one hand and I kindly ask you to consider the following simple sketchy architecture. and think how much it would simplify testing, help understanding and incrementally PR-ing this thing. Thanks package stream
// Range represents a closed interval
type Range struct {
From, To uint64 // closed interval
}
// individual streams - no need for actual interface
type Stream interface {
String() string
Bound() bool
}
// Provider manages the source of a type of stream
// this is completely stateless
type Provider interface {
Type() string // Name of provider, eg. SYNC
CanHandle(Name) (lastIndex uint64, bound bool, err error) // provider can serve the stream given as argument, return cursor, boundedness and an error if not provided
OfferedHashes(Name, Range) []Address // returns the offered hashes for the given range
GetDataByHash(Name, ...Address) []Chunk // retrieves the chunks based on Address
}
// Streamer implements Registry interface to be used in provider packages
//
type Registry interface {
Register(Provider) error
}
// LocalStoreProvider wraps localstore and implements GetDataByHash call with localstore get
type LocalStoreProvider struct {
localStore
}
func (l *LocalProvider) GetDataByHash(ctx context.Context, hash []byte) (chunk.Chunk, error) {
return l.localStore.Get(ctx, chunk.ModeGetSync, hash)
}
// Client Logic
// Request Object - no need for interface, just to show methods
type Request interface {
Cancel()
Done() (close <-chan struct{}) // like in context
Err() // error to extract after Done() closed
}
// Peer - this is not an interface just demonstrating the relevant methods
type Peer interface {
GetRange(ctx context.Context, n Name, r Range, need func([]byte) bool, deliver func(Chunk) error) (Request, error)
Providers() []Info
}
// NetStoreClient is a stream client that
// - proxies need function as localstore Has
// - puts chunk in netstore
// These clients are stateless
type StorerClient struct {
Streamer
intervals IntervalStore
store NetStore
need func([]byte) bool // implements it with netStore.Has and opens a fetcher
deliver func(Chunk) error // implements it with netStore.Put chunk.ModePutSync
}
// Request will try to cover the indicated range using interval store
// and peer requests for each gap in the intervals
func (s *StorerClient) Request(p Peer, n Name, r Range) (Request, error) {
// starts a go routine that keeps getting the intervals based on the interval store
// continuously requesting subranges according to the interval store
// and calls
p.GetRange(ctx, n, r, s.need, s.deliver)
// as batches are sealed, intervals are updated
// the request is alive until the whole range is covered
}
// and now the level doing full sync of a stream
type Mode = uin64
const (
History Mode = iota
Live
Both
)
// StorerClient implements Syncer, here stream.Syncer means syncing a stream
// only interface used by the pull syncer client
type Syncer interface {
Request(Context, Peer, Name, Mode) (Request, error)
// this falls back to until cursor (history) and from cursor (live) range requests from the StorerClient
//
}
////////////////////////////
// package pullsync
// pullSync.Server implements stream.Provider
// uses localstore SubscribePull for the bins
// server is node-wide
type Server struct {
// ...
*stream.LocalProvider
}
// the node-wide pullsync.Client
type Client struct {
stream.Syncer // embed stream.Syncer
// when pullsync
// here you simply put the update sync logic listening to kademlia depth changes
// and call `Request`
// remember the request, when no longer relevant just call request.Cancel()
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok for moral's sake lets just merge this PR after some critical changes. The rest should be addressed in subsequent PRs.
- there is a lot of changes in packages totally unrelated to this PR, (docker, pyramid chunker, etc) quite a few unnecessary
- filestore, hasherstore, localstore and intervals store changes are important and unexplained
- the package should be in
network/stream/v2
andnewstream
should be dropped - error handling and peer drops could be massively simplified
- select on routine termination conditions are scattered around in many unnecessary places
- the technical debt this PR introduces is way too high. Lets avoid this in future
@@ -96,3 +98,15 @@ func (i *Inspector) Has(chunkAddresses []storage.Address) string { | |||
|
|||
return strings.Join(hostChunks, "") | |||
} | |||
|
|||
func (i *Inspector) PeerStreams() (string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment exported functions
api/inspector_test.go
Outdated
t.Fatal(err) | ||
} | ||
|
||
// if want := hex.EncodeToString(baseKey)[:16]; peerInfo.Base != want { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove commented code
cmd/swarm/main.go
Outdated
@@ -233,6 +233,8 @@ func init() { | |||
} | |||
|
|||
func main() { | |||
runtime.SetMutexProfileFraction(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
surely not in production right?
network/newstream/peer.go
Outdated
done chan error | ||
ruid uint // the request uid | ||
from uint64 // want from index | ||
to *uint64 //want to index, nil signifies top of range not yet known |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// space
network/newstream/stream.go
Outdated
func NewSlipStream(intervalsStore state.Store, kad *network.Kademlia, providers ...StreamProvider) *SlipStream { | ||
slipStream := &SlipStream{ | ||
// New creates a new stream protocol handler | ||
func New(intervalsStore state.Store, baseKey []byte, providers ...StreamProvider) *Registry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer an explicit register method, but can wait till next iteration
network/newstream/stream.go
Outdated
return | ||
} | ||
|
||
w.to = &msg.LastIndex // now that we know the range of the batch we can set the upped bound of the interval to the open want |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
upped -> upper
and better say 'we can set the open wants upper bound to the index supplied in the msg.
Maybe some checks would be needed about this number
network/newstream/stream.go
Outdated
// wait for all handlers to finish | ||
done := make(chan struct{}) | ||
go func() { | ||
r.handlersWg.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this needed?
network/newstream/sync_provider.go
Outdated
} | ||
} | ||
go func(chunks ...chunk.Chunk) { | ||
s.cacheMtx.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better have cache in a separate object
@@ -0,0 +1 @@ | |||
{"nodes":[{"node":{"info":{"id":"0b405ded87ce4e712aaebb3055e70716c971fe049a54961294c7a7a0471d12f6","name":"node_0b405ded87ce4e712aaebb3055e70716c971fe049a54961294c7a7a0471d12f6","enode":"enode://918f13e29fcea21b47e553c60ea0ae63f856db5745c6010e31b566f819195f9a5f250788862c43b230dd93fbc24cb6b3d33c86069cc21de3a3612b2f32eeccbe@127.0.0.1:0","enr":"0xf88fb840ad1a67c3b07299bb50337351be97595ef05e5ab7988ab7831e06a3003d1fc4ad74e925ec495ab312ccc889eb9c05f2e14f625e56f813a76dc53ddd94ea0955f90183636170cdc583627a7a08c6846869766508826964827634826970847f00000189736563703235366b31a102918f13e29fcea21b47e553c60ea0ae63f856db5745c6010e31b566f819195f9a","ip":"127.0.0.1","ports":{"discovery":0,"listener":0},"listenAddr":"","protocols":{"bzz":"C0Bd7YfOTnEqrrswVecHFslx/gSaVJYSlMenoEcdEvY=","hive":"\n=========================================================================\nThu Feb 28 17:59:23 UTC 2019 KΛÐΞMLIΛ hive: queen's address: 0b405d\npopulation: 3 (3), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n000 1 a430 | 1 a430 (0)\n============ DEPTH: 1 ==========================================\n001 2 77ba 675c | 2 77ba (0) 675c (0)\n002 0 | 0\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n008 0 | 0\n009 0 | 0\n010 0 | 0\n011 0 | 0\n012 0 | 0\n013 0 | 0\n014 0 | 0\n015 0 | 0\n========================================================================="}},"config":{"id":"0b405ded87ce4e712aaebb3055e70716c971fe049a54961294c7a7a0471d12f6","private_key":"173c23bc3aec26afe7d299eb9556e7c189851378fe56125d929966ac7207bcb6","name":"node_0b405ded87ce4e712aaebb3055e70716c971fe049a54961294c7a7a0471d12f6","services":["streamer"],"enable_msg_events":true,"port":38277},"up":true}},{"node":{"info":{"id":"a4301045c0cde5acb5406e1fba46ec868c056439ab43ef795830748fb6ba2e5a","name":"node_a4301045c0cde5acb5406e1fba46ec868c056439ab43ef795830748fb6ba2e5a","enode":"enode://89b66f574168570cc5d347051c0e2b05b33578d42142ef8cc930f1fbb384266f2827e4b087f5e2985a32d8f2b353fb7be416193632da5c692cace99d1b9e4274@127.0.0.1:0","enr":"0xf88fb8402e6392d3d5377977fc4b4d29f4dbf4ca96e963df056ead84ec2fcb7ee071996670a935d3b49250455e02c392baacb4c8499f815f74b09c405316a5d2fdee94e60183636170cdc583627a7a08c6846869766508826964827634826970847f00000189736563703235366b31a10289b66f574168570cc5d347051c0e2b05b33578d42142ef8cc930f1fbb384266f","ip":"127.0.0.1","ports":{"discovery":0,"listener":0},"listenAddr":"","protocols":{"bzz":"pDAQRcDN5ay1QG4fukbshowFZDmrQ+95WDB0j7a6Llo=","hive":"\n=========================================================================\nThu Feb 28 17:59:23 UTC 2019 KΛÐΞMLIΛ hive: queen's address: a43010\npopulation: 3 (3), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 3 675c 77ba 0b40 | 3 675c (0) 77ba (0) 0b40 (0)\n001 0 | 0\n002 0 | 0\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n008 0 | 0\n009 0 | 0\n010 0 | 0\n011 0 | 0\n012 0 | 0\n013 0 | 0\n014 0 | 0\n015 0 | 0\n========================================================================="}},"config":{"id":"a4301045c0cde5acb5406e1fba46ec868c056439ab43ef795830748fb6ba2e5a","private_key":"5bcd3bbb886e5ad2317a6ac1de2b19ebe27db9337ba8e7daa76a1cdaa6a3d158","name":"node_a4301045c0cde5acb5406e1fba46ec868c056439ab43ef795830748fb6ba2e5a","services":["streamer"],"enable_msg_events":true,"port":41325},"up":true}},{"node":{"info":{"id":"77ba145a4623b9c83e0a14a375ba49c68719f2abe162ffca66de7ef2e31a75f8","name":"node_77ba145a4623b9c83e0a14a375ba49c68719f2abe162ffca66de7ef2e31a75f8","enode":"enode://ecb56004a128067628a3198244080ba4ac79860cb2cf74407d7571955a80fb9243ad5f8265063a5aa209ea95329b4b57d0f1b492a85835ab87fce79881a895a9@127.0.0.1:0","enr":"0xf88fb84035ed878b2a99ed1bbd55df1701cd82115aebc649b1e8e68302e7caf5480a61322c74189d1fe66f2d16e79174f0e3479213f0bf963a4518d583078e164bd94a220183636170cdc583627a7a08c6846869766508826964827634826970847f00000189736563703235366b31a103ecb56004a128067628a3198244080ba4ac79860cb2cf74407d7571955a80fb92","ip":"127.0.0.1","ports":{"discovery":0,"listener":0},"listenAddr":"","protocols":{"bzz":"d7oUWkYjucg+ChSjdbpJxocZ8qvhYv/KZt5+8uMadfg=","hive":"\n=========================================================================\nThu Feb 28 17:59:23 UTC 2019 KΛÐΞMLIΛ hive: queen's address: 77ba14\npopulation: 3 (3), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n000 1 a430 | 1 a430 (0)\n============ DEPTH: 1 ==========================================\n001 1 0b40 | 1 0b40 (0)\n002 0 | 0\n003 1 675c | 1 675c (0)\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n008 0 | 0\n009 0 | 0\n010 0 | 0\n011 0 | 0\n012 0 | 0\n013 0 | 0\n014 0 | 0\n015 0 | 0\n========================================================================="}},"config":{"id":"77ba145a4623b9c83e0a14a375ba49c68719f2abe162ffca66de7ef2e31a75f8","private_key":"6f30c3d0135202cbf540669c8c76d9144bd53764aaf4ae18e8bee98ce05c6211","name":"node_77ba145a4623b9c83e0a14a375ba49c68719f2abe162ffca66de7ef2e31a75f8","services":["streamer"],"enable_msg_events":true,"port":40785},"up":true}},{"node":{"info":{"id":"675cfed0a7b461f43cfebd6a10ecc8868d35b245381d731abec4608b6dce250a","name":"node_675cfed0a7b461f43cfebd6a10ecc8868d35b245381d731abec4608b6dce250a","enode":"enode://64a6b33b028b818a1961272651d1d7a976329e541b6e3e983f518d89326d7427f4c9243f78bb03fd2e2dc059f610ee50d1e664e3107db01ac603b9c3b20ebb4f@127.0.0.1:0","enr":"0xf88fb8407dbbb99300e04c6917c27d61bf396481aae19287c2869e8dd74e11b0f2ebb74174376a6152c26af95173a4868a0349aaf3be5d063918590fd5962f4af174e0010183636170cdc583627a7a08c6846869766508826964827634826970847f00000189736563703235366b31a10364a6b33b028b818a1961272651d1d7a976329e541b6e3e983f518d89326d7427","ip":"127.0.0.1","ports":{"discovery":0,"listener":0},"listenAddr":"","protocols":{"bzz":"Z1z+0Ke0YfQ8/r1qEOzIho01skU4HXMavsRgi23OJQo=","hive":"\n=========================================================================\nThu Feb 28 17:59:23 UTC 2019 KΛÐΞMLIΛ hive: queen's address: 675cfe\npopulation: 3 (3), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n000 1 a430 | 1 a430 (0)\n============ DEPTH: 1 ==========================================\n001 1 0b40 | 1 0b40 (0)\n002 0 | 0\n003 1 77ba | 1 77ba (0)\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n008 0 | 0\n009 0 | 0\n010 0 | 0\n011 0 | 0\n012 0 | 0\n013 0 | 0\n014 0 | 0\n015 0 | 0\n========================================================================="}},"config":{"id":"675cfed0a7b461f43cfebd6a10ecc8868d35b245381d731abec4608b6dce250a","private_key":"d77006035afa5194053c0347021f7825a71f9643028c45b65756ef88ee53e6dd","name":"node_675cfed0a7b461f43cfebd6a10ecc8868d35b245381d731abec4608b6dce250a","services":["streamer"],"enable_msg_events":true,"port":37069},"up":true}}],"conns":[{"one":"0b405ded87ce4e712aaebb3055e70716c971fe049a54961294c7a7a0471d12f6","other":"a4301045c0cde5acb5406e1fba46ec868c056439ab43ef795830748fb6ba2e5a","up":true},{"one":"a4301045c0cde5acb5406e1fba46ec868c056439ab43ef795830748fb6ba2e5a","other":"77ba145a4623b9c83e0a14a375ba49c68719f2abe162ffca66de7ef2e31a75f8","up":true},{"one":"77ba145a4623b9c83e0a14a375ba49c68719f2abe162ffca66de7ef2e31a75f8","other":"675cfed0a7b461f43cfebd6a10ecc8868d35b245381d731abec4608b6dce250a","up":true},{"one":"675cfed0a7b461f43cfebd6a10ecc8868d35b245381d731abec4608b6dce250a","other":"0b405ded87ce4e712aaebb3055e70716c971fe049a54961294c7a7a0471d12f6","up":true},{"one":"77ba145a4623b9c83e0a14a375ba49c68719f2abe162ffca66de7ef2e31a75f8","other":"0b405ded87ce4e712aaebb3055e70716c971fe049a54961294c7a7a0471d12f6","up":true},{"one":"a4301045c0cde5acb5406e1fba46ec868c056439ab43ef795830748fb6ba2e5a","other":"675cfed0a7b461f43cfebd6a10ecc8868d35b245381d731abec4608b6dce250a","up":true}]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please just use the snapshots innetwork/stream/testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to remove the whole network/stream
package. Why create references between the new
and old
packages, when we should have only one in the codebase?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nonsense, no the idea is to reimplement streamer
- we want to use the same snapshots as before
- we want pr to be smaller
- bitvector and intervalstore are also dependencies from old stream
- the new streamer should be in
stream/v2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I was unaware that we will be maintaining both streamers. cc @janos
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
storage/netstore.go
Outdated
} | ||
|
||
log.Trace("netstore.chunk-not-in-localstore", "ref", ref.String()) | ||
//n.logger.Trace("netstore.chunk-not-in-localstore", "ref", ref.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^
Dockerfile.alltools
Outdated
RUN echo "http://dl-cdn.alpinelinux.org/alpine/edge/testing" >> /etc/apk/repositories | ||
RUN apk --update add perf | ||
RUN wget https://github.com/stedolan/jq/releases/download/jq-1.6/jq-linux64 | ||
RUN chmod +x /jq-linux64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why we need jq
and websocat
within the image. What is the use-case here? I use those tools from my local machine, why do we need them in the swarm
binary?
cmd/swarm-smoke/upload_and_sync.go
Outdated
@@ -47,7 +47,7 @@ func uploadAndSyncCmd(ctx *cli.Context) error { | |||
seed = inputSeed | |||
} | |||
|
|||
randomBytes := testutil.RandomBytes(seed, filesize*1000) | |||
randomBytes := testutil.RandomBytes(seed, filesize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should revert this. I didn't understand why we wanted to have it smaller in the first place to be honest.
network/newstream/peer.go
Outdated
if !ok { | ||
p.logger.Error("ruid not found, dropping peer", "ruid", ruid) | ||
p.Drop() | ||
return o, true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, true
, let's be explicit that o
is no found.
func BenchmarkHistoricalStream_15000(b *testing.B) { benchmarkHistoricalStream(b, 15000) } | ||
func BenchmarkHistoricalStream_20000(b *testing.B) { benchmarkHistoricalStream(b, 20000) } | ||
|
||
func benchmarkHistoricalStream(b *testing.B, chunks uint64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zelig @nonsense Could you check this benchmark also? I think that the measuring method is not correct. The simulation is constructed outside of the benchmark loop and in the benchmark loop chunks are uploaded on the same node of the same simulation, making the state on every benchmark iteration different. Every iteration measures different state and with larger number of chunks in the localstore, it becomes slower. This makes this benchmark result dependent on the number of iterations performed, where only the precisions of results should be influenced by the number of iterations. I think that this benchmark should be rewritten in a way that every iteration measures time on the independent state of other iterations, or to remove the benchmark as that may make this benchmark time consuming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@janos it does look like the benchmark is indeed not correct for the reasons you mentioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed the benchmark to address this problem, and it is even usable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @janos for the quick cleanup and reorg, I dont like that you put in the deletion of old stream code in this PR.
i approve it but lets make sure:
- we heed the issues raised in later PRs
- we understand the changes to localstore, intervalstore, simulation, etc packages
@zelig Thank you for the review and approval. I will create issues on the comments that are there after this PR gets merged. I do not mind putting back the old streamer code. Would you like to restore the files? |
@@ -68,7 +68,7 @@ func TestSyncingViaGlobalSync(t *testing.T) { | |||
log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes)) | |||
testSyncingViaGlobalSync(t, *chunks, *nodes) | |||
} else { | |||
chunkCounts := []int{4, 32} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't sync 32 chunks ? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It times out on travis. :) And it actually does not even flake locally. The only thing that I managed to see is the goroutine leak in hive that is fixed. If it is flaky even with only 4 chunks on travis, we should disable this test and dedicate more time to find the root cause.
This PR is the result of joint efforts in the core track to realise the exact, up to date requirements of the
stream!
protocol and implement them so that their functionality and behavior is easier to reason about.The design borrows some code from the old
stream
package. Namely the way we manage bins we want to stream in the case of syncing and the existing intervals store. However everything else including tests have been built from the ground up.The PR does not yet remove the existing
stream
code, does not move theintervals
package, but rather builds on top of those. This is in order to decrease the strain of diffing those changes. These would be deferred to subsequent PRs after this one is merged.The new stream implementation suggested in this PR brings more guarantees, better performance and safety to how we handle
streams
, namely:streams
is far less likely due to tight checking of wanted streams from a certain peerChunkDelivery
messageIn terms of reduced complexity:
offers
, clients maintainwants
PLEASE NOTE: that tests constitute about a half of this PR. Some of them overlap in functionality and will be removed, namely some of the tests in
cursors_test.go
.Actionable items that still need to be addressed, however I would personally like to defer to a later PR:
stream
package. In laymen terminology - we would need to call certain functions onstream
when a depth change happens for example, but then also to have hooks onstream
to know that aStreamInfoRes
message has arrived and what to do accordingly....maxPeers
- oldstream
package had a notion of maximum peers we'd like to stream to/with. we have concluded partially thatkademlia
should somehow shield us from having to implement this at the moment, since number of connections are/should be enforced bykademlia
at the moment. so this needs more info before we jump the gun to implement such a requirementHOW TO REVIEW:
Since there is a fair amount of business logic here, a brief overview on how the protocol works is needed in order to facilitate an effective review process.
Flow is as follows:
swarm.go
. Stream handles the protocol message exchange,syncProvider
handles sinking the data and managing streams according to kademliastream.go
fileInitPeer
function onsync_provider.go
)StreamInfoReq
. Server replies with info about streams (boundedness, cursors (previously known assessionIndex
) inStreamInfoRes
messageStreamInfoRes
, and according to wantedness of streams, initiatesGetRange
queries to fetch the stream from the server.OfferedHashes
message for the requested rangeOfferedHashes
with aWantedHashes
messageChunkDelivery
message. This message can contain multiple chunks as needed. Also, theoretically ,multipleChunkDelivery
messages could be sent in reply for the sameWantedHashes
in order to optimise amounts of messages sent and according to db performance to sink the datarelated to ethersphere/user-stories#2
fixes #1451
somewhat fixes #1393
closes #1457
closes #1247
closes #1246
closes #1234
closes #1204
closes #1203
closes #1105