Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds max_size option to TCPSourceConfig #3134

Merged
merged 1 commit into from May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/wallaroo/core/source/tcp_source/tcp_source_config.pony
Expand Up @@ -84,22 +84,26 @@ class val TCPSourceConfigOptions
class val TCPSourceConfig[In: Any val] is SourceConfig
let handler: FramedSourceHandler[In] val
let parallelism: USize
let max_size: USize
let _worker_source_config: WorkerTCPSourceConfig

new val create(source_name': SourceName,
handler': FramedSourceHandler[In] val, host': String, service': String,
valid': Bool, parallelism': USize = 10)
valid': Bool, parallelism': USize = 10, max_size': USize = 16384)
=>
handler = handler'
parallelism = parallelism'
max_size = max_size'
_worker_source_config = WorkerTCPSourceConfig(source_name', host',
service', valid')

new val from_options(handler': FramedSourceHandler[In] val,
opts: TCPSourceConfigOptions, parallelism': USize = 10)
opts: TCPSourceConfigOptions, parallelism': USize = 10,
max_size': USize = 16384)
=>
handler = handler'
parallelism = parallelism'
max_size = max_size'
_worker_source_config = WorkerTCPSourceConfig(opts.source_name, opts.host,
opts.service, opts.valid)

Expand Down
Expand Up @@ -46,6 +46,7 @@ class val TCPSourceCoordinatorBuilder[In: Any val] is SourceCoordinatorBuilder
let _recovering: Bool
let _target_router: Router
let _parallelism: USize
let _max_size: USize
let _handler: FramedSourceHandler[In] val
let _host: String
let _service: String
Expand All @@ -59,7 +60,7 @@ class val TCPSourceCoordinatorBuilder[In: Any val] is SourceCoordinatorBuilder
event_log: EventLog, auth: AmbientAuth,
layout_initializer: LayoutInitializer,
recovering: Bool, target_router: Router = EmptyRouter,
parallelism: USize, handler: FramedSourceHandler[In] val,
parallelism: USize, max_size: USize, handler: FramedSourceHandler[In] val,
host: String, service: String, valid: Bool)
=>
_worker_name = worker_name
Expand All @@ -77,6 +78,7 @@ class val TCPSourceCoordinatorBuilder[In: Any val] is SourceCoordinatorBuilder
_recovering = recovering
_target_router = target_router
_parallelism = parallelism
_max_size = max_size
_handler = handler
_host = host
_service = service
Expand All @@ -87,8 +89,8 @@ class val TCPSourceCoordinatorBuilder[In: Any val] is SourceCoordinatorBuilder
_runner_builder, _partitioner_builder, _router, _metrics_conn,
_metrics_reporter.clone(), _router_registry,
_outgoing_boundary_builders, _event_log, _auth, _layout_initializer,
_recovering, _target_router, _parallelism, _handler, _host, _service,
_valid)
_recovering, _target_router, _parallelism, _handler,
_host, _service, _valid where max_size = _max_size)

class val TCPSourceCoordinatorBuilderBuilder[In: Any val] is
SourceCoordinatorBuilderBuilder
Expand Down Expand Up @@ -119,14 +121,14 @@ class val TCPSourceCoordinatorBuilderBuilder[In: Any val] is
runner_builder, partitioner_builder, router, metrics_conn,
consume metrics_reporter, router_registry, outgoing_boundary_builders,
event_log, auth, layout_initializer, recovering, target_router,
_source_config.parallelism, _source_config.handler,
config.host, config.service, config.valid)
_source_config.parallelism, _source_config.max_size,
_source_config.handler, config.host, config.service, config.valid)
else
Unreachable()
TCPSourceCoordinatorBuilder[In](worker_name, pipeline_name,
runner_builder, partitioner_builder, router, metrics_conn,
consume metrics_reporter, router_registry, outgoing_boundary_builders,
event_log, auth, layout_initializer, recovering, target_router,
_source_config.parallelism, _source_config.handler,
"0", "0", false)
_source_config.parallelism, _source_config.max_size,
_source_config.handler, "0", "0", false)
end
5 changes: 3 additions & 2 deletions machida/lib/wallaroo/__init__.py
Expand Up @@ -510,17 +510,18 @@ def encoder(func):


class TCPSourceConfig(object):
def __init__(self, name, host, port, decoder, valid=True, parallelism=10):
def __init__(self, name, host, port, decoder, valid=True, parallelism=10, max_size=16384):
self._host = host
self._port = port
self._name = name
self._decoder = decoder
self._valid = valid
self._parallelism = parallelism
self._max_size = max_size

def to_tuple(self):
return ("tcp", self._name, self._host, self._port, self._decoder,
self._valid, self._parallelism)
self._valid, self._parallelism, self._max_size)


class GenSourceConfig(object):
Expand Down
7 changes: 6 additions & 1 deletion machida/machida.pony
Expand Up @@ -126,6 +126,7 @@ use @PyList_GetItem[Pointer[U8] val](l: Pointer[U8] box, i: USize)
use @PyList_SetItem[I32](l: Pointer[U8] box, i: USize, item: Pointer[U8] box)
use @PyList_AsTuple[Pointer[U8] val](list: Pointer[U8] tag)
use @PyInt_AsLong[I64](i: Pointer[U8] box)
use @PyLong_AsLong[I64](i: Pointer[U8] box)
use @PyObject_HasAttrString[I32](o: Pointer[U8] box, attr: Pointer[U8] tag)
use @PyObject_IsTrue[I32](o: Pointer[U8] box)

Expand Down Expand Up @@ -926,8 +927,12 @@ primitive _SourceConfig
USize.from[I64](@PyInt_AsLong(@PyTuple_GetItem(source_config_tuple, 6)))
end

let max_size: USize = recover val
USize.from[I64](@PyLong_AsLong(@PyTuple_GetItem(source_config_tuple, 7)))
end

TCPSourceConfig[(PyData val | None)](source_name, decoder, host, port,
valid, parallelism)
valid, parallelism, max_size)
| "kafka-internal" =>
let ksclip = KafkaSourceConfigCLIParser(env.out, source_name)
let ksco = ksclip.parse_options(env.args)?
Expand Down
6 changes: 5 additions & 1 deletion machida3/machida.pony
Expand Up @@ -951,8 +951,12 @@ primitive _SourceConfig
USize.from[I64](@PyLong_AsLong(@PyTuple_GetItem(source_config_tuple, 6)))
end

let max_size: USize = recover val
USize.from[I64](@PyLong_AsLong(@PyTuple_GetItem(source_config_tuple, 7)))
end

TCPSourceConfig[(PyData val | None)](source_name, decoder, host, port,
valid, parallelism)
valid, parallelism, max_size)
| "kafka-internal" =>
let ksclip = KafkaSourceConfigCLIParser(env.out, source_name)
let ksco = ksclip.parse_options(env.args)?
Expand Down