Skip to content

Commit

Permalink
FIXUP Add "flow-control" option
Browse files Browse the repository at this point in the history
  • Loading branch information
stefwalter committed Jul 16, 2018
1 parent 7994a64 commit 7c42414
Show file tree
Hide file tree
Showing 13 changed files with 63 additions and 181 deletions.
6 changes: 6 additions & 0 deletions doc/protocol.md
Expand Up @@ -108,6 +108,7 @@ The following fields are defined:
* "group": An optional channel group
* "capabilities": Optional, array of capability strings required from the bridge
* "session": Optional, set to "private" or "shared". Defaults to "shared"
* "flow-control": Optional boolean whether the channel should throttle itself via flow control.

If "binary" is set to "raw" then this channel transfers binary messages.

Expand Down Expand Up @@ -155,6 +156,11 @@ Another one is the "fence" group. While any channels are open in the "fence"
group, any channels opened after that point will be blocked and wait until all
channels in the "fence" group are closed before resuming.

The "flow-control" option controls whether a channel should attempt to throttle
itself via flow control when sending or receiving large amounts of data. The
current default (when this option is not provided) is to not do flow control.
However, this default will likely change in the future.

**Host values**

Because the host parameter is how cockpit maps url requests to the correct bridge,
Expand Down
1 change: 1 addition & 0 deletions src/base1/cockpit.js
Expand Up @@ -841,6 +841,7 @@ function Channel(options) {
else
delete command.binary;

command["flow-control"] = true;
transport.send_control(command);

/* Now drain the queue */
Expand Down
48 changes: 0 additions & 48 deletions src/bridge/test-fs.c
Expand Up @@ -290,9 +290,6 @@ test_read_simple (TestCase *tc,
control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "done");

Expand Down Expand Up @@ -362,9 +359,6 @@ test_read_changed (TestCase *tc,
control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "done");

Expand Down Expand Up @@ -399,9 +393,6 @@ test_read_replaced (TestCase *tc,
control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "done");

Expand Down Expand Up @@ -434,9 +425,6 @@ test_read_removed (TestCase *tc,
control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "done");

Expand Down Expand Up @@ -468,9 +456,6 @@ test_read_non_mmappable (TestCase *tc,
control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "done");

Expand Down Expand Up @@ -499,9 +484,6 @@ test_write_simple (TestCase *tc,
control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

control = mock_transport_pop_control (tc->transport);
tag = cockpit_get_file_tag (tc->test_path);
g_assert (json_object_get_member (control, "problem") == NULL);
Expand Down Expand Up @@ -529,9 +511,6 @@ test_write_multiple (TestCase *tc,
control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

control = mock_transport_pop_control (tc->transport);
tag = cockpit_get_file_tag (tc->test_path);
g_assert (json_object_get_member (control, "problem") == NULL);
Expand Down Expand Up @@ -560,9 +539,6 @@ test_write_remove (TestCase *tc,
control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

control = mock_transport_pop_control (tc->transport);
g_assert (json_object_get_member (control, "problem") == NULL);
g_assert_cmpstr (json_object_get_string_member (control, "tag"), ==, "-");
Expand All @@ -587,9 +563,6 @@ test_write_remove_nonexistent (TestCase *tc,
control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

control = mock_transport_pop_control (tc->transport);
g_assert (json_object_get_member (control, "problem") == NULL);
g_assert_cmpstr (json_object_get_string_member (control, "tag"), ==, "-");
Expand Down Expand Up @@ -617,9 +590,6 @@ test_write_empty (TestCase *tc,
control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

control = mock_transport_pop_control (tc->transport);
tag = cockpit_get_file_tag (tc->test_path);
g_assert (json_object_get_member (control, "problem") == NULL);
Expand Down Expand Up @@ -671,9 +641,6 @@ test_write_expect_non_existent (TestCase *tc,
control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

control = mock_transport_pop_control (tc->transport);
tag = cockpit_get_file_tag (tc->test_path);
g_assert (json_object_get_member (control, "problem") == NULL);
Expand Down Expand Up @@ -724,9 +691,6 @@ test_write_expect_tag (TestCase *tc,
control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

control = mock_transport_pop_control (tc->transport);
tag = cockpit_get_file_tag (tc->test_path);
g_assert (json_object_get_member (control, "problem") == NULL);
Expand Down Expand Up @@ -757,9 +721,6 @@ test_write_expect_tag_fail (TestCase *tc,
control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = mock_transport_pop_control (tc->transport);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

control = mock_transport_pop_control (tc->transport);
tag = cockpit_get_file_tag (tc->test_path);
g_assert_cmpstr (json_object_get_string_member (control, "problem"), ==, "out-of-date");
Expand Down Expand Up @@ -878,9 +839,6 @@ test_dir_simple (TestCase *tc,
control = recv_control (tc);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = recv_control(tc);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

g_free (base);

close_channel (tc, NULL);
Expand Down Expand Up @@ -910,9 +868,6 @@ test_dir_simple_no_watch (TestCase *tc,
control = recv_control (tc);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = recv_control (tc);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

g_free (base);

// channel should be closed
Expand Down Expand Up @@ -950,9 +905,6 @@ test_dir_watch (TestCase *tc,
control = recv_control (tc);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ready");

control = recv_control (tc);
g_assert_cmpstr (json_object_get_string_member (control, "command"), ==, "ping");

set_contents (tc->test_path, "Hello!");

GFile *dir = g_file_new_for_path (tc->test_subdir);
Expand Down
6 changes: 1 addition & 5 deletions src/bridge/test-httpstream.c
Expand Up @@ -221,8 +221,6 @@ test_http_stream2 (TestGeneral *tt,
object = mock_transport_pop_control (tt->transport);
cockpit_assert_json_eq (object, "{\"command\":\"ready\",\"channel\":\"444\"}");
object = mock_transport_pop_control (tt->transport);
cockpit_assert_json_eq (object, "{\"command\":\"ping\",\"channel\":\"444\",\"sequence\":0}");
object = mock_transport_pop_control (tt->transport);
cockpit_assert_json_eq (object, "{\"command\":\"response\",\"channel\":\"444\",\"status\":200,\"reason\":\"OK\",\"headers\":{\"X-DNS-Prefetch-Control\":\"off\",\"Referrer-Policy\":\"no-referrer\"}}");

data = mock_transport_combine_output (tt->transport, "444", &count);
Expand Down Expand Up @@ -825,13 +823,11 @@ test_tls_authority_bad (TestTls *test,
cockpit_transport_emit_recv (COCKPIT_TRANSPORT (test->transport), NULL, bytes);
g_bytes_unref (bytes);

while (mock_transport_count_sent (test->transport) < 3)
while (mock_transport_count_sent (test->transport) < 2)
g_main_context_iteration (NULL, TRUE);

resp = mock_transport_pop_control (test->transport);
cockpit_assert_json_eq (resp, "{\"command\":\"ready\",\"channel\":\"444\"}");
resp = mock_transport_pop_control (test->transport);
cockpit_assert_json_eq (resp, "{\"command\":\"ping\",\"channel\":\"444\",\"sequence\":0}");

resp = mock_transport_pop_control (test->transport);
expected_json = g_strdup_printf ("{\"command\":\"close\",\"channel\":\"444\",\"problem\":\"unknown-hostkey\", "
Expand Down
10 changes: 0 additions & 10 deletions src/bridge/test-peer.c
Expand Up @@ -465,11 +465,6 @@ test_reopen (TestCase *tc,
/* Reset the peer. This closes the channel. */
cockpit_peer_reset (tc->peer);

while ((control = mock_transport_pop_control (tc->transport)) == NULL)
g_main_context_iteration (NULL, TRUE);
cockpit_assert_json_eq (control, "{\"command\":\"ping\",\"channel\":\"a\",\"sequence\":0}");
control = NULL;

while ((control = mock_transport_pop_control (tc->transport)) == NULL)
g_main_context_iteration (NULL, TRUE);
cockpit_assert_json_eq (control, "{\"command\":\"close\",\"channel\":\"a\",\"problem\":\"terminated\"}");
Expand Down Expand Up @@ -521,11 +516,6 @@ test_timeout (TestCase *tc,
g_signal_connect (other, "closed", G_CALLBACK (on_other_closed), &closed);
emit_string (tc, NULL, "{\"command\": \"close\", \"channel\": \"a\"}");

while ((control = mock_transport_pop_control (tc->transport)) == NULL)
g_main_context_iteration (NULL, TRUE);
cockpit_assert_json_eq (control, "{\"command\":\"ping\",\"channel\":\"a\",\"sequence\":0}");
control = NULL;

while ((control = mock_transport_pop_control (tc->transport)) == NULL)
g_main_context_iteration (NULL, TRUE);
cockpit_assert_json_eq (control, "{\"command\":\"close\",\"channel\":\"a\"}");
Expand Down
24 changes: 5 additions & 19 deletions src/bridge/test-pipe-channel.c
Expand Up @@ -237,8 +237,7 @@ test_echo (TestCase *tc,
payload = g_bytes_new ("Marmalaade!", 11);
cockpit_transport_emit_recv (COCKPIT_TRANSPORT (tc->transport), "548", payload);

/* A ready message, a ping, and the expected message */
while (mock_transport_count_sent (tc->transport) < 3)
while (mock_transport_count_sent (tc->transport) < 2)
g_main_context_iteration (NULL, TRUE);

sent = mock_transport_pop_channel (tc->transport, "548");
Expand Down Expand Up @@ -271,8 +270,6 @@ test_shutdown (TestCase *tc,
sent = mock_transport_pop_control (tc->transport);
expect_control_message (sent, "ready", "548", NULL);
sent = mock_transport_pop_control (tc->transport);
expect_control_message (sent, "ping", "548", NULL);
sent = mock_transport_pop_control (tc->transport);
expect_control_message (sent, "done", "548", NULL);

sent = mock_transport_pop_control (tc->transport);
Expand Down Expand Up @@ -309,8 +306,6 @@ test_close_normal (TestCase *tc,
control = mock_transport_pop_control (tc->transport);
expect_control_message (control, "ready", "548", NULL);
control = mock_transport_pop_control (tc->transport);
expect_control_message (control, "ping", "548", NULL);
control = mock_transport_pop_control (tc->transport);
expect_control_message (control, "done", "548", NULL);

control = mock_transport_pop_control (tc->transport);
Expand Down Expand Up @@ -340,7 +335,6 @@ test_close_problem (TestCase *tc,
g_assert_cmpstr (tc->channel_problem, ==, "boooyah");
g_assert (mock_transport_pop_channel (tc->transport, "548") == NULL);
expect_control_message (mock_transport_pop_control (tc->transport), "ready", "548", NULL);
expect_control_message (mock_transport_pop_control (tc->transport), "ping", "548", NULL);
expect_control_message (mock_transport_pop_control (tc->transport),
"close", "548", "problem", "boooyah", NULL);
}
Expand Down Expand Up @@ -376,8 +370,7 @@ test_spawn_simple (void)
cockpit_transport_emit_recv (COCKPIT_TRANSPORT (transport), "548", sent);
cockpit_channel_close (channel, NULL);

/* A ready message, a ping, and the expected message */
while (mock_transport_count_sent (transport) < 3)
while (mock_transport_count_sent (transport) < 2)
g_main_context_iteration (NULL, TRUE);
g_assert (g_bytes_equal (sent, mock_transport_pop_channel (transport, "548")));
g_bytes_unref (sent);
Expand Down Expand Up @@ -487,8 +480,6 @@ test_spawn_status (void)
control = mock_transport_pop_control (transport);
expect_control_message (control, "ready", "548", NULL);
control = mock_transport_pop_control (transport);
expect_control_message (control, "ping", "548", NULL);
control = mock_transport_pop_control (transport);
expect_control_message (control, "done", "548", NULL);

control = mock_transport_pop_control (transport);
Expand Down Expand Up @@ -537,8 +528,6 @@ test_spawn_signal (void)
control = mock_transport_pop_control (transport);
expect_control_message (control, "ready", "548", NULL);
control = mock_transport_pop_control (transport);
expect_control_message (control, "ping", "548", NULL);
control = mock_transport_pop_control (transport);
expect_control_message (control, "done", "548", NULL);

control = mock_transport_pop_control (transport);
Expand Down Expand Up @@ -721,8 +710,7 @@ test_send_invalid (TestCase *tc,
cockpit_transport_emit_recv (COCKPIT_TRANSPORT (tc->transport), "548", sent);
g_bytes_unref (sent);

/* A ready message, ping message, and data message */
while (mock_transport_count_sent (tc->transport) < 3)
while (mock_transport_count_sent (tc->transport) < 2)
g_main_context_iteration (NULL, TRUE);

converted = g_bytes_new ("Oh \xef\xbf\xbd""Marma""\xef\xbf\xbd""laade!", 20);
Expand All @@ -744,8 +732,7 @@ test_recv_invalid (TestCase *tc,
g_assert_cmpint (g_socket_send (tc->conn_sock, "\x00Marmalaade!\x00", 13, NULL, &error), ==, 13);
g_assert_no_error (error);

/* A ready message, ping message, and data message */
while (mock_transport_count_sent (tc->transport) < 3)
while (mock_transport_count_sent (tc->transport) < 2)
g_main_context_iteration (NULL, TRUE);

converted = g_bytes_new ("\xef\xbf\xbd""Marmalaade!""\xef\xbf\xbd", 17);
Expand Down Expand Up @@ -781,8 +768,7 @@ test_recv_valid_batched (TestCase *tc,

g_timeout_add (100, add_remainder, tc->conn_sock);

/* A ready message, ping message, and data message */
while (mock_transport_count_sent (tc->transport) < 3)
while (mock_transport_count_sent (tc->transport) < 2)
g_main_context_iteration (NULL, TRUE);

converted = g_bytes_new ("Marmalaade!\xe2\x94\x80", 14);
Expand Down

0 comments on commit 7c42414

Please sign in to comment.