Skip to content

Commit

Permalink
Optimize: rename close_UnixNetVConnection to NetHandler::free_netvc.
Browse files Browse the repository at this point in the history
EThread is processor of Event, it is the event processor's
responsibility to deallocate Event object. (refer to the comments in
I_EventProcessor.h, "Event allocation policy" section.)

Due to NetHandler is processor of NetVC, it is the NetHandler's
responsibility to free NetVC object.

Take EThread::free_event() as reference to create
NetHandler::free_netvc() from close_UnixNetVConnection().
  • Loading branch information
oknet committed Oct 18, 2017
1 parent 326cd98 commit db30b0a
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 49 deletions.
61 changes: 60 additions & 1 deletion iocore/net/P_UnixNet.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,58 @@ struct PollCont : public Continuation {
int pollEvent(int event, Event *e);
};

/**
NetHandler is the processor of NetVC for the Net sub-system. The NetHandler
is the core component of the Net sub-system. Once started, it is responsible
for polling socket fds and perform the I/O tasks in NetVC.
The NetHandler is executed periodically to perform read/write tasks for
NetVConnection. The NetHandler::mainNetEvent() should be viewed as a part of
EThread::execute() loop. This is the reason that Net System is a sub-system.
By get_NetHandler(this_ethread()), you can get the NetHandler object that
runs inside the current EThread and then @c startIO / @c stopIO which
assign/release a NetVC to/from NetHandler. Before you call these functions,
holding the mutex of this NetHandler is required.
The NetVConnection provides a set of do_io functions through which you can
specify continuations to be called back by its NetHandler. These function
calls do not block. Instead they return an VIO object and schedule the
callback to the continuation passed in when there are I/O events occurred.
Multi-thread scheduler:
The NetHandler should be viewed as multi-threaded schedulers which process
NetVCs from their queues. The NetVC can be made of NetProcessor (allocate_vc)
either by directly adding a NetVC to the queue (NetHandler::startIO), or more
conveniently, calling a method service call (NetProcessor::connect_re) which
synthesizes the NetVC and places it in the queue.
Callback event codes:
These event codes for do_io_read and reenable(read VIO) task:
VC_EVENT_READ_READY, VC_EVENT_READ_COMPLETE,
VC_EVENT_EOS, VC_EVENT_ERROR
These event codes for do_io_write and reenable(write VIO) task:
VC_EVENT_WRITE_READY, VC_EVENT_WRITE_COMPLETE
VC_EVENT_ERROR
There is no event and callback for do_io_shutdown / do_io_close task.
NetVConnection allocation policy:
NetVCs are allocated by the NetProcessor and deallocated by NetHandler.
A state machine may access the returned, non-recurring NetVC / VIO until
it is closed by do_io_close. For recurring NetVC, the NetVC may be
accessed until it is closed. Once the NetVC is closed, it's the
NetHandler's responsibility to deallocate it.
Before assign to NetHandler or after release from NetHandler, it's the
NetVC's responsibility to deallocate itself.
*/

//
// NetHandler
//
Expand Down Expand Up @@ -234,7 +286,7 @@ class NetHandler : public Continuation
@param netvc UnixNetVConnection to be managed by InactivityCop
*/
void startCop(UnixNetVConnection *netvc);
/* *
/**
Stop to handle active timeout and inactivity on a UnixNetVConnection.
Remove the netvc from open_list and cop_list.
Also remove the netvc from keep_alive_queue and active_queue if its context is IN.
Expand All @@ -244,6 +296,13 @@ class NetHandler : public Continuation
*/
void stopCop(UnixNetVConnection *netvc);

/**
Release a netvc and free it.
@param netvc UnixNetVConnection to be deattached.
*/
void free_netvc(UnixNetVConnection *netvc);

NetHandler();

private:
Expand Down
1 change: 0 additions & 1 deletion iocore/net/P_UnixNetVConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,6 @@ UnixNetVConnection::set_action(Continuation *c)

// declarations for local use (within the net module)

void close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t);
void write_to_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread);
void write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread);

Expand Down
5 changes: 5 additions & 0 deletions iocore/net/SSLNetVConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,12 @@ SSLNetVConnection::free(EThread *t)
{
ink_release_assert(t == this_ethread());

// cancel OOB
cancel_OOB();
// close socket fd
if (con.fd != NO_FD) {
NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, -1);
}
con.close();

clear();
Expand Down
31 changes: 25 additions & 6 deletions iocore/net/UnixNet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class InactivityCop : public Continuation
}

if (vc->closed) {
close_UnixNetVConnection(vc, e->ethread);
nh.free_netvc(vc);
continue;
}

Expand Down Expand Up @@ -305,6 +305,25 @@ update_nethandler_config(const char *name, RecDataT data_type ATS_UNUSED, RecDat

return REC_ERR_OKAY;
}
//
// Function used to release a UnixNetVConnection and free it.
//
void
NetHandler::free_netvc(UnixNetVConnection *netvc)
{
EThread *t = trigger_event->ethread;

ink_assert(t == this_ethread());
ink_release_assert(netvc->thread == t);
ink_release_assert(netvc->nh == this);

// Release netvc from InactivityCop
stopCop(netvc);
// Release netvc from NetHandler
stopIO(netvc);
// Clear and deallocate netvc
netvc->free(t);
}

//
// Initialization here, in the thread in which we will be executing
Expand Down Expand Up @@ -387,7 +406,7 @@ NetHandler::process_ready_list()
// Initialize the thread-local continuation flags
set_cont_flags(vc->control_flags);
if (vc->closed)
close_UnixNetVConnection(vc, trigger_event->ethread);
free_netvc(vc);
else if (vc->read.enabled && vc->read.triggered)
vc->net_read_io(this, trigger_event->ethread);
else if (!vc->read.enabled) {
Expand All @@ -404,7 +423,7 @@ NetHandler::process_ready_list()
while ((vc = write_ready_list.dequeue())) {
set_cont_flags(vc->control_flags);
if (vc->closed)
close_UnixNetVConnection(vc, trigger_event->ethread);
free_netvc(vc);
else if (vc->write.enabled && vc->write.triggered)
write_to_net(this, vc, trigger_event->ethread);
else if (!vc->write.enabled) {
Expand All @@ -422,7 +441,7 @@ NetHandler::process_ready_list()
while ((vc = read_ready_list.dequeue())) {
diags->set_override(vc->control.debug_override);
if (vc->closed)
close_UnixNetVConnection(vc, trigger_event->ethread);
free_netvc(vc);
else if (vc->read.enabled && vc->read.triggered)
vc->net_read_io(this, trigger_event->ethread);
else if (!vc->read.enabled)
Expand All @@ -431,7 +450,7 @@ NetHandler::process_ready_list()
while ((vc = write_ready_list.dequeue())) {
diags->set_override(vc->control.debug_override);
if (vc->closed)
close_UnixNetVConnection(vc, trigger_event->ethread);
free_netvc(vc);
else if (vc->write.enabled && vc->write.triggered)
write_to_net(this, vc, trigger_event->ethread);
else if (!vc->write.enabled)
Expand Down Expand Up @@ -624,7 +643,7 @@ NetHandler::_close_vc(UnixNetVConnection *vc, ink_hrtime now, int &handle_event,
keep_alive_queue_size, ink_hrtime_to_sec(now), ink_hrtime_to_sec(vc->next_inactivity_timeout_at),
ink_hrtime_to_sec(vc->inactivity_timeout_in), diff);
if (vc->closed) {
close_UnixNetVConnection(vc, this_ethread());
free_netvc(vc);
++closed;
} else {
vc->next_inactivity_timeout_at = now;
Expand Down
64 changes: 23 additions & 41 deletions iocore/net/UnixNetVConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,37 +72,6 @@ net_activity(UnixNetVConnection *vc, EThread *thread)
}
}

//
// Function used to close a UnixNetVConnection and free the vc
//
void
close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t)
{
if (vc->con.fd != NO_FD) {
NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, -1);
}
NetHandler *nh = vc->nh;
vc->cancel_OOB();

ink_release_assert(vc->thread == t);

// 1. Cancel timeout
vc->next_inactivity_timeout_at = 0;
vc->next_activity_timeout_at = 0;

vc->inactivity_timeout_in = 0;
vc->active_timeout_in = 0;

if (nh) {
// 2. Release vc from InactivityCop.
nh->stopCop(vc);
// 3. Release vc from NetHandler.
nh->stopIO(vc);
}
// 4. Clear then deallocate vc.
vc->free(t);
}

//
// Signal an event
//
Expand Down Expand Up @@ -130,7 +99,7 @@ read_signal_and_update(int event, UnixNetVConnection *vc)
if (!--vc->recursion && vc->closed) {
/* BZ 31932 */
ink_assert(vc->thread == this_ethread());
close_UnixNetVConnection(vc, vc->thread);
vc->nh->free_netvc(vc);
return EVENT_DONE;
} else {
return EVENT_CONT;
Expand Down Expand Up @@ -161,7 +130,7 @@ write_signal_and_update(int event, UnixNetVConnection *vc)
if (!--vc->recursion && vc->closed) {
/* BZ 31932 */
ink_assert(vc->thread == this_ethread());
close_UnixNetVConnection(vc, vc->thread);
vc->nh->free_netvc(vc);
return EVENT_DONE;
} else {
return EVENT_CONT;
Expand Down Expand Up @@ -228,7 +197,7 @@ read_from_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
// global session pool case. If so, the closed flag should be stable once we get the
// s->vio.mutex (the global session pool mutex).
if (vc->closed) {
close_UnixNetVConnection(vc, thread);
vc->nh->free_netvc(vc);
return;
}
// if it is not enabled.
Expand Down Expand Up @@ -669,6 +638,9 @@ UnixNetVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader
void
UnixNetVConnection::do_io_close(int alerrno /* = -1 */)
{
// FIXME: the nh must not nullptr.
ink_assert(nh);

read.enabled = 0;
write.enabled = 0;
read.vio.buffer.clear();
Expand All @@ -694,7 +666,11 @@ UnixNetVConnection::do_io_close(int alerrno /* = -1 */)
}

if (close_inline) {
close_UnixNetVConnection(this, t);
if (nh) {
nh->free_netvc(this);
} else {
free(t);
}
}
}

Expand Down Expand Up @@ -1207,7 +1183,7 @@ UnixNetVConnection::mainEvent(int event, Event *e)
writer_cont = write.vio._cont;

if (closed) {
close_UnixNetVConnection(this, thread);
nh->free_netvc(this);
return EVENT_DONE;
}

Expand Down Expand Up @@ -1304,11 +1280,6 @@ UnixNetVConnection::connectUp(EThread *t, int fd)
}

if (check_emergency_throttle(con)) {
// The `con' could be closed if there is hyper emergency
if (fd == NO_FD) {
// We need to decrement the stat because close_UnixNetVConnection only decrements with a valid connection descriptor.
NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, -1);
}
// Set errno force to EMFILE (reached limit for open file descriptors)
errno = EMFILE;
res = -errno;
Expand Down Expand Up @@ -1354,6 +1325,12 @@ UnixNetVConnection::connectUp(EThread *t, int fd)
void
UnixNetVConnection::clear()
{
// clear timeout variables
next_inactivity_timeout_at = 0;
next_activity_timeout_at = 0;
inactivity_timeout_in = 0;
active_timeout_in = 0;

// clear variables for reuse
this->mutex.clear();
action_.mutex.clear();
Expand Down Expand Up @@ -1387,7 +1364,12 @@ UnixNetVConnection::free(EThread *t)
{
ink_release_assert(t == this_ethread());

// cancel OOB
cancel_OOB();
// close socket fd
if (con.fd != NO_FD) {
NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, -1);
}
con.close();

clear();
Expand Down

0 comments on commit db30b0a

Please sign in to comment.