Skip to content

Commit

Permalink
fix(controller): Fixes resource version misuse. Fixes #4714 (#4741)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec authored and simster7 committed Dec 17, 2020
1 parent e192fb1 commit 4345806
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 29 deletions.
3 changes: 2 additions & 1 deletion cmd/argo/commands/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func waitOnOne(serviceClient workflowpkg.WorkflowServiceClient, ctx context.Cont
req := &workflowpkg.WatchWorkflowsRequest{
Namespace: namespace,
ListOptions: &metav1.ListOptions{
FieldSelector: util.GenerateFieldSelectorFromWorkflowName(wfName),
FieldSelector: util.GenerateFieldSelectorFromWorkflowName(wfName),
ResourceVersion: "0",
},
}
stream, err := serviceClient.WatchWorkflows(ctx, req)
Expand Down
3 changes: 2 additions & 1 deletion cmd/argo/commands/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func watchWorkflow(ctx context.Context, serviceClient workflowpkg.WorkflowServic
req := &workflowpkg.WatchWorkflowsRequest{
Namespace: namespace,
ListOptions: &metav1.ListOptions{
FieldSelector: util.GenerateFieldSelectorFromWorkflowName(workflow),
FieldSelector: util.GenerateFieldSelectorFromWorkflowName(workflow),
ResourceVersion: "0",
},
}
stream, err := serviceClient.WatchWorkflows(ctx, req)
Expand Down
21 changes: 3 additions & 18 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package workflow
import (
"encoding/json"
"fmt"
"io"
"sort"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -210,13 +211,7 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
return nil
case event, open := <-watch.ResultChan():
if !open {
log.Debug("Re-establishing workflow watch")
watch.Stop()
watch, err = wfIf.Watch(*opts)
if err != nil {
return err
}
continue
return io.EOF
}
log.Debug("Received workflow event")
wf, ok := event.Object.(*wfv1.Workflow)
Expand All @@ -234,8 +229,6 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
if err != nil {
return err
}
// when we re-establish, we want to start at the same place
opts.ResourceVersion = wf.ResourceVersion
}
}
}
Expand Down Expand Up @@ -264,13 +257,7 @@ func (s *workflowServer) WatchEvents(req *workflowpkg.WatchEventsRequest, ws wor
return nil
case event, open := <-watch.ResultChan():
if !open {
log.Debug("Re-establishing event watch")
watch.Stop()
watch, err = eventInterface.Watch(*opts)
if err != nil {
return err
}
continue
return io.EOF
}
log.Debug("Received event")
e, ok := event.Object.(*corev1.Event)
Expand All @@ -283,8 +270,6 @@ func (s *workflowServer) WatchEvents(req *workflowpkg.WatchEventsRequest, ws wor
if err != nil {
return err
}
// when we re-establish, we want to start at the same place
opts.ResourceVersion = e.ResourceVersion
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion ui/src/app/shared/services/requests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,22 @@ export default {
return Observable.create((observer: Observer<any>) => {
const eventSource = new EventSource(url);
eventSource.onmessage = x => observer.next(x.data);
eventSource.onerror = x => observer.error(x);
eventSource.onerror = x => {
switch (eventSource.readyState) {
case EventSource.CONNECTING:
observer.error(new Error('Failed to connect to ' + url));
break;
case EventSource.OPEN:
observer.error(new Error('Error in open connection to ' + url));
break;
case EventSource.CLOSED:
observer.error(new Error('Connection closed to ' + url));
break;
default:
observer.error(new Error('Unknown error with ' + url));
}
};

return () => {
eventSource.close();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ export class WorkflowDetails extends React.Component<RouteComponentProps<any>, W
private changesSubscription: Subscription;
private timelineComponent: WorkflowTimeline;

private get resourceVersion() {
return this.state.workflow && this.state.workflow.metadata.resourceVersion;
}

private get selectedTabKey() {
return new URLSearchParams(this.props.location.search).get('tab') || 'workflow';
}
Expand Down Expand Up @@ -336,7 +332,7 @@ export class WorkflowDetails extends React.Component<RouteComponentProps<any>, W
try {
this.ensureUnsubscribed();
this.changesSubscription = services.workflows
.watch({name, namespace, resourceVersion: this.resourceVersion})
.watch({name, namespace, resourceVersion: '0'})
.map(changeEvent => changeEvent.object)
.subscribe(
workflow => this.setState({workflow, error: null}),
Expand Down
4 changes: 1 addition & 3 deletions util/logs/workflow-logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient
}

if req.GetLogOptions().Follow {
wfListOptions := metav1.ListOptions{FieldSelector: "metadata.name=" + req.GetName()}
wfListOptions := metav1.ListOptions{FieldSelector: "metadata.name=" + req.GetName(), ResourceVersion: "0"}
wfWatch, err := wfInterface.Watch(wfListOptions)
if err != nil {
return err
Expand Down Expand Up @@ -189,8 +189,6 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient
if event.Type == watch.Deleted || wf.Status.Fulfilled() {
return
}
// in case we re-establish the watch, make sure we start at the same place
wfListOptions.ResourceVersion = wf.ResourceVersion
}
}
}()
Expand Down

0 comments on commit 4345806

Please sign in to comment.