Skip to content

Commit

Permalink
Fix outdated ConnectorSourceCoordinator naming
Browse files Browse the repository at this point in the history
  • Loading branch information
jtfmumm committed Apr 29, 2019
1 parent bc995da commit 53fedd9
Showing 1 changed file with 7 additions and 7 deletions.
Expand Up @@ -177,7 +177,7 @@ class ConnectorSourceNotify[In: Any val]
var _router: Router
let _metrics_reporter: MetricsReporter
let _header_size: USize
var _listener: ConnectorSourceCoordinator[In]
var _coordinator: ConnectorSourceCoordinator[In]

// Barrier/checkpoint id tracking
var _barrier_checkpoint_id: CheckpointId = 0
Expand Down Expand Up @@ -212,7 +212,7 @@ class ConnectorSourceNotify[In: Any val]

new create(source_id': RoutingId,
parameters: ConnectorSourceNotifyParameters[In],
listener': ConnectorSourceCoordinator[In], is_recovering: Bool)
coordinator': ConnectorSourceCoordinator[In], is_recovering: Bool)
=>
source_id = source_id'
_pipeline_name = parameters.pipeline_name
Expand All @@ -231,7 +231,7 @@ class ConnectorSourceNotify[In: Any val]
host = parameters.host
service = parameters.service

_listener = listener'
_coordinator = coordinator'

ifdef "trace" then
@printf[I32]("%s: max_credits = %lu, refill_credits = %lu\n".cstring(),
Expand Down Expand Up @@ -429,7 +429,7 @@ class ConnectorSourceNotify[In: Any val]
else
// respond immediately
_send_reply(source, cwm.AckMsg(0, [(s.id, s.last_seen)]))
_listener.streams_relinquish(source_id,
_coordinator.streams_relinquish(source_id,
[StreamTuple(s.id, s.name, s.last_seen)])
end
return _continue_perhaps(source)
Expand Down Expand Up @@ -858,12 +858,12 @@ class ConnectorSourceNotify[In: Any val]
if streams.size() > 0 then
@printf[I32]("ConnectorSource relinquishing %s streams\n".cstring(),
streams.size().string().cstring())
_listener.streams_relinquish(source_id, consume streams)
_coordinator.streams_relinquish(source_id, consume streams)
else
if _fsm_state is _ProtoFsmShrinking then
@printf[I32]("ConnectorSource shrinking %s streams\n".cstring(),
streams.size().string().cstring())
_listener.streams_relinquish(source_id, consume streams)
_coordinator.streams_relinquish(source_id, consume streams)
end
end

Expand Down Expand Up @@ -898,7 +898,7 @@ class ConnectorSourceNotify[In: Any val]

// send request to take ownership of this stream id
let request_id = ConnectorStreamNotifyId(stream_id, _session_id)
_listener.stream_notify(request_id, stream_id, stream_name,
_coordinator.stream_notify(request_id, stream_id, stream_name,
point_of_reference, promise, source)

fun ref stream_notify_result(source: ConnectorSource[In] ref,
Expand Down

0 comments on commit 53fedd9

Please sign in to comment.