diff --git a/tendrl/commons/flows/import_cluster/__init__.py b/tendrl/commons/flows/import_cluster/__init__.py index 1962516e..579d2aee 100644 --- a/tendrl/commons/flows/import_cluster/__init__.py +++ b/tendrl/commons/flows/import_cluster/__init__.py @@ -1,9 +1,11 @@ +import copy import etcd import json from tendrl.commons import flows from tendrl.commons.flows.exceptions import FlowExecutionFailedError from tendrl.commons.objects import AtomExecutionFailedError +from tendrl.commons.utils import time_utils class ImportCluster(flows.BaseFlow): @@ -13,48 +15,142 @@ def __init__(self, *args, **kwargs): def run(self): if "Node[]" not in self.parameters: integration_id = self.parameters['TendrlContext.integration_id'] - _cluster = NS.tendrl.objects.Cluster( - integration_id=NS.tendrl_context.integration_id).load() - if (_cluster.status is not None and - _cluster.status != "" and - _cluster.current_job['status'] == 'in_progress' and - _cluster.status in + short_name = self.parameters.get('Cluster.short_name', None) + if short_name: + if not short_name.isalpha(): + raise FlowExecutionFailedError( + "Invalid cluster short_name: %s" + "Only alpha numeric allowed for short name" % + short_name + ) + else: + self.parameters['TendrlContext.integration_id'] = short_name + # Check for uniqueness of cluster short name + _clusters = NS._int.client.read( + '/clusters' + ) + for entry in _clusters.leaves: + _cluster = NS.tendrl.objects.Cluster( + integration_id=entry.key.split('/')[-1] + ).load() + if _cluster.short_name and short_name and \ + _cluster.short_name == short_name: + raise FlowExecutionFailedError( + "Cluster with name: %s already exists" % short_name + ) + # If corrent node is provisioner, set the cluster integration_id + # with short name + if short_name not in [None, ""] and \ + not NS.tendrl.objects.Cluster(integration_id=short_name).exists(): + NS.tendrl.objects.Cluster(integration_id=short_name).save() + if "provisioner/%s" % integration_id in NS.node_context.tags \ + and short_name: + _ctc = NS.tendrl.objects.ClusterTendrlContext( + integration_id=integration_id + ).load() + NS._int.client.write( + '/indexes/detected_cluster_id_to_integration_id/%s' % _ctc.cluster_id, + short_name + ) + NS._int.client.delete("/clusters/%s" % integration_id) + NS._int.client.write( + '/indexes/tags/provisioner/%s' % short_name, + NS.node_context.node_id + ) + NS._int.client.delete('/indexes/tags/provisioner/%s' % integration_id) + _ctc = NS.tendrl.objects.ClusterTendrlContext( + integration_id=integration_id + ).load() + _ctc.integration_id = short_name + _ctc.save() + NS.tendrl.objects.ClusterTendrlContext( + integration_id=integration_id + ).remove() + + _new_cluster = NS.tendrl.objects.Cluster( + integration_id=short_name or integration_id + ).load() + if (_new_cluster.status is not None and + _new_cluster.status != "" and + _new_cluster.current_job['status'] == 'in_progress' and + _new_cluster.status in ["importing", "unmanaging", "expanding"]): raise FlowExecutionFailedError( "Another job in progress for cluster, please wait till " "the job finishes (job_id: %s) (integration_id: %s) " % ( - _cluster.current_job['job_id'], - _cluster.integration_id + _new_cluster.current_job['job_id'], + _new_cluster.integration_id ) ) - _cluster.status = "importing" - _cluster.current_job = { + _new_cluster.status = "importing" + _new_cluster.current_job = { 'job_id': self.job_id, 'job_name': self.__class__.__name__, 'status': 'in_progress' } - _cluster.save() + _new_cluster.save() try: integration_id_index_key = \ "indexes/tags/tendrl/integration/%s" % integration_id - _node_ids = NS._int.client.read( - integration_id_index_key).value - self.parameters["Node[]"] = json.loads(_node_ids) + _node_ids = json.loads( + NS._int.client.read( + integration_id_index_key + ).value + ) + self.parameters["Node[]"] = _node_ids + if short_name: + if 'provisioner/%s' % integration_id in NS.node_context.tags: + _nc = NS.node_context(node_id=node_id).load() + _tags = _nc.tags + _tags.remove('provisioner/%s' % integration_id) + _tags.append('provisioner/%s' % short_name) + _nc.tags = _tags + _nc.save() + for node_id in _node_ids: + _nc = NS.tendrl.objects.NodeContext(node_id=node_id).load() + _tags = _nc.tags + if 'tendrl/integration/%s' % integration_id in _tags: + _tags.remove('tendrl/integration/%s' % integration_id) + _tags.append('tendrl/integration/%s' % short_name) + _nc.tags = _tags + _nc.save() + _tc = NS.tendrl.objects.TendrlContext(node_id=node_id).load() + if _tc.integration_id not in [None, ""]: + _tc.integration_id = short_name + _tc.save() + NS.tendrl.objects.ClusterNodeContext( + node_id=node_id, + fqdn=_nc.fqdn, + ipv4_addr = _nc.ipv4_addr, + updated_at = str(time_utils.now()), + tags = _nc.tags, + status = _nc.status, + sync_status = _nc.sync_status, + last_sync = _nc.last_sync, + first_sync_done = 'no', + is_managed = 'no' + ).save() + NS._int.client.delete( + '/clusters/%s/nodes/%s' % (integration_id, node_id), + recursive=True + ) + except etcd.EtcdKeyNotFound: _cluster = NS.tendrl.objects.Cluster( - integration_id=NS.tendrl_context.integration_id).load() + integration_id=short_name or integration_id + ).load() _cluster.status = "" _cluster.current_job['status'] = 'failed' _cluster.save() raise FlowExecutionFailedError("Cluster with " "integration_id " "(%s) not found, cannot " - "import" % integration_id) + "import" % short_name or integration_id) else: _cluster = NS.tendrl.objects.Cluster( - integration_id=NS.tendrl_context.integration_id + integration_id=short_name or integration_id ).load() _cluster.volume_profiling_flag = self.parameters[ 'Cluster.volume_profiling_flag'] @@ -69,7 +165,7 @@ def run(self): _job = NS.tendrl.objects.Job(job_id=self.job_id).load() if 'parent' not in _job.payload: _cluster = NS.tendrl.objects.Cluster( - integration_id=NS.tendrl_context.integration_id + integration_id=short_name or integration_id ).load() _cluster.status = "" _cluster.current_job['status'] = "finished" @@ -79,7 +175,7 @@ def run(self): AtomExecutionFailedError, Exception) as ex: _cluster = NS.tendrl.objects.Cluster( - integration_id=NS.tendrl_context.integration_id).load() + integration_id=short_name or integration_id).load() _cluster.status = "" _cluster.current_job['status'] = 'failed' _errors = [] diff --git a/tendrl/commons/objects/cluster/__init__.py b/tendrl/commons/objects/cluster/__init__.py index bffa380c..da703b6d 100644 --- a/tendrl/commons/objects/cluster/__init__.py +++ b/tendrl/commons/objects/cluster/__init__.py @@ -2,7 +2,8 @@ class Cluster(objects.BaseObject): - def __init__(self, integration_id=None, public_network=None, + def __init__(self, integration_id=None, short_name=None, + public_network=None, cluster_network=None, node_configuration=None, conf_overrides=None, node_identifier=None, last_sync=None, is_managed=None, current_job=dict(), @@ -12,6 +13,7 @@ def __init__(self, integration_id=None, public_network=None, *args, **kwargs): super(Cluster, self).__init__(*args, **kwargs) self.integration_id = integration_id + self.short_name = short_name self.public_network = public_network self.cluster_network = cluster_network self.node_configuration = node_configuration diff --git a/tendrl/commons/objects/definition/master.yaml b/tendrl/commons/objects/definition/master.yaml index 8d8baf5c..be49f978 100644 --- a/tendrl/commons/objects/definition/master.yaml +++ b/tendrl/commons/objects/definition/master.yaml @@ -64,6 +64,8 @@ namespace.tendrl: mandatory: - TendrlContext.integration_id - Cluster.volume_profiling_flag + optional: + - Cluster.short_name pre_run: - tendrl.objects.Cluster.atoms.CheckClusterNodesUp - tendrl.objects.Node.atoms.IsNodeTendrlManaged @@ -211,6 +213,7 @@ namespace.tendrl: inputs: mandatory: - TendrlContext.integration_id + - Cluster.short_name name: import cluster run: tendrl.objects.Cluster.atoms.ImportCluster type: Create @@ -250,6 +253,9 @@ namespace.tendrl: integration_id: help: integration id type: String + short_name: + help: Cluster short name + type: String public_network: help: Public Network cidr of the cluster type: String