Skip to content

Commit

Permalink
Merge 1af76c2 into 160b4d4
Browse files Browse the repository at this point in the history
  • Loading branch information
shtripat committed Feb 9, 2018
2 parents 160b4d4 + 1af76c2 commit a51967f
Show file tree
Hide file tree
Showing 20 changed files with 1,069 additions and 25 deletions.
23 changes: 13 additions & 10 deletions tendrl/commons/flows/import_cluster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,25 @@ def __init__(self, *args, **kwargs):
def run(self):
if "Node[]" not in self.parameters:
integration_id = self.parameters['TendrlContext.integration_id']

_cluster = NS.tendrl.objects.Cluster(
integration_id=NS.tendrl_context.integration_id).load()
if (_cluster.import_job_id is not None and
_cluster.import_job_id != "") or _cluster.import_status \
in ["in_progress", "done", "failed"]:
if (_cluster.status is not None and
_cluster.status != "" and
_cluster.status in ["syncing", "importing", "unmanaging"]):
raise FlowExecutionFailedError(
"Cluster already being imported by another Job, please "
"wait till "
"Another job in progress for cluster, please wait till "
"the job finishes (job_id: %s) (integration_id: %s) " % (
_cluster.import_job_id, _cluster.integration_id
_cluster.current_job['job_id'],
_cluster.integration_id
)
)

_cluster.import_status = "in_progress"
_cluster.import_job_id = self.job_id
_cluster.status = "importing"
_cluster.current_job = {
'job_id': self.job_id,
'job_name': self.__class__.__name__,
'status': 'in_progress'
}
_cluster.save()

try:
Expand Down Expand Up @@ -77,7 +80,7 @@ def run(self):
Exception) as ex:
_cluster = NS.tendrl.objects.Cluster(
integration_id=NS.tendrl_context.integration_id).load()
_cluster.import_status = "failed"
_cluster.current_job['status'] = 'failed'
_errors = []
if hasattr(ex, 'message'):
_errors = [ex.message]
Expand Down
72 changes: 72 additions & 0 deletions tendrl/commons/flows/unmanage_cluster/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import etcd
import time

from tendrl.commons import flows
from tendrl.commons.flows.exceptions import FlowExecutionFailedError
from tendrl.commons.objects import AtomExecutionFailedError


class UnmanageCluster(flows.BaseFlow):
def __init__(self, *args, **kwargs):
super(UnmanageCluster, self).__init__(*args, **kwargs)

def run(self):
integration_id = self.parameters['TendrlContext.integration_id']
_cluster = NS.tendrl.objects.Cluster(
integration_id=integration_id
).load()
if _cluster.status is not None and \
_cluster.status != "" and \
_cluster.status in \
['syncing', 'importing', 'unmanaging']:
raise FlowExecutionFailedError(
"Another job in progress for cluster"
"please wait till the job finishes "
"(job_id: %s) (integration_id: %s) " %
(
_cluster.current_job['job_id'],
_cluster.integration_id
)
)

_cluster.status = "unmanaging"
_cluster.current_job = {
'job_id': self.job_id,
'job_name': self.__class__.__name__,
'status': "in_progress"
}
_cluster.save()

try:
super(UnmanageCluster, self).run()
# Wait for cluster to re-appear as detected cluster
# as node-agents are still running on the storage nodes
while True:
try:
_cluster = NS.tendrl.objects.Cluster(
integration_id=integration_id
).load()
if _cluster.is_managed == "no":
break
except etcd.EtcdKeyNotFound:
time.sleep(5)
continue
_cluster.status = ""
_cluster.current_job['status'] = "done"
_cluster.save()
except (FlowExecutionFailedError,
AtomExecutionFailedError,
Exception) as ex:
_cluster = NS.tendrl.objects.Cluster(
integration_id=integration_id
).load()
_cluster.current_job['status'] = "failed"
_errors = []
if hasattr(ex, 'message'):
_errors = [ex.message]
else:
_errors = [str(ex)]
if _errors:
_cluster.errors = _errors
_cluster.save()
raise ex
11 changes: 5 additions & 6 deletions tendrl/commons/objects/cluster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
class Cluster(objects.BaseObject):
def __init__(self, integration_id=None, public_network=None,
cluster_network=None, node_configuration=None,
conf_overrides=None, node_identifier=None, sync_status=None,
last_sync=None, import_status=None, import_job_id=None,
is_managed=None, enable_volume_profiling=None, errors=[],
conf_overrides=None, node_identifier=None,
last_sync=None, is_managed=None, current_job=dict(),
status=None, enable_volume_profiling=None, errors=[],
*args, **kwargs):
super(Cluster, self).__init__(*args, **kwargs)
self.integration_id = integration_id
Expand All @@ -15,11 +15,10 @@ def __init__(self, integration_id=None, public_network=None,
self.node_configuration = node_configuration
self.conf_overrides = conf_overrides
self.node_identifier = node_identifier
self.sync_status = sync_status
self.last_sync = last_sync
self.import_status = import_status
self.import_job_id = import_job_id
self.is_managed = is_managed
self.current_job = current_job
self.status = status
self.enable_volume_profiling = enable_volume_profiling
self.errors = errors
self.value = 'clusters/{0}'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ def run(self):
_cluster = NS.tendrl.objects.Cluster(
integration_id=NS.tendrl_context.integration_id
).load()
_cluster.import_status = "done"
_cluster.status = ""
_cluster.current_job['status'] = "done"
_cluster.save()

return True
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import ast
import etcd

from tendrl.commons import objects
from tendrl.commons.utils import log_utils as logger


class DeleteClusterDetails(objects.BaseAtom):
def __init__(self, *args, **kwargs):
super(DeleteClusterDetails, self).__init__(*args, **kwargs)

def run(self):
integration_id = self.parameters['TendrlContext.integration_id']
cluster_tendrl_context = NS.tendrl.objects.ClusterTendrlContext(
integration_id=integration_id
).load()

etcd_keys_to_delete = []
etcd_keys_to_delete.append(
"/clusters/%s" % integration_id
)
etcd_keys_to_delete.append(
"/alerting/clusters/%s" % integration_id
)
etcd_keys_to_delete.append(
"/indexes/tags/detected_cluster/%s" %
cluster_tendrl_context.cluster_id
)
etcd_keys_to_delete.append(
"/indexes/tags/detected_cluster_id_to_integration_id/%s" %
cluster_tendrl_context.cluster_id
)
etcd_keys_to_delete.append(
"/indexes/tags/provisioner/%s" % integration_id
)
etcd_keys_to_delete.append(
"/indexes/tags/tendrl/integration/%s" %
integration_id
)
nodes = NS._int.client.read(
"/clusters/%s/nodes" % integration_id
)
node_ids = []
for node in nodes.leaves:
node_id = node.key.split("/")[-1]
node_ids.append(node_id)
etcd_keys_to_delete.append(
"/alerting/nodes/%s" % node_id
)

# Find the alerting/alerts entries to be deleted
try:
cluster_alert_ids = NS._int.client.read(
"/alerting/clusters"
)
for entry in cluster_alert_ids.leaves:
ca_id = entry.key.split("/")[-1]
etcd_keys_to_delete.append(
"/alerting/alerts/%s" % ca_id
)
except etcd.EtcdKeyNotFound:
# No cluster alerts, continue
pass

try:
node_alert_ids = NS._int.client.read(
"/alerting/nodes"
)
for entry in node_alert_ids.leaves:
na_id = entry.key.split("/")[-1]
etcd_keys_to_delete.append(
"/alerting/alerts/%s" % na_id
)
except etcd.EtcdKeyNotFound:
# No node alerts, continue
pass

# Remove the cluster details
for key in list(set(etcd_keys_to_delete)):
try:
NS._int.client.delete(key, recursive=True)
except etcd.EtcdKeyNotFound:
logger.log(
"debug",
NS.publisher_id,
{
"message": "The key: %s not found for deletion" %
key
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
continue

# Load the gluster servers list and remove
# the cluster nodes
try:
gl_srvr_list = NS._int.client.read(
"/indexes/tags/gluster/server"
).value
gl_srvr_list = ast.literal_eval(gl_srvr_list)
for node_id in node_ids:
if node_id in gl_srvr_list:
gl_srvr_list.remove(node_id)
NS._int.client.write(
"/indexes/tags/gluster/server",
gl_srvr_list
)
except etcd.EtcdKeyNotFound:
pass

return True
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import time
import uuid

from tendrl.commons import objects
from tendrl.commons.objects.job import Job
from tendrl.commons.utils import log_utils as logger


class DeleteMonitoringDetails(objects.BaseAtom):
def __init__(self, *args, **kwargs):
super(DeleteMonitoringDetails, self).__init__(*args, **kwargs)

def run(self):
integration_id = self.parameters['TendrlContext.integration_id']
_job_id = str(uuid.uuid4())
payload = {
"tags": ["tendrl/integration/monitoring"],
"run": "monitoring.flows.DeleteMonitoringData",
"status": "new",
"parameters": self.parameters,
"type": "monitoring"
}
Job(
job_id=_job_id,
status="new",
payload=payload
).save()

# Wait for 2 mins for the job to complete
loop_count = 0
wait_count = 24
while True:
if loop_count >= wait_count:
logger.log(
"info",
NS.publisher_id,
{
"message": "Clear monitoring data "
"not yet complete. Timing out. (%s)" %
integration_id
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
return False
time.sleep(5)
finished = True
job = Job(job_id=_job_id).load()
if job.status != "finished":
finished = False
if finished:
break
else:
loop_count += 1
continue

return True
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import etcd

from tendrl.commons import objects


class IsClusterManaged(objects.BaseAtom):
def __init__(self, *args, **kwargs):
super(IsClusterManaged, self).__init__(*args, **kwargs)

def run(self):
integration_id = self.parameters['TendrlContext.integration_id']

try:
cluster = NS.tendrl.objects.Cluster(
integration_id=integration_id
).load()
if cluster.is_managed.lower() == "yes":
return True
else:
return False
except etcd.EtcdKeyNotFound:
# return true as cluster is not present only. atom should pass
return True
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import etcd

from tendrl.commons import objects
from tendrl.commons.utils import etcd_utils
from tendrl.commons.utils import log_utils as logger


class SetClusterUnmanaged(objects.BaseAtom):
def __init__(self, *args, **kwargs):
super(SetClusterUnmanaged, self).__init__(*args, **kwargs)

def run(self):
integration_id = self.parameters['TendrlContext.integration_id']

try:
etcd_utils.write(
"/clusters/%s/is_managed" % integration_id,
"no"
)
except etcd.EtcdKeyNotFound:
logger.log(
"error",
NS.get("publisher_id", None),
{
"message": "Error setting cluster"
"is_managed \"no\": (%s)" % (
integration_id
)
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id']
)
return False

return True
Loading

0 comments on commit a51967f

Please sign in to comment.