Skip to content

Commit

Permalink
rbd: get lastsyncbytes and lastsycduration for volrep
Browse files Browse the repository at this point in the history
This commit get more information from the description
like lastsyncbytes and lastsyncduration and send them
as a response of getvolumereplicationinfo request.

Signed-off-by: Yati Padia <ypadia@redhat.com>
  • Loading branch information
yati1998 committed Jun 20, 2023
1 parent c635103 commit 0df93de
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 121 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/ceph/go-ceph v0.21.0
github.com/container-storage-interface/spec v1.8.0
github.com/csi-addons/replication-lib-utils v0.2.0
github.com/csi-addons/spec v0.2.0
github.com/csi-addons/spec v0.2.1-0.20230606140122-d20966d2e444
github.com/gemalto/kmip-go v0.0.9
github.com/golang/protobuf v1.5.3
github.com/google/fscrypt v0.3.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr
github.com/csi-addons/replication-lib-utils v0.2.0 h1:tGs42wfjkObbBo/98a3uxTFWEJ1dq5PIMqPWtdLd040=
github.com/csi-addons/replication-lib-utils v0.2.0/go.mod h1:ROQlEsc2EerVtc/K/C+6Hx8pqaQ9MVy9xFFpyKfI9lc=
github.com/csi-addons/spec v0.1.0/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/csi-addons/spec v0.2.0 h1:Ews7bxpN9P6nFxl1XvMg87cR1wLROdH1FzSfLfb4VfI=
github.com/csi-addons/spec v0.2.0/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/csi-addons/spec v0.2.1-0.20230606140122-d20966d2e444 h1:hWVCrZWVHctpWt6cQxV1I6dW3wpBDMg3Vrvu9uAuUxw=
github.com/csi-addons/spec v0.2.1-0.20230606140122-d20966d2e444/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
60 changes: 40 additions & 20 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -719,18 +720,20 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
}

description := remoteStatus.Description
lastSyncTime, err := getLastSyncTime(description)
lastSyncTime, lastSyncDuration, lastSyncBytes, err := getLastSyncInfo(description)
if err != nil {
if errors.Is(err, corerbd.ErrLastSyncTimeNotFound) {
return nil, status.Errorf(codes.NotFound, "failed to get last sync time: %v", err)
return nil, status.Errorf(codes.NotFound, "failed to get last sync info: %v", err)
}
log.ErrorLog(ctx, err.Error())

return nil, status.Errorf(codes.Internal, "failed to get last sync time: %v", err)
return nil, status.Errorf(codes.Internal, "failed to get last sync info: %v", err)
}

resp := &replication.GetVolumeReplicationInfoResponse{
LastSyncTime: lastSyncTime,
LastSyncTime: lastSyncTime,
LastSyncDuration: lastSyncDuration,
LastSyncBytes: lastSyncBytes,
}

return resp, nil
Expand All @@ -756,42 +759,59 @@ func RemoteStatus(gmis *librbd.GlobalMirrorImageStatus) (librbd.SiteMirrorImageS
return ss, err
}

// This function gets the local snapshot time from the description
// of localStatus and converts it into required type.
func getLastSyncTime(description string) (*timestamppb.Timestamp, error) {
// This function gets the local snapshot time, last sync snapshot seconds
// and last sync bytes from the description of localStatus and convert
// it into required types.
func getLastSyncInfo(description string) (*timestamppb.Timestamp, *durationpb.Duration, int64, error) {
// Format of the description will be as followed:
// description = "replaying,{"bytes_per_second":0.0,
// "bytes_per_snapshot":149504.0,"local_snapshot_timestamp":1662655501
// ,"remote_snapshot_timestamp":1662655501}"
// description = `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,
// "last_snapshot_bytes":81920,"last_snapshot_sync_seconds":0,
// "local_snapshot_timestamp":1684675261,
// "remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`
// In case there is no last snapshot bytes returns 0 as the
// LastSyncBytes is optional.
// In case there is no last snapshot sync seconds, it returns nil as the
// LastSyncDuration is optional.
// In case there is no local snapshot timestamp return an error as the
// LastSyncTime is required.

if description == "" {
return nil, fmt.Errorf("empty description: %w", corerbd.ErrLastSyncTimeNotFound)
return nil, nil, 0, fmt.Errorf("empty description: %w", corerbd.ErrLastSyncTimeNotFound)
}
splittedString := strings.SplitN(description, ",", 2)
if len(splittedString) == 1 {
return nil, fmt.Errorf("no local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound)
return nil, nil, 0, fmt.Errorf("no snapshot details: %w", corerbd.ErrLastSyncTimeNotFound)
}
type localStatus struct {
LocalSnapshotTime int64 `json:"local_snapshot_timestamp"`
LocalSnapshotTime int64 `json:"local_snapshot_timestamp"`
LastSnapshotBytes int64 `json:"last_snapshot_bytes"`
LastSnapshotDuration int64 `json:"last_snapshot_sync_seconds"`
}

var localSnapTime localStatus
err := json.Unmarshal([]byte(splittedString[1]), &localSnapTime)
var localSnapInfo localStatus
err := json.Unmarshal([]byte(splittedString[1]), &localSnapInfo)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal description: %w", err)
return nil, nil, 0, fmt.Errorf("failed to unmarshal local snapshot info: %w", err)
}

// If the json unmarsal is successful but the local snapshot time is 0, we
// need to consider it as an error as the LastSyncTime is required.
if localSnapTime.LocalSnapshotTime == 0 {
return nil, fmt.Errorf("empty local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound)
if localSnapInfo.LocalSnapshotTime == 0 {
return nil, nil, 0, fmt.Errorf("empty local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound)
}

lastUpdateTime := time.Unix(localSnapTime.LocalSnapshotTime, 0)
// converts localSnapshotTime of type int64 to time.Time
lastUpdateTime := time.Unix(localSnapInfo.LocalSnapshotTime, 0)
lastSyncTime := timestamppb.New(lastUpdateTime)

return lastSyncTime, nil
// converts localSnapshotDuration of type int64 to time.Time
lastDurationTime := time.Unix(localSnapInfo.LastSnapshotDuration, 0)
// converts time.time to time.Duration
lastDuration := time.Until(lastDurationTime)
// converts time.Duration to *durationpb.Duration
lastSyncDuration := durationpb.New(lastDuration)

return lastSyncTime, lastSyncDuration, localSnapInfo.LastSnapshotBytes, nil
}

func checkVolumeResyncStatus(localStatus librbd.SiteMirrorImageStatus) error {
Expand Down
59 changes: 49 additions & 10 deletions internal/csi-addons/rbd/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -438,58 +439,96 @@ func TestCheckRemoteSiteStatus(t *testing.T) {
}
}

func TestValidateLastSyncTime(t *testing.T) {
func TestValidateLastSyncInfo(t *testing.T) {
t.Parallel()
tests := []struct {
name string
description string
timestamp *timestamppb.Timestamp
duration *durationpb.Duration
synctime *timestamppb.Timestamp
bytes int64
expectedErr string
}{
{
"valid description",
//nolint:lll // sample output cannot be split into multiple lines.
`replaying,{"bytes_per_second":0.0,"bytes_per_snapshot":149504.0,"local_snapshot_timestamp":1662655501,"remote_snapshot_timestamp":1662655501}`,
`replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
durationpb.New(time.Until(time.Unix(56743, 0))),
timestamppb.New(time.Unix(1662655501, 0)),
81920,
"",
},
{
"empty description",
"",
nil,
nil,
0,
corerbd.ErrLastSyncTimeNotFound.Error(),
},
{
"description without local_snapshot_timestamp",
`replaying,{"bytes_per_second":0.0,"bytes_per_snapshot":149504.0,"remote_snapshot_timestamp":1662655501}`,
"description without last_snapshot_bytes",
//nolint:lll // sample output cannot be split into multiple lines.
`replaying, {"bytes_per_second":0.0,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
durationpb.New(time.Until(time.Unix(56743, 0))),
timestamppb.New(time.Unix(1662655501, 0)),
0,
"",
},
{
"description without local_snapshot_time",
//nolint:lll // sample output cannot be split into multiple lines.
`replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
durationpb.New(time.Until(time.Unix(56743, 0))),
nil,
81920,
"",
},
{
"description without last_snapshot_sync_seconds",
//nolint:lll // sample output cannot be split into multiple lines.
`replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
nil,
timestamppb.New(time.Unix(1662655501, 0)),
81920,
"",
},
{
"description with invalid JSON",
`replaying,{"bytes_per_second":0.0,"bytes_per_snapshot":149504.0","remote_snapshot_timestamp":1662655501`,
//nolint:lll // sample output cannot be split into multiple lines.
`replaying,{"bytes_per_second":0.0,"last_snapshot_bytes":81920","bytes_per_snapshot":149504.0","remote_snapshot_timestamp":1662655501`,
nil,
nil,
0,
"failed to unmarshal",
},
{
"description with no JSON",
`replaying`,
nil,
nil,
0,
corerbd.ErrLastSyncTimeNotFound.Error(),
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
ts, err := getLastSyncTime(tt.description)
ts, td, tb, err := getLastSyncInfo(tt.description)
if err != nil && !strings.Contains(err.Error(), tt.expectedErr) {
// returned error
t.Errorf("getLastSyncTime() returned error, expected: %v, got: %v",
t.Errorf("getLastSyncInfo() returned error, expected: %v, got: %v",
tt.expectedErr, err)
}
if !ts.AsTime().Equal(tt.timestamp.AsTime()) {
t.Errorf("getLastSyncTime() %v, expected %v", ts, tt.timestamp)
if ts != tt.synctime {
t.Errorf("getLastSyncInfo() %v, expected %v", ts, tt.bytes)
}
if td != tt.duration {
t.Errorf("getLastSyncInfo() %v, expected %v", td, tt.bytes)
}
if tb != tt.bytes {
t.Errorf("getLastSyncInfo() %v, expected %v", tb, tt.bytes)
}
})
}
Expand Down

0 comments on commit 0df93de

Please sign in to comment.