Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Expose the component config through the tracker. #2709

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions heron/tools/tracker/src/python/handlers/logicalplanhandler.py
Expand Up @@ -55,6 +55,7 @@ def get(self):
spouts_map = dict()
for name, value in lplan['spouts'].items():
spouts_map[name] = dict(
config=value.get("config", dict()),
outputs=value["outputs"],
spout_type=value["type"],
spout_source=value["source"],
Expand All @@ -63,6 +64,7 @@ def get(self):
bolts_map = dict()
for name, value in lplan['bolts'].items():
bolts_map[name] = dict(
config=value.get("config", dict()),
inputComponents=[i['component_name'] for i in value['inputs']],
inputs=value["inputs"],
outputs=value["outputs"]
Expand Down
14 changes: 14 additions & 0 deletions heron/tools/tracker/src/python/javaobj.py
Expand Up @@ -67,6 +67,11 @@ def dumps(obj):
marshaller = JavaObjectMarshaller()
return marshaller.dump(obj)

_java_primitives = set([
"java.lang.Double",
"java.lang.Float",
"java.lang.Integer",
"java.lang.Long"])

class JavaClass(object):
"""Java class representation"""
Expand Down Expand Up @@ -106,6 +111,15 @@ def __repr__(self):
name = self.classdesc.name
return "<javaobj:%s>" % name

def classname(self):
name = "UNKNOWN"
if self.classdesc:
name = self.classdesc.name
return name

def is_primitive(self):
return self.classname() in _java_primitives

def copy(self, new_object):
"""copy an object"""
new_object.classdesc = self.classdesc
Expand Down
27 changes: 27 additions & 0 deletions heron/tools/tracker/src/python/pyutils.py
@@ -0,0 +1,27 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
''' pyobj.py '''
import sys

isPY3 = sys.version_info >= (3, 0, 0)

# helper method to support python 2 and 3
def is_str_instance(obj):
if isPY3:
return isinstance(obj, str)
else:
return str(type(obj)) == "<type 'unicode'>" or str(type(obj)) == "<type 'str'>"
81 changes: 59 additions & 22 deletions heron/tools/tracker/src/python/tracker.py
Expand Up @@ -27,9 +27,65 @@
from heron.statemgrs.src.python import statemanagerfactory
from heron.tools.tracker.src.python.topology import Topology
from heron.tools.tracker.src.python import javaobj
from heron.tools.tracker.src.python import pyutils
from heron.tools.tracker.src.python import utils


def convert_pb_kvs(kvs, include_non_primitives=True):
"""
converts pb kvs to dict
"""
config = {}
for kv in kvs:
if kv.value:
config[kv.key] = kv.value
elif kv.serialized_value:
# add serialized_value support for python values (fixme)

# is this a serialized java object
if topology_pb2.JAVA_SERIALIZED_VALUE == kv.type:
jv = _convert_java_value(kv, include_non_primitives=include_non_primitives)
if jv is not None:
config[kv.key] = jv
else:
config[kv.key] = _raw_value(kv)
return config


def _convert_java_value(kv, include_non_primitives=True):
try:
pobj = javaobj.loads(kv.serialized_value)
if pyutils.is_str_instance(pobj):
return pobj

if pobj.is_primitive():
return pobj.value

if include_non_primitives:
# java objects that are not strings return value and encoded value
# Hexadecimal byte array for Serialized objects that
return {
'value' : json.dumps(pobj,
default=lambda custom_field: custom_field.__dict__,
sort_keys=True,
indent=2),
'raw' : utils.hex_escape(kv.serialized_value)}

return None
except Exception:
Log.exception("Failed to parse data as java object")
if include_non_primitives:
return _raw_value(kv)
else:
return None

def _raw_value(kv):
return {
# The value should be a valid json object
'value' : '{}',
'raw' : utils.hex_escape(kv.serialized_value)}


class Tracker(object):
"""
Tracker is a stateless cache of all the topologies
Expand Down Expand Up @@ -321,6 +377,7 @@ def extract_logical_plan(self, topology):
elif kvs.key == "spout.version":
spoutVersion = javaobj.loads(kvs.serialized_value)
spoutPlan = {
"config": convert_pb_kvs(spoutConfigs, include_non_primitives=False),
"type": spoutType,
"source": spoutSource,
"version": spoutVersion,
Expand All @@ -337,6 +394,7 @@ def extract_logical_plan(self, topology):
for bolt in topology.bolts():
boltName = bolt.comp.name
boltPlan = {
"config": convert_pb_kvs(bolt.comp.config.kvs, include_non_primitives=False),
"outputs": [],
"inputs": []
}
Expand Down Expand Up @@ -384,28 +442,7 @@ def extract_physical_plan(self, topology):

# Configs
if topology.physical_plan.topology.topology_config:
for kvs in topology.physical_plan.topology.topology_config.kvs:
if kvs.value:
physicalPlan["config"][kvs.key] = kvs.value
elif kvs.serialized_value:
# currently assumes that serialized_value is Java serialization
# when multi-language support is added later, ConfigValueType should be checked

# Hexadecimal byte array for Serialized objects
try:
pobj = javaobj.loads(kvs.serialized_value)
physicalPlan["config"][kvs.key] = {
'value' : json.dumps(pobj,
default=lambda custom_field: custom_field.__dict__,
sort_keys=True,
indent=2),
'raw' : utils.hex_escape(kvs.serialized_value)}
except Exception:
Log.exception("Failed to parse data as java object")
physicalPlan["config"][kvs.key] = {
# The value should be a valid json object
'value' : '{}',
'raw' : utils.hex_escape(kvs.serialized_value)}
physicalPlan["config"] = convert_pb_kvs(topology.physical_plan.topology.topology_config.kvs)
for spout in spouts:
spout_name = spout.comp.name
physicalPlan["spouts"][spout_name] = []
Expand Down