Skip to content

Commit

Permalink
reintroduce child context with cancel along with CloseAndRecv to forc…
Browse files Browse the repository at this point in the history
…e goroutine cleanup by not interrupt UpdateLog call before it finishes

rh-pre-commit.version: 2.2.0
rh-pre-commit.check-secrets: ENABLED
  • Loading branch information
gabemontero committed Mar 1, 2024
1 parent 3cea96c commit b8a991a
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,13 +393,16 @@ func (r *Reconciler) getPodLogs(ctx context.Context, ns, pod, container, labelKe
}

func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, labelKey, logName string) error {
logger := logging.FromContext(ctx)
// TODO consider making configurable after we get some real world usage feedback
streamCtx, streamCancel := context.WithTimeout(ctx, 5*time.Minute)
defer streamCancel()
logger := logging.FromContext(streamCtx)
logger.Debugw("Streaming log started",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
)
logsClient, err := r.resultsClient.UpdateLog(ctx)
logsClient, err := r.resultsClient.UpdateLog(streamCtx)
if err != nil {
return fmt.Errorf("failed to create UpdateLog client: %w", err)
}
Expand All @@ -412,7 +415,7 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, labelKey,
LabelSelector: fmt.Sprintf("%s=%s", labelKey, o.GetName()),
}
var pods *corev1.PodList
pods, err = r.kubernetesClientset.CoreV1().Pods(o.GetNamespace()).List(ctx, lo)
pods, err = r.kubernetesClientset.CoreV1().Pods(o.GetNamespace()).List(streamCtx, lo)
if err != nil {
return err
}
Expand All @@ -430,7 +433,7 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, labelKey,
if len(task) == 0 {
task = pipelineTaskName
}
ba, podLogsErr := r.getPodLogs(ctx, o.GetNamespace(), pod.Name, container.Name, labelKey, task)
ba, podLogsErr := r.getPodLogs(streamCtx, o.GetNamespace(), pod.Name, container.Name, labelKey, task)
if podLogsErr != nil {
return podLogsErr
}
Expand Down Expand Up @@ -468,6 +471,17 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, labelKey,
logger.Error(flushErr)
return flushErr
}
// so we use CloseAndRecv vs. just CloseSent to achieve a few things:
// 1) CloseAndRecv calls CloseSend under the covers, followed by a Recv call to obtain a LogSummary
// 2) LogSummary appears to have some stats on the state of operations
// 3) It also appears to be the best form of "confirmation" that the asynchronous operation of UpdateLog on the api
// server side has reached a terminal state
// 4) Hence, creating a child context which we cancel hopefully does not interrupt the UpdateLog call when this method exits,
// 5) However, we need the context cancel to close out the last goroutine launched in newClientStreamWithParams that does
// the final clean, otherwise we end up with our now familiar goroutine leak, which in the end is a memory leak

// comparing closeErr with io.EOF does not work; and I could not find code / desc etc. constants in the grpc code that handled
// the wrapped EOF error we expect to get from grpc when things are "OK"
if logSummary, closeErr := logsClient.CloseAndRecv(); closeErr != nil && !strings.Contains(closeErr.Error(), "EOF") {
logger.Warnw("CloseAndRecv ret err",
zap.String("name", o.GetName()),
Expand Down

0 comments on commit b8a991a

Please sign in to comment.