Skip to content
This repository has been archived by the owner on Mar 16, 2024. It is now read-only.

Commit

Permalink
Switch logs, pull, push to websocket response
Browse files Browse the repository at this point in the history
Signed-off-by: Darren Shepherd <darren@acorn.io>
  • Loading branch information
ibuildthecloud committed Nov 4, 2022
1 parent 99bc5d9 commit 68050ef
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 103 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
cuelang.org/go v0.4.3
github.com/AlecAivazis/survey/v2 v2.3.6
github.com/acorn-io/aml v0.0.0-20220717003025-bc8cb1214693
github.com/acorn-io/baaah v0.0.0-20221020031138-2c03477cc65c
github.com/acorn-io/baaah v0.0.0-20221104210523-3108050e28bc
github.com/acorn-io/mink v0.0.0-20221102221115-be238dbdd92d
github.com/acorn-io/namegenerator v0.0.0-20220915160418-9e3d5a0ffe78
github.com/containerd/console v1.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ github.com/acorn-io/aml v0.0.0-20220717003025-bc8cb1214693 h1:5uxUFWpREhhKllLkan
github.com/acorn-io/aml v0.0.0-20220717003025-bc8cb1214693/go.mod h1:uiNpPSAYWhEBfbGWKpN185+EP+hZr2M/U8Ckr/Jo3Kk=
github.com/acorn-io/apiserver v0.25.2-ot-1 h1:91Q7+Jd9BeYZhzt0C+38w2sMn8aHFOxfTdlE6MCH9UI=
github.com/acorn-io/apiserver v0.25.2-ot-1/go.mod h1:8cynBL5SR6CIKypk9nWtZXHzEiJhLVQxeMVZ5CGzkFo=
github.com/acorn-io/baaah v0.0.0-20221020031138-2c03477cc65c h1:2VQVe+9xyShFvELDsLgLG/mgRrm8l58LV20Ief3lNFI=
github.com/acorn-io/baaah v0.0.0-20221020031138-2c03477cc65c/go.mod h1:HVIZ8vDXjY2y045giWUAcoX1fIAkmqABQgKgdgo3/og=
github.com/acorn-io/baaah v0.0.0-20221104210523-3108050e28bc h1:QqFNbqWzNGkEGYMkKb71A24U7my9LvOlvCfB116ucUA=
github.com/acorn-io/baaah v0.0.0-20221104210523-3108050e28bc/go.mod h1:HVIZ8vDXjY2y045giWUAcoX1fIAkmqABQgKgdgo3/og=
github.com/acorn-io/component-base v0.25.2-ot-1 h1:xinqJUNbpW2/zsvm8mDv6Q7riLhvXup9x7Kz9eIPM1M=
github.com/acorn-io/component-base v0.25.2-ot-1/go.mod h1:/5qYr5BXGNPs+cRd6+WL1NfOYtzOstJlm1CMK06cm7s=
github.com/acorn-io/etcd/server/v3 v3.5.4-ot-1 h1:sQMBy/UImb1923toh9U0oIxlpO7crLrD7eKE/orB/pQ=
Expand Down
33 changes: 17 additions & 16 deletions pkg/client/app.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package client

import (
"bufio"
"context"
"encoding/json"
"errors"
"sort"
"strings"

Expand All @@ -13,6 +11,8 @@ import (
"github.com/acorn-io/acorn/pkg/run"
"github.com/acorn-io/acorn/pkg/scheme"
"github.com/acorn-io/baaah/pkg/typed"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -158,39 +158,40 @@ func (c *client) AppLog(ctx context.Context, name string, opts *LogOptions) (<-c
opts.ContainerReplica = name
}

resp, err := c.RESTClient.Get().
url := c.RESTClient.Get().
Namespace(app.Namespace).
Resource("apps").
Name(app.Name).
SubResource("log").
VersionedParams((*apiv1.LogOptions)(opts), scheme.ParameterCodec).
Stream(ctx)
URL()

conn, err := c.Dialer.DialWebsocket(ctx, url.String(), nil)
if err != nil {
return nil, err
}

result := make(chan apiv1.LogMessage)
go func() {
defer close(result)
lines := bufio.NewScanner(resp)
for lines.Scan() {
line := lines.Text()
defer conn.Close()
for {
_, data, err := conn.ReadMessage()
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
break
} else if err != nil {
logrus.Errorf("error reading websocket: %v", err)
break
}
message := apiv1.LogMessage{}
if err := json.Unmarshal([]byte(line), &message); err == nil {
if err := json.Unmarshal(data, &message); err == nil {
result <- message
} else if !errors.Is(err, context.Canceled) && err.Error() != "unexpected end of JSON input" {
} else {
result <- apiv1.LogMessage{
Error: err.Error(),
}
}
}

err := lines.Err()
if err != nil && !errors.Is(err, context.Canceled) {
result <- apiv1.LogMessage{
Error: err.Error(),
}
}
}()

return result, nil
Expand Down
61 changes: 32 additions & 29 deletions pkg/client/image.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package client

import (
"bufio"
"context"
"encoding/json"
"strings"

apiv1 "github.com/acorn-io/acorn/pkg/apis/api.acorn.io/v1"
kclient "github.com/acorn-io/acorn/pkg/k8sclient"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
)

Expand Down Expand Up @@ -59,39 +60,40 @@ func (c *client) ImageDetails(ctx context.Context, imageName string, opts *Image
}

func (c *client) ImagePull(ctx context.Context, imageName string, opts *ImagePullOptions) (<-chan ImageProgress, error) {
resp, err := c.RESTClient.Post().
url := c.RESTClient.Get().
Namespace(c.Namespace).
Resource("images").
Name(strings.ReplaceAll(imageName, "/", "+")).
SubResource("pull").
Body(&apiv1.ImagePull{}).
Stream(ctx)
URL()

conn, err := c.Dialer.DialWebsocket(ctx, url.String(), nil)
if err != nil {
return nil, err
}

result := make(chan ImageProgress, 1000)
go func() {
defer close(result)
lines := bufio.NewScanner(resp)
for lines.Scan() {
line := lines.Text()
defer conn.Close()
for {
_, data, err := conn.ReadMessage()
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
break
} else if err != nil {
logrus.Errorf("error reading websocket: %v", err)
break
}

progress := ImageProgress{}
if err := json.Unmarshal([]byte(line), &progress); err == nil {
if err := json.Unmarshal(data, &progress); err == nil {
result <- progress
} else {
result <- ImageProgress{
Error: err.Error(),
}
}
}

err := lines.Err()
if err != nil {
result <- ImageProgress{
Error: err.Error(),
}
}
}()

return result, nil
Expand All @@ -103,39 +105,40 @@ func (c *client) ImagePush(ctx context.Context, imageName string, opts *ImagePus
return nil, err
}

resp, err := c.RESTClient.Post().
url := c.RESTClient.Get().
Namespace(image.Namespace).
Resource("images").
Name(strings.ReplaceAll(imageName, "/", "+")).
SubResource("push").
Body(&apiv1.ImagePush{}).
Stream(ctx)
URL()

conn, err := c.Dialer.DialWebsocket(ctx, url.String(), nil)
if err != nil {
return nil, err
}

result := make(chan ImageProgress)
go func() {
defer close(result)
lines := bufio.NewScanner(resp)
for lines.Scan() {
line := lines.Text()
defer conn.Close()
for {
_, data, err := conn.ReadMessage()
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
break
} else if err != nil {
logrus.Errorf("error reading websocket: %v", err)
break
}

progress := ImageProgress{}
if err := json.Unmarshal([]byte(line), &progress); err == nil {
if err := json.Unmarshal(data, &progress); err == nil {
result <- progress
} else {
result <- ImageProgress{
Error: err.Error(),
}
}
}

err := lines.Err()
if err != nil {
result <- ImageProgress{
Error: err.Error(),
}
}
}()

return result, nil
Expand Down
11 changes: 10 additions & 1 deletion pkg/k8schannel/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Dialer struct {
needsInit bool
}

func (d *Dialer) DialContext(ctx context.Context, url string, headers http.Header) (*Connection, error) {
func (d *Dialer) DialWebsocket(ctx context.Context, url string, headers http.Header) (*websocket.Conn, error) {
newHeaders := http.Header{}
for k, v := range d.headers {
newHeaders[k] = v
Expand All @@ -42,6 +42,15 @@ func (d *Dialer) DialContext(ctx context.Context, url string, headers http.Heade
}
return nil, err
}

return conn, nil
}

func (d *Dialer) DialContext(ctx context.Context, url string, headers http.Header) (*Connection, error) {
conn, err := d.DialWebsocket(ctx, url, headers)
if err != nil {
return nil, err
}
return NewConnection(conn, d.needsInit), nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/k8schannel/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"io"
"net"
"net/http"
"sync"
"time"

Expand All @@ -12,6 +13,10 @@ import (
"github.com/rancher/wrangler/pkg/merr"
)

var Upgrader = &websocket.Upgrader{CheckOrigin: func(req *http.Request) bool {
return true
}, HandshakeTimeout: 15 * time.Second}

type stream struct {
cond sync.Cond
initialized bool
Expand Down
61 changes: 45 additions & 16 deletions pkg/progressbar/print.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package progressbar

import (
"errors"
"fmt"
"time"

"github.com/acorn-io/acorn/pkg/client"
"github.com/acorn-io/baaah/pkg/typed"
"github.com/pterm/pterm"
)

Expand All @@ -13,28 +16,54 @@ func Print(progress <-chan client.ImageProgress) error {
bar *pterm.ProgressbarPrinter
)

for update := range progress {
if update.Error != "" {
err = errors.New(update.Error)
continue
if pterm.RawOutput {
var last client.ImageProgress
for update := range typed.Every(time.Second, progress) {
if update.Error != "" {
err = errors.New(update.Error)
continue
}
if update.Total == 0 {
continue
}
if update == last {
continue
}
fmt.Printf("[%d/%d]\n", update.Complete, update.Total)
last = update
}

if bar == nil {
bar, _ = pterm.DefaultProgressbar.
WithTotal(int(update.Total)).
WithCurrent(int(update.Complete)).Start()
if last.Total != 0 && last.Total != last.Complete {
fmt.Printf("[%d/%d]\n", last.Total, last.Total)
}
} else {
for update := range progress {
if update.Error != "" {
err = errors.New(update.Error)
continue
}

if update.Total == 0 {
// we need total to properly print status
continue
}

if int(update.Complete) > bar.Current {
bar.Add(int(update.Complete) - bar.Current)
if bar == nil {
bar, _ = pterm.DefaultProgressbar.
WithTotal(int(update.Total)).
WithCurrent(int(update.Complete)).Start()
}

if int(update.Complete) > bar.Current {
bar.Add(int(update.Complete) - bar.Current)
}
}
}

if bar != nil {
if err == nil && bar.Current != bar.Total {
bar.Add(bar.Total - bar.Current)
if bar != nil {
if err == nil && bar.Current != bar.Total {
bar.Add(bar.Total - bar.Current)
}
_, _ = bar.Stop()
}
_, _ = bar.Stop()
}

return err
Expand Down
20 changes: 14 additions & 6 deletions pkg/server/registry/apps/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import (
"net/http"

apiv1 "github.com/acorn-io/acorn/pkg/apis/api.acorn.io/v1"
"github.com/acorn-io/acorn/pkg/k8schannel"
kclient "github.com/acorn-io/acorn/pkg/k8sclient"
"github.com/acorn-io/acorn/pkg/labels"
"github.com/acorn-io/acorn/pkg/log"
"github.com/acorn-io/mink/pkg/strategy"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/endpoints/request"
Expand Down Expand Up @@ -78,10 +81,13 @@ func (i *Logs) Connect(ctx context.Context, id string, options runtime.Object, r
}()

return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusOK)
if f, ok := rw.(http.Flusher); ok {
f.Flush()
conn, err := k8schannel.Upgrader.Upgrade(rw, req, nil)
if err != nil {
logrus.Errorf("Error during handshake for app logs: %v", err)
return
}
defer conn.Close()

for message := range output {
lm := apiv1.LogMessage{
Line: message.Line,
Expand All @@ -105,11 +111,13 @@ func (i *Logs) Connect(ctx context.Context, id string, options runtime.Object, r
if err != nil {
panic("failed to marshal update: " + err.Error())
}
_, _ = rw.Write(append(data, '\n'))
if f, ok := rw.(http.Flusher); ok {
f.Flush()
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
logrus.Errorf("Error writing log message: %v", err)
break
}
}

_ = conn.CloseHandler()(websocket.CloseNormalClosure, "")
}), nil
}

Expand Down

0 comments on commit 68050ef

Please sign in to comment.