Skip to content

Commit

Permalink
fix(conn): JoinCluster loop should use latest conn (#7950) (#7952)
Browse files Browse the repository at this point in the history
JoinCluster loop was getting the connection from pool upfront, and then looping over it. This opened up a bug because in #7918 , we close the connection in case it becomes unhealthy.

This PR gets the latest connection available in the loop. This was the only place in the codebase where I found this issue.

(cherry picked from commit 7531e95)

Co-authored-by: Manish R Jain <manish@dgraph.io>
  • Loading branch information
danielmai and manishrjain committed Jul 16, 2021
1 parent e7549bb commit 7f51327
Show file tree
Hide file tree
Showing 8 changed files with 10 additions and 21 deletions.
3 changes: 1 addition & 2 deletions dgraph/cmd/zero/raft.go
Expand Up @@ -632,10 +632,9 @@ func (n *node) initAndStartNode() error {
return errors.Errorf("Unhealthy connection to %v", opts.peer)
}

gconn := p.Get()
c := pb.NewRaftClient(gconn)
timeout := 8 * time.Second
for {
c := pb.NewRaftClient(p.Get())
ctx, cancel := context.WithTimeout(n.ctx, timeout)
// JoinCluster can block indefinitely, raft ignores conf change proposal
// if it has pending configuration.
Expand Down
3 changes: 1 addition & 2 deletions worker/draft.go
Expand Up @@ -1760,8 +1760,7 @@ func (n *node) joinPeers() error {
return err
}

gconn := pl.Get()
c := pb.NewRaftClient(gconn)
c := pb.NewRaftClient(pl.Get())
glog.Infof("Calling JoinCluster via leader: %s", pl.Addr)
if _, err := c.JoinCluster(n.ctx, n.RaftContext); err != nil {
return errors.Wrapf(err, "error while joining cluster")
Expand Down
4 changes: 1 addition & 3 deletions worker/multi_tenancy_ee.go
Expand Up @@ -88,9 +88,7 @@ func proposeDeleteOrSend(ctx context.Context, req *pb.DeleteNsRequest) error {
if pl == nil {
return conn.ErrNoConnection
}
con := pl.Get()
c := pb.NewWorkerClient(con)

c := pb.NewWorkerClient(pl.Get())
_, err := c.DeleteNamespace(ctx, req)
return err
}
3 changes: 1 addition & 2 deletions worker/mutation.go
Expand Up @@ -598,10 +598,9 @@ func proposeOrSend(ctx context.Context, gid uint32, m *pb.Mutations, chr chan re
chr <- res
return
}
con := pl.Get()

var tc *api.TxnContext
c := pb.NewWorkerClient(con)
c := pb.NewWorkerClient(pl.Get())

ch := make(chan error, 1)
go func() {
Expand Down
3 changes: 1 addition & 2 deletions worker/schema.go
Expand Up @@ -174,8 +174,7 @@ func getSchemaOverNetwork(ctx context.Context, gid uint32, s *pb.SchemaRequest,
ch <- resultErr{err: conn.ErrNoConnection}
return
}
conn := pl.Get()
c := pb.NewWorkerClient(conn)
c := pb.NewWorkerClient(pl.Get())
schema, e := c.Schema(ctx, s)
ch <- resultErr{result: schema, err: e}
}
Expand Down
3 changes: 1 addition & 2 deletions worker/snapshot.go
Expand Up @@ -47,8 +47,7 @@ type badgerWriter interface {

// populateSnapshot gets data for a shard from the leader and writes it to BadgerDB on the follower.
func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) error {
con := pl.Get()
c := pb.NewWorkerClient(con)
c := pb.NewWorkerClient(pl.Get())

// We should absolutely cancel the context when we return from this function, that way, the
// leader who is sending the snapshot would stop sending.
Expand Down
3 changes: 1 addition & 2 deletions worker/task.go
Expand Up @@ -53,11 +53,10 @@ func invokeNetworkRequest(ctx context.Context, addr string,
return nil, errors.Wrapf(err, "dispatchTaskOverNetwork: while retrieving connection.")
}

con := pl.Get()
if span := otrace.FromContext(ctx); span != nil {
span.Annotatef(nil, "invokeNetworkRequest: Sending request to %v", addr)
}
c := pb.NewWorkerClient(con)
c := pb.NewWorkerClient(pl.Get())
return f(ctx, c)
}

Expand Down
9 changes: 3 additions & 6 deletions worker/zero.go
Expand Up @@ -31,8 +31,7 @@ func RemoveNodeOverNetwork(ctx context.Context, req *pb.RemoveNodeRequest) (*pb.
return nil, conn.ErrNoConnection
}

con := pl.Get()
c := pb.NewZeroClient(con)
c := pb.NewZeroClient(pl.Get())
return c.RemoveNode(ctx, req)
}

Expand All @@ -44,8 +43,7 @@ func MoveTabletOverNetwork(ctx context.Context, req *pb.MoveTabletRequest) (*pb.
return nil, conn.ErrNoConnection
}

con := pl.Get()
c := pb.NewZeroClient(con)
c := pb.NewZeroClient(pl.Get())
return c.MoveTablet(ctx, req)
}

Expand All @@ -57,7 +55,6 @@ func ApplyLicenseOverNetwork(ctx context.Context, req *pb.ApplyLicenseRequest) (
return nil, conn.ErrNoConnection
}

con := pl.Get()
c := pb.NewZeroClient(con)
c := pb.NewZeroClient(pl.Get())
return c.ApplyLicense(ctx, req)
}

0 comments on commit 7f51327

Please sign in to comment.