Skip to content
This repository has been archived by the owner on Jul 6, 2023. It is now read-only.

Commit

Permalink
Merge pull request #185 from lpabon/t181
Browse files Browse the repository at this point in the history
Optimize ssh connections
  • Loading branch information
Luis Pabón committed Sep 2, 2015
2 parents cd13fd8 + 1474d17 commit f88c555
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 87 deletions.
3 changes: 3 additions & 0 deletions apps/glusterfs/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func NewApp(configIo io.Reader) *App {
default:
return nil
}
if app.executor == nil {
return nil
}
logger.Debug("Loaded %v executor", app.conf.Executor)

// Set db is set in the configuration file
Expand Down
20 changes: 4 additions & 16 deletions executors/sshexec/brick.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package sshexec
import (
"fmt"
"github.com/heketi/heketi/executors"
"github.com/heketi/heketi/utils/ssh"
"github.com/lpabon/godbc"
)

Expand All @@ -33,11 +32,6 @@ func (s *SshExecutor) BrickCreate(host string,
godbc.Require(brick.TpSize >= brick.Size)
godbc.Require(brick.VgId != "")

exec := ssh.NewSshExecWithKeyFile(logger, s.user, s.private_keyfile)
if exec == nil {
return nil, ErrSshPrivateKey
}

logger.Info("Creating brick on host %v", host)
commands := []string{

Expand Down Expand Up @@ -73,7 +67,7 @@ func (s *SshExecutor) BrickCreate(host string,
}

// Execute commands
_, err := exec.ConnectAndExec(host+":22", commands, 10)
_, err := s.sshExec(host, commands, 10)
if err != nil {
return nil, err
}
Expand All @@ -95,17 +89,11 @@ func (s *SshExecutor) BrickDestroy(host string,
godbc.Require(brick.Name != "")
godbc.Require(brick.VgId != "")

// Setup ssh session
exec := ssh.NewSshExecWithKeyFile(logger, s.user, s.private_keyfile)
if exec == nil {
return ErrSshPrivateKey
}

// Try to unmount first
commands := []string{
fmt.Sprintf("sudo umount /brick_%v", brick.Name),
}
_, err := exec.ConnectAndExec(host+":22", commands, 5)
_, err := s.sshExec(host, commands, 5)
if err != nil {
logger.Err(err)
}
Expand All @@ -114,7 +102,7 @@ func (s *SshExecutor) BrickDestroy(host string,
commands = []string{
fmt.Sprintf("sudo lvremove -f vg_%v/tp_%v", brick.VgId, brick.Name),
}
_, err = exec.ConnectAndExec(host+":22", commands, 5)
_, err = s.sshExec(host, commands, 5)
if err != nil {
logger.Err(err)
}
Expand All @@ -123,7 +111,7 @@ func (s *SshExecutor) BrickDestroy(host string,
commands = []string{
fmt.Sprintf("sudo rmdir /brick_%v", brick.Name),
}
_, err = exec.ConnectAndExec(host+":22", commands, 5)
_, err = s.sshExec(host, commands, 5)
if err != nil {
logger.Err(err)
}
Expand Down
24 changes: 3 additions & 21 deletions executors/sshexec/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"github.com/heketi/heketi/executors"
"github.com/heketi/heketi/utils/ssh"
"strconv"
"strings"
)
Expand All @@ -35,20 +34,14 @@ const (

func (s *SshExecutor) DeviceSetup(host, device, vgid string) (d *executors.DeviceInfo, e error) {

// Setup ssh session
exec := ssh.NewSshExecWithKeyFile(logger, s.user, s.private_keyfile)
if exec == nil {
return nil, ErrSshPrivateKey
}

// Setup commands
commands := []string{
fmt.Sprintf("sudo pvcreate %v", device),
fmt.Sprintf("sudo vgcreate vg_%v %v", vgid, device),
}

// Execute command
_, err := exec.ConnectAndExec(host+":22", commands, 5)
_, err := s.sshExec(host, commands, 5)
if err != nil {
return nil, err
}
Expand All @@ -71,11 +64,6 @@ func (s *SshExecutor) DeviceSetup(host, device, vgid string) (d *executors.Devic
}

func (s *SshExecutor) DeviceTeardown(host, device, vgid string) error {
// Setup ssh session
exec := ssh.NewSshExecWithKeyFile(logger, s.user, s.private_keyfile)
if exec == nil {
return ErrSshPrivateKey
}

// Setup commands
commands := []string{
Expand All @@ -84,7 +72,7 @@ func (s *SshExecutor) DeviceTeardown(host, device, vgid string) error {
}

// Execute command
_, err := exec.ConnectAndExec(host+":22", commands, 5)
_, err := s.sshExec(host, commands, 5)
if err != nil {
return err
}
Expand All @@ -96,19 +84,13 @@ func (s *SshExecutor) getVgSizeFromNode(
d *executors.DeviceInfo,
host, device, vgid string) error {

// Setup ssh session
exec := ssh.NewSshExecWithKeyFile(logger, s.user, s.private_keyfile)
if exec == nil {
return ErrSshPrivateKey
}

// Setup command
commands := []string{
fmt.Sprintf("sudo vgdisplay -c vg_%v", vgid),
}

// Execute command
b, err := exec.ConnectAndExec(host+":22", commands, 5)
b, err := s.sshExec(host, commands, 5)
if err != nil {
return err
}
Expand Down
25 changes: 7 additions & 18 deletions executors/sshexec/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,37 @@ package sshexec

import (
"fmt"
"github.com/heketi/heketi/utils/ssh"
"github.com/lpabon/godbc"
)

func (s *SshExecutor) PeerProbe(exec_host, newnode string) error {
func (s *SshExecutor) PeerProbe(host, newnode string) error {

godbc.Require(exec_host != "")
godbc.Require(host != "")
godbc.Require(newnode != "")

exec := ssh.NewSshExecWithKeyFile(logger, s.user, s.private_keyfile)
if exec == nil {
return ErrSshPrivateKey
}

logger.Info("Probing: %v -> %v", exec_host, newnode)
logger.Info("Probing: %v -> %v", host, newnode)
// create the commands
commands := []string{
fmt.Sprintf("sudo gluster peer probe %v", newnode),
}
_, err := exec.ConnectAndExec(exec_host+":22", commands, 5)
_, err := s.sshExec(host, commands, 5)
if err != nil {
return err
}

return nil
}

func (s *SshExecutor) PeerDetach(exec_host, detachnode string) error {
godbc.Require(exec_host != "")
func (s *SshExecutor) PeerDetach(host, detachnode string) error {
godbc.Require(host != "")
godbc.Require(detachnode != "")

exec := ssh.NewSshExecWithKeyFile(logger, s.user, s.private_keyfile)
if exec == nil {
return ErrSshPrivateKey
}

// create the commands
logger.Info("Detaching node %v", detachnode)
commands := []string{
fmt.Sprintf("sudo gluster peer detach %v", detachnode),
}
_, err := exec.ConnectAndExec(exec_host+":22", commands, 5)
_, err := s.sshExec(host, commands, 5)
if err != nil {
logger.Err(err)
}
Expand Down
52 changes: 52 additions & 0 deletions executors/sshexec/sshexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@ package sshexec
import (
"errors"
"github.com/heketi/heketi/utils"
"github.com/heketi/heketi/utils/ssh"
"github.com/lpabon/godbc"
"os"
"sync"
)

const (
DEFAULT_MAX_CONNECTIONS = 8
)

type SshExecutor struct {
private_keyfile string
user string
throttlemap map[string]chan bool
lock sync.Mutex
exec *ssh.SshExec
config *SshConfig
}

Expand All @@ -46,6 +55,7 @@ func NewSshExecutor(config *SshConfig) *SshExecutor {
godbc.Require(config != nil)

s := &SshExecutor{}
s.throttlemap = make(map[string]chan bool)

// Set configuration
if config.PrivateKeyFile == "" {
Expand All @@ -66,10 +76,52 @@ func NewSshExecutor(config *SshConfig) *SshExecutor {
logger.Warning("Rebalance on volume expansion has been enabled. This is an EXPERIMENTAL feature")
}

// Setup key
s.exec = ssh.NewSshExecWithKeyFile(logger, s.user, s.private_keyfile)
if s.exec == nil {
logger.LogError("Unable to load ssh user and private keyfile")
return nil
}

godbc.Ensure(s != nil)
godbc.Ensure(s.config == config)
godbc.Ensure(s.user != "")
godbc.Ensure(s.private_keyfile != "")

return s
}

func (s *SshExecutor) accessConnection(host string) {

var (
c chan bool
ok bool
)

s.lock.Lock()
if c, ok = s.throttlemap[host]; !ok {
c = make(chan bool, DEFAULT_MAX_CONNECTIONS)
s.throttlemap[host] = c
}
s.lock.Unlock()

c <- true
}

func (s *SshExecutor) freeConnection(host string) {
s.lock.Lock()
c := s.throttlemap[host]
s.lock.Unlock()

<-c
}

func (s *SshExecutor) sshExec(host string, commands []string, timeoutMinutes int) ([]string, error) {

// Throttle
s.accessConnection(host)
defer s.freeConnection(host)

// Execute
return s.exec.ConnectAndExec(host+":22", commands, timeoutMinutes)
}
25 changes: 3 additions & 22 deletions executors/sshexec/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package sshexec
import (
"fmt"
"github.com/heketi/heketi/executors"
"github.com/heketi/heketi/utils/ssh"
"github.com/lpabon/godbc"
)

Expand All @@ -36,12 +35,6 @@ func (s *SshExecutor) VolumeCreate(host string,
godbc.Require(volume.Name != "")
godbc.Require(volume.Replica > 1)

// Setup ssh key
exec := ssh.NewSshExecWithKeyFile(logger, s.user, s.private_keyfile)
if exec == nil {
return nil, ErrSshPrivateKey
}

// Setup volume create command
// There could many, many bricks which could make the command line
// too long. Instead, create the volume first, then add each brick set.
Expand All @@ -65,7 +58,7 @@ func (s *SshExecutor) VolumeCreate(host string,
commands = append(commands, fmt.Sprintf("sudo gluster volume start %v", volume.Name))

// Execute command
_, err := exec.ConnectAndExec(host+":22", commands, 10)
_, err := s.sshExec(host, commands, 10)
if err != nil {
return nil, err
}
Expand All @@ -81,12 +74,6 @@ func (s *SshExecutor) VolumeExpand(host string,
godbc.Require(len(volume.Bricks) > 0)
godbc.Require(volume.Name != "")

// Setup ssh key
exec := ssh.NewSshExecWithKeyFile(logger, s.user, s.private_keyfile)
if exec == nil {
return nil, ErrSshPrivateKey
}

// Setup volume create command
commands := s.createAddBrickCommands(volume, 0 /* start at the beginning of the brick list */)

Expand All @@ -97,7 +84,7 @@ func (s *SshExecutor) VolumeExpand(host string,
}

// Execute command
_, err := exec.ConnectAndExec(host+":22", commands, 10)
_, err := s.sshExec(host, commands, 10)
if err != nil {
return nil, err
}
Expand All @@ -109,12 +96,6 @@ func (s *SshExecutor) VolumeDestroy(host string, volume string) error {
godbc.Require(host != "")
godbc.Require(volume != "")

// Setup ssh key
exec := ssh.NewSshExecWithKeyFile(logger, s.user, s.private_keyfile)
if exec == nil {
return ErrSshPrivateKey
}

// Shutdown volume
commands := []string{
// stop gluster volume
Expand All @@ -123,7 +104,7 @@ func (s *SshExecutor) VolumeDestroy(host string, volume string) error {
}

// Execute command
_, err := exec.ConnectAndExec(host+":22", commands, 5)
_, err := s.sshExec(host, commands, 5)
if err != nil {
return err
}
Expand Down
10 changes: 0 additions & 10 deletions utils/ssh/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ type SshExec struct {
logger *utils.Logger
}

var (
maxconnections = make(chan bool, 8)
)

func getKeyFile(file string) (key ssh.Signer, err error) {
buf, err := ioutil.ReadFile(file)
if err != nil {
Expand Down Expand Up @@ -111,12 +107,6 @@ func NewSshExecWithKeyFile(logger *utils.Logger, user string, file string) *SshE
// This function was based from https://github.com/coreos/etcd-manager/blob/master/main.go
func (s *SshExec) ConnectAndExec(host string, commands []string, timeoutMinutes int) ([]string, error) {

// Wait here for a turn
maxconnections <- true
defer func() {
<-maxconnections
}()

buffers := make([]string, len(commands))

// :TODO: Will need a timeout here in case the server does not respond
Expand Down

0 comments on commit f88c555

Please sign in to comment.