Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect ping/pong flow-control abilities properly #9642

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ The following fields are defined:
* "group": An optional channel group
* "capabilities": Optional, array of capability strings required from the bridge
* "session": Optional, set to "private" or "shared". Defaults to "shared"
* "flow-control": Optional boolean whether the channel should throttle itself via flow control.

If "binary" is set to "raw" then this channel transfers binary messages.

Expand Down Expand Up @@ -155,6 +156,11 @@ Another one is the "fence" group. While any channels are open in the "fence"
group, any channels opened after that point will be blocked and wait until all
channels in the "fence" group are closed before resuming.

The "flow-control" option controls whether a channel should attempt to throttle
itself via flow control when sending or receiving large amounts of data. The
current default (when this option is not provided) is to not do flow control.
However, this default will likely change in the future.

**Host values**

Because the host parameter is how cockpit maps url requests to the correct bridge,
Expand Down
1 change: 1 addition & 0 deletions src/base1/cockpit.js
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ function Channel(options) {
else
delete command.binary;

command["flow-control"] = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to understand -- the binary cockpit-bridge and thus the protocol default to "off" -- but the JavaScript API defaults to on, so that as soon as one uses a newer cockpit-system they get flow control enabled for everything. This would still work with new cockpit-system and old cockpit-bridge as unknown options get ignored. Did I understand that correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct.

transport.send_control(command);

/* Now drain the queue */
Expand Down
56 changes: 36 additions & 20 deletions src/common/cockpitchannel.c
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ struct _CockpitChannelPrivate {
gint64 out_window;

/* Another object giving back-pressure on received data */
gboolean flow_control;
CockpitFlow *pressure;
gulong pressure_sig;
GQueue *throttled;
Expand Down Expand Up @@ -217,6 +218,9 @@ process_pong (CockpitChannel *self,
{
gint64 sequence;

if (!self->priv->flow_control)
return;

if (!cockpit_json_get_int (pong, "sequence", -1, &sequence))
{
g_message ("%s: received invalid \"pong\" \"sequence\" field", self->priv->id);
Expand Down Expand Up @@ -311,7 +315,6 @@ on_transport_closed (CockpitTransport *transport,
cockpit_channel_close (self, problem);
}


static void
cockpit_channel_actual_send (CockpitChannel *self,
GBytes *payload,
Expand All @@ -334,28 +337,31 @@ cockpit_channel_actual_send (CockpitChannel *self,
cockpit_transport_send (self->priv->transport, self->priv->id, payload);

/* A wraparound of our gint64 size? */
size = g_bytes_get_size (payload);
g_return_if_fail (G_MAXINT64 - size > self->priv->out_sequence);
if (self->priv->flow_control)
{
size = g_bytes_get_size (payload);
g_return_if_fail (G_MAXINT64 - size > self->priv->out_sequence);

/* How many bytes have been sent (queued) */
out_sequence = self->priv->out_sequence + size;
/* How many bytes have been sent (queued) */
out_sequence = self->priv->out_sequence + size;

/* Every CHANNEL_FLOW_PING bytes we send a ping */
if (out_sequence / CHANNEL_FLOW_PING != self->priv->out_sequence / CHANNEL_FLOW_PING)
{
ping = json_object_new ();
json_object_set_int_member (ping, "sequence", out_sequence);
cockpit_channel_control (self, "ping", ping);
g_debug ("%s: sending ping with sequence: %" G_GINT64_FORMAT, self->priv->id, out_sequence);
json_object_unref (ping);
}
/* Every CHANNEL_FLOW_PING bytes we send a ping */
if (out_sequence / CHANNEL_FLOW_PING != self->priv->out_sequence / CHANNEL_FLOW_PING)
{
ping = json_object_new ();
json_object_set_int_member (ping, "sequence", out_sequence);
cockpit_channel_control (self, "ping", ping);
g_debug ("%s: sending ping with sequence: %" G_GINT64_FORMAT, self->priv->id, out_sequence);
json_object_unref (ping);
}

/* If we've sent more than the window, apply back pressure */
self->priv->out_sequence = out_sequence;
if (self->priv->out_sequence > self->priv->out_window)
{
g_debug ("%s: sent too much data without acknowledgement, emitting back pressure", self->priv->id);
cockpit_flow_emit_pressure (COCKPIT_FLOW (self), TRUE);
/* If we've sent more than the window, apply back pressure */
self->priv->out_sequence = out_sequence;
if (self->priv->out_sequence > self->priv->out_window)
{
g_debug ("%s: sent too much data without acknowledgement, emitting back pressure", self->priv->id);
cockpit_flow_emit_pressure (COCKPIT_FLOW (self), TRUE);
}
}

if (validated)
Expand Down Expand Up @@ -509,6 +515,16 @@ cockpit_channel_real_prepare (CockpitChannel *channel)
"channel has invalid \"binary\" option: %s", binary);
}
}

/*
* The default here, can change from FALSE to TRUE over time once we assume that all
* cockpit-ws participants have been upgraded sufficiently. The default when we're
* on the channel creation side is to handle flow control.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably once we make this decision, we can just revert this patch, in fact. I can't imagine we'll ever have someone sending "{"flow-control": false}".

*/
if (!cockpit_json_get_bool (options, "flow-control", FALSE, &self->priv->flow_control))
{
cockpit_channel_fail (self, "protocol-error", "channel has invalid \"flow-control\" option");
}
}

static void
Expand Down
4 changes: 4 additions & 0 deletions src/common/test-channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -601,11 +601,13 @@ setup_pair (TestPairCase *tc,
options = json_object_new ();
json_object_set_string_member (options, "command", "open");
json_object_set_string_member (options, "channel", "999");
json_object_set_boolean_member (options, "flow-control", TRUE);
tc->channel_a = g_object_new (mock_null_channel_get_type (),
"id", "999",
"options", options,
"transport", tc->transport_a,
NULL);
cockpit_channel_prepare (tc->channel_a);
json_object_unref (options);

pipe = cockpit_pipe_new ("b", sv[1], sv[1]);
Expand All @@ -614,11 +616,13 @@ setup_pair (TestPairCase *tc,

options = json_object_new ();
json_object_set_string_member (options, "channel", "999");
json_object_set_boolean_member (options, "flow-control", TRUE);
tc->channel_b = g_object_new (mock_null_channel_get_type (),
"id", "999",
"options", options,
"transport", tc->transport_b,
NULL);
cockpit_channel_prepare (tc->channel_b);
json_object_unref (options);
}

Expand Down
5 changes: 5 additions & 0 deletions src/ws/cockpitchannelresponse.c
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ cockpit_channel_response_prepare (CockpitChannel *channel)
const gchar *payload;
JsonObject *open;

COCKPIT_CHANNEL_CLASS (cockpit_channel_response_parent_class)->prepare (channel);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add the previously-missing chain-up as a separate commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


/*
* Tell the transport to throttle incoming flow on the given channel based on
* output pressure in the web response.
Expand Down Expand Up @@ -596,6 +598,7 @@ cockpit_channel_response_serve (CockpitWebService *service,

channel = cockpit_web_service_unique_channel (service);
json_object_set_string_member (object, "channel", channel);
json_object_set_boolean_member (object, "flow-control", TRUE);

if (quoted_etag)
{
Expand Down Expand Up @@ -710,6 +713,8 @@ cockpit_channel_response_open (CockpitWebService *service,
if (!json_object_has_member (open, "binary"))
json_object_set_string_member (open, "binary", "raw");

json_object_set_boolean_member (open, "flow-control", TRUE);

if (!content_type)
{
if (!cockpit_web_service_parse_binary (open, &data_type))
Expand Down
2 changes: 2 additions & 0 deletions src/ws/cockpitchannelsocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ cockpit_channel_socket_open (CockpitWebService *service,
goto out;
}

json_object_set_boolean_member (open, "flow-control", TRUE);

id = cockpit_web_service_unique_channel (service);
self = g_object_new (COCKPIT_TYPE_CHANNEL_SOCKET,
"transport", transport,
Expand Down