Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CN-919] Fix WanReplication CR overwrite batch-publishers #814

Merged
merged 2 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 23 additions & 11 deletions controllers/hazelcast/hazelcast_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,7 @@ func filterPersistedDS(ctx context.Context, c client.Client, h *hazelcastv1alpha
return l, nil
}

func filterPersistedWanReplications(ctx context.Context, c client.Client, h *hazelcastv1alpha1.Hazelcast) (map[string]hazelcastv1alpha1.WanReplication, error) {
func filterPersistedWanReplications(ctx context.Context, c client.Client, h *hazelcastv1alpha1.Hazelcast) (map[string][]hazelcastv1alpha1.WanReplication, error) {
fieldMatcher := client.MatchingFields{"hazelcastResourceName": h.Name}
nsMatcher := client.InNamespace(h.Namespace)

Expand All @@ -1108,7 +1108,7 @@ func filterPersistedWanReplications(ctx context.Context, c client.Client, h *haz
return nil, err
}

l := make(map[string]hazelcastv1alpha1.WanReplication, 0)
l := make(map[string][]hazelcastv1alpha1.WanReplication, 0)
for _, wr := range wrList.Items {
for wanKey, mapStatus := range wr.Status.WanReplicationMapsStatus {
hzName, _ := splitWanMapKey(wanKey)
Expand All @@ -1117,7 +1117,10 @@ func filterPersistedWanReplications(ctx context.Context, c client.Client, h *haz
}
switch mapStatus.Status {
case hazelcastv1alpha1.WanStatusPersisting, hazelcastv1alpha1.WanStatusSuccess:
l[wanKey] = wr
if l[wanKey] == nil {
l[wanKey] = make([]hazelcastv1alpha1.WanReplication, 0)
}
l[wanKey] = append(l[wanKey], wr)
default: // TODO, might want to do something for the other cases
}
}
Expand All @@ -1144,13 +1147,12 @@ func fillHazelcastConfigWithMaps(ctx context.Context, c client.Client, cfg *conf
return nil
}

func fillHazelcastConfigWithWanReplications(cfg *config.Hazelcast, wrl map[string]hazelcastv1alpha1.WanReplication) {
func fillHazelcastConfigWithWanReplications(cfg *config.Hazelcast, wrl map[string][]hazelcastv1alpha1.WanReplication) {
if len(wrl) != 0 {
cfg.WanReplication = map[string]config.WanReplicationConfig{}
for wanKey, wan := range wrl {
_, mapName := splitWanMapKey(wanKey)
mapStatus := wan.Status.WanReplicationMapsStatus[wanKey]
wanConfig := createWanReplicationConfig(mapStatus.PublisherId, wan)
wanConfig := createWanReplicationConfig(wanKey, wan)
cfg.WanReplication[wanName(mapName)] = wanConfig
}
}
Expand Down Expand Up @@ -1530,7 +1532,20 @@ func createReplicatedMapConfig(rm *hazelcastv1alpha1.ReplicatedMap) config.Repli
}
}

func createWanReplicationConfig(publisherId string, wr hazelcastv1alpha1.WanReplication) config.WanReplicationConfig {
func createWanReplicationConfig(wanKey string, wrs []hazelcastv1alpha1.WanReplication) config.WanReplicationConfig {
cfg := config.WanReplicationConfig{
BatchPublisher: make(map[string]config.BatchPublisherConfig),
}
if len(wrs) != 0 {
for _, wr := range wrs {
bpc := createBatchPublisherConfig(wr)
cfg.BatchPublisher[wr.Status.WanReplicationMapsStatus[wanKey].PublisherId] = bpc
}
}
return cfg
}

func createBatchPublisherConfig(wr hazelcastv1alpha1.WanReplication) config.BatchPublisherConfig {
bpc := config.BatchPublisherConfig{
ClusterName: wr.Spec.TargetClusterName,
TargetEndpoints: wr.Spec.Endpoints,
Expand All @@ -1541,10 +1556,7 @@ func createWanReplicationConfig(publisherId string, wr hazelcastv1alpha1.WanRepl
BatchSize: wr.Spec.Batch.Size,
BatchMaxDelayMillis: wr.Spec.Batch.MaximumDelay,
}
cfg := config.WanReplicationConfig{
BatchPublisher: map[string]config.BatchPublisherConfig{publisherId: bpc},
}
return cfg
return bpc
}

func (r *HazelcastReconciler) reconcileStatefulset(ctx context.Context, h *hazelcastv1alpha1.Hazelcast, logger logr.Logger) error {
Expand Down
112 changes: 112 additions & 0 deletions controllers/hazelcast/hazelcast_resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package hazelcast

import (
"context"
"reflect"
"testing"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -188,6 +189,117 @@ func Test_hazelcastConfigMultipleCRs(t *testing.T) {
}
}

func Test_hazelcastConfigMultipleWanCRs(t *testing.T) {
hz := &hazelcastv1alpha1.Hazelcast{
ObjectMeta: metav1.ObjectMeta{
Name: "hazelcast",
Namespace: "default",
},
}
wrs := &hazelcastv1alpha1.WanReplicationList{}
wrs.Items = []hazelcastv1alpha1.WanReplication{
{
ObjectMeta: metav1.ObjectMeta{
Name: "wan-1",
Namespace: "default",
},
Spec: hazelcastv1alpha1.WanReplicationSpec{
Resources: []hazelcastv1alpha1.ResourceSpec{
{
Name: hz.Name,
Kind: hazelcastv1alpha1.ResourceKindHZ,
},
},
TargetClusterName: "dev",
Endpoints: "10.0.0.1:5701",
},
Status: hazelcastv1alpha1.WanReplicationStatus{
WanReplicationMapsStatus: map[string]hazelcastv1alpha1.WanReplicationMapStatus{
hz.Name + "__map": {
PublisherId: "map-wan-1",
Status: hazelcastv1alpha1.WanStatusSuccess,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "wan-2",
Namespace: "default",
},
Spec: hazelcastv1alpha1.WanReplicationSpec{
Resources: []hazelcastv1alpha1.ResourceSpec{
{
Name: hz.Name,
Kind: hazelcastv1alpha1.ResourceKindHZ,
},
},
TargetClusterName: "dev",
Endpoints: "10.0.0.2:5701",
},
Status: hazelcastv1alpha1.WanReplicationStatus{
WanReplicationMapsStatus: map[string]hazelcastv1alpha1.WanReplicationMapStatus{
hz.Name + "__map": {
PublisherId: "map-wan-2",
Status: hazelcastv1alpha1.WanStatusSuccess,
},
},
},
},
}
objects := []client.Object{
hz,
&hazelcastv1alpha1.Map{
ObjectMeta: metav1.ObjectMeta{
Name: "map",
Namespace: "default",
},
Spec: hazelcastv1alpha1.MapSpec{
DataStructureSpec: hazelcastv1alpha1.DataStructureSpec{
HazelcastResourceName: hz.Name,
},
},
},
}
c := &mockK8sClient{Client: fakeK8sClient(objects...)}
c.list = wrs

bytes, err := hazelcastConfig(context.TODO(), c, hz, logr.Discard())
if err != nil {
t.Errorf("unable to build Config, %e", err)
}
hzConfig := &config.HazelcastWrapper{}
err = yaml.Unmarshal(bytes, hzConfig)
if err != nil {
t.Error(err)
}
wrConf, ok := hzConfig.Hazelcast.WanReplication["map-default"]
if !ok {
t.Errorf("wan config for map-default not found")
}
if _, ok := wrConf.BatchPublisher["map-wan-1"]; !ok {
t.Errorf("butch publisher map-wan-1 not found")
}
if _, ok := wrConf.BatchPublisher["map-wan-2"]; !ok {
t.Errorf("butch publisher map-wan-2 not found")
}
}

// Client used to mock List() method for the WanReplicationList CR
// Needed since the List() method uses the indexed filed "hazelcastResourceName" not available in the fakeClient.
type mockK8sClient struct {
client.Client
list *hazelcastv1alpha1.WanReplicationList
}

func (c *mockK8sClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
if list == nil || reflect.TypeOf(list) != reflect.TypeOf(c.list) {
return c.Client.List(ctx, list, opts...)
}
list.(*hazelcastv1alpha1.WanReplicationList).Items = c.list.Items
return nil
}

type listKeys func(h config.Hazelcast) []string

func getKeys[C config.Cache | config.ReplicatedMap |
Expand Down
10 changes: 7 additions & 3 deletions controllers/hazelcast/wanreplication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,10 +682,14 @@ func (r *WanReplicationReconciler) validateWanConfigPersistence(ctx context.Cont
if !ok {
continue
}
wanPub, ok := wanRep.BatchPublisher[v.PublisherId]
if !ok {
continue
}

// WAN is in Config but is not correct
realWan := createWanReplicationConfig(v.PublisherId, *wan)
if !reflect.DeepEqual(realWan, wanRep) {
// WAN publisher is in Config but is not correct
realPub := createBatchPublisherConfig(*wan)
if !reflect.DeepEqual(realPub, wanPub) {
continue
}
mapWanStatus[mapWanKey] = wanMapSuccessStatus{}
Expand Down
2 changes: 1 addition & 1 deletion test/integration/wanreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var _ = Describe("WanReplication CR", func() {
wan := &hazelcastv1alpha1.WanReplication{
ObjectMeta: randomObjectMeta(namespace),
}
By("failing to create Cache CR")
By("trying to create WAN CR")
Expect(k8sClient.Create(context.Background(), wan)).ShouldNot(Succeed())
})
})
Expand Down