Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions agent/client/cancel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package client

import (
"time"

"github.com/go-zoox/command/agent/event"
"github.com/go-zoox/core-utils/fmt"
"github.com/go-zoox/logger"
)

func (c *client) Cancel() error {
logger.Debugf("cancel event")

err := c.sendEvent(&event.Event{
Type: event.Cancel,
})

if err != nil {
return err
}

timer := time.NewTicker(30 * time.Second)
select {
case <-c.core.Context().Done():
return c.core.Context().Err()
case <-c.cancelEventDone:
return nil
case <-timer.C:
return fmt.Errorf("timeout to wait start event")
}
}
91 changes: 91 additions & 0 deletions agent/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package client

import (
"io"
"os"

"github.com/go-zoox/command/agent/event"
"github.com/go-zoox/command/terminal"
"github.com/go-zoox/logger"
"github.com/go-zoox/websocket"

command "github.com/go-zoox/command/config"
)

type Client interface {
Connect() error
Close() error
//
New(command *command.Config) error
//
Start() error
Wait() error
Cancel() error
//
SetStdin(stdin io.Reader) error
SetStdout(stdout io.Writer) error
SetStderr(stderr io.Writer) error
//
Run() error
//
Terminal() (terminal.Terminal, error)
}

type client struct {
opt *Option
//
core websocket.Conn
//
stdin io.Reader
stdout io.Writer
stderr io.Writer
//
exitcodeCh chan int
//
newEventDone chan struct{}
startEventDone chan struct{}
waitEventDone chan struct{}
cancelEventDone chan struct{}
}

type Option struct {
Server string
}

func New(opts ...func(opt *Option)) (Client, error) {
opt := &Option{
Server: "ws://localhost:8080",
}
for _, o := range opts {
o(opt)
}

return &client{
opt: opt,
//
stdin: os.Stdin,
stdout: os.Stdout,
stderr: os.Stderr,
//
exitcodeCh: make(chan int),
//
newEventDone: make(chan struct{}),
startEventDone: make(chan struct{}),
waitEventDone: make(chan struct{}),
cancelEventDone: make(chan struct{}),
}, nil
}

func (c *client) sendEvent(evt *event.Event) error {
s, err := evt.Encode()
if err != nil {
return err
}

logger.Debugf("send event to server: %s", s)
if err := c.core.WriteTextMessage(s); err != nil {
return err
}

return nil
}
5 changes: 5 additions & 0 deletions agent/client/close.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package client

func (c *client) Close() error {
return c.core.Close()
}
107 changes: 107 additions & 0 deletions agent/client/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package client

import (
"github.com/go-zoox/command/agent/event"
"github.com/go-zoox/core-utils/fmt"
"github.com/go-zoox/logger"
"github.com/go-zoox/websocket"
"github.com/go-zoox/websocket/conn"
"github.com/spf13/cast"
)

func (c *client) Connect() error {
logger.Debugf("create websocket client to %s", c.opt.Server)

if c.opt.Server == "" {
return fmt.Errorf("server address is required")
}

ws, err := websocket.NewClient(func(opt *websocket.ClientOption) {
opt.Addr = c.opt.Server
})
if err != nil {
return err
}

connected := make(chan struct{})

logger.Debugf("listen on close event ...")
ws.OnClose(func(conn conn.Conn, code int, message string) error {
c.stderr.Write([]byte(message))
c.exitcodeCh <- code
return nil
})

logger.Debugf("listen on connect event ...")
ws.OnConnect(func(cc websocket.Conn) error {
c.core = cc

connected <- struct{}{}

cc.OnMessage(func(typ int, message []byte) error {
if typ != conn.TextMessage {
return nil
}

evt := &event.Event{}
if err := evt.Decode(message); err != nil {
return err
}

logger.Debugf("receive event from server: %s", message)

switch evt.Type {
case event.Done:
responseEvent := &event.DoneEvent{}
if err := responseEvent.Decode(message); err != nil {
return err
}
switch string(responseEvent.Payload) {
case event.New:
c.newEventDone <- struct{}{}
case event.Start:
c.startEventDone <- struct{}{}
case event.Wait:
c.waitEventDone <- struct{}{}
case event.Cancel:
c.cancelEventDone <- struct{}{}
}
case event.Stdout:
stdoutEvent := &event.StdoutEvent{}
if err := stdoutEvent.Decode(message); err != nil {
return err
}

c.stdout.Write(stdoutEvent.Payload)
case event.Stderr:
stderrEvent := &event.StderrEvent{}
if err := stderrEvent.Decode(message); err != nil {
return err
}

c.stderr.Write(stderrEvent.Payload)
case event.Exitcode:
exitcodeEvent := &event.ExitcodeEvent{}
if err := exitcodeEvent.Decode(message); err != nil {
return err
}

exitcode := cast.ToInt(string(exitcodeEvent.Payload))
c.exitcodeCh <- exitcode
}

return nil
})

return nil
})

logger.Debugf("connect to server ...")
if err := ws.Connect(); err != nil {
return err
}

<-connected
logger.Debugf("connected")
return nil
}
18 changes: 18 additions & 0 deletions agent/client/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package client

import "io"

func (c *client) SetStdin(stdin io.Reader) error {
c.stdin = stdin
return nil
}

func (c *client) SetStdout(stdout io.Writer) error {
c.stdout = stdout
return nil
}

func (c *client) SetStderr(stderr io.Writer) error {
c.stderr = stderr
return nil
}
32 changes: 32 additions & 0 deletions agent/client/new.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package client

import (
"fmt"
"time"

"github.com/go-zoox/command/agent/event"
command "github.com/go-zoox/command/config"
"github.com/go-zoox/logger"
)

func (c *client) New(command *command.Config) error {
logger.Debugf("new event with command: %s", command.Command)

err := c.sendEvent(&event.Event{
Type: event.New,
Payload: command,
})
if err != nil {
return err
}

timer := time.NewTicker(30 * time.Second)
select {
case <-c.core.Context().Done():
return c.core.Context().Err()
case <-c.newEventDone:
return nil
case <-timer.C:
return fmt.Errorf("timeout to await new event")
}
}
9 changes: 9 additions & 0 deletions agent/client/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package client

func (c *client) Run() error {
if err := c.Start(); err != nil {
return err
}

return c.Wait()
}
30 changes: 30 additions & 0 deletions agent/client/start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package client

import (
"time"

"github.com/go-zoox/command/agent/event"
"github.com/go-zoox/core-utils/fmt"
"github.com/go-zoox/logger"
)

func (c *client) Start() error {
logger.Debugf("start event")

err := c.sendEvent(&event.Event{
Type: event.Start,
})
if err != nil {
return err
}

timer := time.NewTicker(30 * time.Second)
select {
case <-c.core.Context().Done():
return c.core.Context().Err()
case <-c.startEventDone:
return nil
case <-timer.C:
return fmt.Errorf("timeout to wait start event")
}
}
11 changes: 11 additions & 0 deletions agent/client/terminal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package client

import (
"fmt"

"github.com/go-zoox/command/terminal"
)

func (c *client) Terminal() (terminal.Terminal, error) {
return nil, fmt.Errorf("not implemented in agent")
}
35 changes: 35 additions & 0 deletions agent/client/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package client

import (
"github.com/go-zoox/command/agent/event"
"github.com/go-zoox/command/errors"
"github.com/go-zoox/logger"
)

func (c *client) Wait() error {
logger.Debugf("wait event")

err := c.sendEvent(&event.Event{
Type: event.Wait,
})
if err != nil {
return err
}

select {
case <-c.core.Context().Done():
return c.core.Context().Err()
case <-c.waitEventDone:
logger.Debugf("wait for exit code ...")
code := <-c.exitcodeCh
logger.Debugf("exit code is %d", code)

if code == 0 {
return nil
}

return &errors.ExitError{
Code: code,
}
}
}
6 changes: 6 additions & 0 deletions agent/event/cancel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package event

const Cancel = "cancel"

type CancelEvent struct {
}
Loading