Skip to content

Commit

Permalink
go fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Nov 17, 2023
1 parent a8236d8 commit a6a1e71
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
14 changes: 7 additions & 7 deletions broker/protocol/dispatcher.go
Expand Up @@ -29,11 +29,11 @@ func RegisterGRPCDispatcher(localZone string) {
// passed to a gRPC RPC call. If ProcessSpec_ID is non-zero valued, the RPC is
// dispatched to the specified member. Otherwise, the RPC is dispatched to a
// Route member, preferring:
// * A member not having a currently-broken network connection (eg, due to
// a stale Route or network split).
// * A member which is in the same zone as the caller (potentially reducing
// network traffic costs.
// * A member having a Ready connection (potentially reducing latency).
// - A member not having a currently-broken network connection (eg, due to
// a stale Route or network split).
// - A member which is in the same zone as the caller (potentially reducing
// network traffic costs.
// - A member having a Ready connection (potentially reducing latency).
func WithDispatchRoute(ctx context.Context, rt Route, id ProcessSpec_ID) context.Context {
return context.WithValue(ctx, dispatchRouteCtxKey{}, dispatchRoute{route: rt, id: id})
}
Expand All @@ -55,7 +55,7 @@ func WithDispatchItemRoute(ctx context.Context, dr DispatchRouter, item string,
id = rt.Members[rt.Primary]
}
return context.WithValue(ctx, dispatchRouteCtxKey{},
dispatchRoute{route: rt, id: id, item: item, DispatchRouter: dr})
dispatchRoute{route: rt, id: id, item: item, DispatchRouter: dr})
}

// DispatchRouter routes item to Routes, and observes item Routes.
Expand Down Expand Up @@ -221,7 +221,7 @@ func (d *dispatcher) Pick(info balancer.PickInfo) (balancer.PickResult, error) {

if tr, ok := trace.FromContext(info.Ctx); ok {
tr.LazyPrintf("Pick(Route: %s, ID: %s) => %s (%s)",
&dr.route, &dr.id, &dispatchID, state)
&dr.route, &dr.id, &dispatchID, state)
}
switch state {
case connectivity.Idle, connectivity.Connecting:
Expand Down
32 changes: 16 additions & 16 deletions broker/protocol/dispatcher_test.go
Expand Up @@ -59,7 +59,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
// SubConn to the default service address is started.
var _, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.Equals, balancer.ErrNoSubConnAvailable)
c.Check(cc.created, gc.DeepEquals, []mockSubConn{mockSubConn{Name:"default.addr", disp: disp}})
c.Check(cc.created, gc.DeepEquals, []mockSubConn{mockSubConn{Name: "default.addr", disp: disp}})
cc.created = nil

// Case: Default connection transitions to Ready. Expect it's now returned.
Expand All @@ -72,34 +72,34 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {

// Case: Specific remote peer is dispatched to.
ctx = WithDispatchRoute(context.Background(),
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})

result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.Equals, balancer.ErrNoSubConnAvailable)
c.Check(cc.created, gc.DeepEquals, []mockSubConn{mockSubConn{Name: "remote.addr", disp: disp}})
cc.created = nil

mockSubConn{Name:"remote.addr", disp: disp}.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
mockSubConn{Name: "remote.addr", disp: disp}.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})

result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.IsNil)
c.Check(result.Done, gc.IsNil)
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "remote.addr", disp: disp })
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "remote.addr", disp: disp})

// Case: Route allows for multiple members. A local one is now dialed.
ctx = WithDispatchRoute(context.Background(), buildRouteFixture(), ProcessSpec_ID{})

_, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.Equals, balancer.ErrNoSubConnAvailable)
c.Check(cc.created, gc.DeepEquals, []mockSubConn{mockSubConn{Name:"local.addr", disp: disp}})
c.Check(cc.created, gc.DeepEquals, []mockSubConn{mockSubConn{Name: "local.addr", disp: disp}})
cc.created = nil

mockSubConn{Name:"local.addr",disp:disp}.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
mockSubConn{Name: "local.addr", disp: disp}.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})

result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.IsNil)
c.Check(result.Done, gc.IsNil)
c.Check(result.SubConn, gc.Equals, mockSubConn{Name:"local.addr", disp: disp})
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "local.addr", disp: disp})

// Case: One local addr is marked as failed. Another is dialed.
mockSubConn{Name: "local.addr", disp: disp}.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
Expand All @@ -114,7 +114,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.IsNil)
c.Check(result.Done, gc.IsNil)
c.Check(result.SubConn, gc.Equals, mockSubConn{Name:"local.otherAddr", disp: disp})
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "local.otherAddr", disp: disp})

// Case: otherAddr is also failed. Expect that an error is returned,
// rather than dispatch to remote addr. (Eg we prefer to wait for a
Expand All @@ -130,7 +130,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
mockSubConn{Name: "remote.addr", disp: disp}.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})

ctx = WithDispatchRoute(context.Background(),
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})

_, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.Equals, balancer.ErrTransientFailure)
Expand All @@ -151,7 +151,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.IsNil)
c.Check(result.Done, gc.NotNil)
c.Check(result.SubConn, gc.Equals, mockSubConn{Name:"local.addr", disp: disp})
c.Check(result.SubConn, gc.Equals, mockSubConn{Name: "local.addr", disp: disp})

// Closure callback with an Unavailable error (only) will trigger an invalidation.
result.Done(balancer.DoneInfo{Err: nil})
Expand Down Expand Up @@ -207,14 +207,14 @@ func (s *DispatcherSuite) TestDispatchMarkAndSweep(c *gc.C) {

// This time, expect that local.addr is swept.
disp.sweep()
c.Check(cc.removed, gc.DeepEquals, []mockSubConn{mockSubConn{Name: "local.addr", disp: disp }})
c.Check(cc.removed, gc.DeepEquals, []mockSubConn{mockSubConn{Name: "local.addr", disp: disp}})
cc.removed = nil
mockSubConn{Name: "local.addr", disp: disp}.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})

disp.sweep() // Now remote.addr is swept.
c.Check(cc.removed, gc.DeepEquals, []mockSubConn{mockSubConn{Name: "remote.addr", disp: disp}})
cc.removed = nil
mockSubConn{Name: "remote.addr", disp: disp }.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
mockSubConn{Name: "remote.addr", disp: disp}.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})

// No connections remain.
c.Check(disp.idConn, gc.HasLen, 0)
Expand Down Expand Up @@ -249,13 +249,13 @@ func (s1 mockSubConn) Equal(s2 mockSubConn) bool {
return s1.Name == s2.Name
}

func (s mockSubConn) UpdateAddresses([]resolver.Address) { panic("deprecated") }
func (s mockSubConn) UpdateState(state balancer.SubConnState) { s.disp.updateSubConnState(s, state) }
func (s mockSubConn) Connect() {}
func (s mockSubConn) UpdateAddresses([]resolver.Address) { panic("deprecated") }
func (s mockSubConn) UpdateState(state balancer.SubConnState) { s.disp.updateSubConnState(s, state) }
func (s mockSubConn) Connect() {}
func (s mockSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, func() {}
}
func (s mockSubConn) Shutdown() {
func (s mockSubConn) Shutdown() {
var c = s.disp.cc.(*mockClientConn)
c.removed = append(c.removed, s)
}
Expand Down

0 comments on commit a6a1e71

Please sign in to comment.