From 096cdc51bacf7143d89545f1dc61a62a6724c914 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 13 Jun 2019 01:00:19 -0500 Subject: [PATCH] WIP --- .../fixed_length_message_blaster.pony | 49 +++++++++++++------ 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/testing/tools/fixed_length_message_blaster/fixed_length_message_blaster.pony b/testing/tools/fixed_length_message_blaster/fixed_length_message_blaster.pony index 9689a81c4a..896cd78595 100644 --- a/testing/tools/fixed_length_message_blaster/fixed_length_message_blaster.pony +++ b/testing/tools/fixed_length_message_blaster/fixed_length_message_blaster.pony @@ -13,17 +13,18 @@ actor Main new create(env: Env) => _env = env - let usage = "usage: $0 --host host:port --file /path/to/file --msg-size N --batch-size N [--report-interval usec] [--time-limit usec] [--throttled-messages] [--usec-interval usec=1000]" + let usage = "usage: $0 --host host:port --file /path/to/file --msg-size N --batch-size N [--report-interval usec] [--time-limit usec] [--throttled-messages] [--catch_up] [--msec-interval usec=1000]" try var h_arg: (Array[String] | None) = None var f_arg: (String | None) = None var b_arg: (USize | None) = None + var c_arg: Bool = false var m_arg: (USize | None) = None var r_arg: U64 = 0 var t_arg: U64 = 0 var thr_arg: Bool = false - var u_arg: U64 = 1000 + var i_arg: U64 = 1000 var options = Options(env.args) if env.args.size() == 1 then @@ -35,11 +36,12 @@ actor Main .add("host", "h", StringArgument) .add("file", "f", StringArgument) .add("batch-size", "b", I64Argument) + .add("catch-up", None, None) + .add("msec-interval", "i", I64Argument) .add("msg-size", "m", I64Argument) .add("report-interval", "r", I64Argument) .add("time-limit", "t", I64Argument) .add("throttled-messages", None, None) - .add("usec-interval", "u", I64Argument) for option in options do match option @@ -51,6 +53,10 @@ actor Main f_arg = arg | ("batch-size", let arg: I64) => b_arg = arg.usize() + | ("cactch-up", _) => + c_arg = true + | ("msec-interval", let arg: I64) => + i_arg = arg.u64() | ("msg-size", let arg: I64) => m_arg = arg.usize() | ("report-interval", let arg: I64) => @@ -59,8 +65,6 @@ actor Main t_arg = arg.u64() * 1000 | ("throttled-messages", _) => thr_arg = true - | ("usec-interval", let arg: I64) => - u_arg = arg.u64() end end @@ -94,11 +98,12 @@ actor Main let batch_size = b_arg as USize let msg_size = m_arg as USize let host = h_arg as Array[String] + let catch_up = c_arg let file_path = FilePath(env.root as AmbientAuth, f_arg as String)? + let msec_interval = i_arg let report_interval = r_arg let time_limit = t_arg let throttle_messages = thr_arg - let usec_interval = u_arg let batches = match OpenFile(file_path) | let f: File => @@ -114,8 +119,8 @@ actor Main try let sender = Sender(env.root as AmbientAuth, env.err, - host(0)?, host(1)?, batches, usec_interval, - report_interval, time_limit, throttle_messages) + host(0)?, host(1)?, batches, msec_interval, + report_interval, time_limit, throttle_messages, catch_up) sender.start() else env.err.print("Unable to send") @@ -140,7 +145,7 @@ actor Sender let _data_chunks: Array[Array[U8] val] val var _data_chunk_index: USize = 0 let _timers: Timers = Timers - let _usec_interval: U64 + let _msec_interval: U64 let _report_interval: U64 let _time_limit: U64 var _throttled: Bool = true @@ -149,28 +154,31 @@ actor Sender var _all_bytes_sent: USize = 0 var _start_sec: I64 = 0 var _start_nsec: I64 = 0 + let _catch_up: Bool new create(ambient: AmbientAuth, err: OutStream, host: String, port: String, data_chunks: Array[Array[U8] val] val, - usec_interval: U64, + msec_interval: U64, report_interval: U64, time_limit: U64, - throttle_messages: Bool) + throttle_messages: Bool, + catch_up: Bool) => let notifier = Notifier(err, this, report_interval > 0, throttle_messages) _tcp = TCPConnection(ambient, consume notifier, host, port) _data_chunks = data_chunks _err = err - _usec_interval = usec_interval + _msec_interval = msec_interval _report_interval = report_interval _time_limit = time_limit + _catch_up = catch_up be start() => - let t = Timer(TriggerSend(this), 0, _usec_interval * 1000) + let t = Timer(TriggerSend(this), 0, _msec_interval * 1000*1000) _timers(consume t) if _report_interval > 0 then let t2 = Timer(TriggerReport(this, _report_interval > 0), @@ -235,9 +243,18 @@ actor Sender end _throttled = false if _count_while_throttled > 0 then - // We only send one extra, no matter how many send messages - // arrived while we were throttled. - _send() + if _catch_up then + while _count_while_throttled > 0 do + // TODO: This loop will go too fast, and we won't get a + // throttled message until it is far too late. + _send() + _count_while_throttled = _count_while_throttled - 1 + end + else + // We only send one extra, no matter how many send messages + // arrived while we were throttled. + _send() + end end class Notifier is TCPConnectionNotify