Skip to content

Commit

Permalink
Clean up UnixNetProcessor entanglements.
Browse files Browse the repository at this point in the history
The NetProcessor class sometimes downcasts itself to a UnixNetProcessor
because it implicitly knows that it is a singleton and that
UnixNetProcessor is the actual implementation. We can remove this oddity
by simply making the relevant NetProcessor operations abstract and moving
the implementations to UnixNetProcessor.

While we are doing this, we can remove some unused member variables,
make createNetAccept protected (only subclasses of UnixNetProcessor
should call it), remove unnecessary casting, and add a missing lock
to a naVec traversal.

Signed-off-by: James Peach <jpeach@apache.org>
  • Loading branch information
jpeach committed Jun 15, 2023
1 parent ebd48d7 commit 641c87b
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 101 deletions.
19 changes: 5 additions & 14 deletions iocore/net/I_NetProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class NetProcessor : public Processor
@return Action, that can be cancelled to cancel the accept. The
port becomes free immediately.
*/
virtual Action *accept(Continuation *cont, AcceptOptions const &opt = DEFAULT_ACCEPT_OPTIONS);
virtual Action *accept(Continuation *cont, AcceptOptions const &opt = DEFAULT_ACCEPT_OPTIONS) = 0;

/**
Accepts incoming connections on port. Accept connections on port.
Expand All @@ -160,8 +160,9 @@ class NetProcessor : public Processor
port becomes free immediately.
*/
virtual Action *main_accept(Continuation *cont, SOCKET listen_socket_in, AcceptOptions const &opt = DEFAULT_ACCEPT_OPTIONS);
virtual void stop_accept();
virtual Action *main_accept(Continuation *cont, SOCKET listen_socket_in, AcceptOptions const &opt = DEFAULT_ACCEPT_OPTIONS) = 0;

virtual void stop_accept() = 0;

/**
Open a NetVConnection for connection oriented I/O. Connects
Expand All @@ -181,8 +182,7 @@ class NetProcessor : public Processor
@param options @see NetVCOptions.
*/

Action *connect_re(Continuation *cont, sockaddr const *addr, NetVCOptions *options = nullptr);
virtual Action *connect_re(Continuation *cont, sockaddr const *addr, NetVCOptions *options = nullptr) = 0;

/**
Initializes the net processor. This must be called before the event threads are started.
Expand Down Expand Up @@ -225,15 +225,6 @@ class NetProcessor : public Processor
// noncopyable
NetProcessor(const NetProcessor &) = delete;
NetProcessor &operator=(const NetProcessor &) = delete;

private:
/** @note Not implemented. */
virtual int
stop()
{
ink_release_assert(!"NetProcessor::stop not implemented");
return 1;
}
};

/**
Expand Down
13 changes: 7 additions & 6 deletions iocore/net/P_QUICNetProcessor_native.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,19 @@ class QUICNetProcessor : public UnixNetProcessor
virtual ~QUICNetProcessor();

void init() override;
virtual int start(int, size_t stacksize) override;
// TODO: refactoring NetProcessor::connect_re and UnixNetProcessor::connect_re_internal
// Action *connect_re(Continuation *cont, sockaddr const *addr, NetVCOptions *opts) override;
Action *connect_re(Continuation *cont, sockaddr const *addr, NetVCOptions *opts);
int start(int, size_t stacksize) override;

virtual NetAccept *createNetAccept(const NetProcessor::AcceptOptions &opt) override;
virtual NetVConnection *allocate_vc(EThread *t) override;
Action *connect_re(Continuation *cont, sockaddr const *addr, NetVCOptions *opts) override;

NetVConnection *allocate_vc(EThread *t) override;

Action *main_accept(Continuation *cont, SOCKET fd, AcceptOptions const &opt) override;

off_t quicPollCont_offset;

protected:
NetAccept *createNetAccept(const NetProcessor::AcceptOptions &opt) override;

private:
QUICNetProcessor(const QUICNetProcessor &);
QUICNetProcessor &operator=(const QUICNetProcessor &);
Expand Down
13 changes: 7 additions & 6 deletions iocore/net/P_QUICNetProcessor_quiche.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,19 @@ class QUICNetProcessor : public UnixNetProcessor
virtual ~QUICNetProcessor();

void init() override;
virtual int start(int, size_t stacksize) override;
// TODO: refactoring NetProcessor::connect_re and UnixNetProcessor::connect_re_internal
// Action *connect_re(Continuation *cont, sockaddr const *addr, NetVCOptions *opts) override;
Action *connect_re(Continuation *cont, sockaddr const *addr, NetVCOptions *opts);
int start(int, size_t stacksize) override;

virtual NetAccept *createNetAccept(const NetProcessor::AcceptOptions &opt) override;
virtual NetVConnection *allocate_vc(EThread *t) override;
Action *connect_re(Continuation *cont, sockaddr const *addr, NetVCOptions *opts) override;

NetVConnection *allocate_vc(EThread *t) override;

Action *main_accept(Continuation *cont, SOCKET fd, AcceptOptions const &opt) override;

off_t quicPollCont_offset;

protected:
NetAccept *createNetAccept(const NetProcessor::AcceptOptions &opt) override;

private:
QUICNetProcessor(const QUICNetProcessor &);
QUICNetProcessor &operator=(const QUICNetProcessor &);
Expand Down
10 changes: 3 additions & 7 deletions iocore/net/P_SSLNetProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,17 @@ struct SSLNetProcessor : public UnixNetProcessor {
public:
int start(int, size_t stacksize) override;

void cleanup();

SSLNetProcessor();
~SSLNetProcessor() override;

//
// Private
//

NetAccept *createNetAccept(const NetProcessor::AcceptOptions &opt) override;
NetVConnection *allocate_vc(EThread *t) override;

// noncopyable
SSLNetProcessor(const SSLNetProcessor &) = delete;
SSLNetProcessor &operator=(const SSLNetProcessor &) = delete;

protected:
NetAccept *createNetAccept(const NetProcessor::AcceptOptions &opt) override;
};

extern SSLNetProcessor ssl_NetProcessor;
26 changes: 10 additions & 16 deletions iocore/net/P_UnixNetProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,35 +35,29 @@ class UnixNetVConnection;
//
//////////////////////////////////////////////////////////////////
struct UnixNetProcessor : public NetProcessor {
private:
Action *accept_internal(Continuation *cont, int fd, AcceptOptions const &opt);

protected:
virtual NetAccept *createNetAccept(const NetProcessor::AcceptOptions &opt);

public:
virtual Action *accept_internal(Continuation *cont, int fd, AcceptOptions const &opt);
Action *accept(Continuation *cont, AcceptOptions const &opt = DEFAULT_ACCEPT_OPTIONS) override;
Action *main_accept(Continuation *cont, SOCKET listen_socket_in, AcceptOptions const &opt = DEFAULT_ACCEPT_OPTIONS) override;

Action *connect_re_internal(Continuation *cont, sockaddr const *target, NetVCOptions *options = nullptr);
Action *connect(Continuation *cont, UnixNetVConnection **vc, sockaddr const *target, NetVCOptions *opt = nullptr);
void stop_accept() override;

virtual NetAccept *createNetAccept(const NetProcessor::AcceptOptions &opt);
Action *connect_re(Continuation *cont, sockaddr const *target, NetVCOptions *options = nullptr) override;
NetVConnection *allocate_vc(EThread *t) override;

void init() override;
void init_socks() override;

Event *accept_thread_event;

// offsets for per thread data structures
off_t netHandler_offset;
off_t pollCont_offset;

// we probably won't need these members
int n_netthreads;
EThread **netthreads;
};

TS_INLINE Action *
NetProcessor::connect_re(Continuation *cont, sockaddr const *addr, NetVCOptions *opts)
{
return static_cast<UnixNetProcessor *>(this)->connect_re_internal(cont, addr, opts);
}

extern UnixNetProcessor unix_netProcessor;

//
Expand Down
2 changes: 1 addition & 1 deletion iocore/net/QUICNetProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ QUICNetProcessor::createNetAccept(const NetProcessor::AcceptOptions &opt)
this->_ctable = new QUICConnectionTable(params->connection_table_size());
this->_rtable = new QUICResetTokenTable();
}
return (NetAccept *)new QUICPacketHandlerIn(opt, *this->_ctable, *this->_rtable);
return new QUICPacketHandlerIn(opt, *this->_ctable, *this->_rtable);
}

NetVConnection *
Expand Down
2 changes: 1 addition & 1 deletion iocore/net/QUICNetProcessor_quiche.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ QUICNetProcessor::createNetAccept(const NetProcessor::AcceptOptions &opt)
QUICConfig::scoped_config params;
this->_ctable = new QUICConnectionTable(params->connection_table_size());
}
return (NetAccept *)new QUICPacketHandlerIn(opt, *this->_ctable, *this->_quiche_config);
return new QUICPacketHandlerIn(opt, *this->_ctable, *this->_quiche_config);
}

NetVConnection *
Expand Down
2 changes: 1 addition & 1 deletion iocore/net/QUICPacketHandler_quiche.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ QUICPacketHandlerIn::init_accept(EThread *t = nullptr)
Continuation *
QUICPacketHandlerIn::_get_continuation()
{
return static_cast<NetAccept *>(this);
return this;
}

void
Expand Down
12 changes: 2 additions & 10 deletions iocore/net/SSLNetProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ struct OCSPContinuation : public Continuation {
OCSPContinuation() : Continuation(new_ProxyMutex()) { SET_HANDLER(&OCSPContinuation::mainEvent); }
};

void
SSLNetProcessor::cleanup()
{
}

int
SSLNetProcessor::start(int, size_t stacksize)
{
Expand Down Expand Up @@ -92,7 +87,7 @@ SSLNetProcessor::start(int, size_t stacksize)
NetAccept *
SSLNetProcessor::createNetAccept(const NetProcessor::AcceptOptions &opt)
{
return (NetAccept *)new SSLNetAccept(opt);
return new SSLNetAccept(opt);
}

NetVConnection *
Expand All @@ -113,7 +108,4 @@ SSLNetProcessor::allocate_vc(EThread *t)

SSLNetProcessor::SSLNetProcessor() {}

SSLNetProcessor::~SSLNetProcessor()
{
cleanup();
}
SSLNetProcessor::~SSLNetProcessor() {}
4 changes: 0 additions & 4 deletions iocore/net/UnixNetAccept.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ using NetAcceptHandler = int (NetAccept::*)(int, void *);

int NetAccept::accept_till_done = 1;

// we need to protect naVec since it might be accessed
// in different threads at the same time
Ptr<ProxyMutex> naVecMutex;
std::vector<NetAccept *> naVec;
static void
safe_delay(int msec)
{
Expand Down
63 changes: 30 additions & 33 deletions iocore/net/UnixNetProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,23 @@
// For Stat Pages
#include "StatPages.h"

int net_accept_number = 0;
// naVecMutext protects access to naVec.
Ptr<ProxyMutex> naVecMutex;

std::vector<NetAccept *> naVec;

unsigned int
net_next_connection_number()
{
static int net_connection_number = 1;

unsigned int res = 0;
do {
res = ink_atomic_increment(&net_connection_number, 1);
} while (!res);
return res;
}

NetProcessor::AcceptOptions const NetProcessor::DEFAULT_ACCEPT_OPTIONS;

NetProcessor::AcceptOptions &
Expand All @@ -56,39 +72,28 @@ NetProcessor::AcceptOptions::reset()
return *this;
}

int net_connection_number = 1;

unsigned int
net_next_connection_number()
{
unsigned int res = 0;
do {
res = static_cast<unsigned int>(ink_atomic_increment(&net_connection_number, 1));
} while (!res);
return res;
}

Action *
NetProcessor::accept(Continuation *cont, AcceptOptions const &opt)
UnixNetProcessor::accept(Continuation *cont, AcceptOptions const &opt)
{
Debug("iocore_net_processor", "NetProcessor::accept - port %d,recv_bufsize %d, send_bufsize %d, sockopt 0x%0x", opt.local_port,
opt.recv_bufsize, opt.send_bufsize, opt.sockopt_flags);

return ((UnixNetProcessor *)this)->accept_internal(cont, NO_FD, opt);
return accept_internal(cont, NO_FD, opt);
}

Action *
NetProcessor::main_accept(Continuation *cont, SOCKET fd, AcceptOptions const &opt)
UnixNetProcessor::main_accept(Continuation *cont, SOCKET fd, AcceptOptions const &opt)
{
UnixNetProcessor *this_unp = static_cast<UnixNetProcessor *>(this);
Debug("iocore_net_processor", "NetProcessor::main_accept - port %d,recv_bufsize %d, send_bufsize %d, sockopt 0x%0x",
opt.local_port, opt.recv_bufsize, opt.send_bufsize, opt.sockopt_flags);
return this_unp->accept_internal(cont, fd, opt);
return accept_internal(cont, fd, opt);
}

Action *
UnixNetProcessor::accept_internal(Continuation *cont, int fd, AcceptOptions const &opt)
{
static int net_accept_number = 0;

ProxyMutex *mutex = this_ethread()->mutex.get();
int accept_threads = opt.accept_threads; // might be changed.
IpEndpoint accept_ip; // local binding address.
Expand Down Expand Up @@ -170,19 +175,21 @@ UnixNetProcessor::accept_internal(Continuation *cont, int fd, AcceptOptions cons
}

void
NetProcessor::stop_accept()
UnixNetProcessor::stop_accept()
{
SCOPED_MUTEX_LOCK(lock, naVecMutex, this_ethread());
for (auto &na : naVec) {
na->stop_accept();
}
}

Action *
UnixNetProcessor::connect_re_internal(Continuation *cont, sockaddr const *target, NetVCOptions *opt)
UnixNetProcessor::connect_re(Continuation *cont, sockaddr const *target, NetVCOptions *opt)
{
if (TSSystemState::is_event_system_shut_down()) {
return nullptr;
}

EThread *t = eventProcessor.assign_affinity_by_type(cont, opt->etype);
UnixNetVConnection *vc = (UnixNetVConnection *)this->allocate_vc(t);

Expand Down Expand Up @@ -259,19 +266,11 @@ UnixNetProcessor::connect_re_internal(Continuation *cont, sockaddr const *target
}
}

Action *
UnixNetProcessor::connect(Continuation *cont, UnixNetVConnection ** /* avc */, sockaddr const *target, NetVCOptions *opt)
{
return connect_re(cont, target, opt);
}

struct PollCont;

// This needs to be called before the ET_NET threads are started.
void
UnixNetProcessor::init()
{
EventType etype = ET_NET;
naVecMutex = new_ProxyMutex();

netHandler_offset = eventProcessor.allocate(sizeof(NetHandler));
pollCont_offset = eventProcessor.allocate(sizeof(PollCont));
Expand All @@ -284,7 +283,7 @@ UnixNetProcessor::init()
// schedule per thread start up logic. Global init is done only here.
NetHandler::init_for_process();
NetHandler::active_thread_types[ET_NET] = true;
eventProcessor.schedule_spawn(&initialize_thread_for_net, etype);
eventProcessor.schedule_spawn(&initialize_thread_for_net, ET_NET);

RecData d;
d.rec_int = 0;
Expand All @@ -294,9 +293,7 @@ UnixNetProcessor::init()
* Stat pages
*/
extern Action *register_ShowNet(Continuation * c, HTTPHdr * h);
if (etype == ET_NET) {
statPagesManager.register_http("net", register_ShowNet);
}
statPagesManager.register_http("net", register_ShowNet);
}

void
Expand Down
2 changes: 0 additions & 2 deletions src/traffic_server/traffic_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2048,8 +2048,6 @@ main(int /* argc ATS_UNUSED */, const char **argv)
ts::ModuleVersion(HOSTDB_MODULE_INTERNAL_VERSION._major, HOSTDB_MODULE_INTERNAL_VERSION._minor, ts::ModuleVersion::PRIVATE));
ink_split_dns_init(ts::ModuleVersion(1, 0, ts::ModuleVersion::PRIVATE));

naVecMutex = new_ProxyMutex();

// Do the inits for NetProcessors that use ET_NET threads. MUST be before starting those threads.
netProcessor.init();
prep_HttpProxyServer();
Expand Down

0 comments on commit 641c87b

Please sign in to comment.