Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
slfritchie authored and jtfmumm committed Jun 13, 2019
1 parent 02ff172 commit 096cdc5
Showing 1 changed file with 33 additions and 16 deletions.
Expand Up @@ -13,17 +13,18 @@ actor Main

new create(env: Env) =>
_env = env
let usage = "usage: $0 --host host:port --file /path/to/file --msg-size N --batch-size N [--report-interval usec] [--time-limit usec] [--throttled-messages] [--usec-interval usec=1000]"
let usage = "usage: $0 --host host:port --file /path/to/file --msg-size N --batch-size N [--report-interval usec] [--time-limit usec] [--throttled-messages] [--catch_up] [--msec-interval usec=1000]"

try
var h_arg: (Array[String] | None) = None
var f_arg: (String | None) = None
var b_arg: (USize | None) = None
var c_arg: Bool = false
var m_arg: (USize | None) = None
var r_arg: U64 = 0
var t_arg: U64 = 0
var thr_arg: Bool = false
var u_arg: U64 = 1000
var i_arg: U64 = 1000

var options = Options(env.args)
if env.args.size() == 1 then
Expand All @@ -35,11 +36,12 @@ actor Main
.add("host", "h", StringArgument)
.add("file", "f", StringArgument)
.add("batch-size", "b", I64Argument)
.add("catch-up", None, None)
.add("msec-interval", "i", I64Argument)
.add("msg-size", "m", I64Argument)
.add("report-interval", "r", I64Argument)
.add("time-limit", "t", I64Argument)
.add("throttled-messages", None, None)
.add("usec-interval", "u", I64Argument)

for option in options do
match option
Expand All @@ -51,6 +53,10 @@ actor Main
f_arg = arg
| ("batch-size", let arg: I64) =>
b_arg = arg.usize()
| ("cactch-up", _) =>
c_arg = true
| ("msec-interval", let arg: I64) =>
i_arg = arg.u64()
| ("msg-size", let arg: I64) =>
m_arg = arg.usize()
| ("report-interval", let arg: I64) =>
Expand All @@ -59,8 +65,6 @@ actor Main
t_arg = arg.u64() * 1000
| ("throttled-messages", _) =>
thr_arg = true
| ("usec-interval", let arg: I64) =>
u_arg = arg.u64()
end
end

Expand Down Expand Up @@ -94,11 +98,12 @@ actor Main
let batch_size = b_arg as USize
let msg_size = m_arg as USize
let host = h_arg as Array[String]
let catch_up = c_arg
let file_path = FilePath(env.root as AmbientAuth, f_arg as String)?
let msec_interval = i_arg
let report_interval = r_arg
let time_limit = t_arg
let throttle_messages = thr_arg
let usec_interval = u_arg

let batches = match OpenFile(file_path)
| let f: File =>
Expand All @@ -114,8 +119,8 @@ actor Main

try
let sender = Sender(env.root as AmbientAuth, env.err,
host(0)?, host(1)?, batches, usec_interval,
report_interval, time_limit, throttle_messages)
host(0)?, host(1)?, batches, msec_interval,
report_interval, time_limit, throttle_messages, catch_up)
sender.start()
else
env.err.print("Unable to send")
Expand All @@ -140,7 +145,7 @@ actor Sender
let _data_chunks: Array[Array[U8] val] val
var _data_chunk_index: USize = 0
let _timers: Timers = Timers
let _usec_interval: U64
let _msec_interval: U64
let _report_interval: U64
let _time_limit: U64
var _throttled: Bool = true
Expand All @@ -149,28 +154,31 @@ actor Sender
var _all_bytes_sent: USize = 0
var _start_sec: I64 = 0
var _start_nsec: I64 = 0
let _catch_up: Bool

new create(ambient: AmbientAuth,
err: OutStream,
host: String,
port: String,
data_chunks: Array[Array[U8] val] val,
usec_interval: U64,
msec_interval: U64,
report_interval: U64,
time_limit: U64,
throttle_messages: Bool)
throttle_messages: Bool,
catch_up: Bool)
=>
let notifier = Notifier(err, this, report_interval > 0, throttle_messages)
_tcp = TCPConnection(ambient, consume notifier,
host, port)
_data_chunks = data_chunks
_err = err
_usec_interval = usec_interval
_msec_interval = msec_interval
_report_interval = report_interval
_time_limit = time_limit
_catch_up = catch_up

be start() =>
let t = Timer(TriggerSend(this), 0, _usec_interval * 1000)
let t = Timer(TriggerSend(this), 0, _msec_interval * 1000*1000)
_timers(consume t)
if _report_interval > 0 then
let t2 = Timer(TriggerReport(this, _report_interval > 0),
Expand Down Expand Up @@ -235,9 +243,18 @@ actor Sender
end
_throttled = false
if _count_while_throttled > 0 then
// We only send one extra, no matter how many send messages
// arrived while we were throttled.
_send()
if _catch_up then
while _count_while_throttled > 0 do
// TODO: This loop will go too fast, and we won't get a
// throttled message until it is far too late.
_send()
_count_while_throttled = _count_while_throttled - 1
end
else
// We only send one extra, no matter how many send messages
// arrived while we were throttled.
_send()
end
end

class Notifier is TCPConnectionNotify
Expand Down

0 comments on commit 096cdc5

Please sign in to comment.