Skip to content

Commit

Permalink
[cps-2.8] fix: close websocket when workflow run running completed (#…
Browse files Browse the repository at this point in the history
…1283)

* fix: close websocket when workflow run running completed

* refactor: check wfr regularly to reduce cost
  • Loading branch information
caicloud-bot committed Sep 16, 2019
1 parent 0f24dbc commit 0c0083b
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 4 deletions.
34 changes: 33 additions & 1 deletion pkg/server/handler/v1alpha1/workflowrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,45 @@ func getContainerLogStream(tenant, project, workflow, workflowrun, stage string,
folderReader := stream.NewFolderReader(logFolder, prefix, exclusions, time.Second*10)
defer folderReader.Close()

err = websocketutil.Write(ws, folderReader)
ctx, cancel := context.WithCancel(context.Background())
go watchStageTermination(common.TenantNamespace(tenant), workflowrun, stage, cancel)
err = websocketutil.Write(ws, folderReader, ctx.Done())
if err != nil {
log.Error("websocket writer error:", err)
}
return err
}

// watchStageTermination watches status of the WorkflowRun and the specific Stage, when it is terminated, call the onTerminatedCallback func.
func watchStageTermination(namespace, wfrName, stgName string, onTerminatedCallback context.CancelFunc) {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for ; true; <-ticker.C {
wfr, err := handler.K8sClient.CycloneV1alpha1().WorkflowRuns(namespace).Get(wfrName, metav1.GetOptions{})
if err != nil {
log.Warningf("Get workflowRun %s error: %v", wfrName, err)
onTerminatedCallback()
return
}

if wfr.Status.Overall.Phase == v1alpha1.StatusSucceeded ||
wfr.Status.Overall.Phase == v1alpha1.StatusFailed ||
wfr.Status.Overall.Phase == v1alpha1.StatusCancelled {
onTerminatedCallback()
return
}

for stage, status := range wfr.Status.Stages {
if stage == stgName && (status.Status.Phase == v1alpha1.StatusSucceeded ||
status.Status.Phase == v1alpha1.StatusFailed ||
status.Status.Phase == v1alpha1.StatusCancelled) {
onTerminatedCallback()
return
}
}
}
}

// GetContainerLogs handles the request to get container logs, only supports finished stage records.
func GetContainerLogs(ctx context.Context, project, workflow, workflowrun, tenant, stage, container string, download bool) (io.ReadCloser, map[string]string, error) {
headers := make(map[string]string)
Expand Down
32 changes: 29 additions & 3 deletions pkg/util/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func SendStream(server string, reader io.Reader, close <-chan struct{}) error {
// Send sends stream from reader by websocket
func Send(ws *websocket.Conn, reader io.Reader, close <-chan struct{}) error {
buf := bufio.NewReader(reader)
err := Write(ws, buf)
err := Write(ws, buf, close)
if err != nil {
log.Error("websocket writer error:", err)
}
Expand All @@ -92,14 +92,29 @@ type ReadBytes interface {
}

// Write writes message from reader to websocket
func Write(ws *websocket.Conn, reader ReadBytes) error {
func Write(ws *websocket.Conn, reader ReadBytes, stopCh <-chan struct{}) error {
pingTicker := time.NewTicker(PingPeriod)
sendTicker := time.NewTicker(10 * time.Millisecond)
exit := make(chan struct{})
defer func() {
log.Info("close ticket and websocket")
log.Info("close ticker and websocket")
pingTicker.Stop()
sendTicker.Stop()
ws.Close()
close(exit)
}()

stop := make(chan struct{})
// delay exit to ensure remained messages sent.
go func() {
select {
case <-stopCh:
time.Sleep(30 * time.Second)
close(stop)
return
case <-exit:
return
}
}()

for {
Expand Down Expand Up @@ -147,6 +162,17 @@ func Write(ws *websocket.Conn, reader ReadBytes) error {
return err
}
}
case <-stop:
err := ws.SetWriteDeadline(time.Now().Add(WriteWait))
if err != nil {
log.Warning("set write deadline error:", err)
}
err = ws.WriteMessage(websocket.CloseMessage, []byte("Message sending complete, TERMINATE"))
if err != nil {
log.Error("write close message error: ", err)
return err
}
return nil
}
}
}

0 comments on commit 0c0083b

Please sign in to comment.