Skip to content

Commit

Permalink
Translate v2 requests into v3 ClusterMemberAttrSetRequest and Cluster…
Browse files Browse the repository at this point in the history
…VersionSetRequest

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Dec 7, 2023
1 parent 7851295 commit efae7e7
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 32 deletions.
43 changes: 27 additions & 16 deletions server/etcdserver/apply_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,43 @@ import (
"encoding/json"
"path"

"github.com/coreos/go-semver/semver"
"go.uber.org/zap"

"go.etcd.io/etcd/server/v3/etcdserver/api"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
)

func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) {
if r.Method != "PUT" || (!storeMemberAttributeRegexp.MatchString(r.Path) && r.Path != membership.StoreClusterVersionKey()) {
s.lg.Panic("detected disallowed v2 WAL for stage --v2-deprecation=write-only", zap.String("method", r.Method))
}
if storeMemberAttributeRegexp.MatchString(r.Path) {
id := membership.MustParseMemberIDFromKey(s.lg, path.Dir(r.Path))
func v2ToV3Request(lg *zap.Logger, r *RequestV2) pb.InternalRaftRequest {
if r.Method == "PUT" && storeMemberAttributeRegexp.MatchString(r.Path) {
id := membership.MustParseMemberIDFromKey(lg, path.Dir(r.Path))
var attr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
s.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
}
if s.cluster != nil {
s.cluster.UpdateAttributes(id, attr, shouldApplyV3)
return pb.InternalRaftRequest{
Header: &pb.RequestHeader{
ID: r.ID,
},
ClusterMemberAttrSet: &membershippb.ClusterMemberAttrSetRequest{
Member_ID: uint64(id),
MemberAttributes: &membershippb.Attributes{
Name: attr.Name,
ClientUrls: attr.ClientURLs,
},
},
}
}
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
if r.Path == membership.StoreClusterVersionKey() {
if s.cluster != nil {
// persist to backend given v2store can be very stale
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
if r.Method == "PUT" && r.Path == membership.StoreClusterVersionKey() {
return pb.InternalRaftRequest{
Header: &pb.RequestHeader{
ID: r.ID,
},
ClusterVersionSet: &membershippb.ClusterVersionSetRequest{
Ver: r.Val,
},
}
}
lg.Panic("detected disallowed v2 WAL for stage --v2-deprecation=write-only", zap.String("method", r.Method))
return pb.InternalRaftRequest{}
}
8 changes: 2 additions & 6 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1849,17 +1849,13 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership.
rp := &r
pbutil.MustUnmarshal(rp, e.Data)
s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
s.applyV2Request((*RequestV2)(rp), shouldApplyV3)
s.w.Trigger(r.ID, Response{})
return
raftReq = v2ToV3Request(s.lg, (*RequestV2)(rp))
}
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))

if raftReq.V2 != nil {
req := (*RequestV2)(raftReq.V2)
s.applyV2Request(req, shouldApplyV3)
s.w.Trigger(req.ID, Response{})
return
raftReq = v2ToV3Request(s.lg, req)
}

id := raftReq.ID
Expand Down
50 changes: 40 additions & 10 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/coreos/go-semver/semver"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
Expand All @@ -50,6 +51,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
apply2 "go.etcd.io/etcd/server/v3/etcdserver/apply"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
Expand Down Expand Up @@ -153,19 +155,33 @@ func TestV2SetMemberAttributes(t *testing.T) {
defer betesting.Close(t, be)
cl := newTestClusterWithBackend(t, []*membership.Member{{ID: 1}}, be)
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
v2store: mockstore.NewRecorder(),
cluster: cl,
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
v2store: mockstore.NewRecorder(),
cluster: cl,
consistIndex: cindex.NewConsistentIndex(be),
w: wait.New(),
}
as, err := v3alarm.NewAlarmStore(srv.lg, schema.NewAlarmBackend(srv.lg, be))
if err != nil {
t.Fatal(err)
}
srv.alarmStore = as
srv.uberApply = srv.NewUberApplier()

req := pb.Request{
Method: "PUT",
ID: 1,
Path: membership.MemberAttributesStorePath(1),
Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
}
srv.applyV2Request((*RequestV2)(&req), membership.ApplyBoth)
data, err := proto.Marshal(&req)
if err != nil {
t.Fatal(err)
}
srv.applyEntryNormal(&raftpb.Entry{
Data: data,
}, membership.ApplyV2storeOnly)
w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
t.Errorf("attributes = %v, want %v", g, w)
Expand All @@ -180,19 +196,33 @@ func TestV2SetClusterVersion(t *testing.T) {
cl := newTestClusterWithBackend(t, []*membership.Member{}, be)
cl.SetVersion(semver.New("3.4.0"), api.UpdateCapability, membership.ApplyBoth)
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
v2store: mockstore.NewRecorder(),
cluster: cl,
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
v2store: mockstore.NewRecorder(),
cluster: cl,
consistIndex: cindex.NewConsistentIndex(be),
w: wait.New(),
}
as, err := v3alarm.NewAlarmStore(srv.lg, schema.NewAlarmBackend(srv.lg, be))
if err != nil {
t.Fatal(err)
}
srv.alarmStore = as
srv.uberApply = srv.NewUberApplier()

req := pb.Request{
Method: "PUT",
ID: 1,
Path: membership.StoreClusterVersionKey(),
Val: "3.5.0",
}
srv.applyV2Request((*RequestV2)(&req), membership.ApplyBoth)
data, err := proto.Marshal(&req)
if err != nil {
t.Fatal(err)
}
srv.applyEntryNormal(&raftpb.Entry{
Data: data,
}, membership.ApplyV2storeOnly)
if g := cl.Version(); !reflect.DeepEqual(*g, version.V3_5) {
t.Errorf("attributes = %v, want %v", *g, version.V3_5)
}
Expand Down

0 comments on commit efae7e7

Please sign in to comment.