Skip to content

Commit

Permalink
Merge 97977f1 into 9d98386
Browse files Browse the repository at this point in the history
  • Loading branch information
shtripat committed Apr 1, 2018
2 parents 9d98386 + 97977f1 commit a944486
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 20 deletions.
134 changes: 115 additions & 19 deletions tendrl/commons/flows/import_cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import copy
import etcd
import json

from tendrl.commons import flows
from tendrl.commons.flows.exceptions import FlowExecutionFailedError
from tendrl.commons.objects import AtomExecutionFailedError
from tendrl.commons.utils import time_utils


class ImportCluster(flows.BaseFlow):
Expand All @@ -13,48 +15,142 @@ def __init__(self, *args, **kwargs):
def run(self):
if "Node[]" not in self.parameters:
integration_id = self.parameters['TendrlContext.integration_id']
_cluster = NS.tendrl.objects.Cluster(
integration_id=NS.tendrl_context.integration_id).load()
if (_cluster.status is not None and
_cluster.status != "" and
_cluster.current_job['status'] == 'in_progress' and
_cluster.status in
short_name = self.parameters.get('Cluster.short_name', None)
if short_name:
if not short_name.isalpha():
raise FlowExecutionFailedError(
"Invalid cluster short_name: %s"
"Only alpha numeric allowed for short name" %
short_name
)
else:
self.parameters['TendrlContext.integration_id'] = short_name
# Check for uniqueness of cluster short name
_clusters = NS._int.client.read(
'/clusters'
)
for entry in _clusters.leaves:
_cluster = NS.tendrl.objects.Cluster(
integration_id=entry.key.split('/')[-1]
).load()
if _cluster.short_name and short_name and \
_cluster.short_name == short_name:
raise FlowExecutionFailedError(
"Cluster with name: %s already exists" % short_name
)
# If corrent node is provisioner, set the cluster integration_id
# with short name
if short_name not in [None, ""] and \
not NS.tendrl.objects.Cluster(integration_id=short_name).exists():
NS.tendrl.objects.Cluster(integration_id=short_name).save()
if "provisioner/%s" % integration_id in NS.node_context.tags \
and short_name:
_ctc = NS.tendrl.objects.ClusterTendrlContext(
integration_id=integration_id
).load()
NS._int.client.write(
'/indexes/detected_cluster_id_to_integration_id/%s' % _ctc.cluster_id,
short_name
)
NS._int.client.delete("/clusters/%s" % integration_id)
NS._int.client.write(
'/indexes/tags/provisioner/%s' % short_name,
NS.node_context.node_id
)
NS._int.client.delete('/indexes/tags/provisioner/%s' % integration_id)
_ctc = NS.tendrl.objects.ClusterTendrlContext(
integration_id=integration_id
).load()
_ctc.integration_id = short_name
_ctc.save()
NS.tendrl.objects.ClusterTendrlContext(
integration_id=integration_id
).remove()

_new_cluster = NS.tendrl.objects.Cluster(
integration_id=short_name or integration_id
).load()
if (_new_cluster.status is not None and
_new_cluster.status != "" and
_new_cluster.current_job['status'] == 'in_progress' and
_new_cluster.status in
["importing", "unmanaging", "expanding"]):
raise FlowExecutionFailedError(
"Another job in progress for cluster, please wait till "
"the job finishes (job_id: %s) (integration_id: %s) " % (
_cluster.current_job['job_id'],
_cluster.integration_id
_new_cluster.current_job['job_id'],
_new_cluster.integration_id
)
)

_cluster.status = "importing"
_cluster.current_job = {
_new_cluster.status = "importing"
_new_cluster.current_job = {
'job_id': self.job_id,
'job_name': self.__class__.__name__,
'status': 'in_progress'
}
_cluster.save()
_new_cluster.save()

try:
integration_id_index_key = \
"indexes/tags/tendrl/integration/%s" % integration_id
_node_ids = NS._int.client.read(
integration_id_index_key).value
self.parameters["Node[]"] = json.loads(_node_ids)
_node_ids = json.loads(
NS._int.client.read(
integration_id_index_key
).value
)
self.parameters["Node[]"] = _node_ids
if short_name:
if 'provisioner/%s' % integration_id in NS.node_context.tags:
_nc = NS.node_context(node_id=node_id).load()
_tags = _nc.tags
_tags.remove('provisioner/%s' % integration_id)
_tags.append('provisioner/%s' % short_name)
_nc.tags = _tags
_nc.save()
for node_id in _node_ids:
_nc = NS.tendrl.objects.NodeContext(node_id=node_id).load()
_tags = _nc.tags
if 'tendrl/integration/%s' % integration_id in _tags:
_tags.remove('tendrl/integration/%s' % integration_id)
_tags.append('tendrl/integration/%s' % short_name)
_nc.tags = _tags
_nc.save()
_tc = NS.tendrl.objects.TendrlContext(node_id=node_id).load()
if _tc.integration_id not in [None, ""]:
_tc.integration_id = short_name
_tc.save()
NS.tendrl.objects.ClusterNodeContext(
node_id=node_id,
fqdn=_nc.fqdn,
ipv4_addr = _nc.ipv4_addr,
updated_at = str(time_utils.now()),
tags = _nc.tags,
status = _nc.status,
sync_status = _nc.sync_status,
last_sync = _nc.last_sync,
first_sync_done = 'no',
is_managed = 'no'
).save()
NS._int.client.delete(
'/clusters/%s/nodes/%s' % (integration_id, node_id),
recursive=True
)

except etcd.EtcdKeyNotFound:
_cluster = NS.tendrl.objects.Cluster(
integration_id=NS.tendrl_context.integration_id).load()
integration_id=short_name or integration_id
).load()
_cluster.status = ""
_cluster.current_job['status'] = 'failed'
_cluster.save()
raise FlowExecutionFailedError("Cluster with "
"integration_id "
"(%s) not found, cannot "
"import" % integration_id)
"import" % short_name or integration_id)
else:
_cluster = NS.tendrl.objects.Cluster(
integration_id=NS.tendrl_context.integration_id
integration_id=short_name or integration_id
).load()
_cluster.volume_profiling_flag = self.parameters[
'Cluster.volume_profiling_flag']
Expand All @@ -69,7 +165,7 @@ def run(self):
_job = NS.tendrl.objects.Job(job_id=self.job_id).load()
if 'parent' not in _job.payload:
_cluster = NS.tendrl.objects.Cluster(
integration_id=NS.tendrl_context.integration_id
integration_id=short_name or integration_id
).load()
_cluster.status = ""
_cluster.current_job['status'] = "finished"
Expand All @@ -79,7 +175,7 @@ def run(self):
AtomExecutionFailedError,
Exception) as ex:
_cluster = NS.tendrl.objects.Cluster(
integration_id=NS.tendrl_context.integration_id).load()
integration_id=short_name or integration_id).load()
_cluster.status = ""
_cluster.current_job['status'] = 'failed'
_errors = []
Expand Down
4 changes: 3 additions & 1 deletion tendrl/commons/objects/cluster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@


class Cluster(objects.BaseObject):
def __init__(self, integration_id=None, public_network=None,
def __init__(self, integration_id=None, short_name=None,
public_network=None,
cluster_network=None, node_configuration=None,
conf_overrides=None, node_identifier=None,
last_sync=None, is_managed=None, current_job=dict(),
Expand All @@ -12,6 +13,7 @@ def __init__(self, integration_id=None, public_network=None,
*args, **kwargs):
super(Cluster, self).__init__(*args, **kwargs)
self.integration_id = integration_id
self.short_name = short_name
self.public_network = public_network
self.cluster_network = cluster_network
self.node_configuration = node_configuration
Expand Down
6 changes: 6 additions & 0 deletions tendrl/commons/objects/definition/master.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ namespace.tendrl:
mandatory:
- TendrlContext.integration_id
- Cluster.volume_profiling_flag
optional:
- Cluster.short_name
pre_run:
- tendrl.objects.Cluster.atoms.CheckClusterNodesUp
- tendrl.objects.Node.atoms.IsNodeTendrlManaged
Expand Down Expand Up @@ -211,6 +213,7 @@ namespace.tendrl:
inputs:
mandatory:
- TendrlContext.integration_id
- Cluster.short_name
name: import cluster
run: tendrl.objects.Cluster.atoms.ImportCluster
type: Create
Expand Down Expand Up @@ -250,6 +253,9 @@ namespace.tendrl:
integration_id:
help: integration id
type: String
short_name:
help: Cluster short name
type: String
public_network:
help: Public Network cidr of the cluster
type: String
Expand Down

0 comments on commit a944486

Please sign in to comment.