Skip to content

Commit

Permalink
perf: Update node-controller rpc service to notify nodes on volume un…
Browse files Browse the repository at this point in the history
…map (#87)

* perf: Update node-controller rpc service to notify nodes on volume unmap

Utilizing the node-controller RPC communication server, notify node
after a volume has completed controller unmap, so that it can stop
checking for device rediscovery.

Also adds re-use of the grpc.ClientConn objects for better performance,
as well as graceful shutdown of grpc clients and servers
  • Loading branch information
David-T-White committed Jun 14, 2023
1 parent 32d05a5 commit 55d7d6a
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 104 deletions.
4 changes: 3 additions & 1 deletion cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ func main() {
flag.Set("logtostderr", "true")
flag.Parse()
klog.Infof("starting storage controller (%s)", common.Version)
controller.New().Start(*bind)
c := controller.New()
defer c.Stop()
c.Start(*bind)
}
4 changes: 3 additions & 1 deletion cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ func main() {
}

klog.Infof("starting storage node plugin (%s)", common.Version)
node.New().Start(*bind)
n := node.New()
defer n.Stop()
n.Start(*bind)
}
14 changes: 0 additions & 14 deletions pkg/common/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package common
import (
"context"
"net"
"os"
"os/signal"
"runtime"
"strings"
"sync"
Expand Down Expand Up @@ -172,18 +170,6 @@ func (driver *Driver) Start(bind string) {
}
driver.socket = socket

sigc := make(chan os.Signal, 1)
signal.Notify(sigc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT,
)
go func() {
_ = <-sigc
driver.Stop()
}()

go func() {
driver.exporter.ListenAndServe()
}()
Expand Down
81 changes: 75 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import (
"errors"
"fmt"
"os"
"os/signal"
"sync"
"syscall"

storageapi "github.com/Seagate/seagate-exos-x-api-go"
"github.com/Seagate/seagate-exos-x-csi/pkg/common"
"github.com/Seagate/seagate-exos-x-csi/pkg/node_service"
pb "github.com/Seagate/seagate-exos-x-csi/pkg/node_service/node_servicepb"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -60,8 +64,9 @@ var nonAuthenticatedMethods = []string{
type Controller struct {
*common.Driver

client *storageapi.Client
runPath string
client *storageapi.Client
nodeServiceClients map[string]*grpc.ClientConn
runPath string
}

// DriverCtx contains data common to most calls
Expand All @@ -75,9 +80,10 @@ type DriverCtx struct {
func New() *Controller {
client := storageapi.NewClient()
controller := &Controller{
Driver: common.NewDriver(client.Collector),
client: client,
runPath: fmt.Sprintf("/var/run/%s", common.PluginName),
Driver: common.NewDriver(client.Collector),
client: client,
runPath: fmt.Sprintf("/var/run/%s", common.PluginName),
nodeServiceClients: map[string]*grpc.ClientConn{},
}

if err := os.MkdirAll(controller.runPath, 0755); err != nil {
Expand Down Expand Up @@ -116,14 +122,25 @@ func New() *Controller {
if err != nil {
return nil, err
}

return handler(ctx, req)
},
)

csi.RegisterIdentityServer(controller.Server, controller)
csi.RegisterControllerServer(controller.Server, controller)

sigc := make(chan os.Signal, 1)
signal.Notify(sigc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT,
)
go func() {
_ = <-sigc
controller.Stop()
}()

return controller
}

Expand Down Expand Up @@ -299,3 +316,55 @@ func runPreflightChecks(parameters map[string]string, capabilities *[]*csi.Volum

return nil
}

// Makes an RPC call to the specified node to retrieve initiators of the specified type (iSCSI,FC,SAS)
// Handles re-use of the relatively expensive grpc Channel(grpc.ClientConn)
// The gRPC stub is created and destroyed on each call
func (controller *Controller) GetNodeInitiators(ctx context.Context, nodeAddress string, protocol string) ([]string, error) {
var reqType pb.InitiatorType
switch protocol {
case common.StorageProtocolSAS:
reqType = pb.InitiatorType_SAS
case common.StorageProtocolFC:
reqType = pb.InitiatorType_FC
case common.StorageProtocolISCSI:
reqType = pb.InitiatorType_ISCSI
}

clientConnection := controller.nodeServiceClients[nodeAddress]
if clientConnection == nil {
klog.V(3).InfoS("node grpc client not found, establishing...", "nodeAddress", nodeAddress)
var err error
clientConnection, err = node_service.InitializeClient(nodeAddress)
if err != nil {
return nil, err
}
controller.nodeServiceClients[nodeAddress] = clientConnection
}
initiators, err := node_service.GetNodeInitiators(ctx, clientConnection, reqType)
return initiators, err
}

func (controller *Controller) NotifyUnmap(ctx context.Context, nodeAddress string, volumeName string) error {
clientConnection := controller.nodeServiceClients[nodeAddress]
if clientConnection == nil {
klog.V(3).InfoS("node grpc client not found, establishing...", "nodeAddress", nodeAddress)
var err error
clientConnection, err = node_service.InitializeClient(nodeAddress)
if err != nil {
return err
}
controller.nodeServiceClients[nodeAddress] = clientConnection
}
return node_service.NotifyUnmap(ctx, clientConnection, volumeName)
}

// Graceful shutdown of Node-Controller RPC Clients
func (controller *Controller) Stop() {
klog.V(3).InfoS("Controller code graceful shutdown..")
for nodeIP, clientConn := range controller.nodeServiceClients {
klog.V(3).InfoS("Closing node client", "nodeIP", nodeIP)
clientConn.Close()
}
controller.Driver.Stop()
}
35 changes: 6 additions & 29 deletions pkg/controller/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"

"github.com/Seagate/seagate-exos-x-csi/pkg/common"
"github.com/Seagate/seagate-exos-x-csi/pkg/node_service"
pb "github.com/Seagate/seagate-exos-x-csi/pkg/node_service/node_servicepb"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -28,19 +26,9 @@ func (driver *Controller) ControllerPublishVolume(ctx context.Context, req *csi.
nodeIP := req.GetNodeId()
parameters := req.GetVolumeContext()

var reqType pb.InitiatorType
switch parameters[common.StorageProtocolKey] {
case common.StorageProtocolSAS:
reqType = pb.InitiatorType_SAS
case common.StorageProtocolFC:
reqType = pb.InitiatorType_FC
case common.StorageProtocolISCSI:
reqType = pb.InitiatorType_ISCSI
}

initiators, err := node_service.GetNodeInitiators(nodeIP, reqType)
initiators, err := driver.GetNodeInitiators(ctx, nodeIP, parameters[common.StorageProtocolKey])
if err != nil {
klog.ErrorS(err, "error getting node initiators", "node-ip", nodeIP, "storage-protocol", reqType)
klog.ErrorS(err, "error getting node initiators", "node-ip", nodeIP, "storage-protocol", parameters[common.StorageProtocolKey])
return nil, status.Error(codes.NotFound, fmt.Sprintf("Could not retrieve initiators for scheduled node(%s)", nodeIP))
}

Expand All @@ -66,29 +54,16 @@ func (driver *Controller) ControllerUnpublishVolume(ctx context.Context, req *cs
}

volumeName, _ := common.VolumeIdGetName(req.GetVolumeId())

var initiators []string

nodeIP := req.GetNodeId()
storageProtocol, err := common.VolumeIdGetStorageProtocol(req.GetVolumeId())
if err != nil {
klog.ErrorS(err, "No storage protocol found in ControllerUnpublishVolume", "storage protocol", storageProtocol, "volume ID:", req.GetVolumeId())
return nil, err
}

var reqType pb.InitiatorType
switch storageProtocol {
case common.StorageProtocolSAS:
reqType = pb.InitiatorType_SAS
case common.StorageProtocolFC:
reqType = pb.InitiatorType_FC
case common.StorageProtocolISCSI:
reqType = pb.InitiatorType_ISCSI
}

initiators, err = node_service.GetNodeInitiators(nodeIP, reqType)
initiators, err := driver.GetNodeInitiators(ctx, nodeIP, storageProtocol)
if err != nil {
klog.ErrorS(err, "error getting initiators from the node", "nodeIP", nodeIP, "storage-protocol", reqType)
klog.ErrorS(err, "error getting initiators from the node", "nodeIP", nodeIP, "storageProtocol", storageProtocol)
}

klog.InfoS("unmapping volume from initiator", "volumeName", volumeName, "initiators", initiators)
Expand All @@ -100,6 +75,8 @@ func (driver *Controller) ControllerUnpublishVolume(ctx context.Context, req *cs
} else {
klog.Errorf("unknown error while unmapping initiator %s: %v", initiator, err)
}
} else {
driver.NotifyUnmap(ctx, nodeIP, volumeName)
}
}

Expand Down
64 changes: 29 additions & 35 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package node

import (
"bufio"
"context"
"fmt"
"os"
"os/exec"
"strings"
"os/signal"
"syscall"

"github.com/Seagate/csi-lib-iscsi/iscsi"
"github.com/Seagate/seagate-exos-x-csi/pkg/common"
Expand All @@ -25,10 +25,11 @@ import (
type Node struct {
*common.Driver

semaphore *semaphore.Weighted
runPath string
nodeName string
nodeIP string
semaphore *semaphore.Weighted
runPath string
nodeName string
nodeIP string
nodeServer *grpc.Server
}

// New is a convenience function for creating a node driver
Expand Down Expand Up @@ -113,8 +114,21 @@ func New() *Node {
csi.RegisterIdentityServer(node.Server, node)
csi.RegisterNodeServer(node.Server, node)

// initialize node communication service
go node_service.ListenAndServe(envServicePort)
// initialize node-controller communication service
node.nodeServer = grpc.NewServer()
go node_service.ListenAndServe(node.nodeServer, envServicePort)

sigc := make(chan os.Signal, 1)
signal.Notify(sigc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT,
)
go func() {
_ = <-sigc
node.Stop()
}()

return node
}
Expand Down Expand Up @@ -249,6 +263,13 @@ func (node *Node) getConnectorInfoPath(storageProtocol, volumeID string) string
return fmt.Sprintf("%s/%s-%s.json", node.runPath, storageProtocol, volumeID)
}

// Graceful shutdown of the node-controller RPC server
func (node *Node) Stop() {
klog.V(3).InfoS("Node graceful shutdown..")
node.nodeServer.GracefulStop()
node.Driver.Stop()
}

// checkHostBinary: Determine if a binary image is installed or not
func checkHostBinary(name string) error {
if path, err := exec.LookPath(name); err != nil {
Expand All @@ -259,30 +280,3 @@ func checkHostBinary(name string) error {

return nil
}

// readInitiatorName: Extract the initiator name from /etc/iscsi file
func readInitiatorName() (string, error) {

initiatorNameFilePath := "/etc/iscsi/initiatorname.iscsi"
file, err := os.Open(initiatorNameFilePath)
if err != nil {
return "", err
}
defer file.Close()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if equal := strings.Index(line, "="); equal >= 0 {
if strings.TrimSpace(line[:equal]) == "InitiatorName" {
return strings.TrimSpace(line[equal+1:]), nil
}
}
}

if err := scanner.Err(); err != nil {
return "", err
}

return "", fmt.Errorf("InitiatorName key is missing from %s", initiatorNameFilePath)
}
32 changes: 22 additions & 10 deletions pkg/node_service/node_service_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,43 @@ import (
"k8s.io/klog/v2"
)

// Connect to the node_service gRPC server at the given address and retrieve initiators
func GetNodeInitiators(nodeAddress string, reqType pb.InitiatorType) ([]string, error) {
func InitializeClient(nodeAddress string) (conn *grpc.ClientConn, err error) {
port, envFound := os.LookupEnv(common.NodeServicePortEnvVar)
if !envFound {
port = "978"
klog.InfoS("no node service port found in environment. using default", "port", port)
}

nodeServiceAddr := nodeAddress + ":" + port
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

conn, err := grpc.Dial(nodeServiceAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err = grpc.Dial(nodeServiceAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
klog.ErrorS(err, "Error connecting to node service", "node ip", nodeAddress, "port", port)
return nil, err
return
}
defer conn.Close()
return
}

// Connect to the node_service gRPC server at the given address and retrieve initiators
func GetNodeInitiators(ctx context.Context, conn *grpc.ClientConn, reqType pb.InitiatorType) ([]string, error) {
client := pb.NewNodeServiceClient(conn)
initiatorReq := pb.InitiatorRequest{Type: reqType}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
initiators, err := client.GetInitiators(ctx, &initiatorReq)
if err != nil {
klog.ErrorS(err, "Error during GetInitiators", "initiatorReq", initiatorReq)
klog.ErrorS(err, "Error during GetInitiators", "reqType", initiatorReq.Type)
return nil, err
}
return initiators.Initiators, nil
}

func NotifyUnmap(ctx context.Context, conn *grpc.ClientConn, volumeName string) (err error) {
client := pb.NewNodeServiceClient(conn)
unmappedVolumePb := pb.UnmappedVolume{VolumeName: volumeName}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
_, err = client.NotifyUnmap(ctx, &unmappedVolumePb)
if err != nil {
klog.ErrorS(err, "Error during unmap notification", "unmappedVolumeName", volumeName)
}
return
}
Loading

0 comments on commit 55d7d6a

Please sign in to comment.