diff --git a/cmd/mysql-helper/apphelper/apphelper.go b/cmd/mysql-helper/apphelper/apphelper.go index aa87892b5..4cc1c5579 100644 --- a/cmd/mysql-helper/apphelper/apphelper.go +++ b/cmd/mysql-helper/apphelper/apphelper.go @@ -147,12 +147,9 @@ func waitForMysqlReady() error { } func configReadOnly() error { - var query string - if tb.NodeRole() == "master" { - query = "SET GLOBAL READ_ONLY = 0" - } else { - query = "SET GLOBAL SUPER_READ_ONLY = 1" - } + + query := "SET GLOBAL SUPER_READ_ONLY = 1" + if err := tb.RunQuery(query); err != nil { return fmt.Errorf("failed to set read_only config, err: %s", err) } diff --git a/deploy/mysqlclusters.yaml b/deploy/mysqlclusters.yaml index a15705d00..fe4f1ab8e 100644 --- a/deploy/mysqlclusters.yaml +++ b/deploy/mysqlclusters.yaml @@ -115,6 +115,10 @@ spec: type: integer required: - maxQueryTime + readOnly: + description: Makes the cluster READ ONLY. Set the master to writable + or ReadOnly + type: boolean replicas: description: The number of pods. This updates replicas filed Defaults to 0 diff --git a/pkg/apis/mysql/v1alpha1/types.go b/pkg/apis/mysql/v1alpha1/types.go index 0dc3eb905..491f3f100 100644 --- a/pkg/apis/mysql/v1alpha1/types.go +++ b/pkg/apis/mysql/v1alpha1/types.go @@ -107,6 +107,10 @@ type ClusterSpec struct { // QueryLimits represents limits for a query // +optional QueryLimits *QueryLimits `json:"queryLimits,omitempty"` + + // Makes the cluster READ ONLY. Set the master to writable or ReadOnly + // +optional + ReadOnly bool `json:"readOnly,omitempty"` } type MysqlConf map[string]string @@ -139,6 +143,7 @@ type ClusterConditionType string const ( ClusterConditionReady ClusterConditionType = "Ready" ClusterConditionFailoverAck = "PendingFailoverAck" + ClusterConditionReadOnly = "ReadOnly" ) type NodeStatus struct { @@ -158,6 +163,7 @@ const ( NodeConditionLagged NodeConditionType = "Lagged" NodeConditionReplicating = "Replicating" NodeConditionMaster = "Master" + NodeConditionReadOnly = "ReadOnly" ) type PodSpec struct { diff --git a/pkg/mysqlcluster/orc_reconciliation.go b/pkg/mysqlcluster/orc_reconciliation.go index f9ffbbdb4..6d01aa0c8 100644 --- a/pkg/mysqlcluster/orc_reconciliation.go +++ b/pkg/mysqlcluster/orc_reconciliation.go @@ -45,7 +45,14 @@ func (f *cFactory) SyncOrchestratorStatus(ctx context.Context) error { // Try to get cluster from orchestrator if cluster is not present then // register nodes into orchestrator. if insts, err := f.orcClient.Cluster(f.getClusterAlias()); err == nil { + + err = f.updateNodesReadOnlyFlagInOrc(insts) + if err != nil { + glog.Infof("Error setting Master readOnly/writable %s", err) + } + f.updateStatusFromOrc(insts) + } else { glog.Errorf("Fail to get cluster from orchestrator: %s. Now tries to register nodes.", err) return f.registerNodesInOrc() @@ -75,11 +82,145 @@ func (f *cFactory) SyncOrchestratorStatus(ctx context.Context) error { return nil } +func getInstance(hostname string, insts []orc.Instance) (*orc.Instance, error) { + + for _, node := range insts { + host := node.Key.Hostname + + if host == hostname { + return &node, nil + } + } + + return nil, fmt.Errorf("the element was not found") +} + +func getMaster(node *orc.Instance, insts []orc.Instance) (*orc.Instance, error) { + + if len(node.MasterKey.Hostname) != 0 && node.IsCoMaster == false { + next, err := getInstance(node.MasterKey.Hostname, insts) + if err == nil { + return getMaster(next, insts) + } else { + return nil, err + } + } + + if node.IsCoMaster == true { + next, err := getInstance(node.MasterKey.Hostname, insts) + if err == nil { + return next, nil + } else { + return nil, err + } + } + + return node, nil +} + +func determineMasterFor(insts []orc.Instance) (*orc.Instance, error) { + + var masterForNode []orc.Instance + + for _, node := range insts { + master, err := getMaster(&node, insts) + if err == nil { + masterForNode = append(masterForNode, *master) + } else { + return nil, fmt.Errorf("not able to retrieve the root of this node %s", node.Key.Hostname) + } + } + + if len(masterForNode) != 0 { + masterHostName := masterForNode[0] + var check bool = true + for _, node := range masterForNode { + if node.Key.Hostname != masterHostName.Key.Hostname { + check = false + } + } + if check == true { + return &masterHostName, nil + } else { + return nil, fmt.Errorf("multiple masters") + } + } else { + return nil, fmt.Errorf("0 elements in instance array") + } + +} + +// set a host writable just if needed +func (f *cFactory) setInstWritable(inst orc.Instance) error { + if inst.ReadOnly == true { + glog.V(2).Infof("set instance %s writable", inst.Key.Hostname) + return f.orcClient.SetHostWritable(inst.Key) + } + return nil +} + +func (f *cFactory) putNodeInMaintenance(inst orc.Instance) error { + + glog.V(2).Infof("set instance %s in maintenance", inst.Key.Hostname) + return f.orcClient.BeginMaintenance(inst.Key, "mysqlcontroller", "clusterReadOnly") + +} + +func (f *cFactory) getNodeOutOfMaintenance(inst orc.Instance) error { + + glog.V(2).Infof("set instance %s out of maintenance", inst.Key.Hostname) + return f.orcClient.EndMaintenance(inst.Key) + +} + +// set a host read only just if needed +func (f *cFactory) setInstReadOnly(inst orc.Instance) error { + if !inst.ReadOnly == true { + glog.V(2).Infof("set instance %s read only", inst.Key.Hostname) + return f.orcClient.SetHostReadOnly(inst.Key) + } + return nil +} + +func (f *cFactory) updateNodesReadOnlyFlagInOrc(insts []orc.Instance) error { + master, err := determineMasterFor(insts) + if err != nil && err.Error() == "multiple masters" { + // master is not found + // set cluster read only + for _, inst := range insts { + f.putNodeInMaintenance(inst) + f.setInstReadOnly(inst) + } + return nil + } else if err != nil { + return err + } + + // master is determinated + for _, inst := range insts { + if f.cluster.Spec.ReadOnly == true { + f.putNodeInMaintenance(inst) + f.setInstReadOnly(inst) + } else if f.cluster.Spec.ReadOnly == false && err == nil { + f.getNodeOutOfMaintenance(inst) + if inst.Key.Hostname == master.Key.Hostname { + f.setInstWritable(inst) + } else { + f.setInstReadOnly(inst) + } + } + } + + return nil +} + func (f *cFactory) updateStatusFromOrc(insts []orc.Instance) { // TODO: imporve this code by computing differences between what // orchestartor knows and what we know updatedNodes := []string{} + + var isReadOnly bool = true for _, node := range insts { host := node.Key.Hostname updatedNodes = append(updatedNodes, host) @@ -92,12 +233,10 @@ func (f *cFactory) updateStatusFromOrc(insts []orc.Instance) { } continue } - maxSlaveLatency := defaultMaxSlaveLatency if f.cluster.Spec.MaxSlaveLatency != nil { maxSlaveLatency = *f.cluster.Spec.MaxSlaveLatency } - if !node.SlaveLagSeconds.Valid { f.updateNodeCondition(host, api.NodeConditionLagged, core.ConditionUnknown) } else if node.SlaveLagSeconds.Int64 <= maxSlaveLatency { @@ -105,17 +244,32 @@ func (f *cFactory) updateStatusFromOrc(insts []orc.Instance) { } else { // node is behind master f.updateNodeCondition(host, api.NodeConditionLagged, core.ConditionTrue) } - if node.Slave_SQL_Running && node.Slave_IO_Running { f.updateNodeCondition(host, api.NodeConditionReplicating, core.ConditionTrue) } else { f.updateNodeCondition(host, api.NodeConditionReplicating, core.ConditionFalse) } + f.updateNodeCondition(host, api.NodeConditionMaster, core.ConditionFalse) + isReadOnly = isReadOnly && node.ReadOnly + if node.ReadOnly == true { + f.updateNodeCondition(host, api.NodeConditionReadOnly, core.ConditionTrue) + } else { + f.updateNodeCondition(host, api.NodeConditionReadOnly, core.ConditionFalse) + } + } + + master, err := determineMasterFor(insts) + if err != nil { + glog.Errorf("Error acquiring master name %s", err) + } else { + f.updateNodeCondition(master.Key.Hostname, api.NodeConditionMaster, core.ConditionTrue) - if !node.ReadOnly { - f.updateNodeCondition(host, api.NodeConditionMaster, core.ConditionTrue) + if isReadOnly == true { + f.cluster.UpdateStatusCondition(api.ClusterConditionReadOnly, + core.ConditionTrue, "initializedTrue", "settingReadOnlyTrue") } else { - f.updateNodeCondition(host, api.NodeConditionMaster, core.ConditionFalse) + f.cluster.UpdateStatusCondition(api.ClusterConditionReadOnly, + core.ConditionFalse, "initializedFalse", "settingReadOnlyFalse") } } diff --git a/pkg/mysqlcluster/orc_reconciliation_test.go b/pkg/mysqlcluster/orc_reconciliation_test.go index ad7e9400f..c1274f512 100644 --- a/pkg/mysqlcluster/orc_reconciliation_test.go +++ b/pkg/mysqlcluster/orc_reconciliation_test.go @@ -31,6 +31,7 @@ import ( api "github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1" fakeMyClient "github.com/presslabs/mysql-operator/pkg/generated/clientset/versioned/fake" "github.com/presslabs/mysql-operator/pkg/util/options" + orc "github.com/presslabs/mysql-operator/pkg/util/orchestrator" fakeOrc "github.com/presslabs/mysql-operator/pkg/util/orchestrator/fake" tutil "github.com/presslabs/mysql-operator/pkg/util/test" ) @@ -77,17 +78,19 @@ var _ = Describe("Mysql cluster reconcilation", func() { Context("cluster does not exists in orc", func() { It("should register into orc", func() { cluster.Status.ReadyNodes = 1 - Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) + Expect(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) Expect(orcClient.CheckDiscovered("asd-mysql-0.asd-mysql-nodes.default")).To(Equal(true)) }) It("should update status", func() { + + //AddInstance signature: cluster, host string, master bool, lag int64, slaveRunning, upToDate bool orcClient.AddInstance("asd.default", cluster.GetPodHostname(0), true, -1, false, true) orcClient.AddRecoveries("asd.default", 1, true) factory.createPod("asd-mysql-0") - Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) + Expect(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) Expect(cluster.Status.Nodes[0].GetCondition(api.NodeConditionMaster).Status).To( Equal(core.ConditionTrue)) @@ -96,28 +99,34 @@ var _ = Describe("Mysql cluster reconcilation", func() { Equal(core.ConditionFalse)) var event string - Ω(rec.Events).Should(Receive(&event)) + Expect(rec.Events).Should(Receive(&event)) Expect(event).To(ContainSubstring("ReplicationStopped")) - Ω(rec.Events).Should(Receive(&event)) + Expect(rec.Events).Should(Receive(&event)) + Expect(event).To(ContainSubstring("DemoteMaster")) + Expect(rec.Events).Should(Receive(&event)) Expect(event).To(ContainSubstring("PromoteMaster")) }) It("should have pending recoveries", func() { + + //AddInstance signature: cluster, host string, master bool, lag int64, slaveRunning, upToDate bool orcClient.AddInstance("asd.default", cluster.GetPodHostname(0), true, -1, false, true) orcClient.AddRecoveries("asd.default", 11, false) - Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) + Expect(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) Expect(getCCond( cluster.Status.Conditions, api.ClusterConditionFailoverAck).Status).To( Equal(core.ConditionTrue)) }) It("should have pending recoveries but cluster not ready enough", func() { + + //AddInstance signature: cluster, host string, master bool, lag int64, slaveRunning, upToDate bool orcClient.AddInstance("asd.default", cluster.GetPodHostname(0), true, -1, false, true) orcClient.AddRecoveries("asd.default", 111, false) cluster.UpdateStatusCondition(api.ClusterConditionReady, core.ConditionTrue, "", "") - Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) + Expect(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) Expect(getCCond( cluster.Status.Conditions, api.ClusterConditionFailoverAck).Status).To( Equal(core.ConditionTrue)) @@ -125,6 +134,8 @@ var _ = Describe("Mysql cluster reconcilation", func() { }) It("should have pending recoveries that will be recovered", func() { + + //AddInstance signature: cluster, host string, master bool, lag int64, slaveRunning, upToDate bool orcClient.AddInstance("asd.default", cluster.GetPodHostname(0), true, -1, false, true) orcClient.AddRecoveries("asd.default", 112, false) @@ -137,39 +148,165 @@ var _ = Describe("Mysql cluster reconcilation", func() { }, } - Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) + Expect(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) Expect(getCCond( cluster.Status.Conditions, api.ClusterConditionFailoverAck).Status).To( Equal(core.ConditionTrue)) Expect(orcClient.CheckAck(112)).To(Equal(true)) var event string - Ω(rec.Events).Should(Receive(&event)) + Expect(rec.Events).Should(Receive(&event)) Expect(event).To(ContainSubstring("RecoveryAcked")) }) - It("node not uptodate in orc", func() { + It("master is in orc", func() { + + //AddInstance signature: cluster, host string, master bool, lag int64, slaveRunning, upToDate bool orcClient.AddInstance("asd.default", cluster.GetPodHostname(0), true, -1, false, false) - Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) + Expect(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) Expect(cluster.Status.Nodes[0].GetCondition(api.NodeConditionMaster).Status).To( - Equal(core.ConditionUnknown)) + Equal(core.ConditionTrue)) }) It("node not in orc", func() { + + //AddInstance signature: cluster, host string, master bool, lag int64, slaveRunning, upToDate bool orcClient.AddInstance("asd.default", cluster.GetPodHostname(0), true, -1, false, true) - Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) + Expect(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) Expect(cluster.Status.Nodes[0].GetCondition(api.NodeConditionMaster).Status).To( Equal(core.ConditionTrue)) orcClient.RemoveInstance("asd.default", cluster.GetPodHostname(0)) - Ω(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) - + Expect(factory.SyncOrchestratorStatus(ctx)).Should(Succeed()) Expect(cluster.Status.Nodes[0].GetCondition(api.NodeConditionMaster).Status).To( Equal(core.ConditionUnknown)) + }) + + It("existence of a single master", func() { + + //AddInstance signature: cluster, host string, master bool, lag int64, slaveRunning, upToDate bool + inst := orcClient.AddInstance("asd.default", "foo122-mysql-0", + false, -1, false, true) + inst.Key.Port = 3306 + inst.MasterKey = orc.InstanceKey{Hostname: "", Port: 3306} + + inst = orcClient.AddInstance("asd.default", "foo122-mysql-1", + false, -1, false, true) + inst.Key.Port = 3307 + inst.MasterKey = orc.InstanceKey{Hostname: "foo122-mysql-0", Port: 3306} + + inst = orcClient.AddInstance("asd.default", "foo122-mysql-2", + false, -1, false, true) + inst.Key.Port = 3308 + inst.MasterKey = orc.InstanceKey{Hostname: "foo122-mysql-0", Port: 3306} + + inst = orcClient.AddInstance("asd.default", "foo122-mysql-3", + false, -1, false, true) + inst.Key.Port = 3309 + inst.MasterKey = orc.InstanceKey{Hostname: "foo122-mysql-2", Port: 3308} + + inst = orcClient.AddInstance("asd.default", "foo122-mysql-4", + false, -1, false, true) + inst.Key.Port = 3310 + inst.MasterKey = orc.InstanceKey{Hostname: "foo122-mysql-3", Port: 3309} + + insts, _ := orcClient.Cluster("asd.default") + + _, err := determineMasterFor(insts) + Expect(err).To(BeNil()) + }) + + It("existence of multiple masters", func() { + + //AddInstance signature: cluster, host string, master bool, lag int64, slaveRunning, upToDate bool + inst := orcClient.AddInstance("asd.default", "foo122-mysql-0", + false, -1, false, true) + inst.Key.Port = 3306 + inst.MasterKey = orc.InstanceKey{Hostname: "foo122-mysql-5", Port: 3311} + inst.IsCoMaster = true + + inst = orcClient.AddInstance("asd.default", "foo122-mysql-1", + false, -1, false, true) + inst.Key.Port = 3307 + inst.MasterKey = orc.InstanceKey{Hostname: "foo122-mysql-0", Port: 3306} + + inst = orcClient.AddInstance("asd.default", "foo122-mysql-2", + false, -1, false, true) + inst.Key.Port = 3308 + inst.MasterKey = orc.InstanceKey{Hostname: "foo122-mysql-0", Port: 3306} + + inst = orcClient.AddInstance("asd.default", "foo122-mysql-3", + false, -1, false, true) + inst.Key.Port = 3309 + inst.MasterKey = orc.InstanceKey{Hostname: "foo122-mysql-2", Port: 3308} + + inst = orcClient.AddInstance("asd.default", "foo122-mysql-4", + false, -1, false, true) + inst.Key.Port = 3310 + inst.MasterKey = orc.InstanceKey{Hostname: "foo122-mysql-3", Port: 3309} + + inst = orcClient.AddInstance("asd.default", "foo122-mysql-5", + false, -1, false, true) + inst.Key.Port = 3311 + inst.MasterKey = orc.InstanceKey{Hostname: "foo122-mysql-0", Port: 3306} + inst.IsCoMaster = true + + insts, _ := orcClient.Cluster("asd.default") + + _, err := determineMasterFor(insts) + Expect(err).ToNot(BeNil()) + }) + + It("no instances", func() { + + insts, _ := orcClient.Cluster("asd.default") + + _, err := determineMasterFor(insts) + Expect(err).ToNot(BeNil()) + }) + + It("set master readOnly/Writable", func() { + + //Set ReadOnly to true in order to get master ReadOnly + //AddInstance signature: cluster, host string, master bool, lag int64, slaveRunning, upToDate bool + orcClient.AddInstance("asd.default", cluster.GetPodHostname(0), + true, -1, false, true) + + factory.cluster.Spec.ReadOnly = true + + insts, _ := orcClient.Cluster("asd.default") + + err := factory.updateNodesReadOnlyFlagInOrc(insts) + Expect(err).To(BeNil()) + + insts, _ = orcClient.Cluster("asd.default") + + for _, instance := range insts { + if instance.Key.Hostname == cluster.GetPodHostname(0) && instance.Key.Port == 3306 { + Expect(instance.ReadOnly).To(Equal(true)) + } + } + + //Set ReadOnly to false in order to get the master Writable + + factory.cluster.Spec.ReadOnly = false + + insts, _ = orcClient.Cluster("asd.default") + + err = factory.updateNodesReadOnlyFlagInOrc(insts) + Expect(err).To(BeNil()) + + insts, _ = orcClient.Cluster("asd.default") + + for _, instance := range insts { + if instance.Key.Hostname == cluster.GetPodHostname(0) && instance.Key.Port == 3306 { + Expect(instance.ReadOnly).To(Equal(false)) + } + } }) diff --git a/pkg/openapi/openapi_generated.go b/pkg/openapi/openapi_generated.go index 9bf24aa01..9e50cbc09 100644 --- a/pkg/openapi/openapi_generated.go +++ b/pkg/openapi/openapi_generated.go @@ -363,6 +363,13 @@ func schema_pkg_apis_mysql_v1alpha1_ClusterSpec(ref common.ReferenceCallback) co Ref: ref("github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1.QueryLimits"), }, }, + "readOnly": { + SchemaProps: spec.SchemaProps{ + Description: "Makes the cluster READ ONLY. Set the master to writable or ReadOnly", + Type: []string{"boolean"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/util/orchestrator/fake/client.go b/pkg/util/orchestrator/fake/client.go index 333821ae6..de7517b21 100644 --- a/pkg/util/orchestrator/fake/client.go +++ b/pkg/util/orchestrator/fake/client.go @@ -23,7 +23,7 @@ import ( ) type FakeOrc struct { - Clusters map[string][]Instance + Clusters map[string][]*Instance Recoveries map[string][]TopologyRecovery AckRec []int64 @@ -34,12 +34,12 @@ func New() *FakeOrc { return &FakeOrc{} } -func (o *FakeOrc) AddInstance(cluster, host string, master bool, sls int64, slaveR, upToDate bool) { +func (o *FakeOrc) AddInstance(cluster, host string, master bool, lag int64, slaveRunning, upToDate bool) *Instance { valid := true - if sls < 0 { + if lag < 0 { valid = false } - inst := Instance{ + inst := &Instance{ Key: InstanceKey{ Hostname: host, Port: 3306, @@ -47,22 +47,25 @@ func (o *FakeOrc) AddInstance(cluster, host string, master bool, sls int64, slav ReadOnly: !master, SlaveLagSeconds: NullInt64{ Valid: valid, - Int64: sls, + Int64: lag, }, ClusterName: cluster, - Slave_SQL_Running: slaveR, - Slave_IO_Running: slaveR, + Slave_SQL_Running: slaveRunning, + Slave_IO_Running: slaveRunning, IsUpToDate: upToDate, IsLastCheckValid: upToDate, } if o.Clusters == nil { - o.Clusters = make(map[string][]Instance) + o.Clusters = make(map[string][]*Instance) } clusters, ok := o.Clusters[cluster] if ok { o.Clusters[cluster] = append(clusters, inst) + } else { + o.Clusters[cluster] = []*Instance{inst} } - o.Clusters[cluster] = []Instance{inst} + + return inst } func (o *FakeOrc) RemoveInstance(cluster, host string) { @@ -139,7 +142,7 @@ func (o *FakeOrc) Master(clusterHint string) (*Instance, error) { } for _, inst := range insts { if !inst.ReadOnly { - return &inst, nil + return inst, nil } } return nil, fmt.Errorf("[faker] master not found!!!!") @@ -150,8 +153,11 @@ func (o *FakeOrc) Cluster(cluster string) ([]Instance, error) { if !ok { return nil, fmt.Errorf("not found") } - - return insts, nil + var Insts []Instance + for _, inst := range insts { + Insts = append(Insts, *inst) + } + return Insts, nil } func (o *FakeOrc) AuditRecovery(cluster string) ([]TopologyRecovery, error) { @@ -167,3 +173,47 @@ func (o *FakeOrc) AckRecovery(id int64, comment string) error { o.AckRec = append(o.AckRec, id) return nil } + +func (o *FakeOrc) SetHostWritable(key InstanceKey) error { + + check := false + for _, instances := range o.Clusters { + for _, instance := range instances { + if instance.Key.Hostname == key.Hostname && instance.Key.Port == key.Port { + instance.ReadOnly = false + check = true + } + } + } + if check == true { + return nil + } else { + return fmt.Errorf("the desired host and port was not found") + } +} + +func (o *FakeOrc) SetHostReadOnly(key InstanceKey) error { + + check := false + for _, instances := range o.Clusters { + for _, instance := range instances { + if instance.Key.Hostname == key.Hostname && instance.Key.Port == key.Port { + instance.ReadOnly = true + check = true + } + } + } + if check == true { + return nil + } else { + return fmt.Errorf("the desired host and port was not found") + } +} + +func (o *FakeOrc) BeginMaintenance(key InstanceKey, owner, reason string) error { + return nil +} + +func (o *FakeOrc) EndMaintenance(key InstanceKey) error { + return nil +} diff --git a/pkg/util/orchestrator/orchestrator.go b/pkg/util/orchestrator/orchestrator.go index 24cb88bc9..232709588 100644 --- a/pkg/util/orchestrator/orchestrator.go +++ b/pkg/util/orchestrator/orchestrator.go @@ -30,6 +30,12 @@ type Interface interface { AuditRecovery(cluster string) ([]TopologyRecovery, error) AckRecovery(id int64, commnet string) error + + SetHostWritable(key InstanceKey) error + SetHostReadOnly(key InstanceKey) error + + BeginMaintenance(key InstanceKey, owner, reason string) error + EndMaintenance(key InstanceKey) error } type orchestrator struct { @@ -98,3 +104,38 @@ func (o *orchestrator) AckRecovery(id int64, comment string) error { return nil } + +func (o *orchestrator) SetHostWritable(key InstanceKey) error { + + if err := o.makeGetAPIRequest(fmt.Sprintf("set-writeable/%s/%d", key.Hostname, key.Port), nil); err != nil { + return err + } + + return nil +} + +func (o *orchestrator) SetHostReadOnly(key InstanceKey) error { + + if err := o.makeGetAPIRequest(fmt.Sprintf("set-read-only/%s/%d", key.Hostname, key.Port), nil); err != nil { + return err + } + + return nil +} + +func (o *orchestrator) BeginMaintenance(key InstanceKey, owner, reason string) error { + + if err := o.makeGetAPIRequest(fmt.Sprintf("begin-maintenance/%s/%d/%s/%s", key.Hostname, key.Port, owner, reason), nil); err != nil { + return err + } + + return nil +} + +func (o *orchestrator) EndMaintenance(key InstanceKey) error { + + if err := o.makeGetAPIRequest(fmt.Sprintf("end-maintenance/%s/%d", key.Hostname, key.Port), nil); err != nil { + return err + } + return nil +} diff --git a/test/e2e/cluster/cluster.go b/test/e2e/cluster/cluster.go index cb92ffa4f..d369f1ce4 100644 --- a/test/e2e/cluster/cluster.go +++ b/test/e2e/cluster/cluster.go @@ -205,6 +205,47 @@ var _ = Describe("Mysql cluster tests", func() { testClusterEndpoints(f, cluster, []int{0}, []int{0}) }) + It("cluster readOnly", func() { + cluster.Spec.Replicas = 2 + cluster.Spec.ReadOnly = true + + cluster, err = f.MyClientSet.MysqlV1alpha1().MysqlClusters(f.Namespace.Name).Update(cluster) + Expect(err).NotTo(HaveOccurred(), "Failed to update cluster: '%s'", cluster.Name) + + // test cluster to be ready + By("test cluster is ready after cluster update") + testClusterReadiness(f, cluster) + By("test cluster is registered in orchestrator after cluster update") + testClusterReadOnlyIsRegistredWithOrchestrator(f, cluster) + + cluster, err = f.MyClientSet.MysqlV1alpha1().MysqlClusters(f.Namespace.Name).Get(cluster.Name, meta.GetOptions{}) + Expect(err).NotTo(HaveOccurred(), "Failed to get cluster %s", cluster.Name) + + // expect cluster to be marked readOnly + By("test cluster to be readOnly") + f.ClusterEventuallyCondition(cluster, api.ClusterConditionReadOnly, core.ConditionTrue, f.Timeout) + + // expect node to be marked as lagged and removed from service + By("test cluster node 0 to be readOnly") + f.NodeEventuallyCondition(cluster, cluster.GetPodHostname(0), api.NodeConditionReadOnly, core.ConditionTrue, 20*time.Second) + // node 1 should not be in healty service + By("test cluster endpoints after delayed slave") + testClusterEndpoints(f, cluster, []int{0}, []int{0, 1}) + + // remove master pod + podName := cluster.GetNameForResource(api.StatefulSet) + "-0" + err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(podName, &meta.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred(), "Failed to delete pod %s", podName) + + // check failover done, this is a reggression test + // TODO: decrease this timeout to 20 + failoverTimeout := 40 * time.Second + By(fmt.Sprintf("Check failover done; timeout=%s", failoverTimeout)) + f.NodeEventuallyCondition(cluster, cluster.GetPodHostname(1), api.NodeConditionMaster, core.ConditionFalse, failoverTimeout) + f.NodeEventuallyCondition(cluster, cluster.GetPodHostname(1), api.NodeConditionReadOnly, core.ConditionTrue, failoverTimeout) + + }) + }) func testClusterReadiness(f *framework.Framework, cluster *api.MysqlCluster) { @@ -221,8 +262,18 @@ func testClusterReadiness(f *framework.Framework, cluster *api.MysqlCluster) { // f.ClusterEventuallyCondition(cluster, api.ClusterConditionFailoverAck, core.ConditionFalse, f.Timeout) } -// tests if the cluster is in orchestrator and is properly configured +//test if a non-readOnly cluster is registered with orchestrator func testClusterIsRegistredWithOrchestrator(f *framework.Framework, cluster *api.MysqlCluster) { + testClusterRegistrationInOrchestrator(f, cluster, false) +} + +//test if a readOnly cluster is registered with orchestrator +func testClusterReadOnlyIsRegistredWithOrchestrator(f *framework.Framework, cluster *api.MysqlCluster) { + testClusterRegistrationInOrchestrator(f, cluster, true) +} + +// tests if the cluster is in orchestrator and is properly configured +func testClusterRegistrationInOrchestrator(f *framework.Framework, cluster *api.MysqlCluster, clusterReadOnly bool) { cluster, err := f.MyClientSet.MysqlV1alpha1().MysqlClusters(f.Namespace.Name).Get(cluster.Name, meta.GetOptions{}) Expect(err).NotTo(HaveOccurred(), "Failed to get cluster '%s'", cluster.Name) @@ -236,7 +287,7 @@ func testClusterIsRegistredWithOrchestrator(f *framework.Framework, cluster *api "GTIDMode": Equal("ON"), "IsUpToDate": Equal(true), "Binlog_format": Equal("ROW"), - "ReadOnly": Equal(false), + "ReadOnly": Equal(clusterReadOnly), }), // master node } for i := 1; i < int(cluster.Spec.Replicas); i++ {