Skip to content
Permalink
Browse files
Updates to Heron UI and Tracker code to work with Python 3 updates (#…
…3786)

* Fix for missing physical plan in UI
* Fix the Heron UI timeline metrics
* Removed Tracker EnvelopeAPI Router which was not working
* Updated AckingTopology to not burn so much CPU
* Updated pylint due to build issue which resulted in lots of format updates

Co-authored-by: choi se <thinker0@gmail.com>
Co-authored-by: choi se <357785+thinker0@users.noreply.github.com>
Co-authored-by: Saad Ur Rahman <surahman@users.noreply.github.com>
Co-authored-by: Saad Ur Rahman <saadurrahman@apache.org>
  • Loading branch information
5 people committed Apr 12, 2022
1 parent ae4099f commit 700125f271b854a5ad2e3fe3204c1e2169ce069a
Showing 139 changed files with 1,209 additions and 1,273 deletions.
@@ -103,10 +103,10 @@ def discover_git_branch():
# Utility functions for system defines
######################################################################
def define_string(name, value):
return '#define %s "%s"\n' % (name, value)
return f'#define {name} "{value}"\n'

def define_value(name, value):
return '#define %s %s\n' % (name, value)
return f'#define {name} {value}\n'

######################################################################
# Discover where a program is located using the PATH variable
@@ -144,7 +144,7 @@ def real_program_path(program_name):
return None

def fail(message):
print("\nFAILED: %s" % message)
print(f"\nFAILED: {message}")
sys.exit(1)

# Assumes the version is at the end of the first line consisting of digits and dots
@@ -158,7 +158,7 @@ def discover_version(path):
version_flag = "-V"
else:
version_flag = "--version"
command = "%s %s" % (path, version_flag)
command = f"{path} {version_flag}"
version_output = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True)
first_line = version_output.decode('ascii', 'ignore').split("\n")[0]
version = get_trailing_version(first_line)
@@ -215,12 +215,12 @@ def discover_version(path):
return version


fail ("Could not determine the version of %s from the following output\n%s\n%s" % (path, command, version_output))
fail (f"Could not determine the version of {path} from the following output\n{command}\n{version_output}")

def to_semver(version):
# is version too short
if re.search('^[\d]+\.[\d]+$', version):
return "%s.0" % version
return f"{version}.0"

# is version too long
version_search = re.search('^([\d]+\.[\d]+\.[\d]+)\.[\d]+$', version)
@@ -231,8 +231,8 @@ def to_semver(version):

def assert_min_version(path, min_version):
version = discover_version(path)
if not semver.match(to_semver(version), ">=%s" % to_semver(min_version)):
fail("%s is version %s which is less than the required version %s" % (path, version, min_version))
if not semver.match(to_semver(version), f">={to_semver(min_version)}"):
fail(f"{path} is version {version} which is less than the required version {min_version}")
return version

######################################################################
@@ -267,17 +267,17 @@ def make_executable(path):
def discover_tool(program, msg, envvar, min_version = ''):
VALUE = discover_program(program, envvar)
if not VALUE:
fail("""You need to have %s installed to build Heron.
Note: Some vendors install %s with a versioned name
(like /usr/bin/%s-4.8). You can set the %s environment
variable to specify the full path to yours.'""" % (program, program, program, envvar))
fail(f"""You need to have {program} installed to build Heron.
Note: Some vendors install {program} with a versioned name
(like /usr/bin/{program}-4.8). You can set the {envvar} environment
variable to specify the full path to yours.'""")

print_value = VALUE
if min_version:
version = assert_min_version(VALUE, min_version)
print_value = "%s (%s)" % (VALUE, version)
print_value = f"{VALUE} ({version})"

print('Using %s:\t%s' % (msg.ljust(20), print_value))
print(f'Using {msg.ljust(20)}:\t{print_value}')
return VALUE

def discover_jdk():
@@ -290,7 +290,7 @@ def discover_jdk():
"You can set the JAVA_HOME environment variavle to specify the full path to yours.")
jdk_bin_path = os.path.dirname(javac_path)
jdk_path = os.path.dirname(jdk_bin_path)
print('Using %s:\t%s' % ('JDK'.ljust(20), jdk_path))
print(f"Using {'JDK'.ljust(20)}:\t{jdk_path}")
return jdk_path

def test_venv():
@@ -312,14 +312,14 @@ def discover_tool_default(program, msg, envvar, defvalue):
VALUE = discover_program(program, envvar)
if not VALUE:
VALUE = defvalue
print('%s:\tnot found, but ok' % (program.ljust(26)))
print(f'{program.ljust(26)}:\tnot found, but ok')
else:
print('Using %s:\t%s' % (msg.ljust(20), VALUE))
print(f'Using {msg.ljust(20)}:\t{VALUE}')
return VALUE

def export_env_to_file(out_file, env):
if env in os.environ:
out_file.write('export %s="%s"\n' % (env, os.environ[env]))
out_file.write(f'export {env}="{os.environ[env]}"\n')

######################################################################
# Generate the shell script that recreates the environment
@@ -348,7 +348,7 @@ def write_env_exec_file(platform, environ):
out_file.write('$*')

make_executable(env_exec_file)
print('Wrote the environment exec file %s' % (env_exec_file))
print(f'Wrote the environment exec file {env_exec_file}')


######################################################################
@@ -385,13 +385,13 @@ def write_heron_config_header(config_file):
out_file.write(define_string('GIT_BRANCH', discover_git_branch()))
out_file.write(generate_system_defines())
out_file.close()
print('Wrote the heron config header file: \t"%s"' % (config_file))
print(f'Wrote the heron config header file: \t"{config_file}"')

######################################################################
# MAIN program that sets up your workspace for bazel
######################################################################
def main():
env_map = dict()
env_map = {}

# Discover the platform
platform = discover_platform()
@@ -20,6 +20,7 @@

package org.apache.heron.examples.api;

import java.time.Duration;
import java.util.Map;
import java.util.Random;

@@ -36,6 +37,7 @@
import org.apache.heron.api.tuple.Fields;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;
import org.apache.heron.common.basics.SysUtils;

/**
* This is a basic example of a Heron topology with acking enable.
@@ -53,7 +55,7 @@ public static void main(String[] args) throws Exception {

int spouts = 2;
int bolts = 2;
builder.setSpout("word", new AckingTestWordSpout(), spouts);
builder.setSpout("word", new AckingTestWordSpout(Duration.ofMillis(200)), spouts);
builder.setBolt("exclaim1", new ExclamationBolt(), bolts)
.shuffleGrouping("word");

@@ -97,8 +99,10 @@ public static class AckingTestWordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] words;
private Random rand;
private final Duration throttleDuration;

public AckingTestWordSpout() {
public AckingTestWordSpout(Duration throttleDuration) {
this.throttleDuration = throttleDuration;
}

@SuppressWarnings("rawtypes")
@@ -116,7 +120,9 @@ public void close() {

public void nextTuple() {
final String word = words[rand.nextInt(words.length)];

if (!throttleDuration.isZero()) {
SysUtils.sleep(throttleDuration); // sleep to throttle back CPU usage
}
// To enable acking, we need to emit each tuple with a MessageId, which is an Object.
// Each new message emitted needs to be annotated with a unique ID, which allows
// the spout to keep track of which messages should be acked back to the producer or
@@ -33,4 +33,4 @@ def process(self, tup):

def process_tick(self, tup):
self.log("Got tick tuple!")
self.log("Total received data tuple: %d" % self.total)
self.log(f"Total received data tuple: {self.total}")
@@ -20,7 +20,7 @@

"""module for example bolt: CountBolt"""
from collections import Counter
import heronpy.api.global_metrics as global_metrics
from heronpy.api import global_metrics
from heronpy.api.bolt.bolt import Bolt

# pylint: disable=unused-argument
@@ -34,8 +34,8 @@ def initialize(self, config, context):
self.counter = Counter()
self.total = 0

self.logger.info("Component-specific config: \n%s" % str(config))
self.logger.info("Context: \n%s" % str(context))
self.logger.info(f"Component-specific config: \n{str(config)}")
self.logger.info(f"Context: \n{str(context)}")

def _increment(self, word, inc_by):
self.counter[word] += inc_by
@@ -49,4 +49,4 @@ def process(self, tup):

def process_tick(self, tup):
self.log("Got tick tuple!")
self.log("Current map: %s" % str(self.counter))
self.log(f"Current map: {str(self.counter)}")
@@ -30,12 +30,12 @@ def initialize(self, config, context):
def process(self, tup):
self.total += 1
if self.total % 2 == 0:
self.logger.debug("Failing a tuple: %s" % str(tup))
self.logger.debug("Failing a tuple: %s", str(tup))
self.fail(tup)
else:
self.logger.debug("Acking a tuple: %s" % str(tup))
self.logger.debug("Acking a tuple: %s", str(tup))
self.ack(tup)

def process_tick(self, tup):
self.log("Got tick tuple!")
self.log("Total received: %d" % self.total)
self.log(f"Total received: {self.total}")
@@ -21,7 +21,7 @@
"""module for example bolt: CountBolt"""
from collections import Counter

import heronpy.api.global_metrics as global_metrics
from heronpy.api import global_metrics
from heronpy.api.bolt.bolt import Bolt
from heronpy.api.state.stateful_component import StatefulComponent

@@ -34,19 +34,19 @@ class StatefulCountBolt(Bolt, StatefulComponent):
# pylint: disable=attribute-defined-outside-init
def init_state(self, stateful_state):
self.recovered_state = stateful_state
self.logger.info("Checkpoint Snapshot recovered : %s" % str(self.recovered_state))
self.logger.info(f"Checkpoint Snapshot recovered : {str(self.recovered_state)}")

def pre_save(self, checkpoint_id):
for (k, v) in list(self.counter.items()):
self.recovered_state.put(k, v)
self.logger.info("Checkpoint Snapshot %s : %s" % (checkpoint_id, str(self.recovered_state)))
self.logger.info(f"Checkpoint Snapshot {checkpoint_id} : {str(self.recovered_state)}")

def initialize(self, config, context):
self.logger.info("In prepare() of CountBolt")
self.counter = Counter()
self.total = 0

self.logger.info("Component-specific config: \n%s" % str(config))
self.logger.info(f"Component-specific config: \n{str(config)}")

def _increment(self, word, inc_by):
self.counter[word] += inc_by
@@ -34,4 +34,4 @@ def process(self, tup):

def process_tick(self, tup):
self.log("Got tick tuple!")
self.log("Current stream counter: %s" % str(self.stream_counter))
self.log(f"Current stream counter: {str(self.stream_counter)}")
@@ -27,11 +27,11 @@ class WindowSizeBolt(SlidingWindowBolt):
A bolt that calculates the average batch size of window"""

def initialize(self, config, context):
super(WindowSizeBolt, self).initialize(config, context)
super().initialize(config, context)
self.numerator = 0.0
self.denominator = 0.0

def processWindow(self, window_info, tuples):
self.numerator += len(tuples)
self.denominator += 1
self.logger.info("The current average is %f" % (self.numerator / self.denominator))
self.logger.info(f"The current average is {(self.numerator / self.denominator)}")
@@ -36,15 +36,15 @@ def initialize(self, config, context):

self.emit_count = 0

self.logger.info("Component-specific config: \n%s" % str(config))
self.logger.info("Context: \n%s" % str(context))
self.logger.info(f"Component-specific config: \n{str(config)}")
self.logger.info(f"Context: \n{str(context)}")

def next_tuple(self):
word = next(self.words)
self.emit([word])
self.emit_count += 1

if self.emit_count % 100000 == 0:
self.logger.info("Emitted %s" % str(self.emit_count))
self.logger.info(f"Emitted {str(self.emit_count)}")
self.logger.info("Emitting to error stream")
self.emit(["test error message"], stream='error')
@@ -34,13 +34,13 @@ class StatefulWordSpout(Spout, StatefulComponent):
# pylint: disable=attribute-defined-outside-init
def init_state(self, stateful_state):
self.recovered_state = stateful_state
self.logger.info("Checkpoint Snapshot recovered : %s" % str(self.recovered_state))
self.logger.info(f"Checkpoint Snapshot recovered : {str(self.recovered_state)}")

def pre_save(self, checkpoint_id):
# Purely for debugging purposes
for (k, v) in list(self.counter.items()):
self.recovered_state.put(k, v)
self.logger.info("Checkpoint Snapshot %s : %s" % (checkpoint_id, str(self.recovered_state)))
self.logger.info(f"Checkpoint Snapshot {checkpoint_id} : {str(self.recovered_state)}")

# pylint: disable=unused-argument
def initialize(self, config, context):
@@ -52,7 +52,7 @@ def initialize(self, config, context):
self.ack_count = 0
self.fail_count = 0

self.logger.info("Component-specific config: \n%s" % str(config))
self.logger.info(f"Component-specific config: \n{str(config)}")

def next_tuple(self):
word = next(self.words)
@@ -37,8 +37,8 @@ def initialize(self, config, context):
self.ack_count = 0
self.fail_count = 0

self.logger.info("Component-specific config: \n%s" % str(config))
self.logger.info("Context: \n%s" % str(context))
self.logger.info(f"Component-specific config: \n{str(config)}")
self.logger.info(f"Context: \n{str(context)}")

def next_tuple(self):
word = next(self.words)
@@ -50,9 +50,9 @@ def next_tuple(self):
def ack(self, tup_id):
self.ack_count += 1
if self.ack_count % 100000 == 0:
self.logger.info("Acked %sth tuples, tup_id: %s" % (str(self.ack_count), str(tup_id)))
self.logger.info(f"Acked {str(self.ack_count)}th tuples, tup_id: {str(tup_id)}")

def fail(self, tup_id):
self.fail_count += 1
if self.fail_count % 100000 == 0:
self.logger.info("Failed %sth tuples, tup_id: %s" % (str(self.fail_count), str(tup_id)))
self.logger.info(f"Failed {str(self.fail_count)}th tuples, tup_id: {str(tup_id)}")

0 comments on commit 700125f

Please sign in to comment.