New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ts 3612 - Restructuring client session and transaction processing #570

Closed
wants to merge 1 commit into
base: master
from
Jump to file or symbol
Failed to load files and symbols.
+1,903 −605
Diff settings

Always

Just for now

Copy path View file
@@ -131,7 +131,9 @@ class Thread
ProxyAllocator eventAllocator;
ProxyAllocator netVCAllocator;
ProxyAllocator sslNetVCAllocator;
ProxyAllocator httpClientSessionAllocator;
ProxyAllocator http1ClientSessionAllocator;
ProxyAllocator http2ClientSessionAllocator;
ProxyAllocator http2StreamAllocator;
ProxyAllocator httpServerSessionAllocator;
ProxyAllocator hdrHeapAllocator;
ProxyAllocator strHeapAllocator;
@@ -229,24 +229,6 @@ class SSLNetVConnection : public UnixNetVConnection
// least some of the hooks
bool calledHooks(TSHttpHookID /* eventId */) { return (this->sslHandshakeHookState != HANDSHAKE_HOOKS_PRE); }
MIOBuffer *
get_ssl_iobuf()
{
return iobuf;
}
void
set_ssl_iobuf(MIOBuffer *buf)
{
iobuf = buf;
}
IOBufferReader *
get_ssl_reader()
{
return reader;
}
bool
isEosRcvd()
{
@@ -328,8 +310,6 @@ class SSLNetVConnection : public UnixNetVConnection
const SSLNextProtocolSet *npnSet;
Continuation *npnEndpoint;
SessionAccept *sessionAcceptPtr;
MIOBuffer *iobuf;
IOBufferReader *reader;
bool eosRcvd;
bool sslTrace;
};
Copy path View file
@@ -531,30 +531,6 @@ SSLNetVConnection::net_read_io(NetHandler *nh, EThread *lthread)
// the handshake is complete. Otherwise set up for continuing read
// operations.
if (ntodo <= 0) {
if (!getSSLClientConnection()) {
// we will not see another ET epoll event if the first byte is already
// in the ssl buffers, so, SSL_read if there's anything already..
Debug("ssl", "ssl handshake completed on vc %p, check to see if first byte, is already in the ssl buffers", this);
this->iobuf = new_MIOBuffer(BUFFER_SIZE_INDEX_4K);
if (this->iobuf) {
this->reader = this->iobuf->alloc_reader();
s->vio.buffer.writer_for(this->iobuf);
ret = ssl_read_from_net(this, lthread, r);
if (ret == SSL_READ_EOS) {
this->eosRcvd = true;
}
#if DEBUG
int pending = SSL_pending(this->ssl);
if (r > 0 || pending > 0) {
Debug("ssl", "ssl read right after handshake, read %" PRId64 ", pending %d bytes, for vc %p", r, pending, this);
}
#endif
} else {
Error("failed to allocate MIOBuffer after handshake, vc %p", this);
}
read.triggered = 0;
read_disable(nh, this);
}
readSignalDone(VC_EVENT_READ_COMPLETE, nh);
} else {
read.triggered = 1;
@@ -857,7 +833,7 @@ SSLNetVConnection::SSLNetVConnection()
sslHandShakeComplete(false), sslClientConnection(false), sslClientRenegotiationAbort(false), sslSessionCacheHit(false),
handShakeBuffer(NULL), handShakeHolder(NULL), handShakeReader(NULL), handShakeBioStored(0),
sslPreAcceptHookState(SSL_HOOKS_INIT), sslHandshakeHookState(HANDSHAKE_HOOKS_PRE), npnSet(NULL), npnEndpoint(NULL),
sessionAcceptPtr(NULL), iobuf(NULL), reader(NULL), eosRcvd(false), sslTrace(false)
sessionAcceptPtr(NULL), eosRcvd(false), sslTrace(false)
{
}
@@ -931,9 +907,6 @@ SSLNetVConnection::free(EThread *t)
SSL_free(ssl);
ssl = NULL;
}
if (iobuf) {
free_MIOBuffer(iobuf);
}
sslHandShakeComplete = false;
sslClientConnection = false;
sslHandshakeBeginTime = 0;
@@ -950,8 +923,6 @@ SSLNetVConnection::free(EThread *t)
npnSet = NULL;
npnEndpoint = NULL;
sessionAcceptPtr = NULL;
iobuf = NULL;
reader = NULL;
eosRcvd = false;
sslHandShakeComplete = false;
free_handshake_buffers();
@@ -264,7 +264,7 @@ UnixNetProcessor::connect_re_internal(Continuation *cont, sockaddr const *target
}
}
}
eventProcessor.schedule_imm(vc, opt->etype);
t->schedule_imm(vc);
if (using_socks) {
return &socksEntry->action_;
} else
Copy path View file
@@ -33,7 +33,7 @@
#include "URL.h"
#include "MIME.h"
#include "HTTP.h"
#include "HttpClientSession.h"
#include "ProxyClientSession.h"
#include "Http2ClientSession.h"
#include "HttpServerSession.h"
#include "HttpSM.h"
@@ -961,24 +961,21 @@ INKContInternal::destroy()
} else {
// TODO: Should this schedule on some other "thread" ?
// TODO: we don't care about the return action?
TSContSchedule((TSCont) this, 0, TS_THREAD_POOL_DEFAULT);
this_ethread()->schedule_imm(this);
Warning("INKCont not deletable %d %p", m_event_count, this);
}
}
void
INKContInternal::handle_event_count(int event)
{
if ((event == EVENT_IMMEDIATE) || (event == EVENT_INTERVAL)) {
int val;
m_deletable = (m_closed != 0);
val = ink_atomic_increment((int *)&m_event_count, -1);
if ((event == EVENT_IMMEDIATE) || (event == EVENT_INTERVAL) || event == TS_EVENT_HTTP_TXN_CLOSE) {
int val = ink_atomic_increment((int *)&m_event_count, -1);
if (val <= 0) {
ink_assert(!"not reached");
}
m_deletable = m_deletable && (val == 1);
m_deletable = (m_closed != 0) && (val == 1);
}
}
@@ -994,6 +991,8 @@ INKContInternal::handle_event(int event, void *edata)
this->mutex = NULL;
m_free_magic = INKCONT_INTERN_MAGIC_DEAD;
INKContAllocator.free(this);
} else {
Warning("INKCont Deletable but not deleted %d", m_event_count);
}
} else {
return m_event_func((TSCont) this, (TSEvent)event, edata);
@@ -1209,7 +1208,7 @@ INKVConnInternal::set_data(int id, void *data)
int
APIHook::invoke(int event, void *edata)
{
if ((event == EVENT_IMMEDIATE) || (event == EVENT_INTERVAL)) {
if ((event == EVENT_IMMEDIATE) || (event == EVENT_INTERVAL) || event == TS_EVENT_HTTP_TXN_CLOSE) {
if (ink_atomic_increment((int *)&m_cont->m_event_count, 1) < 0) {
ink_assert(!"not reached");
}
@@ -4404,7 +4403,7 @@ TSHttpSsnHookAdd(TSHttpSsn ssnp, TSHttpHookID id, TSCont contp)
sdk_assert(sdk_sanity_check_continuation(contp) == TS_SUCCESS);
sdk_assert(sdk_sanity_check_hook_id(id) == TS_SUCCESS);
HttpClientSession *cs = (HttpClientSession *)ssnp;
ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
cs->ssn_hook_append(id, (INKContInternal *)contp);
}
@@ -4413,14 +4412,14 @@ TSHttpSsnTransactionCount(TSHttpSsn ssnp)
{
sdk_assert(sdk_sanity_check_http_ssn(ssnp) == TS_SUCCESS);
HttpClientSession *cs = (HttpClientSession *)ssnp;
ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
return cs->get_transact_count();
}
class TSHttpSsnCallback : public Continuation
{
public:
TSHttpSsnCallback(HttpClientSession *cs, TSEvent event) : Continuation(cs->mutex), m_cs(cs), m_event(event)
TSHttpSsnCallback(ProxyClientSession *cs, TSEvent event) : Continuation(cs->mutex), m_cs(cs), m_event(event)
{
SET_HANDLER(&TSHttpSsnCallback::event_handler);
}
@@ -4434,7 +4433,7 @@ class TSHttpSsnCallback : public Continuation
}
private:
HttpClientSession *m_cs;
ProxyClientSession *m_cs;
TSEvent m_event;
};
@@ -4444,7 +4443,7 @@ TSHttpSsnReenable(TSHttpSsn ssnp, TSEvent event)
{
sdk_assert(sdk_sanity_check_http_ssn(ssnp) == TS_SUCCESS);
HttpClientSession *cs = (HttpClientSession *)ssnp;
ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
EThread *eth = this_ethread();
// If this function is being executed on a thread created by the API
@@ -4499,8 +4498,8 @@ TSHttpTxnSsnGet(TSHttpTxn txnp)
{
sdk_assert(sdk_sanity_check_txn(txnp) == TS_SUCCESS);
HttpSM *sm = (HttpSM *)txnp;
return (TSHttpSsn)sm->ua_session;
HttpSM *sm = reinterpret_cast<HttpSM *>(txnp);
return reinterpret_cast<TSHttpSsn>(sm->ua_session ? (TSHttpSsn)sm->ua_session->get_parent() : NULL);
}
// TODO: Is this still necessary ??
@@ -4510,7 +4509,7 @@ TSHttpTxnClientKeepaliveSet(TSHttpTxn txnp, int set)
HttpSM *sm = (HttpSM *)txnp;
HttpTransact::State *s = &(sm->t_state);
s->hdr_info.trust_response_cl = (set != 0) ? true : false;
s->hdr_info.trust_response_cl = (set != 0);
}
TSReturnCode
@@ -5240,7 +5239,7 @@ TSHttpSsnSSLConnectionGet(TSHttpSsn ssnp)
{
sdk_assert(sdk_sanity_check_null_ptr((void *)ssnp) == TS_SUCCESS);
HttpClientSession *cs = reinterpret_cast<HttpClientSession *>(ssnp);
ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
if (cs == NULL) {
return NULL;
}
@@ -5256,7 +5255,7 @@ TSHttpSsnSSLConnectionGet(TSHttpSsn ssnp)
sockaddr const *
TSHttpSsnClientAddrGet(TSHttpSsn ssnp)
{
HttpClientSession *cs = reinterpret_cast<HttpClientSession *>(ssnp);
ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
if (cs == NULL)
return 0;
@@ -5279,7 +5278,7 @@ TSHttpTxnClientAddrGet(TSHttpTxn txnp)
sockaddr const *
TSHttpSsnIncomingAddrGet(TSHttpSsn ssnp)
{
HttpClientSession *cs = reinterpret_cast<HttpClientSession *>(ssnp);
ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
if (cs == NULL)
return 0;
@@ -5359,16 +5358,8 @@ TSHttpTxnOutgoingAddrSet(TSHttpTxn txnp, const struct sockaddr *addr)
sdk_assert(sdk_sanity_check_txn(txnp) == TS_SUCCESS);
HttpSM *sm = (HttpSM *)txnp;
sm->ua_session->outbound_port = ats_ip_port_host_order(addr);
if (ats_is_ip4(addr)) {
sm->ua_session->outbound_ip4.assign(addr);
} else if (ats_is_ip6(addr)) {
sm->ua_session->outbound_ip6.assign(addr);
} else {
sm->ua_session->outbound_ip4.invalidate();
sm->ua_session->outbound_ip6.invalidate();
}
sm->ua_session->set_outbound_port(ats_ip_port_host_order(addr));
sm->ua_session->set_outbound_ip(IpAddr(addr));
return TS_ERROR;
}
@@ -5399,7 +5390,7 @@ TSHttpTxnOutgoingTransparencySet(TSHttpTxn txnp, int flag)
return TS_ERROR;
}
sm->ua_session->f_outbound_transparent = flag;
sm->ua_session->set_outbound_transparent(flag);
return TS_SUCCESS;
}
@@ -5745,7 +5736,7 @@ TSHttpSsnArgSet(TSHttpSsn ssnp, int arg_idx, void *arg)
sdk_assert(sdk_sanity_check_http_ssn(ssnp) == TS_SUCCESS);
sdk_assert(arg_idx >= 0 && arg_idx < HTTP_SSN_TXN_MAX_USER_ARG);
HttpClientSession *cs = (HttpClientSession *)ssnp;
ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
cs->set_user_arg(arg_idx, arg);
}
@@ -5756,7 +5747,7 @@ TSHttpSsnArgGet(TSHttpSsn ssnp, int arg_idx)
sdk_assert(sdk_sanity_check_http_ssn(ssnp) == TS_SUCCESS);
sdk_assert(arg_idx >= 0 && arg_idx < HTTP_SSN_TXN_MAX_USER_ARG);
HttpClientSession *cs = (HttpClientSession *)ssnp;
ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
return cs->get_user_arg(arg_idx);
}
@@ -5861,14 +5852,14 @@ void
TSHttpSsnDebugSet(TSHttpSsn ssnp, int on)
{
sdk_assert(sdk_sanity_check_http_ssn(ssnp) == TS_SUCCESS);
((HttpClientSession *)ssnp)->debug_on = on;
(reinterpret_cast<ProxyClientSession *>(ssnp))->debug_on = on;
}
int
TSHttpSsnDebugGet(TSHttpSsn ssnp)
{
sdk_assert(sdk_sanity_check_http_ssn(ssnp) == TS_SUCCESS);
return ((HttpClientSession *)ssnp)->debug();
return (reinterpret_cast<ProxyClientSession *>(ssnp))->debug();
}
int
@@ -7494,7 +7485,7 @@ TSFetchRespHdrMLocGet(TSFetchSM fetch_sm)
TSReturnCode
TSHttpIsInternalSession(TSHttpSsn ssnp)
{
HttpClientSession *cs = (HttpClientSession *)ssnp;
ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
if (!cs) {
return TS_ERROR;
}
@@ -7510,7 +7501,7 @@ TSHttpIsInternalSession(TSHttpSsn ssnp)
TSReturnCode
TSHttpSsnIsInternal(TSHttpSsn ssnp)
{
HttpClientSession *cs = (HttpClientSession *)ssnp;
ProxyClientSession *cs = reinterpret_cast<ProxyClientSession *>(ssnp);
if (!cs) {
return TS_ERROR;
}
Copy path View file
@@ -176,6 +176,8 @@ traffic_server_SOURCES = \
ProtocolProbeSessionAccept.h \
ProxyClientSession.cc \
ProxyClientSession.h \
ProxyClientTransaction.cc \
ProxyClientTransaction.h \
ReverseProxy.cc \
ReverseProxy.h \
SocksProxy.cc \
@@ -294,6 +296,8 @@ traffic_sac_SOURCES = \
ProtocolProbeSessionAccept.h \
ProxyClientSession.cc \
ProxyClientSession.h \
ProxyClientTransaction.cc \
ProxyClientTransaction.h \
Plugin.cc \
InkAPI.cc \
FetchSM.cc \
@@ -125,10 +125,7 @@ struct ProtocolProbeTrampoline : public Continuation, public ProtocolProbeSessio
return EVENT_CONT;
done:
SSLNetVConnection *ssl_vc = dynamic_cast<SSLNetVConnection *>(netvc);
if (!ssl_vc || (this->iobuf != ssl_vc->get_ssl_iobuf())) {
free_MIOBuffer(this->iobuf);
}
free_MIOBuffer(this->iobuf);
this->iobuf = NULL;
delete this;
return EVENT_CONT;
@@ -145,15 +142,8 @@ ProtocolProbeSessionAccept::mainEvent(int event, void *data)
ink_assert(data);
VIO *vio;
NetVConnection *netvc = static_cast<NetVConnection *>(data);
SSLNetVConnection *ssl_vc = dynamic_cast<SSLNetVConnection *>(netvc);
MIOBuffer *buf = NULL;
IOBufferReader *reader = NULL;
if (ssl_vc) {
buf = ssl_vc->get_ssl_iobuf();
reader = ssl_vc->get_ssl_reader();
}
ProtocolProbeTrampoline *probe = new ProtocolProbeTrampoline(this, netvc->mutex, buf, reader);
NetVConnection *netvc = (NetVConnection *)data;
ProtocolProbeTrampoline *probe = new ProtocolProbeTrampoline(this, netvc->mutex, NULL, NULL);
// XXX we need to apply accept inactivity timeout here ...
Copy path View file
@@ -27,7 +27,8 @@
static int64_t next_cs_id = 0;
ProxyClientSession::ProxyClientSession() : VConnection(NULL), debug_on(false), hooks_on(true)
ProxyClientSession::ProxyClientSession()
: VConnection(NULL), acl_record(NULL), host_res_style(HOST_RES_IPV4), debug_on(false), hooks_on(true), con_id(0)
{
ink_zero(this->user_args);
}
Oops, something went wrong.
ProTip! Use n and p to navigate between commits in a pull request.