-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Minogrpc refactoring #47
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just some random thoughts, to be discussed in live
mino/minogrpc/mod.go
Outdated
server *Server | ||
namespace string | ||
// rootAddress is the address of the orchestrator of a protocol. When Stream is | ||
// called, the caller takes this address so that participants now how to route |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// called, the caller takes this address so that participants now how to route | |
// called, the caller takes this address so that participants know how to route |
mino/minogrpc/mod.go
Outdated
return rootAddress{} | ||
} | ||
|
||
// Equal implements mino.Address. It returns true the other address is also a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Equal implements mino.Address. It returns true the other address is also a | |
// Equal implements mino.Address. It returns true if the other address is also a |
mino/minogrpc/stream.go
Outdated
}, | ||
} | ||
|
||
if s.me != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why s.me
would be nil?
} | ||
|
||
func (q *NonBlockingQueue) pushAndWait() { | ||
q.working.Add(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is that needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now it's only used in the tests but the purpose is to make sure the go routine has stopped.
@@ -60,46 +97,129 @@ type AddressFactory struct{} | |||
// FromText implements mino.AddressFactory. It returns an instance of an | |||
// address from a byte slice. | |||
func (f AddressFactory) FromText(text []byte) mino.Address { | |||
return address{id: string(text)} | |||
if len(text) == 0 { | |||
return newRootAddress() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
making an empty address default to the root address looks a bit dangerous to me. IMO the root address should be explicit and a not the default option when an address is empty. For example someone could provide an address that for some unwanted reason is empty but not intended to be used as the root. But in this case there won't be any error but there might be a side effect difficult to debug.
} | ||
|
||
// NewMinogrpc sets up the grpc and http servers. URL should | ||
func NewMinogrpc(serverURL *url.URL, rf routing.Factory) (*Minogrpc, error) { | ||
// Minogrpc represents a grpc service restricted to a namespace |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- implements mino.Mino
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to the struct definition.
mino/minogrpc/mod.go
Outdated
closing: make(chan error, 1), | ||
} | ||
|
||
// Counter needs to be above 1 for asynchronous call to Add. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Counter needs to be above 1 for asynchronous call to Add. | |
// Counter needs to be >=1 for asynchronous call to Add. |
mino/minogrpc/overlay.proto
Outdated
rpc Call(Envelope) returns (Envelope) {} | ||
rpc Stream(stream Envelope) returns (stream Envelope) {} | ||
rpc Call(Message) returns (Message) {} | ||
rpc Relay(stream Envelope) returns (stream Envelope) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would keep stream
as the name
errs <- xerrors.Errorf("couldn't unmarshal message: %v", err) | ||
continue | ||
} | ||
req := mino.Request{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am suddenly wondering: why don't we use the same type of return for streaming? Right now its
Recv(context.Context) (Address, proto.Message, error)
but shouldn't it be
Recv(context.Context) (Request, error)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, will do after the PR.
mino/minogrpc/server.go
Outdated
} | ||
|
||
sendMsg := &Envelope{ | ||
Message: m, | ||
apiURI := headers[headerURIKey] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uri := uriFromContext(ctx)
...
mino/minogrpc/server.go
Outdated
// This should never and it will panic if it does as this will provoke | ||
// several issues later on. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// This should never and it will panic if it does as this will provoke | |
// several issues later on. | |
// This should never happen and it will panic if it does as this will | |
// provoke several issues later on. |
mino/minogrpc/server.go
Outdated
func (o overlay) setupStream(stream relayer, sender *sender, receiver *receiver, addr mino.Address) { | ||
// Relay sender for that connection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (o overlay) setupStream(stream relayer, sender *sender, receiver *receiver, addr mino.Address) { | |
// Relay sender for that connection. | |
func (o overlay) setupStream(stream relayer, sender *sender, receiver *receiver, | |
addr mino.Address) { | |
// Relay sender for that connection. |
ddc43e6
to
482e862
Compare
This refactors minogrpc to reduce the amount of code. It also implements an infinite buffered receiver to prevent the relay from stalling. Finally, it adds unit tests.