Skip to content

Commit

Permalink
Update apps for new API
Browse files Browse the repository at this point in the history
  • Loading branch information
jtfmumm committed Nov 3, 2018
1 parent 0e83747 commit 2792c5f
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 23 deletions.
2 changes: 2 additions & 0 deletions examples/pony/alphabet/_test/validate.py
Expand Up @@ -17,6 +17,8 @@
from json import loads
from struct import calcsize, unpack

print("Validating Alphabet")

fmt = '>LsQ'
def decoder(bs):
return unpack(fmt, bs)[1:3]
Expand Down
23 changes: 12 additions & 11 deletions examples/pony/celsius-kafka/celsius.pony
Expand Up @@ -41,18 +41,19 @@ actor Main
end

try
let application = recover val
Application("Celsius Conversion App")
.new_pipeline[F32, F32]("Celsius Conversion",
KafkaSourceConfig[F32](ksource_clip.parse_options(env.args)?,
env.root as AmbientAuth, CelsiusKafkaDecoder))
.to[F32]({(): Multiply => Multiply})
.to[F32]({(): Add => Add})
.to_sink(KafkaSinkConfig[F32](FahrenheitEncoder,
ksink_clip.parse_options(env.args)?,
env.root as AmbientAuth))
let pipeline = recover val
let inputs = Wallaroo.source[F32]("Celsius Conversion",
KafkaSourceConfig[F32](ksource_clip.parse_options(env.args)?,
env.root as AmbientAuth, CelsiusKafkaDecoder))

inputs
.to[F32](Multiply)
.to[F32](Add)
.to_sink(KafkaSinkConfig[F32](FahrenheitEncoder,
ksink_clip.parse_options(env.args)?,
env.root as AmbientAuth))
end
Startup(env, application, "celsius-conversion")
Wallaroo.build_application(env, "Celsius Conversion", pipeline)
else
@printf[I32]("Couldn't build topology\n".cstring())
end
Expand Down
17 changes: 9 additions & 8 deletions examples/pony/celsius/celsius.pony
Expand Up @@ -31,17 +31,18 @@ use "wallaroo/core/topology"
actor Main
new create(env: Env) =>
try
let application = recover val
Application("Celsius Conversion App")
.new_pipeline[F32, F32]("Celsius Conversion",
let pipeline = recover val
let inputs = Wallaroo.source[F32]("Celsius Conversion",
TCPSourceConfig[F32].from_options(CelsiusDecoder,
TCPSourceConfigCLIParser(env.args)?(0)?))
.to[F32]({(): Multiply => Multiply})
.to[F32]({(): Add => Add})
.to_sink(TCPSinkConfig[F32 val].from_options(FahrenheitEncoder,
TCPSinkConfigCLIParser(env.args)?(0)?))

inputs
.to[F32](Multiply)
.to[F32](Add)
.to_sink(TCPSinkConfig[F32 val].from_options(FahrenheitEncoder,
TCPSinkConfigCLIParser(env.args)?(0)?))
end
Startup(env, application, "celsius-conversion")
Wallaroo.build_application(env, "Celsius Conversion", pipeline)
else
@printf[I32]("Couldn't build topology\n".cstring())
end
Expand Down
1 change: 1 addition & 0 deletions examples/python/celsius_connectors/celsius.py
Expand Up @@ -37,6 +37,7 @@ def application_setup(args):
# ab.to_sink("fahrenheit_conversion")
# return ab.build()

#!@
values = wallaroo.source("convert",
wallaroo.???(decoder))

Expand Down
18 changes: 14 additions & 4 deletions utils/data_receiver/data_receiver.pony
Expand Up @@ -64,11 +64,12 @@ actor Main

// Start it up!
let listener_addr = l_arg as Array[String]
let host = listener_addr(0)?
let port = listener_addr(1)?
let tcp_auth = TCPListenAuth(env.root as AmbientAuth)
TCPListener(tcp_auth,
ListenerNotify(env.out, env.err, input_mode, output_mode),
listener_addr(0)?,
listener_addr(1)?)
ListenerNotify(env.out, env.err, input_mode, output_mode, host, port),
host, port)
else
usage(env.out)
end
Expand All @@ -91,16 +92,25 @@ class ListenerNotify is TCPListenNotify
let _stderr: OutStream
let _input_mode: InputMode
let _output_mode: OutputMode
let _host: String
let _port: String

new iso create(stdout: OutStream,
stderr: OutStream,
input_mode: InputMode,
output_mode: OutputMode)
output_mode: OutputMode,
host: String,
port: String)
=>
_stdout = stdout
_stderr = stderr
_input_mode = input_mode
_output_mode = output_mode
_host = host
_port = port

fun ref listening(listen: TCPListener ref) =>
_stdout.print("Listening on " + _host + ":" + _port)

fun ref not_listening(listen: TCPListener ref) =>
_stderr.print("Unable to listen\n")
Expand Down

0 comments on commit 2792c5f

Please sign in to comment.