Skip to content

Commit

Permalink
Add NodeStage to common csi client lib
Browse files Browse the repository at this point in the history
  • Loading branch information
fierlion committed Sep 13, 2023
1 parent 21bd233 commit dd40a8f
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 17 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -9,6 +9,7 @@ _bin/
*.orig
/agent/version/_version.go
/ecs-init/version/version.go
/ecs-agent/daemonimages/csidriver/tarfiles
.agignore
*.sublime-*
.DS_Store
Expand Down
118 changes: 101 additions & 17 deletions ecs-agent/csiclient/csi_client.go
Expand Up @@ -4,7 +4,7 @@
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
Expand All @@ -25,16 +25,29 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
v1 "k8s.io/api/core/v1"
)

const (
PROTOCOL = "unix"
PROTOCOL = "unix"
fsTypeBlockName = "block"
)

// CSIClient is an interface that specifies all supported operations in the Container Storage Interface(CSI)
// driver for Agent uses. The CSI driver provides many volume related operations to manage the lifecycle of
// Amazon EBS volumes, including mounting, umounting, resizing and volume stats.
type CSIClient interface {
NodeStageVolume(ctx context.Context,
volID string,
publishContext map[string]string,
stagingTargetPath string,
fsType string,
accessMode v1.PersistentVolumeAccessMode,
secrets map[string]string,
volumeContext map[string]string,
mountOptions []string,
fsGroup *int64,
) error
GetVolumeMetrics(volumeId string, hostMountPath string) (*Metrics, error)
}

Expand All @@ -44,31 +57,83 @@ type csiClient struct {
}

// NewCSIClient creates a CSI client for the communication with CSI driver daemon.
func NewCSIClient(socketIn string) CSIClient {
return &csiClient{csiSocket: socketIn}
func NewCSIClient(socketIn string) csiClient {
return csiClient{csiSocket: socketIn}
}

// GetVolumeMetrics returns volume usage.
func (cc *csiClient) GetVolumeMetrics(volumeId string, hostMountPath string) (*Metrics, error) {
// Set up a connection to the server
dialer := func(addr string, t time.Duration) (net.Conn, error) {
return net.Dial(PROTOCOL, addr)
func (cc *csiClient) NodeStageVolume(ctx context.Context,
volID string,
publishContext map[string]string,
stagingTargetPath string,
fsType string,
accessMode v1.PersistentVolumeAccessMode,
secrets map[string]string,
volumeContext map[string]string,
mountOptions []string,
fsGroup *int64,
) error {
conn, err := grpcDialConnect()
if err != nil {
logger.Error("NodeStage: CSI Connection Error")
return err
}
conn, err := grpc.Dial(
cc.csiSocket,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDialer(dialer),
)
defer conn.Close()

client := csi.NewNodeClient(conn)

defaultVolumeCapability := &csi.VolumeCapability{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
}
req := csi.NodeStageVolumeRequest{
VolumeId: volID,
PublishContext: publishContext,
StagingTargetPath: stagingTargetPath,
VolumeCapability: defaultVolumeCapability,
Secrets: secrets,
VolumeContext: volumeContext,
}

if fsType == fsTypeBlockName {
req.VolumeCapability.AccessType = &csi.VolumeCapability_Block{
Block: &csi.VolumeCapability_BlockVolume{},
}
} else {
mountVolume := &csi.VolumeCapability_MountVolume{
FsType: fsType,
MountFlags: mountOptions,
}
req.VolumeCapability.AccessType = &csi.VolumeCapability_Mount{
Mount: mountVolume,
}
}

_, err = client.NodeStageVolume(ctx, &req)

if err != nil {
logger.Error("Error building a connection to CSI driver", logger.Fields{
logger.Error("Error staging volume via CSI driver", logger.Fields{
field.Error: err,
"Socket": cc.csiSocket,
})
return nil, err
return err
}
return nil
}

// GetVolumeMetrics returns volume usage.
func (cc *csiClient) GetVolumeMetrics(volumeId string, hostMountPath string) (*Metrics, error) {
conn, err := grpcDialConnect()
if err != nil {
logger.Error("GetVolumeMetrics: CSI Connection Error")
return err
}
defer conn.Close()

client := csi.NewNodeClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

Expand Down Expand Up @@ -112,3 +177,22 @@ func (cc *csiClient) GetVolumeMetrics(volumeId string, hostMountPath string) (*M
Capacity: totalBytes,
}, nil
}

func grpcDialConnect() (*grpc.ClientConn, err) {
dialer := func(addr string, t time.Duration) (net.Conn, error) {
return net.Dial(PROTOCOL, addr)
}
conn, err := grpc.Dial(
cc.csiSocket,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDialer(dialer),
)
if err != nil {
logger.Error("Error building a connection to CSI driver", logger.Fields{
field.Error: err,
"Socket": cc.csiSocket,
})
return nil, err
}
return conn, nil
}

0 comments on commit dd40a8f

Please sign in to comment.