Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Support seamless upgrades of EM servers #165

Open
wants to merge 3 commits into from

3 participants

@pc
pc commented

The two commits below allow you to have a server running, and then, fork(), exec(), start a new server using the same socket, finish up requests in the master, and then start processing new requests in the child. This allows you to do upgrades without dropping any requests.

@tmm1
Owner

See also #93 for an EM.attach_server implementation to use in the new child.

@tmm1
Owner

Sorry it took me so long to look at this. I think the right solution for this use case is to use the attach_server patch along with a patch that exposes a generic cloexec toggle to ruby. See #118 for a hacky way to do this with IO.for_fd and detach currently.

Ideally we would simply have a EM::Connection#close_on_exec=, but that doesn't help in the start_server case where no EM::Connection object is created. This close_on_exec= will probably call something like EM.set_close_on_exec(binding, value) that can also be used with the return value of EM.start_server

@sodabrew
Collaborator

Can this be closed since #465 landed a while ago?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 6, 2011
  1. @pc

    add get_file_descriptor

    pc authored
Commits on Feb 7, 2011
  1. @pc

    Support reuse of server sockets

    pc authored
Commits on Feb 9, 2011
  1. @pc

    missing negation

    pc authored
This page is out of date. Refresh to see the latest.
View
20 ext/cmain.cpp
@@ -273,6 +273,16 @@ extern "C" const unsigned long evma_create_unix_domain_server (const char *filen
return EventMachine->CreateUnixDomainServer (filename);
}
+/**********************
+evma_create_tcp_server
+**********************/
+
+extern "C" const unsigned long evma_reuse_server (int descriptor)
+{
+ ensure_eventmachine("evma_reuse_server");
+ return EventMachine->OutputBinding (descriptor);
+}
+
/*************************
evma_open_datagram_socket
*************************/
@@ -372,6 +382,16 @@ extern "C" void evma_close_connection (const unsigned long binding, int after_wr
ed->ScheduleClose (after_writing ? true : false);
}
+/****************************
+evma_preserve_server_sockets
+****************************/
+
+extern "C" void evma_preserve_server_sockets()
+{
+ ensure_eventmachine("evma_preserve_server_sockets");
+ EventMachine_t::PreserveServerSockets();
+}
+
/***********************************
evma_report_connection_error_status
***********************************/
View
8 ext/ed.cpp
@@ -62,7 +62,8 @@ EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em):
MaxOutboundBufSize(0),
MyEventMachine (em),
PendingConnectTimeout(20000000),
- InactivityTimeout (0)
+ InactivityTimeout (0),
+ bIsAcceptorSocket(false)
{
/* There are three ways to close a socket, all of which should
* automatically signal to the event machine that this object
@@ -127,7 +128,6 @@ void EventableDescriptor::SetEventCallback (EMCallback cb)
EventCallback = cb;
}
-
/**************************
EventableDescriptor::Close
**************************/
@@ -136,7 +136,8 @@ void EventableDescriptor::Close()
{
// Close the socket right now. Intended for emergencies.
if (MySocket != INVALID_SOCKET) {
- shutdown (MySocket, 1);
+ if(!(bIsAcceptorSocket && EventMachine_t::ShouldPreserveServerSockets()))
+ shutdown (MySocket, 1);
close (MySocket);
MySocket = INVALID_SOCKET;
}
@@ -1264,6 +1265,7 @@ AcceptorDescriptor::AcceptorDescriptor
AcceptorDescriptor::AcceptorDescriptor (int sd, EventMachine_t *parent_em):
EventableDescriptor (sd, parent_em)
{
+ bIsAcceptorSocket = true;
#ifdef HAVE_EPOLL
EpollEvent.events = EPOLLIN;
#endif
View
1  ext/ed.h
@@ -102,6 +102,7 @@ class EventableDescriptor: public Bindable_t
protected:
int MySocket;
+ bool bIsAcceptorSocket;
EMCallback EventCallback;
void _GenericInboundDispatch(const char*, int);
View
37 ext/em.cpp
@@ -26,6 +26,7 @@ See the file COPYING for complete licensing information.
* Now we define it here so that users can change its value if necessary.
*/
static unsigned int MaxOutstandingTimers = 100000;
+static bool PreserveServerSockets_ = false;
/* Internal helper to convert strings to internet addresses. IPv6-aware.
@@ -61,6 +62,22 @@ void EventMachine_t::SetMaxTimerCount (int count)
MaxOutstandingTimers = count;
}
+/********************************************
+STATIC EventMachine_t::PreserveServerSockets
+********************************************/
+
+void EventMachine_t::PreserveServerSockets ()
+{
+ PreserveServerSockets_ = true;
+}
+/*************************************************
+STATIC EventMachine_t::ShouldPreserveServerSockets
+**************************************************/
+
+bool EventMachine_t::ShouldPreserveServerSockets ()
+{
+ return PreserveServerSockets_;
+}
/******************************
@@ -1551,7 +1568,7 @@ const unsigned long EventMachine_t::CreateTcpServer (const char *server, int por
}
}
- { // set CLOEXEC. Only makes sense on Unix
+ if(!EventMachine_t::ShouldPreserveServerSockets()) { // set CLOEXEC. Only makes sense on Unix
#ifdef OS_UNIX
int cloexec = fcntl (sd_accept, F_GETFD, 0);
assert (cloexec >= 0);
@@ -1582,15 +1599,7 @@ const unsigned long EventMachine_t::CreateTcpServer (const char *server, int por
}
}
- { // Looking good.
- AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this);
- if (!ad)
- throw std::runtime_error ("unable to allocate acceptor");
- Add (ad);
- output_binding = ad->GetBinding();
- }
-
- return output_binding;
+ return OutputBinding(sd_accept);
fail:
if (sd_accept != INVALID_SOCKET)
@@ -1598,6 +1607,14 @@ const unsigned long EventMachine_t::CreateTcpServer (const char *server, int por
return 0;
}
+const unsigned long EventMachine_t::OutputBinding(int sd_accept)
+{
+ AcceptorDescriptor *ad = new AcceptorDescriptor (sd_accept, this);
+ if (!ad)
+ throw std::runtime_error ("unable to allocate acceptor");
+ Add (ad);
+ return ad->GetBinding();
+}
/**********************************
EventMachine_t::OpenDatagramSocket
View
3  ext/em.h
@@ -60,6 +60,8 @@ class EventMachine_t
public:
static int GetMaxTimerCount();
static void SetMaxTimerCount (int);
+ static void PreserveServerSockets();
+ static bool ShouldPreserveServerSockets();
public:
EventMachine_t (EMCallback);
@@ -73,6 +75,7 @@ class EventMachine_t
const unsigned long ConnectToUnixServer (const char *);
const unsigned long CreateTcpServer (const char *, int);
+ const unsigned long OutputBinding (int);
const unsigned long OpenDatagramSocket (const char *, int);
const unsigned long CreateUnixDomainServer (const char*);
const unsigned long OpenKeyboard();
View
2  ext/eventmachine.h
@@ -61,6 +61,7 @@ extern "C" {
void evma_stop_tcp_server (const unsigned long signature);
const unsigned long evma_create_tcp_server (const char *address, int port);
+ const unsigned long evma_reuse_server (int descriptor);
const unsigned long evma_create_unix_domain_server (const char *filename);
const unsigned long evma_open_datagram_socket (const char *server, int port);
const unsigned long evma_open_keyboard();
@@ -87,6 +88,7 @@ extern "C" {
int evma_send_file_data_to_connection (const unsigned long binding, const char *filename);
void evma_close_connection (const unsigned long binding, int after_writing);
+ void evma_preserve_server_sockets();
int evma_report_connection_error_status (const unsigned long binding);
void evma_signal_loopbreak();
void evma_set_timer_quantum (int);
View
33 ext/rubymain.cpp
@@ -257,6 +257,18 @@ static VALUE t_stop_server (VALUE self, VALUE signature)
}
+/**************
+t_start_server
+**************/
+
+static VALUE t_reuse_server (VALUE self, VALUE descriptor)
+{
+ const unsigned long f = evma_reuse_server (FIX2INT(descriptor));
+ if (!f)
+ rb_raise (rb_eRuntimeError, "no acceptor (bad descriptor)");
+ return ULONG2NUM (f);
+}
+
/*******************
t_start_unix_server
*******************/
@@ -471,6 +483,16 @@ static VALUE t_close_connection (VALUE self, VALUE signature, VALUE after_writin
return Qnil;
}
+/*************************
+t_preserve_server_sockets
+*************************/
+
+static VALUE t_preserve_server_sockets (VALUE self)
+{
+ evma_preserve_server_sockets();
+ return Qnil;
+}
+
/********************************
t_report_connection_error_status
********************************/
@@ -558,6 +580,14 @@ static VALUE t_detach_fd (VALUE self, VALUE signature)
return INT2NUM(evma_detach_fd (NUM2ULONG (signature)));
}
+/*********************
+t_get_file_descriptor
+*********************/
+static VALUE t_get_file_descriptor (VALUE self, VALUE signature)
+{
+ return INT2NUM(evma_get_file_descriptor (NUM2ULONG (signature)));
+}
+
/**************
t_get_sock_opt
**************/
@@ -1110,6 +1140,7 @@ extern "C" void Init_rubyeventmachine()
rb_define_module_function (EmModule, "add_oneshot_timer", (VALUE(*)(...))t_add_oneshot_timer, 1);
rb_define_module_function (EmModule, "start_tcp_server", (VALUE(*)(...))t_start_server, 2);
rb_define_module_function (EmModule, "stop_tcp_server", (VALUE(*)(...))t_stop_server, 1);
+ rb_define_module_function (EmModule, "reuse_server", (VALUE(*)(...))t_reuse_server, 1);
rb_define_module_function (EmModule, "start_unix_server", (VALUE(*)(...))t_start_unix_server, 1);
rb_define_module_function (EmModule, "set_tls_parms", (VALUE(*)(...))t_set_tls_parms, 4);
rb_define_module_function (EmModule, "start_tls", (VALUE(*)(...))t_start_tls, 1);
@@ -1118,12 +1149,14 @@ extern "C" void Init_rubyeventmachine()
rb_define_module_function (EmModule, "send_datagram", (VALUE(*)(...))t_send_datagram, 5);
rb_define_module_function (EmModule, "close_connection", (VALUE(*)(...))t_close_connection, 2);
rb_define_module_function (EmModule, "report_connection_error_status", (VALUE(*)(...))t_report_connection_error_status, 1);
+ rb_define_module_function (EmModule, "preserve_server_sockets", (VALUE (*)(...))t_preserve_server_sockets, 0);
rb_define_module_function (EmModule, "connect_server", (VALUE(*)(...))t_connect_server, 2);
rb_define_module_function (EmModule, "bind_connect_server", (VALUE(*)(...))t_bind_connect_server, 4);
rb_define_module_function (EmModule, "connect_unix_server", (VALUE(*)(...))t_connect_unix_server, 1);
rb_define_module_function (EmModule, "attach_fd", (VALUE (*)(...))t_attach_fd, 2);
rb_define_module_function (EmModule, "detach_fd", (VALUE (*)(...))t_detach_fd, 1);
+ rb_define_module_function (EmModule, "get_file_descriptor", (VALUE (*)(...))t_get_file_descriptor, 1);
rb_define_module_function (EmModule, "get_sock_opt", (VALUE (*)(...))t_get_sock_opt, 3);
rb_define_module_function (EmModule, "set_notify_readable", (VALUE (*)(...))t_set_notify_readable, 2);
rb_define_module_function (EmModule, "set_notify_writable", (VALUE (*)(...))t_set_notify_writable, 2);
View
6 lib/eventmachine.rb
@@ -519,8 +519,12 @@ def self.start_server server, port=nil, handler=nil, *args, &block
s = if port
start_tcp_server server, port
- else
+ elsif server.kind_of?(String)
start_unix_server server
+ elsif server.kind_of?(Fixnum)
+ reuse_server server
+ else
+ raise "Invalid server spec: #{server.inspect}, #{port.inspect}"
end
@acceptors[s] = [klass,args,block]
s
Something went wrong with that request. Please try again.