diff --git a/.drone.yml b/.drone.yml index 60eae4773..bf9750242 100644 --- a/.drone.yml +++ b/.drone.yml @@ -1,3 +1,7 @@ +branches: + - master + - 1.x + workspace: base: /root/go path: src/github.com/presslabs/mysql-operator diff --git a/hack/charts/mysql-operator/values.yaml b/hack/charts/mysql-operator/values.yaml index 62959e3b7..b7fe20d80 100644 --- a/hack/charts/mysql-operator/values.yaml +++ b/hack/charts/mysql-operator/values.yaml @@ -55,9 +55,8 @@ orchestrator: RecoverMasterClusterFilters: ['.*'] RecoverIntermediateMasterClusterFilters: ['.*'] # `reset slave all` and `set read_only=0` on promoted master - ApplyMySQLPromotionAfterMasterFailover: true - # set downtime on the failed master - MasterFailoverLostInstancesDowntimeMinutes: 10 + ApplyMySQLPromotionAfterMasterFailover: false + MasterFailoverDetachReplicaMasterHost: true # https://github.com/github/orchestrator/blob/master/docs/configuration-recovery.md#promotion-actions # Safety! do not disable unless you know what you are doing FailMasterPromotionIfSQLThreadNotUpToDate: true diff --git a/pkg/controller/internal/testutil/cluster.go b/pkg/controller/internal/testutil/cluster.go index aa7a1ab5e..f892e50c4 100644 --- a/pkg/controller/internal/testutil/cluster.go +++ b/pkg/controller/internal/testutil/cluster.go @@ -14,13 +14,22 @@ See the License for the specific language governing permissions and limitations under the License. */ +// nolint: errcheck,golint package testutil import ( + "context" "time" + . "github.com/onsi/gomega" + . "github.com/onsi/gomega/gstruct" + gomegatypes "github.com/onsi/gomega/types" + core "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" api "github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1" ) @@ -68,3 +77,35 @@ func NodeConditions(master, replicating, lagged, readOnly bool) []api.NodeCondit }, } } + +// RefreshFn receives a client and a runtime.Objects and refreshes the object from k8s +// example: Eventually(RefreshFn(c, cluster.Unwrap())).Should(HaveClusterStatusReadyNodes(2)) +func RefreshFn(c client.Client, obj runtime.Object) func() runtime.Object { + return func() runtime.Object { + objMeta, ok := obj.(metav1.Object) + if !ok { + return nil + } + + objKey := types.NamespacedName{ + Name: objMeta.GetName(), + Namespace: objMeta.GetNamespace(), + } + + if err := c.Get(context.TODO(), objKey, obj); err == nil { + return obj + } + + // if the object is not updated then return nil, not the old object + return nil + } +} + +// HaveClusterStatusReadyNodes a matcher that checks cluster ready nodes to equal the given value +func HaveClusterStatusReadyNodes(nodes int) gomegatypes.GomegaMatcher { + return PointTo(MatchFields(IgnoreExtras, Fields{ + "Status": MatchFields(IgnoreExtras, Fields{ + "ReadyNodes": Equal(nodes), + }), + })) +} diff --git a/pkg/controller/mysqlcluster/internal/syncer/settings.go b/pkg/controller/mysqlcluster/internal/syncer/settings.go index 162bc4a5a..0907310bd 100644 --- a/pkg/controller/mysqlcluster/internal/syncer/settings.go +++ b/pkg/controller/mysqlcluster/internal/syncer/settings.go @@ -28,25 +28,20 @@ const ( // MysqlPort is the default mysql port. MysqlPort = constants.MysqlPort - // HelperXtrabackupPortName is name of the port on which we take backups - HelperXtrabackupPortName = "xtrabackup" - // HelperXtrabackupPort is the port on which we serve backups - HelperXtrabackupPort = constants.HelperXtrabackupPort - // OrcTopologyDir path where orc conf secret is mounted OrcTopologyDir = constants.OrcTopologyDir - // HelperServerPort represents the port on which http server will run - HelperServerPort = constants.HelperServerPort - // HelperServerProbePath the probe path - HelperServerProbePath = constants.HelperServerProbePath + // SidecarServerPortName name of the port + SidecarServerPortName = "sidecar-http" + // SidecarServerPort represents the port on which http server will run + SidecarServerPort = constants.SidecarServerPort + // SidecarServerProbePath the probe path + SidecarServerProbePath = constants.SidecarServerProbePath // ExporterPort is the port that metrics will be exported ExporterPort = constants.ExporterPort - //ExporterPortName the name of the metrics exporter port ExporterPortName = "prometheus" - // ExporterPath is the path on which metrics are expose ExporterPath = constants.ExporterPath @@ -56,13 +51,10 @@ const ( // ConfVolumeMountPath is the path where mysql configs will be mounted ConfVolumeMountPath = constants.ConfVolumeMountPath - // DataVolumeMountPath is the path to mysql data DataVolumeMountPath = constants.DataVolumeMountPath - // ConfMapVolumeMountPath represents the temp config mount path in init containers ConfMapVolumeMountPath = constants.ConfMapVolumeMountPath - // ConfDPath is the path to extra mysql configs dir ConfDPath = constants.ConfDPath ) diff --git a/pkg/controller/mysqlcluster/internal/syncer/statefullset.go b/pkg/controller/mysqlcluster/internal/syncer/statefullset.go index d1f41e8d8..e987fb92b 100644 --- a/pkg/controller/mysqlcluster/internal/syncer/statefullset.go +++ b/pkg/controller/mysqlcluster/internal/syncer/statefullset.go @@ -85,14 +85,6 @@ func NewStatefulSetSyncer(c client.Client, scheme *runtime.Scheme, cluster *mysq func (s *sfsSyncer) SyncFn(in runtime.Object) error { out := in.(*apps.StatefulSet) - if out.Status.ReadyReplicas == *s.cluster.Spec.Replicas { - s.cluster.UpdateStatusCondition(api.ClusterConditionReady, - core.ConditionTrue, "statefulset ready", "Cluster is ready.") - } else { - s.cluster.UpdateStatusCondition(api.ClusterConditionReady, - core.ConditionFalse, "statefulset not ready", "Cluster is not ready.") - } - s.cluster.Status.ReadyNodes = int(out.Status.ReadyReplicas) out.Spec.Replicas = s.cluster.Spec.Replicas @@ -356,21 +348,20 @@ func (s *sfsSyncer) ensureContainersSpec() []core.Container { }, }) - helper := s.ensureContainer(containerSidecarName, + // SIDECAR container + sidecar := s.ensureContainer(containerSidecarName, s.opt.HelperImage, []string{"config-and-serve"}, ) - helper.Ports = ensurePorts(core.ContainerPort{ - Name: HelperXtrabackupPortName, - ContainerPort: HelperXtrabackupPort, + sidecar.Ports = ensurePorts(core.ContainerPort{ + Name: SidecarServerPortName, + ContainerPort: SidecarServerPort, }) - helper.Resources = ensureResources(containerSidecarName) - - // HELPER container - helper.ReadinessProbe = ensureProbe(30, 5, 5, core.Handler{ + sidecar.Resources = ensureResources(containerSidecarName) + sidecar.ReadinessProbe = ensureProbe(30, 5, 5, core.Handler{ HTTPGet: &core.HTTPGetAction{ - Path: HelperServerProbePath, - Port: intstr.FromInt(HelperServerPort), + Path: SidecarServerProbePath, + Port: intstr.FromInt(SidecarServerPort), Scheme: core.URISchemeHTTP, }, }) @@ -417,7 +408,7 @@ func (s *sfsSyncer) ensureContainersSpec() []core.Container { containers := []core.Container{ mysql, - helper, + sidecar, exporter, heartbeat, } diff --git a/pkg/controller/mysqlcluster/mysqlcluster_controller_test.go b/pkg/controller/mysqlcluster/mysqlcluster_controller_test.go index a85135b6d..e66011470 100644 --- a/pkg/controller/mysqlcluster/mysqlcluster_controller_test.go +++ b/pkg/controller/mysqlcluster/mysqlcluster_controller_test.go @@ -40,6 +40,7 @@ import ( api "github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1" "github.com/presslabs/mysql-operator/pkg/controller/internal/testutil" + util "github.com/presslabs/mysql-operator/pkg/controller/internal/testutil" "github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster" ) @@ -231,7 +232,7 @@ var _ = Describe("MysqlCluster controller", func() { Expect(statefulSet.Spec.Template.ObjectMeta.Annotations["config_rev"]).To(Equal(cfgMap.ResourceVersion)) Expect(statefulSet.Spec.Template.ObjectMeta.Annotations["secret_rev"]).To(Equal(secret.ResourceVersion)) }) - It("should update cluster ready condition", func() { + It("should update cluster ready nodes", func() { // get statefulset sfsKey := types.NamespacedName{ Name: cluster.GetNameForResource(mysqlcluster.StatefulSet), @@ -250,7 +251,9 @@ var _ = Describe("MysqlCluster controller", func() { // expect a reconcile event Eventually(requests, timeout).Should(Receive(Equal(expectedRequest))) Eventually(requests, timeout).Should(Receive(Equal(expectedRequest))) - Eventually(getClusterConditions(c, cluster), timeout).Should(haveCondWithStatus(api.ClusterConditionReady, corev1.ConditionTrue)) + + // check ready nodes are updated + Eventually(util.RefreshFn(c, cluster.Unwrap())).Should(util.HaveClusterStatusReadyNodes(2)) }) It("should label pods as healthy and as master accordingly", func() { pod0 := getPod(cluster, 0) @@ -527,23 +530,6 @@ func nodeStatusForPod(cluster *mysqlcluster.MysqlCluster, pod *corev1.Pod, maste } } -// getClusterConditions is a helper func that returns a functions that returns cluster status conditions -func getClusterConditions(c client.Client, cluster *mysqlcluster.MysqlCluster) func() []api.ClusterCondition { - return func() []api.ClusterCondition { - cl := &api.MysqlCluster{} - c.Get(context.TODO(), types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, cl) - return cl.Status.Conditions - } -} - -// haveCondWithStatus is a helper func that returns a matcher to check for an existing condition in a ClusterCondition list. -func haveCondWithStatus(condType api.ClusterConditionType, status corev1.ConditionStatus) gomegatypes.GomegaMatcher { - return ContainElement(MatchFields(IgnoreExtras, Fields{ - "Type": Equal(condType), - "Status": Equal(status), - })) -} - func haveLabelWithValue(label, value string) gomegatypes.GomegaMatcher { return PointTo(MatchFields(IgnoreExtras, Fields{ "ObjectMeta": MatchFields(IgnoreExtras, Fields{ diff --git a/pkg/controller/orchestrator/orchestrator_controller_test.go b/pkg/controller/orchestrator/orchestrator_controller_test.go index 414a5d215..c79634825 100644 --- a/pkg/controller/orchestrator/orchestrator_controller_test.go +++ b/pkg/controller/orchestrator/orchestrator_controller_test.go @@ -41,8 +41,9 @@ import ( ) var ( - one = int32(1) - two = int32(2) + one = int32(1) + two = int32(2) + three = int32(3) ) var _ = Describe("Orchestrator controller", func() { diff --git a/pkg/controller/orchestrator/orchestrator_reconcile.go b/pkg/controller/orchestrator/orchestrator_reconcile.go index 4e4ca8b47..9b1a26c12 100644 --- a/pkg/controller/orchestrator/orchestrator_reconcile.go +++ b/pkg/controller/orchestrator/orchestrator_reconcile.go @@ -35,7 +35,10 @@ import ( const ( // recoveryGraceTime is the time, in seconds, that has to pass since cluster // is marked as Ready and to acknowledge a recovery for a cluster - recoveryGraceTime = 600 + recoveryGraceTime = 600 + // forgetGraceTime represents the time, in seconds, that needs to pass since cluster is ready to + // remove a node from orchestrator + forgetGraceTime = 30 defaultMaxSlaveLatency int64 = 30 mysqlPort = 3306 ) @@ -60,13 +63,14 @@ func (ou *orcUpdater) GetOwner() runtime.Object { return ou.cluster } func (ou *orcUpdater) Sync(ctx context.Context) (syncer.SyncResult, error) { // get instances from orchestrator var ( - instances InstancesSet + allInstances InstancesSet err error recoveries []orc.TopologyRecovery - maintenances []orc.Maintenance + master *orc.Instance ) - if instances, err = ou.orcClient.Cluster(ou.cluster.GetClusterAlias()); err != nil { + // get all related instances from orchestrator + if allInstances, err = ou.orcClient.Cluster(ou.cluster.GetClusterAlias()); err != nil { log.V(-1).Info("can't get instances from orchestrator", "alias", ou.cluster.GetClusterAlias(), "error", err.Error()) if !orc.IsNotFound(err) { log.Error(err, "orchestrator is not reachable", "cluster_alias", ou.cluster.GetClusterAlias()) @@ -74,37 +78,48 @@ func (ou *orcUpdater) Sync(ctx context.Context) (syncer.SyncResult, error) { } } - if len(instances) != 0 { - if maintenances, err = ou.orcClient.Maintenance(); err != nil { - log.V(-1).Info("can't get mainanaces from orchestrator", "alias", ou.cluster.GetClusterAlias(), "error", err.Error()) + // get master node for the cluster + if master, err = ou.orcClient.Master(ou.cluster.GetClusterAlias()); err != nil { + log.V(-1).Info("can't get master from orchestrator", "alias", ou.cluster.GetClusterAlias(), "error", err.Error()) + if !orc.IsNotFound(err) { + log.Error(err, "orchestrator is not reachable", "cluster_alias", ou.cluster.GetClusterAlias()) + return syncer.SyncResult{}, err } } // register nodes in orchestrator if needed, or remove nodes from status - instances = ou.updateNodesInOrc(instances) + instances, undiscoveredInstances, toRemoveInstances := ou.updateNodesInOrc(allInstances) - // remove nodes that are not in orchestrator + // register new nodes into orchestrator + ou.discoverNodesInOrc(undiscoveredInstances) + + // remove nodes which are not registered in orchestrator from status ou.removeNodeConditionNotInOrc(instances) // set readonly in orchestrator if needed - if err = ou.markReadOnlyNodesInOrc(instances, maintenances); err != nil { - log.Error(err, "error setting Master readOnly/writable", "instances", instances) - } + ou.markReadOnlyNodesInOrc(instances, master) + // update cluster status accordingly with orchestrator - ou.updateStatusFromOrc(instances) + ou.updateStatusFromOrc(instances, master) + + // updates cluster ready status based on nodes status from orchestrator + ou.updateClusterReadyStatus() + + // remove old nodes from orchestrator, depends on cluster ready status + ou.forgetNodesFromOrc(toRemoveInstances) // get recoveries for this cluster if recoveries, err = ou.orcClient.AuditRecovery(ou.cluster.GetClusterAlias()); err != nil { log.V(-1).Info("can't get recoveries from orchestrator", "alias", ou.cluster.GetClusterAlias(), "error", err.Error()) } - // update cluster status + // update cluster status for recoveries ou.updateStatusForRecoveries(recoveries) // filter recoveries that can be acknowledged toAck := ou.getRecoveriesToAck(recoveries) - // acknowledge recoveries + // acknowledge recoveries, depends on cluster ready status if err = ou.acknowledgeRecoveries(toAck); err != nil { log.Error(err, "failed to acknowledge recoveries", "alias", ou.cluster.GetClusterAlias(), "ack_recoveries", toAck) } @@ -112,8 +127,46 @@ func (ou *orcUpdater) Sync(ctx context.Context) (syncer.SyncResult, error) { return syncer.SyncResult{}, nil } +func (ou *orcUpdater) updateClusterReadyStatus() { + if ou.cluster.Status.ReadyNodes != int(*ou.cluster.Spec.Replicas) { + ou.cluster.UpdateStatusCondition(api.ClusterConditionReady, + core.ConditionFalse, "StatefulSetNotReady", "StatefulSet is not ready") + return + } + + hasMaster := false + for i := 0; i < int(*ou.cluster.Spec.Replicas); i++ { + hostname := ou.cluster.GetPodHostname(i) + ns := ou.cluster.GetNodeStatusFor(hostname) + master := getCondAsBool(&ns, api.NodeConditionMaster) + replicating := getCondAsBool(&ns, api.NodeConditionReplicating) + + if master { + hasMaster = true + } else if !replicating { + ou.cluster.UpdateStatusCondition(api.ClusterConditionReady, core.ConditionFalse, "NotReplicating", + fmt.Sprintf("Node %s is part of topology and not replicating", hostname)) + return + } + } + + if !hasMaster && !ou.cluster.Spec.ReadOnly { + ou.cluster.UpdateStatusCondition(api.ClusterConditionReady, core.ConditionFalse, "NoMaster", + "Cluster has no designated master") + return + } + + ou.cluster.UpdateStatusCondition(api.ClusterConditionReady, + core.ConditionTrue, "ClusterReady", "Cluster is ready") +} + +func getCondAsBool(status *api.NodeStatus, cond api.NodeConditionType) bool { + index, exists := mysqlcluster.GetNodeConditionIndex(status, cond) + return exists && status.Conditions[index].Status == core.ConditionTrue +} + // nolint: gocyclo -func (ou *orcUpdater) updateStatusFromOrc(insts InstancesSet) { +func (ou *orcUpdater) updateStatusFromOrc(insts InstancesSet, master *orc.Instance) { log.V(1).Info("updating nodes status", "insts", insts) // we assume that cluster is in ReadOnly @@ -125,8 +178,6 @@ func (ou *orcUpdater) updateStatusFromOrc(insts InstancesSet) { maxSlaveLatency = *ou.cluster.Spec.MaxSlaveLatency } - master := insts.DetermineMaster() - // update conditions for every node for _, node := range insts { host := node.Key.Hostname @@ -187,14 +238,14 @@ func (ou *orcUpdater) updateStatusFromOrc(insts InstancesSet) { // updateNodesInOrc is the functions that tries to register // unregistered nodes and to remove nodes that does not exists. -func (ou *orcUpdater) updateNodesInOrc(instances InstancesSet) InstancesSet { +func (ou *orcUpdater) updateNodesInOrc(instances InstancesSet) (InstancesSet, []orc.InstanceKey, []orc.InstanceKey) { var ( // hosts that should be discovered - shouldDiscover []string - // list of instances from orchestrator that should be removed - shouldForget []string - // the list of instances that are in orchestrator and k8s - instancesFiltered InstancesSet + shouldDiscover []orc.InstanceKey + // list of instances that should be removed from orchestrator + shouldForget []orc.InstanceKey + // list of instances from orchestrator that are present in k8s + readyInstances InstancesSet ) log.V(1).Info("nodes (un)registrations", "readyNodes", ou.cluster.Status.ReadyNodes) @@ -208,53 +259,57 @@ func (ou *orcUpdater) updateNodesInOrc(instances InstancesSet) InstancesSet { if i < ou.cluster.Status.ReadyNodes { // host is not present into orchestrator // register new host into orchestrator - shouldDiscover = append(shouldDiscover, host) + hostKey := orc.InstanceKey{ + Hostname: host, + Port: mysqlPort, + } + shouldDiscover = append(shouldDiscover, hostKey) } } else { // this instance is present in both k8s and orchestrator - instancesFiltered = append(instancesFiltered, *inst) + readyInstances = append(readyInstances, *inst) } } - // the only state in which a node can be removed from orchestrator - if int(*ou.cluster.Spec.Replicas) == ou.cluster.Status.ReadyNodes { - // remove all instances from orchestrator that does not exists in k8s - for _, inst := range instances { - if i := instancesFiltered.GetInstance(inst.Key.Hostname); i == nil { - shouldForget = append(shouldForget, inst.Key.Hostname) - } + // remove all instances from orchestrator that does not exists in k8s + for _, inst := range instances { + if i := readyInstances.GetInstance(inst.Key.Hostname); i == nil { + shouldForget = append(shouldForget, inst.Key) } } + if ou.cluster.DeletionTimestamp == nil { - ou.discoverNodesInOrc(shouldDiscover) - ou.forgetNodesFromOrc(shouldForget) - } else { - log.V(1).Info("cluster is deleted - forget all nodes") - // cluster is deleted, remove all hosts from orchestrator - var hosts []string - for _, i := range instances { - hosts = append(hosts, i.Key.Hostname) - } - ou.forgetNodesFromOrc(hosts) + return readyInstances, shouldDiscover, shouldForget } - return instancesFiltered + toRemove := []orc.InstanceKey{} + for _, i := range instances { + toRemove = append(toRemove, i.Key) + } + return readyInstances, []orc.InstanceKey{}, toRemove } -func (ou *orcUpdater) discoverNodesInOrc(hosts []string) { - log.Info("discovering hosts", "hosts", hosts) - for _, host := range hosts { - if err := ou.orcClient.Discover(host, mysqlPort); err != nil { - log.Error(err, "failed to discover host with orchestrator", "host", host) +func (ou *orcUpdater) forgetNodesFromOrc(keys []orc.InstanceKey) { + // the only state in which a node can be removed from orchestrator + // if cluster is ready or if cluster is deleted + ready := ou.cluster.GetClusterCondition(api.ClusterConditionReady) + if ready != nil && ready.Status == core.ConditionTrue && + time.Since(ready.LastTransitionTime.Time).Seconds() > forgetGraceTime || + ou.cluster.DeletionTimestamp != nil { + // remove all instances from orchestrator that does not exists in k8s + for _, key := range keys { + if err := ou.orcClient.Forget(key.Hostname, key.Port); err != nil { + log.Error(err, "failed to forget host with orchestrator", "key", key.Hostname) + } } } } -func (ou *orcUpdater) forgetNodesFromOrc(hosts []string) { - log.Info("forgeting hosts", "hosts", hosts) - for _, host := range hosts { - if err := ou.orcClient.Forget(host, mysqlPort); err != nil { - log.Error(err, "failed to forget host with orchestrator", "host", host) +func (ou *orcUpdater) discoverNodesInOrc(keys []orc.InstanceKey) { + log.Info("discovering hosts", "keys", keys) + for _, key := range keys { + if err := ou.orcClient.Discover(key.Hostname, key.Port); err != nil { + log.Error(err, "failed to discover host with orchestrator", "key", key) } } } @@ -266,15 +321,10 @@ func (ou *orcUpdater) getRecoveriesToAck(recoveries []orc.TopologyRecovery) []or return toAck } - i, found := condIndexCluster(ou.cluster, api.ClusterConditionReady) - if !found || ou.cluster.Status.Conditions[i].Status != core.ConditionTrue { - log.Info("skip acknowledging recovery since cluster is not ready yet", "cluster", ou.cluster) - return toAck - } - - timeSinceReady := time.Since(ou.cluster.Status.Conditions[i].LastTransitionTime.Time).Seconds() - if timeSinceReady < recoveryGraceTime { - log.Info("cluster not ready for acknowledge", "timeSinceReady", timeSinceReady, "threshold", recoveryGraceTime) + ready := ou.cluster.GetClusterCondition(api.ClusterConditionReady) + if !(ready != nil && ready.Status == core.ConditionTrue && + time.Since(ready.LastTransitionTime.Time).Seconds() > recoveryGraceTime) { + log.Info("cluster not ready for acknowledge", "threshold", recoveryGraceTime) return toAck } @@ -299,7 +349,7 @@ func (ou *orcUpdater) getRecoveriesToAck(recoveries []orc.TopologyRecovery) []or } func (ou *orcUpdater) acknowledgeRecoveries(toAck []orc.TopologyRecovery) error { - comment := fmt.Sprintf("Statefulset '%s' is healthy for more then %d seconds", + comment := fmt.Sprintf("Statefulset '%s' is healthy for more than %d seconds", ou.cluster.GetNameForResource(mysqlcluster.StatefulSet), recoveryGraceTime, ) @@ -333,17 +383,6 @@ func (ou *orcUpdater) updateStatusForRecoveries(recoveries []orc.TopologyRecover } } -// nolint: unparam -func condIndexCluster(cluster *mysqlcluster.MysqlCluster, condType api.ClusterConditionType) (int, bool) { - for i, cond := range cluster.Status.Conditions { - if cond.Type == condType { - return i, true - } - } - - return 0, false -} - // updateNodeCondition is a helper function that updates condition for a specific node func (ou *orcUpdater) updateNodeCondition(host string, cType api.NodeConditionType, status core.ConditionStatus) { ou.cluster.UpdateNodeConditionStatus(host, cType, status) @@ -375,27 +414,6 @@ func (ou *orcUpdater) setWritableNode(inst orc.Instance) error { return nil } -func (ou *orcUpdater) beginNodeMaintenance(inst orc.Instance, isInMaintenance bool) error { - - if isInMaintenance { - return nil - } - - log.Info("set node in maintenance", "node", inst.Key.Hostname) - return ou.orcClient.BeginMaintenance(inst.Key, "mysqlcontroller", "clusterReadOnly") - -} - -func (ou *orcUpdater) endNodeMaintenance(inst orc.Instance, isInMaintenance bool) error { - - if !isInMaintenance { - return nil - } - - log.Info("set node out of maintenance", "node", inst.Key.Hostname) - return ou.orcClient.EndMaintenance(inst.Key) -} - // set a host read only just if needed func (ou *orcUpdater) setReadOnlyNode(inst orc.Instance) error { if !inst.ReadOnly { @@ -406,40 +424,28 @@ func (ou *orcUpdater) setReadOnlyNode(inst orc.Instance) error { } // nolint: gocyclo -func (ou *orcUpdater) markReadOnlyNodesInOrc(insts InstancesSet, maintenances []orc.Maintenance) error { +func (ou *orcUpdater) markReadOnlyNodesInOrc(insts InstancesSet, master *orc.Instance) { var err error - master := insts.DetermineMaster() if master == nil { // master is not found // set cluster read only + log.Info("setting cluster in read-only", "cluster", ou.cluster.GetClusterAlias()) for _, inst := range insts { - isInMaintenance := isInstInMaintenance(inst.Key, maintenances) - - if err = ou.beginNodeMaintenance(inst, isInMaintenance); err != nil { - log.Error(err, "failed to begin maintenance", "instance", inst) - } if err = ou.setReadOnlyNode(inst); err != nil { log.Error(err, "failed to set read only", "instance", inst) } } - return nil + return } - // master is determinated + // master is determined for _, inst := range insts { - isInMaintenance := isInstInMaintenance(inst.Key, maintenances) - if ou.cluster.Spec.ReadOnly { - if err = ou.beginNodeMaintenance(inst, isInMaintenance); err != nil { - log.Error(err, "failed to begin maintenance", "instance", inst) - } if err = ou.setReadOnlyNode(inst); err != nil { log.Error(err, "failed to set read only", "instance", inst) } } else { - if err = ou.endNodeMaintenance(inst, isInMaintenance); err != nil { - log.Error(err, "failed to end maintenance", "instance", inst) - } + // set master writable or replica read-only if inst.Key.Hostname == master.Key.Hostname { if err = ou.setWritableNode(inst); err != nil { log.Error(err, "failed to set writable", "instance", inst) @@ -451,8 +457,6 @@ func (ou *orcUpdater) markReadOnlyNodesInOrc(insts InstancesSet, maintenances [] } } } - - return nil } // InstancesSet type is a alias for []orc.Instance with extra utils functions @@ -495,6 +499,7 @@ func (is InstancesSet) DetermineMaster() *orc.Instance { for _, node := range is { master := is.getMasterForNode(&node) if master == nil { + log.V(1).Info("DetermineMaster: master not found for node", "node", node) return nil } masterForNode = append(masterForNode, *master) @@ -504,12 +509,15 @@ func (is InstancesSet) DetermineMaster() *orc.Instance { masterHostName := masterForNode[0] for _, node := range masterForNode { if node.Key.Hostname != masterHostName.Key.Hostname { + log.V(1).Info("DetermineMaster: a node has different master", "node", node, + "master", masterForNode) return nil } } return &masterHostName } + log.V(1).Info("DetermineMaster: master not set", "instances", is) return nil } @@ -523,13 +531,3 @@ func makeRecoveryMessage(acks []orc.TopologyRecovery) string { return strings.Join(texts, " ") } - -func isInstInMaintenance(instKey orc.InstanceKey, maintenances []orc.Maintenance) bool { - for _, m := range maintenances { - if m.Key == instKey { - return true - } - } - - return false -} diff --git a/pkg/controller/orchestrator/orchestrator_reconcile_test.go b/pkg/controller/orchestrator/orchestrator_reconcile_test.go index 9473d0e16..3e14f20fb 100644 --- a/pkg/controller/orchestrator/orchestrator_reconcile_test.go +++ b/pkg/controller/orchestrator/orchestrator_reconcile_test.go @@ -128,8 +128,8 @@ var _ = Describe("Orchestrator reconciler", func() { It("should update cluster status", func() { _, err := orcSyncer.Sync(context.TODO()) Expect(err).To(Succeed()) - Expect(cluster.Status).To(haveCondWithStatus(api.ClusterConditionReadOnly, core.ConditionFalse)) - Expect(cluster.Status).To(haveCondWithStatus(api.ClusterConditionFailoverAck, core.ConditionFalse)) + Expect(cluster.Status).To(haveCondWithStatus(api.ClusterConditionReadOnly, core.ConditionFalse, "ClusterReadOnlyFalse")) + Expect(cluster.Status).To(haveCondWithStatus(api.ClusterConditionFailoverAck, core.ConditionFalse, "NoPendingFailoverAckExists")) Expect(cluster.Status.Nodes).To(HaveLen(1)) }) @@ -138,7 +138,7 @@ var _ = Describe("Orchestrator reconciler", func() { orcClient.AddRecoveries(cluster.GetClusterAlias(), false) _, err := orcSyncer.Sync(context.TODO()) Expect(err).To(Succeed()) - Expect(cluster.Status).To(haveCondWithStatus(api.ClusterConditionFailoverAck, core.ConditionTrue)) + Expect(cluster.Status).To(haveCondWithStatus(api.ClusterConditionFailoverAck, core.ConditionTrue, "PendingFailoverAckExists")) }) It("should not acknowledge pending recoveries when cluster is not ready for enough time", func() { @@ -150,7 +150,7 @@ var _ = Describe("Orchestrator reconciler", func() { _, err := orcSyncer.Sync(context.TODO()) Expect(err).To(Succeed()) - Expect(cluster.Status).To(haveCondWithStatus(api.ClusterConditionFailoverAck, core.ConditionTrue)) + Expect(cluster.Status).To(haveCondWithStatus(api.ClusterConditionFailoverAck, core.ConditionTrue, "PendingFailoverAckExists")) Expect(orcClient.CheckAck(id)).To(Equal(false)) }) @@ -168,7 +168,7 @@ var _ = Describe("Orchestrator reconciler", func() { _, err := orcSyncer.Sync(context.TODO()) Expect(err).To(Succeed()) - Expect(cluster.Status).To(haveCondWithStatus(api.ClusterConditionFailoverAck, core.ConditionTrue)) + Expect(cluster.Status).To(haveCondWithStatus(api.ClusterConditionFailoverAck, core.ConditionTrue, "PendingFailoverAckExists")) Expect(orcClient.CheckAck(id)).To(Equal(true)) var event string @@ -339,110 +339,133 @@ var _ = Describe("Orchestrator reconciler", func() { recorder: rec, orcClient: orcClient, } - - }) - - It("should set the master readOnly when cluster is read only", func() { // set cluster on readonly, master should be in read only state orcClient.AddInstance(orc.Instance{ ClusterName: cluster.GetClusterAlias(), Key: orc.InstanceKey{Hostname: cluster.GetPodHostname(0)}, - ReadOnly: false, + ReadOnly: false, // mark node as master + // mark instance as uptodate + IsUpToDate: true, + IsLastCheckValid: true, }) orcClient.AddInstance(orc.Instance{ ClusterName: cluster.GetClusterAlias(), Key: orc.InstanceKey{Hostname: cluster.GetPodHostname(1)}, MasterKey: orc.InstanceKey{Hostname: cluster.GetPodHostname(0)}, ReadOnly: true, + // set replication running on replica + Slave_SQL_Running: true, + Slave_IO_Running: true, + // mark instance as uptodate + IsUpToDate: true, + IsLastCheckValid: true, }) + // update cluster nodes status + insts, _ := orcClient.Cluster(cluster.GetClusterAlias()) + master, _ := orcClient.Master(cluster.GetClusterAlias()) + updater.updateStatusFromOrc(insts, master) + }) + + It("should update status for nodes on cluster", func() { + Expect(cluster.GetNodeStatusFor(cluster.GetPodHostname(0))).To(haveNodeCondWithStatus(api.NodeConditionMaster, core.ConditionTrue)) + Expect(cluster.GetNodeStatusFor(cluster.GetPodHostname(1))).To(haveNodeCondWithStatus(api.NodeConditionReplicating, core.ConditionTrue)) + }) + + It("should set the master readOnly when cluster is read only", func() { cluster.Spec.ReadOnly = true insts, _ := orcClient.Cluster(cluster.GetClusterAlias()) - Expect(updater.markReadOnlyNodesInOrc(insts, []orc.Maintenance{})).To(Succeed()) + master, _ := orcClient.Master(cluster.GetClusterAlias()) + updater.markReadOnlyNodesInOrc(insts, master) + // check master (node-0) to be read-only insts, _ = orcClient.Cluster(cluster.GetClusterAlias()) - for _, instance := range insts { - if instance.Key.Hostname == cluster.GetPodHostname(0) { - Expect(instance.ReadOnly).To(Equal(true)) - } - } - + node0 := InstancesSet(insts).GetInstance(cluster.GetPodHostname(0)) + Expect(node0.ReadOnly).To(Equal(true)) }) It("should set the master writable when cluster is writable", func() { - orcClient.AddInstance(orc.Instance{ - ClusterName: cluster.GetClusterAlias(), - Key: orc.InstanceKey{Hostname: cluster.GetPodHostname(0)}, - ReadOnly: true, - }) - orcClient.AddInstance(orc.Instance{ - ClusterName: cluster.GetClusterAlias(), - Key: orc.InstanceKey{Hostname: cluster.GetPodHostname(1)}, - MasterKey: orc.InstanceKey{Hostname: cluster.GetPodHostname(0)}, - ReadOnly: true, - }) - //Set ReadOnly to false in order to get the master Writable cluster.Spec.ReadOnly = false insts, _ := orcClient.Cluster(cluster.GetClusterAlias()) - Expect(updater.markReadOnlyNodesInOrc(insts, []orc.Maintenance{})).To(Succeed()) + master := InstancesSet(insts).GetInstance(cluster.GetPodHostname(0)) // set node0 as master + updater.markReadOnlyNodesInOrc(insts, master) + // check master (node-0) to be writable insts, _ = orcClient.Cluster(cluster.GetClusterAlias()) - master := InstancesSet(insts).GetInstance(cluster.GetPodHostname(0)) - Expect(master.ReadOnly).To(Equal(false)) - - slave := InstancesSet(insts).GetInstance(cluster.GetPodHostname(1)) - Expect(slave.ReadOnly).To(Equal(true)) + node0 := InstancesSet(insts).GetInstance(cluster.GetPodHostname(0)) + Expect(node0.ReadOnly).To(Equal(false)) + // check slave (node-1) to be read-only + node1 := InstancesSet(insts).GetInstance(cluster.GetPodHostname(1)) + Expect(node1.ReadOnly).To(Equal(true)) }) It("should remove old nodes from orchestrator", func() { - orcClient.AddInstance(orc.Instance{ - ClusterName: cluster.GetClusterAlias(), - Key: orc.InstanceKey{Hostname: cluster.GetPodHostname(0)}, - ReadOnly: false, - }) - orcClient.AddInstance(orc.Instance{ - ClusterName: cluster.GetClusterAlias(), - Key: orc.InstanceKey{Hostname: cluster.GetPodHostname(1)}, - MasterKey: orc.InstanceKey{Hostname: cluster.GetPodHostname(0)}, - ReadOnly: true, - }) - cluster.Spec.Replicas = &one cluster.Status.ReadyNodes = 1 + // set cluster ready condition and set lastTransitionTime to 100 seconds before + cluster.UpdateStatusCondition(api.ClusterConditionReady, core.ConditionTrue, "", "") + ltt := metav1.NewTime(time.Now().Add(-100 * time.Second)) + cluster.GetClusterCondition(api.ClusterConditionReady).LastTransitionTime = ltt // call register and unregister nodes in orc insts, _ := orcClient.Cluster(cluster.GetClusterAlias()) - updater.updateNodesInOrc(insts) + _, _, rm := updater.updateNodesInOrc(insts) + updater.forgetNodesFromOrc(rm) // check for instances in orc insts, _ = orcClient.Cluster(cluster.GetClusterAlias()) Expect(insts).To(HaveLen(1)) }) - When("status.readyNodes != spec.replicas", func() { - It("should not remove nodes from orchestrator", func() { - orcClient.AddInstance(orc.Instance{ - ClusterName: cluster.GetClusterAlias(), - Key: orc.InstanceKey{Hostname: cluster.GetPodHostname(0)}, - ReadOnly: false, - }) + When("cluster is not ready", func() { + BeforeEach(func() { + cluster.Spec.Replicas = &two + cluster.Status.ReadyNodes = 1 + // call status updater to update status + updater.updateClusterReadyStatus() + }) + + It("should mark cluster as not ready", func() { + Expect(cluster.Status).To(haveCondWithStatus(api.ClusterConditionReady, core.ConditionFalse, "StatefulSetNotReady")) + + // mark cluster as ready + cluster.Status.ReadyNodes = 2 + updater.updateClusterReadyStatus() + + Expect(cluster.Status).To(haveCondWithStatus(api.ClusterConditionReady, core.ConditionTrue, "ClusterReady")) + + // update cluster nodes status but this time with a not replicating node orcClient.AddInstance(orc.Instance{ ClusterName: cluster.GetClusterAlias(), - Key: orc.InstanceKey{Hostname: cluster.GetPodHostname(1)}, + Key: orc.InstanceKey{Hostname: cluster.GetPodHostname(2)}, MasterKey: orc.InstanceKey{Hostname: cluster.GetPodHostname(0)}, ReadOnly: true, + // set replication running on replica + Slave_SQL_Running: false, + Slave_IO_Running: false, + // mark instance as uptodate + IsUpToDate: true, + IsLastCheckValid: true, }) + insts, _ := orcClient.Cluster(cluster.GetClusterAlias()) + master, _ := orcClient.Master(cluster.GetClusterAlias()) + cluster.Spec.Replicas = &three + cluster.Status.ReadyNodes = 3 + updater.updateStatusFromOrc(insts, master) + updater.updateClusterReadyStatus() - cluster.Spec.Replicas = &two - cluster.Status.ReadyNodes = 1 + Expect(cluster.Status).To(haveCondWithStatus(api.ClusterConditionReady, core.ConditionFalse, "NotReplicating")) + }) + It("should not remove nodes from orchestrator", func() { // call register and unregister nodes in orc insts, _ := orcClient.Cluster(cluster.GetClusterAlias()) - updater.updateNodesInOrc(insts) + _, _, rm := updater.updateNodesInOrc(insts) + updater.forgetNodesFromOrc(rm) // check for instances in orc insts, _ = orcClient.Cluster(cluster.GetClusterAlias()) @@ -465,11 +488,12 @@ func haveNodeCondWithStatus(condType api.NodeConditionType, status core.Conditio } // haveCondWithStatus is a helper func that returns a matcher to check for an existing condition in a ClusterCondition list. -func haveCondWithStatus(condType api.ClusterConditionType, status core.ConditionStatus) gomegatypes.GomegaMatcher { +func haveCondWithStatus(condType api.ClusterConditionType, status core.ConditionStatus, reason string) gomegatypes.GomegaMatcher { return MatchFields(IgnoreExtras, Fields{ "Conditions": ContainElement(MatchFields(IgnoreExtras, Fields{ "Type": Equal(condType), "Status": Equal(status), + "Reason": Equal(reason), })), }) diff --git a/pkg/internal/mysqlcluster/status.go b/pkg/internal/mysqlcluster/status.go index 5b1892d13..3a936d722 100644 --- a/pkg/internal/mysqlcluster/status.go +++ b/pkg/internal/mysqlcluster/status.go @@ -80,6 +80,16 @@ func (c *MysqlCluster) condIndex(condType api.ClusterConditionType) (int, bool) return 0, false } +// GetClusterCondition returns the cluster condition of the given type +func (c *MysqlCluster) GetClusterCondition(condType api.ClusterConditionType) *api.ClusterCondition { + i, found := c.condIndex(condType) + if found { + return &c.Status.Conditions[i] + } + + return nil +} + // UpdateNodeConditionStatus updates the status of the condition for a given name and type func (c *MysqlCluster) UpdateNodeConditionStatus(nodeName string, condType api.NodeConditionType, status core.ConditionStatus) bool { i := c.GetNodeStatusIndex(nodeName) diff --git a/pkg/orchestrator/errors.go b/pkg/orchestrator/errors.go index 9cbdab522..8e8005cea 100644 --- a/pkg/orchestrator/errors.go +++ b/pkg/orchestrator/errors.go @@ -82,6 +82,11 @@ func IsNotFound(err error) bool { if strings.Contains(orcErr.Message, "Cannot read instance") { return true } + + // https://github.com/github/orchestrator/blob/7bef26f042aafbd956daeaede0cd4aab2ba46e65/go/http/api.go#L1949 + if strings.Contains(orcErr.Message, "No masters found") { + return true + } } return false } diff --git a/pkg/orchestrator/fake/client.go b/pkg/orchestrator/fake/client.go index ea2339a01..f0668b6d7 100644 --- a/pkg/orchestrator/fake/client.go +++ b/pkg/orchestrator/fake/client.go @@ -232,12 +232,13 @@ func (o *OrcFakeClient) Master(clusterHint string) (*Instance, error) { if !ok { return nil, NewErrorMsg("Unable to determine cluster name", "/master") } + for _, inst := range insts { if !inst.ReadOnly { return inst, nil } } - return nil, NewErrorMsg("Unable to determine master", "/master") + return nil, NewErrorMsg("No masters found", "/master") } // Cluster returns the list of instances from a cluster diff --git a/pkg/sidecar/apphelper/apphelper.go b/pkg/sidecar/apphelper/apphelper.go index 38764edba..c6ff31182 100644 --- a/pkg/sidecar/apphelper/apphelper.go +++ b/pkg/sidecar/apphelper/apphelper.go @@ -171,15 +171,16 @@ func configReadOnly() error { func configTopology() error { if util.NodeRole() == "slave" { - + log.Info("setting up as slave") if util.ShouldBootstrapNode() { + log.Info("doing bootstrap") if gtid, err := util.ReadPurgedGTID(); err == nil { - log.V(1).Info("RESET MASTER and setting GTID_PURGED", "gtid", gtid) + log.Info("RESET MASTER and setting GTID_PURGED", "gtid", gtid) if errQ := util.RunQuery("RESET MASTER; SET GLOBAL GTID_PURGED=?", gtid); errQ != nil { return errQ } } else { - log.V(-1).Info("can't determin what GTID to purge", "error", err) + log.V(-1).Info("can't determine what GTID to purge", "error", err) } } diff --git a/pkg/sidecar/apphelper/server.go b/pkg/sidecar/apphelper/server.go index c426eeeca..3a0e4397c 100644 --- a/pkg/sidecar/apphelper/server.go +++ b/pkg/sidecar/apphelper/server.go @@ -82,6 +82,7 @@ func (s *server) backupHandler(w http.ResponseWriter, r *http.Request) { // nolint: gosec xtrabackup := exec.Command("xtrabackup", "--backup", "--slave-info", "--stream=xbstream", + fmt.Sprintf("--tables-exclude=%s.%s", util.ToolsDbName, util.ToolsInitTableName), "--host=127.0.0.1", fmt.Sprintf("--user=%s", util.GetReplUser()), fmt.Sprintf("--password=%s", util.GetReplPass())) diff --git a/pkg/sidecar/util/util.go b/pkg/sidecar/util/util.go index 73889a2fe..9068f85d2 100644 --- a/pkg/sidecar/util/util.go +++ b/pkg/sidecar/util/util.go @@ -40,9 +40,6 @@ import ( var log = logf.Log.WithName("sidecar.util") var ( - // BackupPort is the port on which xtrabackup expose backups, 3306 - BackupPort = strconv.Itoa(constants.HelperXtrabackupPort) - // MysqlPort represents port on which mysql works MysqlPort = strconv.Itoa(constants.MysqlPort) @@ -71,9 +68,9 @@ var ( OrcTopologyDir = constants.OrcTopologyDir // ServerPort http server port - ServerPort = constants.HelperServerPort + ServerPort = constants.SidecarServerPort // ServerProbeEndpoint is the http server endpoint for probe - ServerProbeEndpoint = constants.HelperServerProbePath + ServerProbeEndpoint = constants.SidecarServerProbePath // ServerBackupEndpoint is the http server endpoint for backups ServerBackupEndpoint = "/xbackup" ) diff --git a/pkg/util/constants/constants.go b/pkg/util/constants/constants.go index e2385efac..cf28f9a0e 100644 --- a/pkg/util/constants/constants.go +++ b/pkg/util/constants/constants.go @@ -20,16 +20,13 @@ const ( // MysqlPort is the default mysql port. MysqlPort = 3306 - // HelperXtrabackupPort is the port on which we serve backups - HelperXtrabackupPort = 3307 - // OrcTopologyDir path where orc conf secret is mounted OrcTopologyDir = "/var/run/orc-topology" - // HelperServerPort represents the port on which http server will run - HelperServerPort = 8088 - // HelperServerProbePath the probe path - HelperServerProbePath = "/health" + // SidecarServerPort represents the port on which http server will run + SidecarServerPort = 8080 + // SidecarServerProbePath the probe path + SidecarServerProbePath = "/health" // ExporterPort is the port that metrics will be exported ExporterPort = 9125