Skip to content
Permalink
Browse files
Clean tracker top level code (#3602)
 * Allow arbitrary whitespace in queries
 * DRY up Multiply/Divide/Subtract
 * use snake_case
 * code simplification
  • Loading branch information
Code0x58 committed Oct 31, 2020
1 parent aa292ef commit 64e81670304a10cb9c4785373818194ffade4562
Showing 31 changed files with 871 additions and 1,123 deletions.
@@ -19,6 +19,7 @@
# under the License.

''' config.py '''
import string

from heron.statemgrs.src.python.config import Config as StateMgrConfig

@@ -33,6 +34,7 @@ class Config:
Responsible for reading the yaml config file and
exposing various tracker configs.
"""
FORMATTER_PARAMETERS = {"CLUSTER", "ENVIRON", "TOPOLOGY", "ROLE", "USER"}

def __init__(self, configs):
self.configs = configs
@@ -48,65 +50,51 @@ def load_configs(self):
for extra_link in self.configs[EXTRA_LINKS_KEY]:
self.extra_links.append(self.validate_extra_link(extra_link))

def validate_extra_link(self, extra_link):
def validate_extra_link(self, extra_link: dict) -> None:
"""validate extra link"""
if EXTRA_LINK_NAME_KEY not in extra_link or EXTRA_LINK_FORMATTER_KEY not in extra_link:
raise Exception("Invalid extra.links format. " +
"Extra link must include a 'name' and 'formatter' field")

self.validated_formatter(extra_link[EXTRA_LINK_FORMATTER_KEY])
return extra_link

# pylint: disable=no-self-use
def validated_formatter(self, url_format):
"""validate visualization url format"""
# We try to create a string by substituting all known
# parameters. If an unknown parameter is present, an error
# will be thrown
valid_parameters = {
"${CLUSTER}": "cluster",
"${ENVIRON}": "environ",
"${TOPOLOGY}": "topology",
"${ROLE}": "role",
"${USER}": "user",
}
dummy_formatted_url = url_format
for key, value in list(valid_parameters.items()):
dummy_formatted_url = dummy_formatted_url.replace(key, value)

# All $ signs must have been replaced
if '$' in dummy_formatted_url:
raise Exception("Invalid viz.url.format: %s" % (url_format))
def validated_formatter(self, url_format: str) -> None:
"""Check visualization url format has no unrecongnised parameters."""
# collect the parameters which would be interpolated
formatter_variables = set()
class ValidationHelper:
def __getitem__(self, key):
formatter_variables.add(key)
return ""

string.Template(url_format).safe_substitute(ValidationHelper())

# No error is thrown, so the format is valid.
return url_format
if not formatter_variables <= self.FORMATTER_PARAMETERS:
raise Exception(f"Invalid viz.url.format: {url_format!r}")

def get_formatted_url(self, formatter, execution_state):
@staticmethod
def get_formatted_url(formatter: str, execution_state: dict) -> str:
"""
@param formatter: The template string to interpolate
@param execution_state: The python dict representing JSON execution_state
@return Formatted viz url
Format a url string using values from the execution state.
"""

# Create the parameters based on execution state
common_parameters = {
"${CLUSTER}": execution_state.get("cluster", "${CLUSTER}"),
"${ENVIRON}": execution_state.get("environ", "${ENVIRON}"),
"${TOPOLOGY}": execution_state.get("jobname", "${TOPOLOGY}"),
"${ROLE}": execution_state.get("role", "${ROLE}"),
"${USER}": execution_state.get("submission_user", "${USER}"),
subs = {
var: execution_state[prop]
for prop, var in (
("cluster", "CLUSTER"),
("environ", "ENVIRON"),
("jobname", "TOPOLOGY"),
("role", "ROLE"),
("submission_user", "USER"))
if prop in execution_state
}

formatted_url = formatter

for key, value in list(common_parameters.items()):
formatted_url = formatted_url.replace(key, value)

return formatted_url
return string.Template(formatter).substitute(subs)

def __str__(self):
return "".join((self.config_str(c) for c in self.configs[STATEMGRS_KEY]))

def config_str(self, config):
@staticmethod
def config_str(config):
keys = ("type", "name", "hostport", "rootpath", "tunnelhost")
return "".join("\t{}: {}\n".format(k, config[k]) for k in keys if k in config).rstrip()
return "".join("\t{k}: {config[k]}\n" for k in keys if k in config).rstrip()
@@ -33,7 +33,6 @@
API_VERSION = ""



# Handler Constants

# Parameter Names
@@ -64,7 +64,7 @@ def get(self):
path = self.get_argument(constants.PARAM_PATH)
offset = self.get_argument_offset()
length = self.get_argument_length()
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)

stmgr_id = "stmgr-" + container
stmgr = topology_info["physical_plan"]["stmgrs"][stmgr_id]
@@ -111,7 +111,7 @@ def get(self):
topology_name = self.get_argument_topology()
container = self.get_argument(constants.PARAM_CONTAINER)
path = self.get_argument(constants.PARAM_PATH)
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)

stmgr_id = "stmgr-" + container
stmgr = topology_info["physical_plan"]["stmgrs"][stmgr_id]
@@ -166,7 +166,7 @@ def get(self):
topology_name = self.get_argument_topology()
container = self.get_argument(constants.PARAM_CONTAINER)
path = self.get_argument(constants.PARAM_PATH, default=".")
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)

stmgr_id = "stmgr-" + str(container)
stmgr = topology_info["physical_plan"]["stmgrs"][stmgr_id]
@@ -59,7 +59,7 @@ def get(self):
role = self.get_argument_role()
topName = self.get_argument_topology()
component = self.get_argument_component()
topology = self.tracker.getTopologyByClusterRoleEnvironAndName(
topology = self.tracker.get_topology(
cluster, role, environ, topName)
instances = self.get_arguments(constants.PARAM_INSTANCE)
exceptions_logs = yield tornado.gen.Task(self.getComponentException,
@@ -60,7 +60,7 @@ def get(self):
role = self.get_argument_role()
topology_name = self.get_argument_topology()
component = self.get_argument_component()
topology = self.tracker.getTopologyByClusterRoleEnvironAndName(
topology = self.tracker.get_topology(
cluster, role, environ, topology_name)
instances = self.get_arguments(constants.PARAM_INSTANCE)
exceptions_summary = yield tornado.gen.Task(self.getComponentExceptionSummary,
@@ -52,7 +52,7 @@ def get(self):
role = self.get_argument_role()
environ = self.get_argument_environ()
topology_name = self.get_argument_topology()
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)
execution_state = topology_info["execution_state"]
self.write_success_response(execution_state)
except Exception as e:
@@ -65,7 +65,7 @@ def get(self):
environ = self.get_argument_environ()
topology_name = self.get_argument_topology()
instance = self.get_argument_instance()
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)
ret = yield self.runInstanceJmap(topology_info, instance)
self.write_success_response(ret)
except Exception as e:
@@ -64,7 +64,7 @@ def get(self):
environ = self.get_argument_environ()
topology_name = self.get_argument_topology()
instance = self.get_argument_instance()
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)
ret = yield self.getInstanceJstack(topology_info, instance)
self.write_success_response(ret)
except Exception as e:
@@ -84,7 +84,7 @@ def get(self):
role = self.get_argument_role()
environ = self.get_argument_environ()
topology_name = self.get_argument_topology()
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)
lplan = topology_info["logical_plan"]

# format the logical plan as required by the web (because of Ambrose)
@@ -65,7 +65,7 @@ def get(self):
environ = self.get_argument_environ()
topology_name = self.get_argument_topology()
instance = self.get_argument_instance()
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)
ret = yield self.getInstanceMemoryHistogram(topology_info, instance)
self.write_success_response(ret)
except Exception as e:
@@ -67,7 +67,7 @@ def get(self):
role = self.get_argument_role()
environ = self.get_argument_environ()
topology_name = self.get_argument_topology()
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)
metadata = topology_info["metadata"]
self.write_success_response(metadata)
except Exception as e:
@@ -64,7 +64,7 @@ def get(self):
component = self.get_argument_component()
metric_names = self.get_required_arguments_metricnames()

topology = self.tracker.getTopologyByClusterRoleEnvironAndName(
topology = self.tracker.get_topology(
cluster, role, environ, topology_name)

interval = int(self.get_argument(constants.PARAM_INTERVAL, default=-1))
@@ -84,7 +84,7 @@ def get(self):
def getComponentMetrics(self,
tmaster,
componentName,
metricNames,
metric_names,
instances,
interval,
callback=None):
@@ -116,8 +116,8 @@ def getComponentMetrics(self,
if len(instances) > 0:
for instance in instances:
metricRequest.instance_id.append(instance)
for metricName in metricNames:
metricRequest.metric.append(metricName)
for metric_name in metric_names:
metricRequest.metric.append(metric_name)
metricRequest.interval = interval

# Serialize the metricRequest to send as a payload
@@ -58,7 +58,7 @@ def get(self):
role = self.get_argument_role()
environ = self.get_argument_environ()
topology_name = self.get_argument_topology()
topology = self.tracker.getTopologyByClusterRoleEnvironAndName(
topology = self.tracker.get_topology(
cluster, role, environ, topology_name)

start_time = self.get_argument_starttime()
@@ -68,9 +68,9 @@ def get(self):
self.validateInterval(start_time, end_time)
instances = self.get_arguments(constants.PARAM_INSTANCE)

topology = self.tracker.getTopologyByClusterRoleEnvironAndName(
topology = self.tracker.get_topology(
cluster, role, environ, topology_name)
metrics = yield tornado.gen.Task(metricstimeline.getMetricsTimeline,
metrics = yield tornado.gen.Task(metricstimeline.get_metrics_timeline,
topology.tmaster, component, metric_names,
instances, int(start_time), int(end_time))
self.write_success_response(metrics)
@@ -53,7 +53,7 @@ def get(self):
role = self.get_argument_role()
environ = self.get_argument_environ()
topology_name = self.get_argument_topology()
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)
packing_plan = topology_info["packing_plan"]
self.write_success_response(packing_plan)
except Exception as e:
@@ -53,7 +53,7 @@ def get(self):
role = self.get_argument_role()
environ = self.get_argument_environ()
topology_name = self.get_argument_topology()
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)
physical_plan = topology_info["physical_plan"]
self.write_success_response(physical_plan)
except Exception as e:
@@ -82,7 +82,7 @@ def get(self):
environ = self.get_argument_environ()
topology_name = self.get_argument_topology()
instance = self.get_argument_instance()
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)
result = yield getInstancePid(topology_info, instance)
self.write_success_response(result)
except Exception as e:
@@ -110,10 +110,10 @@ def get(self):
role = self.get_argument_role()
environ = self.get_argument_environ()
topology_name = self.get_argument_topology()
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)
runtime_state = topology_info["runtime_state"]
runtime_state["topology_version"] = topology_info["metadata"]["release_version"]
topology = self.tracker.getTopologyByClusterRoleEnvironAndName(
topology = self.tracker.get_topology(
cluster, role, environ, topology_name)
reg_summary = yield tornado.gen.Task(self.getStmgrsRegSummary, topology.tmaster)
for stmgr, reg in list(reg_summary.items()):
@@ -53,7 +53,7 @@ def get(self):
role = self.get_argument_role()
environ = self.get_argument_environ()
topology_name = self.get_argument_topology()
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)
scheduler_location = topology_info["scheduler_location"]
self.write_success_response(scheduler_location)
except Exception as e:
@@ -90,7 +90,7 @@ def get(self):
if environ not in ret[cluster]:
ret[cluster][environ] = {}
try:
topology_info = self.tracker.getTopologyInfo(topology.name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology.name, cluster, role, environ)
if topology_info and "execution_state" in topology_info:
ret[cluster][environ][topology.name] = topology_info["execution_state"]
except Exception:
@@ -53,7 +53,7 @@ def get(self):
role = self.get_argument_role()
environ = self.get_argument_environ()
topology_name = self.get_argument_topology()
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)
config = topology_info["physical_plan"]["config"]
self.write_success_response(config)
except Exception as e:
@@ -54,7 +54,7 @@ def get(self):
role = self.get_argument_role()
environ = self.get_argument_environ()
topology_name = self.get_argument_topology()
topology_info = self.tracker.getTopologyInfo(topology_name, cluster, role, environ)
topology_info = self.tracker.get_topology_info(topology_name, cluster, role, environ)
self.write_success_response(topology_info)
except Exception as e:
Log.debug(traceback.format_exc())

0 comments on commit 64e8167

Please sign in to comment.