Skip to content

Commit

Permalink
Merge pull request #5691 from negz/backport-5662-to-release-1.14
Browse files Browse the repository at this point in the history
[Backport release 1.14] Fix locking in the `PackagedFunctionRunner`
  • Loading branch information
negz committed May 15, 2024
2 parents 336b10c + afddd0a commit 49f89b6
Showing 1 changed file with 29 additions and 27 deletions.
56 changes: 29 additions & 27 deletions internal/xfn/function_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (r *PackagedFunctionRunner) RunFunction(ctx context.Context, name string, r
// cost of listing and iterating over FunctionRevisions from cache. The default
// RevisionHistoryLimit is 1, so for most Functions we'd expect there to be two
// revisions in the cache (one active, and one previously active).
func (r *PackagedFunctionRunner) getClientConn(ctx context.Context, name string) (*grpc.ClientConn, error) {
func (r *PackagedFunctionRunner) getClientConn(ctx context.Context, name string) (*grpc.ClientConn, error) { //nolint:gocyclo // Only slightly over (12).
log := r.log.WithValues("function", name)

l := &pkgv1beta1.FunctionRevisionList{}
Expand All @@ -170,12 +170,24 @@ func (r *PackagedFunctionRunner) getClientConn(ctx context.Context, name string)
return nil, errors.Errorf(errFmtEmptyEndpoint, active.GetName())
}

// If we have a connection for the up-to-date endpoint, return it.
r.connsMx.RLock()
conn, ok := r.conns[name]
if ok && conn.Target() == active.Status.Endpoint {
defer r.connsMx.RUnlock()
return conn, nil
}
r.connsMx.RUnlock()

// Either we didn't have a connection, or it wasn't up-to-date.
r.connsMx.Lock()
defer r.connsMx.Unlock()

// Another Goroutine might have updated the connections between when we
// released the read lock and took the write lock, so check again.
conn, ok = r.conns[name]
if ok {
// We have a connection for the up-to-date endpoint. Return it.
// We now have a connection for the up-to-date endpoint.
if conn.Target() == active.Status.Endpoint {
return conn, nil
}
Expand All @@ -185,6 +197,7 @@ func (r *PackagedFunctionRunner) getClientConn(ctx context.Context, name string)
// already closed or in the process of closing.
log.Debug("Closing gRPC client connection with stale target", "old-target", conn.Target(), "new-target", active.Status.Endpoint)
_ = conn.Close()
delete(r.conns, name)
}

// This context is only used for setting up the connection.
Expand All @@ -198,9 +211,7 @@ func (r *PackagedFunctionRunner) getClientConn(ctx context.Context, name string)
return nil, errors.Wrapf(err, errFmtDialFunction, active.Status.Endpoint, active.GetName())
}

r.connsMx.Lock()
r.conns[name] = conn
r.connsMx.Unlock()

log.Debug("Created new gRPC client connection", "target", active.Status.Endpoint)
return conn, nil
Expand Down Expand Up @@ -235,17 +246,16 @@ func (r *PackagedFunctionRunner) GarbageCollectConnectionsNow(ctx context.Contex
// path where no connections need garbage collecting we shouldn't
// take it at all.

// No need to take a write lock or list Functions if there's no work to do.
r.connsMx.RLock()
connections := make([]string, 0, len(r.conns))
for name := range r.conns {
connections = append(connections, name)
if len(r.conns) == 0 {
defer r.connsMx.RUnlock()
return 0, nil
}
r.connsMx.RUnlock()

// No need to list Functions if there's no work to do.
if len(connections) == 0 {
return 0, nil
}
r.connsMx.Lock()
defer r.connsMx.Unlock()

l := &pkgv1beta1.FunctionList{}
if err := r.client.List(ctx, l); err != nil {
Expand All @@ -257,28 +267,20 @@ func (r *PackagedFunctionRunner) GarbageCollectConnectionsNow(ctx context.Contex
functionExists[f.GetName()] = true
}

// Build a list of connections to garbage collect.
gc := make([]string, 0)
for _, name := range connections {
if !functionExists[name] {
gc = append(gc, name)
// Garbage collect connections.
closed := 0
for name := range r.conns {
if functionExists[name] {
continue
}
}

// No need to take a write lock if there's no work to do.
if len(gc) == 0 {
return 0, nil
}

r.log.Debug("Closing gRPC client connections for Functions that are no longer installed", "functions", gc)
r.connsMx.Lock()
for _, name := range gc {
// Close only returns an error is if the connection is already
// closed or in the process of closing.
_ = r.conns[name].Close()
delete(r.conns, name)
closed++
r.log.Debug("Closed gRPC client connection to Function that is no longer installed", "function", name)
}
r.connsMx.Unlock()

return len(gc), nil
return closed, nil
}

0 comments on commit 49f89b6

Please sign in to comment.