Skip to content
This repository has been archived by the owner on Nov 30, 2021. It is now read-only.

Commit

Permalink
fix(upstream): rebase against fleet upstream changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel Monroy committed Aug 9, 2014
1 parent 4c509b1 commit f7363fa
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 38 deletions.
8 changes: 5 additions & 3 deletions client.go
Expand Up @@ -34,6 +34,8 @@ func NewClient() (*FleetClient, error) {
if err != nil {
return nil, err
}
// set global client
cAPI = client
return &FleetClient{Fleet: client}, nil
}

Expand Down Expand Up @@ -63,7 +65,7 @@ func (c *FleetClient) Create(component string, data bool) (err error) {
if err != nil {
return err
}
errchan := waitForJobStates(c.Fleet, []string{unitName}, testJobStateLoaded, 0, os.Stdout)
errchan := waitForJobStates([]string{unitName}, testJobStateLoaded, 0, os.Stdout)
for err := range errchan {
return fmt.Errorf("error waiting for job %s: %v", unitName, err)
}
Expand Down Expand Up @@ -169,7 +171,7 @@ func (c *FleetClient) Start(target string, data bool) (err error) {
}
}
if data == false {
errchan := waitForJobStates(c.Fleet, units, testUnitStateActive, 0, os.Stdout)
errchan := waitForJobStates(units, testUnitStateActive, 0, os.Stdout)
for err := range errchan {
return fmt.Errorf("error waiting for active: %v", err)
}
Expand All @@ -190,7 +192,7 @@ func (c *FleetClient) Stop(target string) (err error) {
return err
}
}
errchan := waitForJobStates(c.Fleet, units, testJobStateInactive, 0, os.Stdout)
errchan := waitForJobStates(units, testJobStateInactive, 0, os.Stdout)
for err := range errchan {
return fmt.Errorf("error waiting for inactive: %v", err)
}
Expand Down
11 changes: 9 additions & 2 deletions fleet.go
Expand Up @@ -6,8 +6,10 @@ import (
"net"
"net/http"
"strings"
"time"

"github.com/coreos/fleet/client"
"github.com/coreos/fleet/machine"
"github.com/coreos/fleet/registry"
"github.com/coreos/fleet/ssh"
)
Expand All @@ -25,6 +27,12 @@ var Flags = struct {
Tunnel string
}{}

// global API client used by commands
var cAPI client.API
// used to cache MachineStates
var machineStates map[string]*machine.MachineState
var requestTimeout time.Duration = time.Duration(10) * time.Second

func getTunnelFlag() string {
tun := Flags.Tunnel
if tun != "" && !strings.Contains(tun, ":") {
Expand Down Expand Up @@ -67,10 +75,9 @@ func getRegistryClient() (client.API, error) {
InsecureSkipVerify: true,
},
}
return client.NewRegistryClient(&trans, Flags.Endpoint, Flags.EtcdKeyPrefix)
return client.NewRegistryClient(&trans, Flags.Endpoint, Flags.EtcdKeyPrefix, requestTimeout)
}


// randomMachineID return a random machineID from the Fleet cluster
func randomMachineID(c *FleetClient) (machineID string, err error) {
machineState, err := c.Fleet.Machines()
Expand Down
29 changes: 25 additions & 4 deletions list.go
Expand Up @@ -30,6 +30,9 @@ var (
}
return "-"
},
"dstate": func(j *job.Job, full bool) string {
return string(j.TargetState)
},
"load": func(j *job.Job, full bool) string {
us := j.UnitState
if us == nil {
Expand Down Expand Up @@ -60,16 +63,34 @@ var (
},
"machine": func(j *job.Job, full bool) string {
us := j.UnitState
if us == nil || us.MachineState == nil {
if us == nil || us.MachineID == "" {
return "-"
}
return machineFullLegend(*us.MachineState, full)
ms := cachedMachineState(us.MachineID)
if ms == nil {
ms = &machine.MachineState{ID: us.MachineID}
}
return machineFullLegend(*ms, full)
},
"tmachine": func(j *job.Job, full bool) string {
if j.TargetMachineID == "" {
return "-"
}
ms := cachedMachineState(j.TargetMachineID)
if ms == nil {
ms = &machine.MachineState{ID: j.TargetMachineID}
}
return machineFullLegend(*ms, full)
},
"hash": func(j *job.Job, full bool) string {
us := j.UnitState
if us == nil || us.UnitHash == "" {
return "-"
}
if !full {
return j.UnitHash.Short()
return us.UnitHash[:7]
}
return j.UnitHash.String()
return us.UnitHash
},
}
)
Expand Down
58 changes: 47 additions & 11 deletions ssh.go
Expand Up @@ -11,26 +11,30 @@ import (
"github.com/coreos/fleet/ssh"
)

// runCommand will attempt to run a command on a given machine
// It will attempt to SSH to the machine if it is identified as being remote.
func runCommand(cmd string, ms *machine.MachineState) (retcode int) {
// runCommand will attempt to run a command on a given machine. It will attempt
// to SSH to the machine if it is identified as being remote.
func runCommand(cmd string, machID string) (retcode int) {
var err error
if machine.IsLocalMachineState(ms) {
if machine.IsLocalMachineID(machID) {
err, retcode = runLocalCommand(cmd)
if err != nil {
fmt.Printf("Error running local command: %v\n", err)
}
} else {
err, retcode = runRemoteCommand(cmd, ms.PublicIP)
if err != nil {
fmt.Printf("Error running remote command: %v\n", err)
ms, err := machineState(machID)
if err != nil || ms == nil {
fmt.Printf("Error getting machine IP: %v\n", err)
} else {
err, retcode = runRemoteCommand(cmd, ms.PublicIP)
if err != nil {
fmt.Printf("Error running remote command: %v\n", err)
}
}
}
return
}

// runLocalCommand runs the given command locally
// and returns any error encountered and the exit code of the command
// runLocalCommand runs the given command locally and returns any error encountered and the exit code of the command
func runLocalCommand(cmd string) (error, int) {
cmdSlice := strings.Split(cmd, " ")
osCmd := exec.Command(cmdSlice[0], cmdSlice[1:]...)
Expand All @@ -51,8 +55,8 @@ func runLocalCommand(cmd string) (error, int) {
return nil, 0
}

// runRemoteCommand runs the given command over SSH on the given IP
// and returns any error encountered and the exit status of the command
// runRemoteCommand runs the given command over SSH on the given IP, and returns
// any error encountered and the exit status of the command
func runRemoteCommand(cmd string, addr string) (err error, exit int) {
var sshClient *ssh.SSHForwardingClient
if tun := getTunnelFlag(); tun != "" {
Expand All @@ -63,6 +67,38 @@ func runRemoteCommand(cmd string, addr string) (err error, exit int) {
if err != nil {
return err, -1
}

defer sshClient.Close()

return ssh.Execute(sshClient, cmd)
}

func machineState(machID string) (*machine.MachineState, error) {
machines, err := cAPI.Machines()
if err != nil {
return nil, err
}
for _, ms := range machines {
if ms.ID == machID {
return &ms, nil
}
}
return nil, nil
}

// cachedMachineState makes a best-effort to retrieve the MachineState of the given machine ID.
// It memoizes MachineState information for the life of a fleetctl invocation.
// Any error encountered retrieving the list of machines is ignored.
func cachedMachineState(machID string) (ms *machine.MachineState) {
if machineStates == nil {
machineStates = make(map[string]*machine.MachineState)
ms, err := cAPI.Machines()
if err != nil {
return nil
}
for i, m := range ms {
machineStates[m.ID] = &ms[i]
}
}
return machineStates[machID]
}
24 changes: 7 additions & 17 deletions state.go
Expand Up @@ -7,7 +7,6 @@ import (
"sync"
"time"

"github.com/coreos/fleet/client"
"github.com/coreos/fleet/job"
)

Expand Down Expand Up @@ -38,12 +37,12 @@ func testUnitStateActive(j *job.Job) bool {
// than zero. Returned is an error channel used to communicate when
// timeouts occur. The returned error channel will be closed after all
// polling operation is complete.
func waitForJobStates(cAPI client.API, jobs []string, test testJob, maxAttempts int, out io.Writer) chan error {
func waitForJobStates(jobs []string, test testJob, maxAttempts int, out io.Writer) chan error {
errchan := make(chan error)
var wg sync.WaitGroup
for _, name := range jobs {
wg.Add(1)
go checkJobState(cAPI, name, test, maxAttempts, out, &wg, errchan)
go checkJobState(name, test, maxAttempts, out, &wg, errchan)
}
go func() {
wg.Wait()
Expand All @@ -52,19 +51,19 @@ func waitForJobStates(cAPI client.API, jobs []string, test testJob, maxAttempts
return errchan
}

func checkJobState(cAPI client.API, jobName string, test testJob, maxAttempts int, out io.Writer, wg *sync.WaitGroup, errchan chan error) {
func checkJobState(jobName string, test testJob, maxAttempts int, out io.Writer, wg *sync.WaitGroup, errchan chan error) {
defer wg.Done()
sleep := 100 * time.Millisecond
if maxAttempts < 1 {
for {
if assertJobState(cAPI, jobName, test, out) {
if assertJobState(jobName, test, out) {
return
}
time.Sleep(sleep)
}
} else {
for attempt := 0; attempt < maxAttempts; attempt++ {
if assertJobState(cAPI, jobName, test, out) {
if assertJobState(jobName, test, out) {
return
}
time.Sleep(sleep)
Expand All @@ -73,7 +72,7 @@ func checkJobState(cAPI client.API, jobName string, test testJob, maxAttempts in
}
}

func assertJobState(cAPI client.API, name string, test testJob, out io.Writer) (ret bool) {
func assertJobState(name string, test testJob, out io.Writer) (ret bool) {
j, err := cAPI.Job(name)
if err != nil {
fmt.Fprintf(os.Stderr, "Error retrieving Job(%s) from Registry: %v", name, err)
Expand All @@ -94,21 +93,12 @@ func assertJobState(cAPI client.API, name string, test testJob, out io.Writer) (
msg = fmt.Sprintf("\033[0;33m%v:\033[0m %v", name, *(j.State))
}

tgt, err := cAPI.JobTarget(name)
if err != nil {
fmt.Fprintf(os.Stderr, "Error retrieving target information for Job(%s) from Registry: %v", name, err)
return
}
if tgt == "" {
return
}

machines, err := cAPI.Machines()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed retrieving list of Machines from Registry: %v", err)
}
for _, ms := range machines {
if ms.ID != tgt {
if ms.ID != j.TargetMachineID {
continue
}
msg = fmt.Sprintf("%s on %s", msg, machineFullLegend(ms, false))
Expand Down
2 changes: 1 addition & 1 deletion status.go
Expand Up @@ -22,5 +22,5 @@ func printUnitStatus(cAPI client.API, jobName string) int {
return 1
}
cmd := fmt.Sprintf("systemctl status -l %s", jobName)
return runCommand(cmd, j.UnitState.MachineState)
return runCommand(cmd, j.UnitState.MachineID)
}

0 comments on commit f7363fa

Please sign in to comment.