Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

TS-1496: Enable per transaction flow control

  • Loading branch information...
commit e768cb61c335d8edafdc2cf17b4553cb490e49e4 1 parent e9cd43b
@SolidWallOfCode SolidWallOfCode authored
View
2  CHANGES
@@ -6,6 +6,8 @@
*) [TS-1942] Remove username.cache configs, they are obsolete and long gone.
+ *) [TS-1496] Enable per transaction flow control.
+
Changes with Apache Traffic Server 3.3.4
View
7 iocore/eventsystem/I_IOBuffer.h
@@ -578,6 +578,12 @@ class IOBufferReader
*/
int64_t read_avail();
+ /** Check if there is more than @a size bytes available to read.
+ @return @c true if more than @a size byte are available.
+ */
+ bool is_read_avail_more_than(int64_t size);
+
+
/**
Number of IOBufferBlocks with data in the block list. Returns the
number of IOBufferBlocks on the block list with data remaining for
@@ -1071,6 +1077,7 @@ class MIOBuffer
return !_writer;
}
int64_t max_read_avail();
+
int max_block_count();
void check_add_block();
View
15 iocore/eventsystem/P_IOBuffer.h
@@ -633,6 +633,21 @@ IOBufferReader::read_avail()
return t;
}
+inline bool
+IOBufferReader::is_read_avail_more_than(int64_t size)
+{
+ int64_t t = -start_offset;
+ IOBufferBlock* b = block;
+ while (b) {
+ t += b->read_avail();
+ if (t > size) {
+ return true;
+ }
+ b = b->next;
+ }
+ return false;
+}
+
TS_INLINE void
IOBufferReader::consume(int64_t n)
{
View
6 mgmt/RecordsConfig.cc
@@ -429,6 +429,12 @@ RecordElement RecordsConfig[] = {
,
{RECT_CONFIG, "proxy.config.http.chunking.size", RECD_INT, "4096", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
,
+ {RECT_CONFIG, "proxy.config.http.flow_control.enabled", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
+ ,
+ {RECT_CONFIG, "proxy.config.http.flow_control.high_water", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
+ ,
+ {RECT_CONFIG, "proxy.config.http.flow_control.low_water", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
+ ,
{RECT_CONFIG, "proxy.config.http.session_auth_cache_keep_alive_enabled", RECD_INT, "1", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}
,
// # Send http11 requests
View
18 proxy/InkAPI.cc
@@ -7583,6 +7583,17 @@ _conf_to_memberp(TSOverridableConfigKey conf, HttpSM* sm, OverridableDataType *t
typ = OVERRIDABLE_TYPE_INT;
ret = &sm->t_state.txn_conf->http_chunking_size;
break;
+ case TS_CONFIG_HTTP_FLOW_CONTROL_ENABLED:
+ ret = &sm->t_state.txn_conf->flow_control_enabled;
+ break;
+ case TS_CONFIG_HTTP_FLOW_CONTROL_HIGH_WATER_MARK:
+ typ = OVERRIDABLE_TYPE_INT;
+ ret = &sm->t_state.txn_conf->flow_high_water_mark;
+ break;
+ case TS_CONFIG_HTTP_FLOW_CONTROL_LOW_WATER_MARK:
+ typ = OVERRIDABLE_TYPE_INT;
+ ret = &sm->t_state.txn_conf->flow_low_water_mark;
+ break;
// This helps avoiding compiler warnings, yet detect unhandled enum members.
case TS_CONFIG_NULL:
@@ -7816,6 +7827,9 @@ TSHttpTxnConfigFind(const char* name, int length, TSOverridableConfigKey *conf,
case 'd':
if (!strncmp(name, "proxy.config.http.server_tcp_init_cwnd", length))
cnf = TS_CONFIG_HTTP_SERVER_TCP_INIT_CWND;
+ else if (!strncmp(name, "proxy.config.http.flow_control.enabled", length))
+ cnf = TS_CONFIG_HTTP_FLOW_CONTROL_ENABLED;
+ break;
break;
case 's':
if (!strncmp(name, "proxy.config.http.send_http11_requests", length))
@@ -7856,6 +7870,8 @@ TSHttpTxnConfigFind(const char* name, int length, TSOverridableConfigKey *conf,
cnf = TS_CONFIG_URL_REMAP_PRISTINE_HOST_HDR;
else if (!strncmp(name, "proxy.config.http.insert_request_via_str", length))
cnf = TS_CONFIG_HTTP_INSERT_REQUEST_VIA_STR;
+ else if (!strncmp(name, "proxy.config.http.flow_control.low_water", length))
+ cnf = TS_CONFIG_HTTP_FLOW_CONTROL_LOW_WATER_MARK;
break;
case 's':
if (!strncmp(name, "proxy.config.http.origin_max_connections", length))
@@ -7887,6 +7903,8 @@ TSHttpTxnConfigFind(const char* name, int length, TSOverridableConfigKey *conf,
case 'r':
if (!strncmp(name, "proxy.config.http.insert_response_via_str", length))
cnf = TS_CONFIG_HTTP_INSERT_RESPONSE_VIA_STR;
+ else if (!strncmp(name, "proxy.config.http.flow_control.high_water", length))
+ cnf = TS_CONFIG_HTTP_FLOW_CONTROL_HIGH_WATER_MARK;
break;
}
break;
View
4 proxy/InkAPITest.cc
@@ -5599,6 +5599,7 @@ typedef enum
ORIG_TS_HTTP_CACHE_LOOKUP_COMPLETE_HOOK,
ORIG_TS_HTTP_PRE_REMAP_HOOK,
ORIG_TS_HTTP_POST_REMAP_HOOK,
+ ORIG_TS_HTTP_RESPONSE_CLIENT_HOOK,
ORIG_TS_HTTP_LAST_HOOK
} ORIG_TSHttpHookID;
@@ -7451,6 +7452,9 @@ const char *SDK_Overridable_Configs[TS_CONFIG_LAST_ENTRY] = {
"proxy.config.net.sock_packet_tos_out",
"proxy.config.http.insert_age_in_response",
"proxy.config.http.chunking.size",
+ "proxy.config.http.flow_control.enabled",
+ "proxy.config.http.flow_control.low_water",
+ "proxy.config.http.flow_control.high_water"
};
REGRESSION_TEST(SDK_API_OVERRIDABLE_CONFIGS) (RegressionTest * test, int atype, int *pstatus)
View
27 proxy/Transform.cc
@@ -398,7 +398,7 @@ TransformTerminus::reenable(VIO *vio)
-------------------------------------------------------------------------*/
TransformVConnection::TransformVConnection(Continuation *cont, APIHook *hooks)
-:VConnection(cont->mutex), m_cont(cont), m_terminus(this), m_closed(0)
+:TransformVCChain(cont->mutex), m_cont(cont), m_terminus(this), m_closed(0)
{
INKVConnInternal *xform;
@@ -506,6 +506,31 @@ TransformVConnection::reenable(VIO *vio)
ink_assert(!"not reached");
}
+/*-------------------------------------------------------------------------
+ -------------------------------------------------------------------------*/
+
+uint64_t
+TransformVConnection::backlog(uint64_t limit)
+{
+ uint64_t b = 0; // backlog
+ VConnection* raw_vc = m_transform;
+ MIOBuffer* w;
+ while (raw_vc && raw_vc != &m_terminus) {
+ INKVConnInternal* vc = static_cast<INKVConnInternal*>(raw_vc);
+ if (0 != (w = vc->m_read_vio.buffer.writer()))
+ b += w->max_read_avail();
+ if (b >= limit) return b;
+ raw_vc = vc->m_output_vc;
+ }
+ if (0 != (w = m_terminus.m_read_vio.buffer.writer()))
+ b += w->max_read_avail();
+ if (b >= limit) return b;
+
+ IOBufferReader* r = m_terminus.m_write_vio.get_reader();
+ if (r)
+ b += r->read_avail();
+ return b;
+}
/*-------------------------------------------------------------------------
-------------------------------------------------------------------------*/
View
28 proxy/Transform.h
@@ -60,6 +60,34 @@ class TransformTest
};
#endif
+/** A protocol class.
+ This provides transform VC specific methods for external access
+ without exposing internals or requiring extra includes.
+*/
+class TransformVCChain : public VConnection
+{
+ protected:
+ /// Required constructor
+ TransformVCChain(ProxyMutex* m);
+ public:
+ /** Compute the backlog. This is the amount of data ready to read
+ for each element of the chain. If @a limit is non-negative then
+ the method will return as soon as the computed backlog is at
+ least that large. This provides for more efficient checking if
+ the caller is interested only in whether the backlog is at least
+ @a limit. The default is to accurately compute the backlog.
+ */
+ virtual uint64_t backlog(
+ uint64_t limit = INTU64_MAX ///< Maximum value of interest
+ ) = 0;
+};
+
+inline
+TransformVCChain::TransformVCChain(ProxyMutex* m)
+ : VConnection(m)
+{
+}
+
///////////////////////////////////////////////////////////////////
/// RangeTransform implementation
/// handling Range requests from clients
View
7 proxy/TransformInternal.h
@@ -59,7 +59,7 @@ class TransformTerminus:public VConnection
};
-class TransformVConnection:public VConnection
+class TransformVConnection:public TransformVCChain
{
public:
TransformVConnection(Continuation * cont, APIHook * hooks);
@@ -74,6 +74,11 @@ class TransformVConnection:public VConnection
void reenable(VIO * vio);
+ /** Compute the backlog.
+ @return The actual backlog, or a value at least @a limit.
+ */
+ virtual uint64_t backlog(uint64_t limit = INTU64_MAX);
+
public:
VConnection * m_transform;
Continuation *m_cont;
View
13 proxy/api/ts/ts.h.in
@@ -210,8 +210,10 @@ extern "C"
continuation for a particular hook are:
TSHttpHookAdd: adds a global hook. You can globally add
- any hook except for TS_HTTP_REQUEST_TRANSFORM_HOOK and
- TS_HTTP_RESPONSE_TRANSFORM_HOOK.
+ any hook except for
+ - TS_HTTP_REQUEST_TRANSFORM_HOOK
+ - TS_HTTP_RESPONSE_TRANSFORM_HOOK
+ - TS_HTTP_RESPONSE_CLIENT_HOOK
The following hooks can ONLY be added globally:
- TS_HTTP_SELECT_ALT_HOOK
@@ -228,6 +230,7 @@ extern "C"
- TS_HTTP_SEND_RESPONSE_HDR_HOOK
- TS_HTTP_REQUEST_TRANSFORM_HOOK
- TS_HTTP_RESPONSE_TRANSFORM_HOOK
+ - TS_HTTP_RESPONSE_CLIENT_HOOK
- TS_HTTP_TXN_START_HOOK
- TS_HTTP_TXN_CLOSE_HOOK
@@ -268,6 +271,7 @@ extern "C"
TS_HTTP_CACHE_LOOKUP_COMPLETE_HOOK,
TS_HTTP_PRE_REMAP_HOOK,
TS_HTTP_POST_REMAP_HOOK,
+ TS_HTTP_RESPONSE_CLIENT_HOOK,
TS_HTTP_LAST_HOOK
} TSHttpHookID;
#define TS_HTTP_READ_REQUEST_PRE_REMAP_HOOK TS_HTTP_PRE_REMAP_HOOK /* backwards compat */
@@ -612,7 +616,10 @@ extern "C"
TS_CONFIG_NET_SOCK_PACKET_TOS_OUT,
TS_CONFIG_HTTP_INSERT_AGE_IN_RESPONSE,
TS_CONFIG_HTTP_CHUNKING_SIZE,
- TS_CONFIG_LAST_ENTRY
+ TS_CONFIG_HTTP_FLOW_CONTROL_ENABLED,
+ TS_CONFIG_HTTP_FLOW_CONTROL_LOW_WATER_MARK,
+ TS_CONFIG_HTTP_FLOW_CONTROL_HIGH_WATER_MARK,
+ TS_CONFIG_LAST_ENTRY,
} TSOverridableConfigKey;
/* The TASK pool of threads is the primary method of off-loading continuations from the
View
19 proxy/http/HttpConfig.cc
@@ -1181,6 +1181,9 @@ HttpConfig::startup()
HttpEstablishStaticConfigByte(c.oride.keep_alive_enabled_out, "proxy.config.http.keep_alive_enabled_out");
HttpEstablishStaticConfigByte(c.oride.chunking_enabled, "proxy.config.http.chunking_enabled");
HttpEstablishStaticConfigLongLong(c.oride.http_chunking_size, "proxy.config.http.chunking.size");
+ HttpEstablishStaticConfigByte(c.oride.flow_control_enabled, "proxy.config.http.flow_control.enabled");
+ HttpEstablishStaticConfigLongLong(c.oride.flow_high_water_mark, "proxy.config.http.flow_control.high_water");
+ HttpEstablishStaticConfigLongLong(c.oride.flow_low_water_mark, "proxy.config.http.flow_control.low_water");
HttpEstablishStaticConfigByte(c.session_auth_cache_keep_alive_enabled,
"proxy.config.http.session_auth_cache_keep_alive_enabled");
HttpEstablishStaticConfigLongLong(c.origin_server_pipeline, "proxy.config.http.origin_server_pipeline");
@@ -1456,6 +1459,22 @@ HttpConfig::reconfigure()
params->oride.keep_alive_enabled_out = INT_TO_BOOL(m_master.oride.keep_alive_enabled_out);
params->oride.chunking_enabled = INT_TO_BOOL(m_master.oride.chunking_enabled);
params->oride.http_chunking_size = m_master.oride.http_chunking_size;
+
+ params->oride.flow_control_enabled = INT_TO_BOOL(m_master.oride.flow_control_enabled);
+ params->oride.flow_high_water_mark = m_master.oride.flow_high_water_mark;
+ params->oride.flow_low_water_mark = m_master.oride.flow_low_water_mark;
+ // If not set (zero) then make values the same.
+ if (params->oride.flow_low_water_mark <= 0)
+ params->oride.flow_low_water_mark = params->oride.flow_high_water_mark;
+ if (params->oride.flow_high_water_mark <= 0)
+ params->oride.flow_high_water_mark = params->oride.flow_low_water_mark;
+ if (params->oride.flow_high_water_mark < params->oride.flow_low_water_mark) {
+ Warning("Flow control low water mark is greater than high water mark, flow control disabled");
+ params->oride.flow_control_enabled = 0;
+ // zero means "hardwired default" when actually used.
+ params->oride.flow_high_water_mark = params->oride.flow_low_water_mark = 0;
+ }
+
params->session_auth_cache_keep_alive_enabled = INT_TO_BOOL(m_master.session_auth_cache_keep_alive_enabled);
params->origin_server_pipeline = m_master.origin_server_pipeline;
params->user_agent_pipeline = m_master.user_agent_pipeline;
View
5 proxy/http/HttpConfig.h
@@ -431,7 +431,7 @@ struct OverridableHttpConfigParams {
freshness_fuzz_time(0), freshness_fuzz_min_time(0),
max_cache_open_read_retries(0), cache_open_read_retry_time(0),
background_fill_active_timeout(0),
- http_chunking_size(0),
+ http_chunking_size(0), flow_high_water_mark(0), flow_low_water_mark(0),
// Strings / floats must come last
proxy_response_server_string(NULL), proxy_response_server_string_len(0),
@@ -506,6 +506,7 @@ struct OverridableHttpConfigParams {
// DOC IN CACHE NO DNS//
//////////////////////
MgmtByte doc_in_cache_skip_dns;
+ MgmtByte flow_control_enabled;
MgmtInt negative_caching_lifetime;
@@ -567,6 +568,8 @@ struct OverridableHttpConfigParams {
MgmtInt background_fill_active_timeout;
MgmtInt http_chunking_size; // Maximum chunk size for chunked output.
+ MgmtInt flow_high_water_mark; ///< Flow control high water mark.
+ MgmtInt flow_low_water_mark; ///< Flow control low water mark.
// IMPORTANT: Here comes all strings / floats configs.
View
2  proxy/http/HttpDebugNames.cc
@@ -486,6 +486,8 @@ HttpDebugNames::get_api_hook_name(TSHttpHookID t)
return "TS_HTTP_PRE_REMAP_HOOK";
case TS_HTTP_POST_REMAP_HOOK:
return "TS_HTTP_POST_REMAP_HOOK";
+ case TS_HTTP_RESPONSE_CLIENT_HOOK:
+ return "TS_HTTP_RESPONSE_CLIENT_HOOK";
case TS_HTTP_LAST_HOOK:
return "TS_HTTP_LAST_HOOK";
}
View
102 proxy/http/HttpSM.cc
@@ -319,7 +319,8 @@ HttpSM::HttpSM()
ua_raw_buffer_reader(NULL),
server_entry(NULL), server_session(NULL), shared_session_retries(0),
server_buffer_reader(NULL),
- transform_info(), post_transform_info(), second_cache_sm(NULL),
+ transform_info(), post_transform_info(), has_active_plugin_agents(false),
+ second_cache_sm(NULL),
default_handler(NULL), pending_action(NULL), historical_action(NULL),
last_action(HttpTransact::STATE_MACHINE_ACTION_UNDEFINED),
client_request_hdr_bytes(0), client_request_body_bytes(0),
@@ -2966,9 +2967,10 @@ HttpSM::is_bg_fill_necessary(HttpTunnelConsumer * c)
// There must be another consumer for it to worthwhile to
// set up a background fill
- if (c->producer->num_consumers > 1 &&
- (c->producer->vc_type == HT_HTTP_SERVER || c->producer->vc_type == HT_TRANSFORM) &&
+ if (((c->producer->num_consumers > 1 && c->producer->vc_type == HT_HTTP_SERVER) ||
+ (c->producer->num_consumers > 1 && c->producer->vc_type == HT_TRANSFORM)) &&
c->producer->alive == true) {
+
// If threshold is 0.0 or negative then do background
// fill regardless of the content length. Since this
// is floating point just make sure the number is near zero
@@ -2988,7 +2990,7 @@ HttpSM::is_bg_fill_necessary(HttpTunnelConsumer * c)
if (pDone <= 1.0 && pDone > t_state.txn_conf->background_fill_threshold) {
return true;
} else {
- DebugSM("http", "[%" PRId64 "] no background. Only %%%f done", sm_id, pDone);
+ DebugSM("http", "[%" PRId64 "] no background. Only %%%f of %%%f done [%" PRId64 " / %" PRId64" ]", sm_id, pDone, t_state.txn_conf->background_fill_threshold, ua_body_done, ua_cl);
}
}
@@ -3027,8 +3029,8 @@ HttpSM::tunnel_handler_ua(int event, HttpTunnelConsumer * c)
// There is another consumer (cache write) so
// detach the user agent
- ink_assert(server_entry->vc == c->producer->vc);
- ink_assert(server_session == c->producer->vc);
+ ink_assert(server_entry->vc == server_session);
+ ink_assert(c->is_downstream_from(server_session));
server_session->get_netvc()->
set_active_timeout(HRTIME_SECONDS(t_state.txn_conf->background_fill_active_timeout));
} else {
@@ -3690,6 +3692,35 @@ HttpSM::tunnel_handler_transform_read(int event, HttpTunnelProducer * p)
}
int
+HttpSM::tunnel_handler_plugin_agent(int event, HttpTunnelConsumer * c)
+{
+ STATE_ENTER(&HttpSM::tunnel_handler_plugin_client, event);
+
+ switch (event) {
+ case VC_EVENT_ERROR:
+ c->vc->do_io_close(EHTTP_ERROR); // close up
+ // Signal producer if we're the last consumer.
+ if (c->producer->alive && c->producer->num_consumers == 1) {
+ tunnel.producer_handler(HTTP_TUNNEL_EVENT_CONSUMER_DETACH, c->producer);
+ }
+ break;
+ case VC_EVENT_EOS:
+ if (c->producer->alive && c->producer->num_consumers == 1) {
+ tunnel.producer_handler(HTTP_TUNNEL_EVENT_CONSUMER_DETACH, c->producer);
+ }
+ // FALLTHROUGH
+ case VC_EVENT_WRITE_COMPLETE:
+ c->write_success = true;
+ c->vc->do_io(VIO::CLOSE);
+ break;
+ default:
+ ink_release_assert(0);
+ }
+
+ return 0;
+}
+
+int
HttpSM::state_srv_lookup(int event, void *data)
{
STATE_ENTER(&HttpSM::state_srv_lookup, event);
@@ -4285,7 +4316,7 @@ HttpSM::do_cache_prepare_write()
inline void
HttpSM::do_cache_prepare_write_transform()
{
- if (cache_sm.cache_write_vc != NULL || tunnel.is_there_cache_write())
+ if (cache_sm.cache_write_vc != NULL || tunnel.has_cache_writer())
do_cache_prepare_action(&transform_cache_sm, NULL, false, true);
else
do_cache_prepare_action(&transform_cache_sm, NULL, false);
@@ -4370,7 +4401,6 @@ HttpSM::do_cache_prepare_action(HttpCacheSM * c_sm, CacheHTTPInfo * object_read_
}
}
-
//////////////////////////////////////////////////////////////////////////
//
// HttpSM::do_http_server_open()
@@ -4566,7 +4596,6 @@ HttpSM::do_http_server_open(bool raw)
} else if (ua_session->f_outbound_transparent) {
opt.addr_binding = NetVCOptions::FOREIGN_ADDR;
opt.local_ip = t_state.client_info.addr;
-
/* If the connection is server side transparent, we can bind to the
port that the client chose instead of randomly assigning one at
the proxy. This is controlled by the 'use_client_source_port'
@@ -5116,8 +5145,7 @@ HttpSM::setup_transform_to_server_transfer()
&HttpSM::tunnel_handler_transform_read,
HT_TRANSFORM,
"post transform");
- p->self_consumer = c;
- c->self_producer = p;
+ tunnel.chain(c,p);
post_transform_info.entry->in_tunnel = true;
tunnel.add_consumer(server_entry->vc,
@@ -6000,14 +6028,15 @@ HttpSM::setup_transfer_from_transform()
&HttpSM::tunnel_handler_transform_read,
HT_TRANSFORM,
"transform read");
- p->self_consumer = c;
- c->self_producer = p;
+ tunnel.chain(c, p);
tunnel.add_consumer(ua_entry->vc, transform_info.vc, &HttpSM::tunnel_handler_ua, HT_HTTP_CLIENT, "user agent");
transform_info.entry->in_tunnel = true;
ua_entry->in_tunnel = true;
+ this->setup_plugin_agents(p);
+
if ( t_state.client_info.receive_chunked_response ) {
tunnel.set_producer_chunking_action(p, client_response_hdr_bytes, TCA_CHUNK_CONTENT);
tunnel.set_producer_chunking_size(p, t_state.txn_conf->http_chunking_size);
@@ -6037,8 +6066,7 @@ HttpSM::setup_transfer_from_transform_to_cache_only()
&HttpSM::tunnel_handler_transform_read,
HT_TRANSFORM,
"transform read");
- p->self_consumer = c;
- c->self_producer = p;
+ tunnel.chain(c, p);
transform_info.entry->in_tunnel = true;
@@ -6108,7 +6136,10 @@ HttpSM::setup_server_transfer()
action = TCA_PASSTHRU_DECHUNKED_CONTENT;
} else {
if (t_state.current.server->transfer_encoding != HttpTransact::CHUNKED_ENCODING)
- action = TCA_CHUNK_CONTENT;
+ if (t_state.client_info.http_version == HTTPVersion(0, 9))
+ action = TCA_PASSTHRU_DECHUNKED_CONTENT; // send as-is
+ else
+ action = TCA_CHUNK_CONTENT;
else
action = TCA_PASSTHRU_CHUNKED_CONTENT;
}
@@ -6141,6 +6172,8 @@ HttpSM::setup_server_transfer()
ua_entry->in_tunnel = true;
server_entry->in_tunnel = true;
+ this->setup_plugin_agents(p);
+
// If the incoming server response is chunked and the client does not
// expect a chunked response, then dechunk it. Otherwise, if the
// incoming response is not chunked and the client expects a chunked
@@ -6257,10 +6290,8 @@ HttpSM::setup_blind_tunnel(bool send_response_hdr)
&HttpSM::tunnel_handler_ssl_consumer, HT_HTTP_SERVER, "http server - tunnel");
// Make the tunnel aware that the entries are bi-directional
- p_os->self_consumer = c_os;
- p_ua->self_consumer = c_ua;
- c_ua->self_producer = p_ua;
- c_os->self_producer = p_os;
+ tunnel.chain(c_os, p_os);
+ tunnel.chain(c_ua, p_ua);
ua_entry->in_tunnel = true;
server_entry->in_tunnel = true;
@@ -6268,6 +6299,20 @@ HttpSM::setup_blind_tunnel(bool send_response_hdr)
tunnel.tunnel_run();
}
+void
+HttpSM::setup_plugin_agents(HttpTunnelProducer* p)
+{
+ APIHook* agent = txn_hook_get(TS_HTTP_RESPONSE_CLIENT_HOOK);
+ has_active_plugin_agents = agent != 0;
+ while (agent) {
+ INKVConnInternal* contp = static_cast<INKVConnInternal*>(agent->m_cont);
+ tunnel.add_consumer(contp, p->vc, &HttpSM::tunnel_handler_plugin_agent, HT_HTTP_CLIENT, "plugin agent");
+ // We don't put these in the SM VC table because the tunnel
+ // will clean them up in do_io_close().
+ agent = agent->next();
+ }
+}
+
inline void
HttpSM::transform_cleanup(TSHttpHookID hook, HttpTransformInfo * info)
{
@@ -6281,6 +6326,22 @@ HttpSM::transform_cleanup(TSHttpHookID hook, HttpTransformInfo * info)
}
}
+void
+HttpSM::plugin_agents_cleanup()
+{
+ // If this is set then all of the plugin agent VCs were put in
+ // the VC table and cleaned up there. This handles the case where
+ // something went wrong early.
+ if (!has_active_plugin_agents) {
+ APIHook* agent = txn_hook_get(TS_HTTP_RESPONSE_CLIENT_HOOK);
+ while (agent) {
+ INKVConnInternal* contp = static_cast<INKVConnInternal*>(agent->m_cont);
+ contp->do_io_close();
+ agent = agent->next();
+ }
+ }
+}
+
//////////////////////////////////////////////////////////////////////////
//
// HttpSM::kill_this()
@@ -6326,6 +6387,7 @@ HttpSM::kill_this()
if (hooks_set) {
transform_cleanup(TS_HTTP_RESPONSE_TRANSFORM_HOOK, &transform_info);
transform_cleanup(TS_HTTP_REQUEST_TRANSFORM_HOOK, &post_transform_info);
+ plugin_agents_cleanup();
}
// It's also possible that the plugin_tunnel vc was never
// executed due to not contacting the server
View
6 proxy/http/HttpSM.h
@@ -322,6 +322,9 @@ class HttpSM: public Continuation
HttpTransformInfo transform_info;
HttpTransformInfo post_transform_info;
+ /// Set if plugin client / user agents are active.
+ /// Need primarily for cleanup.
+ bool has_active_plugin_agents;
HttpCacheSM cache_sm;
HttpCacheSM transform_cache_sm;
@@ -397,6 +400,7 @@ class HttpSM: public Continuation
int tunnel_handler_ssl_consumer(int event, HttpTunnelConsumer * p);
int tunnel_handler_transform_write(int event, HttpTunnelConsumer * c);
int tunnel_handler_transform_read(int event, HttpTunnelProducer * p);
+ int tunnel_handler_plugin_agent(int event, HttpTunnelConsumer * c);
void do_hostdb_lookup();
void do_hostdb_reverse_lookup();
@@ -458,6 +462,7 @@ class HttpSM: public Continuation
HttpTunnelProducer *setup_transfer_from_transform();
HttpTunnelProducer *setup_cache_transfer_to_transform();
HttpTunnelProducer *setup_transfer_from_transform_to_cache_only();
+ void setup_plugin_agents(HttpTunnelProducer* p);
HttpTransact::StateMachineAction_t last_action;
int (HttpSM::*m_last_state) (int event, void *data);
@@ -517,6 +522,7 @@ class HttpSM: public Continuation
void update_stats();
void transform_cleanup(TSHttpHookID hook, HttpTransformInfo * info);
bool is_transparent_passthrough_allowed();
+ void plugin_agents_cleanup();
public:
LINK(HttpSM, debug_link);
View
376 proxy/http/HttpTunnel.cc
@@ -37,8 +37,6 @@
#include "HttpDebugNames.h"
#include "ParseRules.h"
-
-static const int max_chunked_ahead_bytes = 1 << 15;
static const int max_chunked_ahead_blocks = 128;
static const int min_block_transfer_bytes = 256;
static char const * const CHUNK_HEADER_FMT = "%" PRIx64"\r\n";
@@ -47,81 +45,18 @@ static char const * const CHUNK_HEADER_FMT = "%" PRIx64"\r\n";
// a block in the input stream.
static int const CHUNK_IOBUFFER_SIZE_INDEX = MIN_IOBUFFER_SIZE;
-static void
-chunked_reenable(HttpTunnelProducer * p, HttpTunnel * tunnel)
-{
-
- // FIX ME: still need to deal with huge chunk sizes. If a chunk
- // is 1GB, we will currently buffer the whole thing
-
- if (p->chunked_handler.state != ChunkedHandler::CHUNK_FLOW_CONTROL) {
- p->read_vio->reenable();
- } else {
- // If we are in are in the flow control, there's data in
- // the incoming buffer that we haven't processed yet
- // Only process it if we determine the client isn't overflowed
- MIOBuffer *dbuf = p->chunked_handler.dechunked_buffer;
-
- if (dbuf->max_read_avail() < max_chunked_ahead_bytes && dbuf->max_block_count() < max_chunked_ahead_blocks) {
- // Flow control no longer needed. We only initiate flow control
- // after completing a chunk so we know the next state is
- // CHUNK_READ_SIZE_START
- Debug("http_chunk_flow", "Suspending flow control");
- p->chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
-
- // Call back the tunnel as if we've received more data from the server
- int r = tunnel->main_handler(p->chunked_handler.last_server_event, p->read_vio);
-
- // Only actually reenable the server if we've stayed out of the
- // flow control state. The callout may have killed the vc
- // and/or the vio so check that the producer is still alive
- // (INKqa05512)
- // Also, make sure the tunnel has not been deallocated on
- // the call to tunnel->main_handler
- if (r == EVENT_CONT && p->alive && p->chunked_handler.state != ChunkedHandler::CHUNK_FLOW_CONTROL) {
- // INKqa05737 - since we explicitly disabled the vc by setting
- // nbytes = ndone when going into flow control, we need
- // set nbytes up again here
- p->read_vio->nbytes = INT64_MAX;
- p->read_vio->reenable();
- }
- } else {
- Debug("http_chunk_flow", "Blocking reenable - flow control in effect");
- }
- }
-}
-
-static void
-add_chunked_reenable(HttpTunnelProducer * p, HttpTunnel * tunnel)
-{
- if (p->chunked_handler.state != ChunkedHandler::CHUNK_FLOW_CONTROL) {
- p->read_vio->reenable();
- } else {
- // If we are in are in the flow control, there's data in
- // the incoming buffer that we haven't processed yet
- // Only process it if we determine the client isn't overflowed
- MIOBuffer *cbuf = p->chunked_handler.chunked_buffer;
- if (cbuf->max_read_avail() < max_chunked_ahead_bytes && cbuf->max_block_count() < max_chunked_ahead_blocks) {
- // Flow control no longer needed.
- Debug("http_chunk_flow", "Suspending flow control on enchunking");
- p->chunked_handler.state = ChunkedHandler::CHUNK_WRITE_CHUNK;
-
- // Call back the tunnel as if we've received more data from
- // the server
- int r = tunnel->main_handler(p->chunked_handler.last_server_event, p->read_vio);
-
- // Only actually reenable the server if we've stayed out of the
- // flow control state. The callout may have killed the vc
- // and/or the vio so check that the producer is still alive
- // Also, make sure the tunnel has not been deallocated on
- // the call to tunnel->main_handler
- if (r == EVENT_CONT && p->alive && p->chunked_handler.state != ChunkedHandler::CHUNK_FLOW_CONTROL) {
- p->read_vio->reenable();
- }
- } else {
- Debug("http_chunk_flow", "Blocking reenable on enchunking - flow control in effect");
- }
+char
+VcTypeCode(HttpTunnelType_t t) {
+ char zret = ' ';
+ switch (t) {
+ case HT_HTTP_CLIENT: zret = 'U'; break;
+ case HT_HTTP_SERVER: zret = 'S'; break;
+ case HT_TRANSFORM: zret = 'T'; break;
+ case HT_CACHE_READ: zret = 'R'; break;
+ case HT_CACHE_WRITE: zret = 'W'; break;
+ default: break;
}
+ return zret;
}
ChunkedHandler::ChunkedHandler()
@@ -281,15 +216,7 @@ ChunkedHandler::read_chunk()
if (bytes_left == 0) {
Debug("http_chunk", "completed read of chunk of %" PRId64" bytes", cur_chunk_size);
- // Check to see if we need to flow control the output
- if (dechunked_buffer &&
- (dechunked_buffer->max_read_avail() > max_chunked_ahead_bytes ||
- dechunked_buffer->max_block_count() > max_chunked_ahead_blocks)) {
- state = CHUNK_FLOW_CONTROL;
- Debug("http_chunk_flow", "initiating flow control pause");
- } else {
- state = CHUNK_READ_SIZE_START;
- }
+ state = CHUNK_READ_SIZE_START;
} else if (bytes_left > 0) {
Debug("http_chunk", "read %" PRId64" bytes of an %" PRId64" chunk", b, cur_chunk_size);
}
@@ -301,7 +228,7 @@ ChunkedHandler::read_trailer()
int64_t bytes_used;
bool done = false;
- while (chunked_reader->read_avail() > 0 && !done) {
+ while (chunked_reader->is_read_avail_more_than(0) && !done) {
const char *tmp = chunked_reader->start();
int64_t data_size = chunked_reader->block_read_avail();
@@ -341,7 +268,7 @@ ChunkedHandler::read_trailer()
bool ChunkedHandler::process_chunked_content()
{
- while (chunked_reader->read_avail() > 0 && state != CHUNK_READ_DONE && state != CHUNK_READ_ERROR) {
+ while (chunked_reader->is_read_avail_more_than(0) && state != CHUNK_READ_DONE && state != CHUNK_READ_ERROR) {
switch (state) {
case CHUNK_READ_SIZE:
case CHUNK_READ_SIZE_CRLF:
@@ -385,48 +312,36 @@ bool ChunkedHandler::generate_chunked_content()
while ((r_avail = dechunked_reader->read_avail()) > 0 && state != CHUNK_WRITE_DONE) {
int64_t write_val = MIN(max_chunk_size, r_avail);
- // If the server is still alive, check to see if too much data is
- // pilling up on the client's buffer. If the server is done, ignore
- // the flow control rules so that we don't have to bother with stopping
- // the io an coming a back and dealing with the server's data later
- if (server_done == false &&
- (chunked_buffer->max_read_avail() > max_chunked_ahead_bytes ||
- chunked_buffer->max_block_count() > max_chunked_ahead_blocks)) {
- state = CHUNK_FLOW_CONTROL;
- Debug("http_chunk_flow", "initiating flow control pause on enchunking");
- return false;
- } else {
- state = CHUNK_WRITE_CHUNK;
- Debug("http_chunk", "creating a chunk of size %" PRId64" bytes", write_val);
-
- // Output the chunk size.
- if (write_val != max_chunk_size) {
- int len = snprintf(tmp, sizeof(tmp), CHUNK_HEADER_FMT, write_val);
- chunked_buffer->write(tmp, len);
- chunked_size += len;
- } else {
- chunked_buffer->write(max_chunk_header, max_chunk_header_len);
- chunked_size += max_chunk_header_len;
- }
+ state = CHUNK_WRITE_CHUNK;
+ Debug("http_chunk", "creating a chunk of size %" PRId64 " bytes", write_val);
- // Output the chunk itself.
- //
- // BZ# 54395 Note - we really should only do a
- // block transfer if there is sizable amount of
- // data (like we do for the case where we are
- // removing chunked encoding in ChunkedHandler::transfer_bytes()
- // However, I want to do this fix with as small a risk
- // as possible so I'm leaving this issue alone for
- // now
- //
- chunked_buffer->write(dechunked_reader, write_val);
- chunked_size += write_val;
- dechunked_reader->consume(write_val);
-
- // Output the trailing CRLF.
- chunked_buffer->write("\r\n", 2);
- chunked_size += 2;
+ // Output the chunk size.
+ if (write_val != max_chunk_size) {
+ int len = snprintf(tmp, sizeof(tmp), CHUNK_HEADER_FMT, write_val);
+ chunked_buffer->write(tmp, len);
+ chunked_size += len;
+ } else {
+ chunked_buffer->write(max_chunk_header, max_chunk_header_len);
+ chunked_size += max_chunk_header_len;
}
+
+ // Output the chunk itself.
+ //
+ // BZ# 54395 Note - we really should only do a
+ // block transfer if there is sizable amount of
+ // data (like we do for the case where we are
+ // removing chunked encoding in ChunkedHandler::transfer_bytes()
+ // However, I want to do this fix with as small a risk
+ // as possible so I'm leaving this issue alone for
+ // now
+ //
+ chunked_buffer->write(dechunked_reader, write_val);
+ chunked_size += write_val;
+ dechunked_reader->consume(write_val);
+
+ // Output the trailing CRLF.
+ chunked_buffer->write("\r\n", 2);
+ chunked_size += 2;
}
if (server_done) {
@@ -445,11 +360,67 @@ HttpTunnelProducer::HttpTunnelProducer()
vc(NULL), vc_handler(NULL), read_vio(NULL), read_buffer(NULL),
buffer_start(NULL), vc_type(HT_HTTP_SERVER), chunking_action(TCA_PASSTHRU_DECHUNKED_CONTENT),
do_chunking(false), do_dechunking(false), do_chunked_passthru(false),
- init_bytes_done(0), nbytes(0), ntodo(0), bytes_read(0), handler_state(0), num_consumers(0), alive(false),
- read_success(false), name(NULL)
+ init_bytes_done(0), nbytes(0), ntodo(0), bytes_read(0),
+ handler_state(0), num_consumers(0), alive(false),
+ read_success(false), flow_control_source(0), name(NULL)
{
}
+uint64_t
+HttpTunnelProducer::backlog(uint64_t limit) {
+ uint64_t zret = 0;
+ // Calculate the total backlog, the # of bytes inside ATS for this producer.
+ // We go all the way through each chain to the ending sink and take the maximum
+ // over those paths. Do need to be careful about loops which can occur.
+ for ( HttpTunnelConsumer* c = consumer_list.head ; c ; c = c->link.next ) {
+ if (c->alive && c->write_vio) {
+ uint64_t n = 0;
+ if (HT_TRANSFORM == c->vc_type) {
+ n += static_cast<TransformVCChain*>(c->vc)->backlog(limit);
+ } else {
+ IOBufferReader* r = c->write_vio->get_reader();
+ if (r) {
+ n += static_cast<uint64_t>(r->read_avail());
+ }
+ }
+ if (n >= limit) return n;
+
+ if (!c->is_sink()) {
+ HttpTunnelProducer* dsp = c->self_producer;
+ if (dsp) {
+ n += dsp->backlog();
+ }
+ }
+ if (n >= limit) return n;
+ if (n > zret) zret = n;
+ }
+ }
+
+ if (chunked_handler.chunked_reader) {
+ zret += static_cast<uint64_t>(chunked_handler.chunked_reader->read_avail());
+ }
+
+ return zret;
+}
+
+/* We set the producers in a flow chain specifically rather than
+ using a tunnel level variable in order to handle bi-directional
+ tunnels correctly. In such a case the flow control on producers is
+ not related so a single value for the tunnel won't work.
+*/
+void
+HttpTunnelProducer::set_throttle_src(HttpTunnelProducer* srcp) {
+ HttpTunnelProducer* p = this;
+ p->flow_control_source = srcp;
+ for ( HttpTunnelConsumer* c = consumer_list.head ; c ; c = c->link.next ) {
+ if (!c->is_sink()) {
+ p = c->self_producer;
+ if (p)
+ p->set_throttle_src(srcp);
+ }
+ }
+}
+
HttpTunnelConsumer::HttpTunnelConsumer()
: link(), producer(NULL), self_producer(NULL), vc_type(HT_HTTP_CLIENT), vc(NULL), buffer_reader(NULL),
vc_handler(NULL), write_vio(NULL), skip_bytes(0), bytes_written(0), handler_state(0), alive(false),
@@ -463,14 +434,31 @@ HttpTunnel::HttpTunnel()
}
void
+HttpTunnel::init(HttpSM * sm_arg, ProxyMutex * amutex)
+{
+ HttpConfigParams* params = sm_arg->t_state.http_config_param;
+ sm = sm_arg;
+ active = false;
+ mutex = amutex;
+ SET_HANDLER(&HttpTunnel::main_handler);
+ flow_state.enabled_p = params->oride.flow_control_enabled;
+ if (params->oride.flow_low_water_mark > 0)
+ flow_state.low_water = params->oride.flow_low_water_mark;
+ if (params->oride.flow_high_water_mark > 0)
+ flow_state.high_water = params->oride.flow_high_water_mark;
+ // This should always be true, we handled default cases back in HttpConfig::reconfigure()
+ ink_assert(flow_state.low_water <= flow_state.high_water);
+}
+
+void
HttpTunnel::reset()
{
ink_assert(active == false);
#ifdef DEBUG
- for (int i = 0; i < MAX_PRODUCERS; i++) {
+ for (int i = 0; i < MAX_PRODUCERS; ++i) {
ink_assert(producers[i].alive == false);
}
- for (int j = 0; j < MAX_CONSUMERS; j++) {
+ for (int j = 0; j < MAX_CONSUMERS; ++j) {
ink_assert(consumers[j].alive == false);
}
#endif
@@ -484,7 +472,7 @@ HttpTunnel::reset()
void
HttpTunnel::kill_tunnel()
{
- for (int i = 0; i < MAX_PRODUCERS; i++) {
+ for (int i = 0; i < MAX_PRODUCERS; ++i) {
if (producers[i].vc != NULL) {
chain_abort_all(&producers[i]);
}
@@ -499,7 +487,7 @@ HttpTunnel::kill_tunnel()
HttpTunnelProducer *
HttpTunnel::alloc_producer()
{
- for (int i = 0; i < MAX_PRODUCERS; i++) {
+ for (int i = 0; i < MAX_PRODUCERS; ++i) {
if (producers[i].vc == NULL) {
num_producers++;
ink_assert(num_producers <= MAX_PRODUCERS);
@@ -529,7 +517,7 @@ HttpTunnel::deallocate_buffers()
{
int num = 0;
ink_release_assert(active == false);
- for (int i = 0; i < MAX_PRODUCERS; i++) {
+ for (int i = 0; i < MAX_PRODUCERS; ++i) {
if (producers[i].read_buffer != NULL) {
ink_assert(producers[i].vc != NULL);
free_MIOBuffer(producers[i].read_buffer);
@@ -678,6 +666,16 @@ HttpTunnel::add_consumer(VConnection * vc,
return c;
}
+void
+HttpTunnel::chain(HttpTunnelConsumer* c, HttpTunnelProducer* p)
+{
+ p->self_consumer = c;
+ c->self_producer = p;
+ // If the flow is already throttled update the chained producer.
+ if (c->producer->is_throttled())
+ p->set_throttle_src(c->producer->flow_control_source);
+}
+
// void HttpTunnel::tunnel_run()
//
// Makes the tunnel go
@@ -694,7 +692,7 @@ HttpTunnel::tunnel_run(HttpTunnelProducer * p_arg)
ink_assert(active == false);
- for (int i = 0; i < MAX_PRODUCERS; i++) {
+ for (int i = 0 ; i < MAX_PRODUCERS ; ++i) {
p = producers + i;
if (p->vc != NULL) {
producer_run(p);
@@ -952,7 +950,6 @@ HttpTunnel::producer_run(HttpTunnelProducer * p)
p->buffer_start = NULL;
}
-
int
HttpTunnel::producer_handler_dechunked(int event, HttpTunnelProducer * p)
{
@@ -966,7 +963,8 @@ HttpTunnel::producer_handler_dechunked(int event, HttpTunnelProducer * p)
case VC_EVENT_READ_COMPLETE:
case HTTP_TUNNEL_EVENT_PRECOMPLETE:
case VC_EVENT_EOS:
- p->chunked_handler.last_server_event = event;
+ p->last_event =
+ p->chunked_handler.last_server_event = event;
// TODO: Should we check the return code?
p->chunked_handler.generate_chunked_content();
break;
@@ -1001,7 +999,8 @@ HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer * p)
return event;
}
- p->chunked_handler.last_server_event = event;
+ p->last_event =
+ p->chunked_handler.last_server_event = event;
bool done = p->chunked_handler.process_chunked_content();
// If we couldn't understand the encoding, return
@@ -1014,18 +1013,6 @@ HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer * p)
// sense but no reenables follow
return VC_EVENT_EOS;
}
- // If we are in a flow control state, there is still data in
- // buffer so return READ_READY
- if (p->read_vio && p->chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
- // INKqa05737 - We need force the server vc to
- // disabled since the server may have sent the
- // last chunk. When we go to process that last chunk,
- // we will move the server to a keep alive state. Since
- // we are prohibited from changing the buffer, we need
- // make sure the iocore doesn't schedule a read
- p->read_vio->nbytes = p->read_vio->ndone;
- return VC_EVENT_READ_READY;
- }
switch (event) {
case VC_EVENT_READ_READY:
@@ -1077,6 +1064,8 @@ bool HttpTunnel::producer_handler(int event, HttpTunnelProducer * p)
}
} else if (p->do_dechunking || p->do_chunked_passthru) {
event = producer_handler_chunked(event, p);
+ } else {
+ p->last_event = event;
}
//YTS Team, yamsat Plugin
@@ -1185,6 +1174,53 @@ bool HttpTunnel::producer_handler(int event, HttpTunnelProducer * p)
return sm_callback;
}
+bool
+HttpTunnel::consumer_reenable(HttpTunnelConsumer* c)
+{
+ HttpTunnelProducer* p = c->producer;
+ HttpTunnelProducer* srcp = p->flow_control_source;
+ if (p->alive
+#ifndef LAZY_BUF_ALLOC
+ && p->read_buffer->write_avail() > 0
+#endif
+ ) {
+ // Only do flow control if enabled and the producer is an external
+ // source. Otherwise disable by making the backlog zero. Because
+ // the backlog short cuts quit when the value is equal (or
+ // greater) to the target, we use strict comparison only for
+ // checking low water, otherwise the flow control can stall out.
+ uint64_t backlog = (flow_state.enabled_p && p->is_source())
+ ? p->backlog(flow_state.high_water)
+ : 0;
+
+ if (backlog >= flow_state.high_water) {
+ if (is_debug_tag_set("http_tunnel"))
+ Debug("http_tunnel", "Throttle %p %" PRId64 " / %" PRId64, p, backlog, p->backlog());
+ p->throttle(); // p becomes srcp for future calls to this method
+ } else {
+ if (srcp && c->is_sink()) {
+ // Check if backlog is below low water - note we need to check
+ // against the source producer, not necessarily the producer
+ // for this consumer. We don't have to recompute the backlog
+ // if they are the same because we know low water <= high
+ // water so the value is sufficiently accurate.
+ if (srcp != p)
+ backlog = srcp->backlog(flow_state.low_water);
+ if (backlog < flow_state.low_water) {
+ if (is_debug_tag_set("http_tunnel"))
+ Debug("http_tunnel", "Unthrottle %p %" PRId64 " / %" PRId64, p, backlog, p->backlog());
+ srcp->unthrottle();
+ srcp->read_vio->reenable();
+ // Kick source producer to get flow ... well, flowing.
+ this->producer_handler(VC_EVENT_READ_READY, srcp);
+ }
+ }
+ p->read_vio->reenable();
+ }
+ }
+ return p->is_throttled();
+}
+
//
// bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer* p)
//
@@ -1200,6 +1236,7 @@ bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer * c)
{
bool sm_callback = false;
HttpConsumerHandler jump_point;
+ HttpTunnelProducer* p = c->producer;
Debug("http_tunnel", "[%" PRId64 "] consumer_handler [%s %s]", sm->sm_id, c->name, HttpDebugNames::get_event_name(event));
@@ -1207,31 +1244,7 @@ bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer * c)
switch (event) {
case VC_EVENT_WRITE_READY:
- // Data consumed, reenable producer
- if (c->producer->alive) {
- if (c->producer->do_dechunking) {
- // Because dechunking decouples the inbound and outbound
- // buffers, we have to run special code handle the
- // reenable
- chunked_reenable(c->producer, this);
- } else if (c->producer->do_chunking) {
- add_chunked_reenable(c->producer, this);
- } else {
- /*
- * Dont check for space availability. The
- * net code adds more space if required.
- */
-
-#ifndef LAZY_BUF_ALLOC
- if (c->producer->read_buffer->write_avail() > 0) {
- c->producer->read_vio->reenable();
- }
-#else
- c->producer->read_vio->reenable();
-#endif
-
- }
- }
+ this->consumer_reenable(c);
break;
case VC_EVENT_WRITE_COMPLETE:
@@ -1262,13 +1275,20 @@ bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer * c)
// the SM since the reenabling has the side effect
// updating the buffer state for the VConnection
// that is being reenabled
- if (c->producer->alive && c->producer->read_vio
+ if (p->alive && p->read_vio
#ifndef LAZY_BUF_ALLOC
- && c->producer->read_buffer->write_avail() > 0
+ && p->read_buffer->write_avail() > 0
#endif
) {
- c->producer->read_vio->reenable();
+ if (p->is_throttled())
+ this->consumer_reenable(c);
+ else
+ p->read_vio->reenable();
}
+ // [amc] I don't think this happens but we'll leave a debug trap
+ // here just in case.
+ if (p->is_throttled())
+ Debug("http_tunnel", "Special event %s on %p with flow control on", HttpDebugNames::get_event_name(event), p);
break;
case VC_EVENT_READ_READY:
View
150 proxy/http/HttpTunnel.h
@@ -175,6 +175,16 @@ struct HttpTunnelConsumer
bool alive;
bool write_success;
const char *name;
+
+ /** Check if this consumer is downstream from @a vc.
+ @return @c true if any producer in the tunnel eventually feeds
+ data to this consumer.
+ */
+ bool is_downstream_from(VConnection* vc);
+ /** Check if this is a sink (final data destination).
+ @return @c true if data exits the ATS process at this consumer.
+ */
+ bool is_sink() const;
};
struct HttpTunnelProducer
@@ -202,12 +212,44 @@ struct HttpTunnelProducer
int64_t ntodo; // what this vc needs to do
int64_t bytes_read; // total bytes read from the vc
int handler_state; // state used the handlers
+ int last_event; ///< Tracking for flow control restarts.
int num_consumers;
bool alive;
bool read_success;
+ /// Flag and pointer for active flow control throttling.
+ /// If this is set, it points at the source producer that is under flow control.
+ /// If @c NULL then data flow is not being throttled.
+ HttpTunnelProducer* flow_control_source;
const char *name;
+
+ /** Get the largest number of bytes any consumer has not consumed.
+ Use @a limit if you only need to check if the backlog is at least @a limit.
+ @return The actual backlog or a number at least @a limit.
+ */
+ uint64_t backlog(
+ uint64_t limit = INTU64_MAX ///< More than this is irrelevant
+ );
+ /// Check if producer is original (to ATS) source of data.
+ /// @return @c true if this producer is the source of bytes from outside ATS.
+ bool is_source() const;
+ /// Throttle the flow.
+ void throttle();
+ /// Unthrottle the flow.
+ void unthrottle();
+ /// Check throttled state.
+ bool is_throttled() const;
+
+ /** Set the flow control source producer for the flow.
+ This sets the value for this producer and all downstream producers.
+ @note This is the implementation for @c throttle and @c unthrottle.
+ @see throttle
+ @see unthrottle
+ */
+ void set_throttle_src(
+ HttpTunnelProducer* srcp ///< Source producer of flow.
+ );
};
class PostDataBuffers
@@ -229,6 +271,26 @@ class HttpTunnel:public Continuation
{
friend class HttpPagesHandler;
friend class CoreUtils;
+
+ /** Data for implementing flow control across a tunnel.
+
+ The goal is to bound the amount of data buffered for a
+ transaction flowing through the tunnel to (roughly) between the
+ @a high_water and @a low_water water marks. Due to the chunky nater of data
+ flow this always approximate.
+ */
+ struct FlowControl {
+ // Default value for high and low water marks.
+ static uint64_t const DEFAULT_WATER_MARK = 1<<16;
+
+ uint64_t high_water; ///< Buffered data limit - throttle if more than this.
+ uint64_t low_water; ///< Unthrottle if less than this buffered.
+ bool enabled_p; ///< Flow control state (@c false means disabled).
+
+ /// Default constructor.
+ FlowControl();
+ };
+
public:
HttpTunnel();
@@ -237,7 +299,7 @@ class HttpTunnel:public Continuation
void kill_tunnel();
bool is_tunnel_active() { return active; }
bool is_tunnel_alive();
- bool is_there_cache_write();
+ bool has_cache_writer();
// YTS Team, yamsat Plugin
void copy_partial_post_data();
@@ -266,6 +328,7 @@ class HttpTunnel:public Continuation
void tunnel_run(HttpTunnelProducer * p = NULL);
int main_handler(int event, void *data);
+ bool consumer_reenable(HttpTunnelConsumer* c);
bool consumer_handler(int event, HttpTunnelConsumer * c);
bool producer_handler(int event, HttpTunnelProducer * p);
int producer_handler_dechunked(int event, HttpTunnelProducer * p);
@@ -277,6 +340,18 @@ class HttpTunnel:public Continuation
void abort_cache_write_finish_others(HttpTunnelProducer * p);
void append_message_to_producer_buffer(HttpTunnelProducer * p, const char *msg, int64_t msg_len);
+ /** Mark a producer and consumer as the same underlying object.
+
+ This is use to chain producer/consumer pairs together to
+ indicate the data flows through them sequentially. The primary
+ example is a transform which serves as a consumer on the server
+ side and a producer on the cache/client side.
+ */
+ void chain(
+ HttpTunnelConsumer* c, ///< Flow goes in here
+ HttpTunnelProducer* p ///< Flow comes back out here
+ );
+
void close_vc(HttpTunnelProducer * p);
void close_vc(HttpTunnelConsumer * c);
@@ -301,6 +376,9 @@ class HttpTunnel:public Continuation
bool active;
+ /// State data about flow control.
+ FlowControl flow_state;
+
public:
PostDataBuffers * postbuf;
};
@@ -364,15 +442,6 @@ HttpTunnel::is_tunnel_alive()
return tunnel_alive;
}
-inline void
-HttpTunnel::init(HttpSM * sm_arg, ProxyMutex * amutex)
-{
- sm = sm_arg;
- active = false;
- mutex = amutex;
- SET_HANDLER(&HttpTunnel::main_handler);
-}
-
inline HttpTunnelProducer *
HttpTunnel::get_producer(VConnection * vc)
{
@@ -429,7 +498,7 @@ HttpTunnel::append_message_to_producer_buffer(HttpTunnelProducer * p, const char
}
inline bool
-HttpTunnel::is_there_cache_write()
+HttpTunnel::has_cache_writer()
{
for (int i = 0; i < MAX_CONSUMERS; i++) {
if (consumers[i].vc_type == HT_CACHE_WRITE && consumers[i].vc != NULL) {
@@ -438,4 +507,63 @@ HttpTunnel::is_there_cache_write()
}
return false;
}
+
+inline bool
+HttpTunnelConsumer::is_downstream_from(VConnection *vc)
+{
+ HttpTunnelProducer* p = producer;
+ HttpTunnelConsumer* c;
+ while (p) {
+ if (p->vc == vc) return true;
+ // The producer / consumer chain can contain a cycle in the case
+ // of a blind tunnel so give up if we find ourself (the original
+ // consumer).
+ c = p->self_consumer;
+ p = (c && c != this) ? c->producer : 0;
+ }
+ return false;
+}
+
+inline bool
+HttpTunnelConsumer::is_sink() const
+{
+ return HT_HTTP_CLIENT == vc_type || HT_CACHE_WRITE == vc_type;
+}
+
+inline bool
+HttpTunnelProducer::is_source() const
+{
+ // If a producer is marked as a client, then it's part of a bidirectional tunnel
+ // and so is an actual source of data.
+ return HT_HTTP_SERVER == vc_type || HT_CACHE_READ == vc_type || HT_HTTP_CLIENT == vc_type;
+}
+
+inline bool
+HttpTunnelProducer::is_throttled() const
+{
+ return 0 != flow_control_source;
+}
+
+inline void
+HttpTunnelProducer::throttle()
+{
+ if (!this->is_throttled())
+ this->set_throttle_src(this);
+}
+
+inline void
+HttpTunnelProducer::unthrottle()
+{
+ if (this->is_throttled())
+ this->set_throttle_src(0);
+}
+
+inline
+HttpTunnel::FlowControl::FlowControl()
+ : high_water(DEFAULT_WATER_MARK)
+ , low_water(DEFAULT_WATER_MARK)
+ , enabled_p(false)
+{
+}
+
#endif
Please sign in to comment.
Something went wrong with that request. Please try again.