Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
...
  • 4 commits
  • 36 files changed
  • 0 commit comments
  • 1 contributor
Commits on Nov 11, 2010
@ccutrer Refactor IOManagers 1b9a0be
@ccutrer Refactor Socket code on Windows
To allow usage with a non-IOCP IOManager (to be written)
1a3af31
@ccutrer check or IOManagerIOCP in HandleStream::init bbb9808
Commits on Nov 12, 2010
@ccutrer IOManagerPoll 1395e28
View
20 mordor/atomic.h
@@ -20,31 +20,31 @@ template <class T>
typename boost::enable_if_c<sizeof(T) == sizeof(LONG), T>::type
atomicDecrement(volatile T& t)
{
- return InterlockedDecrement((volatile LONG*)&t);
+ return (T)InterlockedDecrement((volatile LONG*)&t);
}
template <class T>
typename boost::enable_if_c<sizeof(T) == sizeof(LONG), T>::type
atomicIncrement(volatile T& t)
{
- return InterlockedIncrement((volatile LONG*)&t);
+ return (T)InterlockedIncrement((volatile LONG*)&t);
}
template <class T>
typename boost::enable_if_c<sizeof(T) == sizeof(LONG), T>::type
atomicAdd(volatile T& t, T v)
{
- return InterlockedExchangeAdd((volatile LONG*)&t, (LONG)v) + v;
+ return (T)InterlockedExchangeAdd((volatile LONG*)&t, (LONG)v) + v;
}
template <class T>
typename boost::enable_if_c<sizeof(T) == sizeof(LONG), T>::type
atomicCompareAndSwap(volatile T& t, T newvalue, T comparand)
{
- return InterlockedCompareExchange((volatile LONG*)&t, (LONG)newvalue, (LONG)comparand);
+ return (T)InterlockedCompareExchange((volatile LONG*)&t, (LONG)newvalue, (LONG)comparand);
}
template <class T>
typename boost::enable_if_c<sizeof(T) == sizeof(LONG), T>::type
atomicSwap(volatile T& t, T newvalue)
{
- return InterlockedExchange((volatile LONG*)&t, (LONG)newvalue);
+ return (T)InterlockedExchange((volatile LONG*)&t, (LONG)newvalue);
}
inline
bool
@@ -63,31 +63,31 @@ template <class T>
typename boost::enable_if_c<sizeof(T) == sizeof(LONGLONG), T>::type
atomicDecrement(volatile T& t)
{
- return InterlockedDecrement64((volatile LONGLONG*)&t);
+ return (T)InterlockedDecrement64((volatile LONGLONG*)&t);
}
template <class T>
typename boost::enable_if_c<sizeof(T) == sizeof(LONGLONG), T>::type
atomicIncrement(volatile T& t)
{
- return InterlockedIncrement64((volatile LONGLONG*)&t);
+ return (T)InterlockedIncrement64((volatile LONGLONG*)&t);
}
template <class T>
typename boost::enable_if_c<sizeof(T) == sizeof(LONGLONG), T>::type
atomicAdd(volatile T& t, T v)
{
- return InterlockedExchangeAdd64((volatile LONGLONG*)&t, (LONGLONG)v) + v;
+ return (T)InterlockedExchangeAdd64((volatile LONGLONG*)&t, (LONGLONG)v) + v;
}
template <class T>
typename boost::enable_if_c<sizeof(T) == sizeof(LONGLONG), T>::type
atomicCompareAndSwap(volatile T& t, T newvalue, T comparand)
{
- return InterlockedCompareExchange64((volatile LONGLONG*)&t, (LONGLONG)newvalue, (LONGLONG)comparand);
+ return (T)InterlockedCompareExchange64((volatile LONGLONG*)&t, (LONGLONG)newvalue, (LONGLONG)comparand);
}
template <class T>
typename boost::enable_if_c<sizeof(T) == sizeof(LONGLONG), T>::type
atomicSwap(volatile T& t, T newvalue)
{
- return InterlockedExchange64((volatile LONGLONG*)&t, (LONGLONG)newvalue);
+ return (T)InterlockedExchange64((volatile LONGLONG*)&t, (LONGLONG)newvalue);
}
#endif
#elif defined(OSX)
View
11 mordor/config.cpp
@@ -212,7 +212,7 @@ static void loadFromRegistry(HKEY hKey)
}
}
-Config::RegistryMonitor::RegistryMonitor(IOManager &ioManager,
+Config::RegistryMonitor::RegistryMonitor(IOManagerIOCP &ioManager,
HKEY hKey, const std::wstring &subKey)
: m_ioManager(ioManager),
m_hKey(NULL),
@@ -289,17 +289,20 @@ Config::RegistryMonitor::ptr
Config::monitorRegistry(IOManager &ioManager, HKEY hKey,
const std::string &subKey)
{
- return monitorRegistry(ioManager, hKey, toUtf16(subKey));
+ MORDOR_ASSERT(ioManager.implementation() == IOManager::IOCP);
+ return monitorRegistry(static_cast<IOManagerIOCP &>(ioManager), hKey, toUtf16(subKey));
}
Config::RegistryMonitor::ptr
Config::monitorRegistry(IOManager &ioManager, HKEY hKey,
const std::wstring &subKey)
{
- RegistryMonitor::ptr result(new RegistryMonitor(ioManager, hKey, subKey));
+ MORDOR_ASSERT(ioManager.implementation() == IOManager::IOCP);
+ IOManagerIOCP &iocpIoManager = static_cast<IOManagerIOCP &>(ioManager);
+ RegistryMonitor::ptr result(new RegistryMonitor(iocpIoManager, hKey, subKey));
// Have to wait until after the object is constructed to get the weak_ptr
// we need
- ioManager.registerEvent(result->m_hEvent,
+ iocpIoManager.registerEvent(result->m_hEvent,
boost::bind(&RegistryMonitor::onRegistryChange,
boost::weak_ptr<RegistryMonitor>(result)), true);
Mordor::loadFromRegistry(result->m_hKey);
View
5 mordor/config.h
@@ -25,6 +25,7 @@ namespace Mordor {
#ifdef WINDOWS
class IOManager;
+class IOManagerIOCP;
#endif
class ConfigVarBase : public boost::noncopyable
@@ -150,7 +151,7 @@ class Config
public:
typedef boost::shared_ptr<RegistryMonitor> ptr;
private:
- RegistryMonitor(IOManager &iomanager, HKEY hKey,
+ RegistryMonitor(IOManagerIOCP &iomanager, HKEY hKey,
const std::wstring &subKey);
public:
@@ -161,7 +162,7 @@ class Config
static void onRegistryChange(boost::weak_ptr<RegistryMonitor> self);
private:
- IOManager &m_ioManager;
+ IOManagerIOCP &m_ioManager;
HKEY m_hKey;
HANDLE m_hEvent;
};
View
5 mordor/eventloop.cpp
@@ -63,11 +63,10 @@ EventLoop::~EventLoop()
}
bool
-EventLoop::stopping()
+EventLoop::stoppingInternal()
{
MSG msg;
- return Scheduler::stopping() &&
- !PeekMessage(&msg, NULL, 0, 0, PM_NOREMOVE) && nextTimer() == ~0ull;
+ return !PeekMessage(&msg, NULL, 0, 0, PM_NOREMOVE);
}
void
View
7 mordor/eventloop.h
@@ -4,9 +4,7 @@
#ifdef WINDOWS
-#include "scheduler.h"
-#include "socket.h"
-#include "timer.h"
+#include "timerscheduler.h"
namespace Mordor {
@@ -24,9 +22,8 @@ class EventLoop : public Scheduler, public TimerManager
EventLoop();
~EventLoop();
- bool stopping();
-
protected:
+ bool stoppingInternal();
void idle();
void tickle();
void onTimerInsertedAtFront();
View
11 mordor/examples/echoserver.cpp
@@ -138,14 +138,13 @@ void namedPipeServer(IOManager &ioManager)
int run(int argc, char *argv[])
{
try {
- IOManager ioManager;
- startSocketServer(ioManager);
- startHttpServer(ioManager);
+ boost::scoped_ptr<IOManager> ioManager(IOManager::create());
+ startSocketServer(*ioManager);
+ startHttpServer(*ioManager);
#ifdef WINDOWS
- ioManager.schedule(boost::bind(&namedPipeServer, boost::ref(ioManager)));
- ioManager.schedule(boost::bind(&namedPipeServer, boost::ref(ioManager)));
+ ioManager->schedule(boost::bind(&namedPipeServer, boost::ref(*ioManager)));
+ ioManager->schedule(boost::bind(&namedPipeServer, boost::ref(*ioManager)));
#endif
- ioManager.dispatch();
} catch (...) {
std::cerr << boost::current_exception_diagnostic_information() << std::endl;
return 1;
View
7 mordor/examples/iombench.cpp
@@ -307,13 +307,12 @@ MORDOR_MAIN(int argc, char *argv[])
NetBench bench(argc, argv);
Config::loadFromEnvironment();
- IOManager iom(g_iomThreads->val());
+ boost::scoped_ptr<IOManager> iom(IOManager::create(g_iomThreads->val()));
- IOMBenchServer server(iom);
- IOMBenchClient client(iom);
+ IOMBenchServer server(*iom);
+ IOMBenchClient client(*iom);
bench.run(&server, &client);
- iom.stop();
return 0;
} catch (...) {
std::cerr << "caught: "
View
4 mordor/examples/simpleclient.cpp
@@ -16,10 +16,10 @@ MORDOR_MAIN(int argc, char *argv[])
{
try {
Config::loadFromEnvironment();
- IOManager ioManager;
+ boost::scoped_ptr<IOManager> ioManager(IOManager::create());
std::vector<Address::ptr> addresses =
Address::lookup(argv[1], AF_UNSPEC, SOCK_STREAM);
- Socket::ptr s(addresses[0]->createSocket(ioManager));
+ Socket::ptr s(addresses[0]->createSocket(*ioManager));
s->connect(addresses[0]);
size_t rc = s->send("hello\r\n", 7);
char buf[8192];
View
12 mordor/examples/tunnel.cpp
@@ -151,7 +151,7 @@ static void socketServer(Socket::ptr s, IOManager &ioManager,
MORDOR_MAIN(int argc, char *argv[])
{
Config::loadFromEnvironment();
- IOManager ioManager;
+ boost::scoped_ptr<IOManager> ioManager(IOManager::create());
if (argc <= 2) {
std::cerr << "Usage: (<listen> | -) <to> [-ssl] [<proxy> [<username> <password>]]"
<< std::endl;
@@ -175,11 +175,11 @@ MORDOR_MAIN(int argc, char *argv[])
username = argv[4];
if (argc > 3) {
outgoing = boost::bind(&outgoingProxyConnection, _1,
- boost::ref(ioManager), argv[3], username, password, to,
+ boost::ref(*ioManager), argv[3], username, password, to,
ssl);
} else {
outgoing = boost::bind(&outgoingConnection, _1,
- boost::ref(ioManager), to, ssl);
+ boost::ref(*ioManager), to, ssl);
}
if (to == "-") {
@@ -194,12 +194,12 @@ MORDOR_MAIN(int argc, char *argv[])
for (std::vector<Address::ptr>::const_iterator it(addresses.begin());
it != addresses.end();
++it) {
- Socket::ptr s = (*it)->createSocket(ioManager);
+ Socket::ptr s = (*it)->createSocket(*ioManager);
s->bind(*it);
Scheduler::getThis()->schedule(boost::bind(&socketServer, s,
- boost::ref(ioManager), outgoing));
+ boost::ref(*ioManager), outgoing));
}
- ioManager.yieldTo();
+ ioManager->yieldTo();
}
} catch (...) {
std::cerr << boost::current_exception_diagnostic_information()
View
4 mordor/examples/wget.cpp
@@ -66,7 +66,7 @@ MORDOR_MAIN(int argc, char *argv[])
{
Config::loadFromEnvironment();
StdoutStream stdoutStream;
- IOManager ioManager;
+ boost::scoped_ptr<IOManager> ioManager(IOManager::create());
po::options_description desc("Allowed options");
desc.add_options()
("help", "print this help message")
@@ -95,7 +95,7 @@ MORDOR_MAIN(int argc, char *argv[])
MORDOR_ASSERT(!uri.schemeDefined() || uri.scheme() == "http" || uri.scheme() == "https");
HTTP::RequestBrokerOptions options;
- options.ioManager = &ioManager;
+ options.ioManager = &*ioManager;
std::string username, password, proxyusername, proxypassword;
if (vm.count("username")) username = vm["username"].as<std::string>();
if (vm.count("password")) password = vm["password"].as<std::string>();
View
2 mordor/exception.cpp
@@ -267,6 +267,8 @@ void throwExceptionFromLastError(error_t error)
case ERROR_ACCESS_DENIED:
throw boost::enable_current_exception(AccessDeniedException())
<< errinfo_nativeerror(error);
+ case WSAECANCELLED:
+ case WSA_E_CANCELLED:
case ERROR_OPERATION_ABORTED:
throw boost::enable_current_exception(OperationAbortedException())
<< errinfo_nativeerror(error);
View
72 mordor/iomanager.cpp
@@ -0,0 +1,72 @@
+// Copyright (c) 2010 - Mozy, Inc.
+
+#include "mordor/iomanager.h"
+
+#include "mordor/assert.h"
+#include "mordor/log.h"
+
+namespace Mordor {
+
+static Logger::ptr g_log = Log::lookup("mordor:iomanager");
+
+IOManager *IOManager::create(size_t threads, bool useCaller)
+{
+#ifdef WINDOWS
+ return new IOManagerIOCP(threads, useCaller);
+#elif defined (LINUX)
+ return new IOManagerEPoll(threads, useCaller);
+#elif defined (BSD)
+ return new IOManagerKQueue(threads, useCaller);
+#else
+ return new IOManagerPoll(threads, useCaller)
+#endif
+}
+
+PipeTicklingIOManager::PipeTicklingIOManager(size_t threads, bool useCaller)
+ : IOManager(threads, useCaller)
+{
+#ifndef WINDOWS
+ int rc = pipe(m_tickleFds);
+ MORDOR_LOG_LEVEL(g_log, rc ? Log::ERROR : Log::VERBOSE) << this << " pipe(): "
+ << rc << " (" << errno << ")";
+ if (rc)
+ MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("pipe");
+ MORDOR_ASSERT(m_tickleFds[0] > 0);
+ MORDOR_ASSERT(m_tickleFds[1] > 0);
+#endif
+}
+
+PipeTicklingIOManager::~PipeTicklingIOManager()
+{
+#ifndef WINDOWS
+ close(m_tickleFds[0]);
+ MORDOR_LOG_VERBOSE(g_log) << this << " close(" << m_tickleFds[0] << ")";
+ close(m_tickleFds[1]);
+ MORDOR_LOG_VERBOSE(g_log) << this << " close(" << m_tickleFds[1] << ")";
+#endif
+}
+
+void
+PipeTicklingIOManager::tickle()
+{
+#ifndef WINDOWS
+ int rc = write(m_tickleFds[1], "T", 1);
+ MORDOR_LOG_VERBOSE(g_log) << this << " write(" << m_tickleFds[1] << ", 1): "
+ << rc << " (" << errno << ")";
+ MORDOR_VERIFY(rc == 1);
+#endif
+}
+
+void
+PipeTicklingIOManager::consumeTickle()
+{
+#ifndef WINDOWS
+ unsigned char dummy;
+ int rc2 = read(m_tickleFds[0], &dummy, 1);
+ MORDOR_VERIFY(rc2 == 1);
+ MORDOR_VERIFY(dummy == 'T');
+ MORDOR_LOG_VERBOSE(g_log) << this << " received tickle";
+#endif
+}
+
+}
View
61 mordor/iomanager.h
@@ -2,8 +2,69 @@
#define __MORDOR_IOMANAGER_H__
// Copyright (c) 2009 - Mozy, Inc.
+#include "timerscheduler.h"
#include "version.h"
+namespace Mordor {
+
+class IOManager : public TimerScheduler
+{
+public:
+ enum Event {
+ READ,
+ WRITE,
+ /// CLOSE may not be supported, and will be silently discarded
+ CLOSE
+ };
+
+ enum Implementation
+ {
+ UNKNOWN,
+ SELECT,
+ POLL,
+ EPOLL,
+ IOCP,
+ KQUEUE
+ };
+
+public:
+ IOManager(size_t threads = 1, bool useCaller = true)
+ : TimerScheduler(threads, useCaller)
+ {}
+
+ // Create the most appropriate IOManager for the current platform
+ static IOManager *create(size_t threads = 1, bool useCaller = true);
+
+ virtual void registerEvent(int fd, Event event, boost::function<void ()> dg = NULL) = 0;
+ /// Will not cause the event to fire
+ /// @return If the event was successfully unregistered before firing normally
+ virtual bool unregisterEvent(int fd, Event event) = 0;
+ /// Will cause the event to fire
+ virtual bool cancelEvent(int fd, Event event) = 0;
+
+ /// This is to avoid costly dynamic_casts; check this value, and then do a
+ /// static_cast.
+ virtual Implementation implementation() const { return UNKNOWN; }
+};
+
+// Helper class for common functionality (tickling via a pipe)
+class PipeTicklingIOManager : public IOManager
+{
+public:
+ PipeTicklingIOManager(size_t threads = 1, bool useCaller = true);
+ ~PipeTicklingIOManager();
+
+protected:
+ void tickle();
+ int tickleFd() { return m_tickleFds[0]; }
+ void consumeTickle();
+
+private:
+ int m_tickleFds[2];
+};
+
+};
+
#ifdef WINDOWS
#include "iomanager_iocp.h"
#elif defined(LINUX)
View
71 mordor/iomanager_epoll.cpp
@@ -93,19 +93,19 @@ static std::ostream &operator <<(std::ostream &os, EPOLL_EVENTS events)
return os;
}
-IOManager::AsyncState::AsyncState()
+IOManagerEPoll::AsyncState::AsyncState()
: m_fd(0),
m_events(NONE)
{}
-IOManager::AsyncState::~AsyncState()
+IOManagerEPoll::AsyncState::~AsyncState()
{
boost::mutex::scoped_lock lock(m_mutex);
MORDOR_ASSERT(!m_events);
}
-IOManager::AsyncState::EventContext &
-IOManager::AsyncState::contextForEvent(Event event)
+IOManagerEPoll::AsyncState::EventContext &
+IOManagerEPoll::AsyncState::contextForEvent(Event event)
{
switch (event) {
case READ:
@@ -120,7 +120,7 @@ IOManager::AsyncState::contextForEvent(Event event)
}
bool
-IOManager::AsyncState::triggerEvent(Event event, size_t &pendingEventCount)
+IOManagerEPoll::AsyncState::triggerEvent(Event event, size_t &pendingEventCount)
{
if (!(m_events & event))
return false;
@@ -137,7 +137,7 @@ IOManager::AsyncState::triggerEvent(Event event, size_t &pendingEventCount)
return true;
}
-IOManager::IOManager(size_t threads, bool useCaller)
+IOManagerEPoll::IOManagerEPoll(size_t threads, bool useCaller)
: Scheduler(threads, useCaller),
m_pendingEventCount(0)
{
@@ -146,47 +146,31 @@ IOManager::IOManager(size_t threads, bool useCaller)
<< " epoll_create(5000): " << m_epfd;
if (m_epfd <= 0)
MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("epoll_create");
- int rc = pipe(m_tickleFds);
- MORDOR_LOG_LEVEL(g_log, rc ? Log::ERROR : Log::VERBOSE) << this << " pipe(): "
- << rc << " (" << errno << ")";
- if (rc) {
- close(m_epfd);
- MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("pipe");
- }
- MORDOR_ASSERT(m_tickleFds[0] > 0);
- MORDOR_ASSERT(m_tickleFds[1] > 0);
epoll_event event;
memset(&event, 0, sizeof(epoll_event));
event.events = EPOLLIN | EPOLLET;
- event.data.fd = m_tickleFds[0];
- rc = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event);
+ event.data.fd = tickleFd();
+ rc = epoll_ctl(m_epfd, EPOLL_CTL_ADD, tickleFd(), &event);
MORDOR_LOG_LEVEL(g_log, rc ? Log::ERROR : Log::VERBOSE) << this
<< " epoll_ctl(" << m_epfd << ", EPOLL_CTL_ADD, " << m_tickleFds[0]
<< ", EPOLLIN | EPOLLET): " << rc << " (" << errno << ")";
if (rc) {
- close(m_tickleFds[0]);
- close(m_tickleFds[1]);
close(m_epfd);
MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("epoll_ctl");
}
try {
start();
} catch (...) {
- close(m_tickleFds[0]);
- close(m_tickleFds[1]);
close(m_epfd);
throw;
}
}
-IOManager::~IOManager()
+IOManagerEPoll::~IOManagerEPoll()
{
stop();
close(m_epfd);
MORDOR_LOG_TRACE(g_log) << this << " close(" << m_epfd << ")";
- close(m_tickleFds[0]);
- MORDOR_LOG_VERBOSE(g_log) << this << " close(" << m_tickleFds[0] << ")";
- close(m_tickleFds[1]);
// Yes, it would be more C++-esque to store a boost::shared_ptr in the
// vector, but that requires an extra allocation per fd for the counter
for (size_t i = 0; i < m_pendingEvents.size(); ++i) {
@@ -195,15 +179,8 @@ IOManager::~IOManager()
}
}
-bool
-IOManager::stopping()
-{
- unsigned long long timeout;
- return stopping(timeout);
-}
-
void
-IOManager::registerEvent(int fd, Event event, boost::function<void ()> dg)
+IOManagerEPoll::registerEvent(int fd, Event event, boost::function<void ()> dg)
{
MORDOR_ASSERT(fd > 0);
MORDOR_ASSERT(Scheduler::getThis());
@@ -249,7 +226,7 @@ IOManager::registerEvent(int fd, Event event, boost::function<void ()> dg)
}
bool
-IOManager::unregisterEvent(int fd, Event event)
+IOManagerEPoll::unregisterEvent(int fd, Event event)
{
MORDOR_ASSERT(fd > 0);
MORDOR_ASSERT(event == READ || event == WRITE || event == CLOSE);
@@ -291,7 +268,7 @@ IOManager::unregisterEvent(int fd, Event event)
}
bool
-IOManager::cancelEvent(int fd, Event event)
+IOManagerEPoll::cancelEvent(int fd, Event event)
{
MORDOR_ASSERT(fd > 0);
MORDOR_ASSERT(event == READ || event == WRITE || event == CLOSE);
@@ -327,15 +304,13 @@ IOManager::cancelEvent(int fd, Event event)
}
bool
-IOManager::stopping(unsigned long long &nextTimeout)
+IOManagerEPoll::stoppingInternal()
{
- nextTimeout = nextTimer();
- return nextTimeout == ~0ull && Scheduler::stopping() &&
- m_pendingEventCount == 0;
+ return m_pendingEventCount == 0;
}
void
-IOManager::idle()
+IOManagerEPoll::idle()
{
epoll_event events[64];
while (true) {
@@ -363,11 +338,8 @@ IOManager::idle()
for(int i = 0; i < rc; ++i) {
epoll_event &event = events[i];
- if (event.data.fd == m_tickleFds[0]) {
- unsigned char dummy;
- int rc2 = read(m_tickleFds[0], &dummy, 1);
- MORDOR_VERIFY(rc2 == 1);
- MORDOR_LOG_VERBOSE(g_log) << this << " received tickle";
+ if (event.data.fd == tickleFd()) {
+ consumeTickle();
continue;
}
@@ -415,15 +387,6 @@ IOManager::idle()
}
}
-void
-IOManager::tickle()
-{
- int rc = write(m_tickleFds[1], "T", 1);
- MORDOR_LOG_VERBOSE(g_log) << this << " write(" << m_tickleFds[1] << ", 1): "
- << rc << " (" << errno << ")";
- MORDOR_VERIFY(rc == 1);
-}
-
}
#endif
View
31 mordor/iomanager_epoll.h
@@ -2,9 +2,7 @@
#define __MORDOR_IOMANAGER_EPOLL_H__
// Copyright (c) 2009 - Mozy, Inc.
-#include "scheduler.h"
-#include "timer.h"
-#include "version.h"
+#include "iomanager.h"
#ifndef LINUX
#error IOManagerEPoll is Linux only
@@ -14,16 +12,8 @@ namespace Mordor {
class Fiber;
-class IOManager : public Scheduler, public TimerManager
+class IOManagerEPoll : public PipeTicklingIOManager
{
-public:
- enum Event {
- NONE = 0x0000,
- READ = 0x0001,
- WRITE = 0x0004,
- CLOSE = 0x2000
- };
-
private:
struct AsyncState : boost::noncopyable
{
@@ -48,29 +38,22 @@ class IOManager : public Scheduler, public TimerManager
};
public:
- IOManager(size_t threads = 1, bool useCaller = true);
- ~IOManager();
-
- bool stopping();
+ IOManagerEPoll(size_t threads = 1, bool useCaller = true);
+ ~IOManagerEPoll();
void registerEvent(int fd, Event events,
boost::function<void ()> dg = NULL);
- /// Will not cause the event to fire
- /// @return If the event was successfully unregistered before firing normally
bool unregisterEvent(int fd, Event events);
- /// Will cause the event to fire
bool cancelEvent(int fd, Event events);
+ Implementation implementation() const { return EPOLL; }
+
protected:
- bool stopping(unsigned long long &nextTimeout);
+ bool stoppingInternal();
void idle();
- void tickle();
-
- void onTimerInsertedAtFront() { tickle(); }
private:
int m_epfd;
- int m_tickleFds[2];
size_t m_pendingEventCount;
boost::mutex m_mutex;
std::vector<AsyncState *> m_pendingEvents;
View
88 mordor/iomanager_iocp.cpp
@@ -14,12 +14,12 @@ namespace Mordor {
static Logger::ptr g_log = Log::lookup("mordor:iomanager");
static Logger::ptr g_logWaitBlock = Log::lookup("mordor:iomanager:waitblock");
-AsyncEvent::AsyncEvent()
+IOManagerIOCP::AsyncEvent::AsyncEvent()
{
memset(this, 0, sizeof(AsyncEvent));
}
-IOManager::WaitBlock::WaitBlock(IOManager &outer)
+IOManagerIOCP::WaitBlock::WaitBlock(IOManagerIOCP &outer)
: m_outer(outer),
m_inUseCount(0)
{
@@ -37,7 +37,7 @@ IOManager::WaitBlock::WaitBlock(IOManager &outer)
}
}
-IOManager::WaitBlock::~WaitBlock()
+IOManagerIOCP::WaitBlock::~WaitBlock()
{
MORDOR_ASSERT(m_inUseCount <= 0);
BOOL bRet = CloseHandle(m_handles[0]);
@@ -49,7 +49,7 @@ IOManager::WaitBlock::~WaitBlock()
}
bool
-IOManager::WaitBlock::registerEvent(HANDLE hEvent,
+IOManagerIOCP::WaitBlock::registerEvent(HANDLE hEvent,
boost::function <void ()> dg,
bool recurring)
{
@@ -76,7 +76,7 @@ IOManager::WaitBlock::registerEvent(HANDLE hEvent,
typedef boost::function<void ()> functor;
size_t
-IOManager::WaitBlock::unregisterEvent(HANDLE handle)
+IOManagerIOCP::WaitBlock::unregisterEvent(HANDLE handle)
{
boost::mutex::scoped_lock lock(m_mutex);
if (m_inUseCount == -1)
@@ -110,7 +110,7 @@ IOManager::WaitBlock::unregisterEvent(HANDLE handle)
}
void
-IOManager::WaitBlock::run()
+IOManagerIOCP::WaitBlock::run()
{
DWORD dwRet;
DWORD count;
@@ -196,7 +196,7 @@ IOManager::WaitBlock::run()
}
void
-IOManager::WaitBlock::removeEntry(int index)
+IOManagerIOCP::WaitBlock::removeEntry(int index)
{
memmove(&m_handles[index], &m_handles[index + 1], (m_inUseCount - index) * sizeof(HANDLE));
memmove(&m_schedulers[index], &m_schedulers[index + 1], (m_inUseCount - index) * sizeof(Scheduler *));
@@ -211,10 +211,11 @@ IOManager::WaitBlock::removeEntry(int index)
memmove(&m_recurring[index], &m_recurring[index + 1], (m_inUseCount - index) * sizeof(bool));
}
-IOManager::IOManager(size_t threads, bool useCaller)
- : Scheduler(threads, useCaller)
+IOManagerIOCP::IOManagerIOCP(size_t threads, bool useCaller)
+ : IOManager(threads, useCaller),
+ m_pendingEventCount(0),
+ m_posixStyleIOManager(NULL)
{
- m_pendingEventCount = 0;
m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
MORDOR_LOG_LEVEL(g_log, m_hCompletionPort ? Log::VERBOSE : Log::ERROR) << this <<
" CreateIoCompletionPort(): " << m_hCompletionPort << " ("
@@ -229,21 +230,37 @@ IOManager::IOManager(size_t threads, bool useCaller)
}
}
-IOManager::~IOManager()
+IOManagerIOCP::~IOManagerIOCP()
{
+ if (m_posixStyleIOManager)
+ delete m_posixStyleIOManager;
stop();
CloseHandle(m_hCompletionPort);
}
+void
+IOManagerIOCP::registerEvent(int fd, Event event, boost::function<void ()> dg)
+{
+ ensurePosixStyleIOManager();
+ m_posixStyleIOManager->registerEvent(fd, event, dg);
+}
+
bool
-IOManager::stopping()
+IOManagerIOCP::unregisterEvent(int fd, Event event)
{
- unsigned long long timeout;
- return stopping(timeout);
+ ensurePosixStyleIOManager();
+ return m_posixStyleIOManager->unregisterEvent(fd, event);
+}
+
+bool
+IOManagerIOCP::cancelEvent(int fd, Event event)
+{
+ ensurePosixStyleIOManager();
+ return m_posixStyleIOManager->cancelEvent(fd, event);
}
void
-IOManager::registerFile(HANDLE handle)
+IOManagerIOCP::registerFile(HANDLE handle)
{
HANDLE hRet = CreateIoCompletionPort(handle, m_hCompletionPort, 0, 0);
MORDOR_LOG_LEVEL(g_log, m_hCompletionPort ? Log::DEBUG : Log::ERROR) << this <<
@@ -255,7 +272,7 @@ IOManager::registerFile(HANDLE handle)
}
void
-IOManager::registerEvent(AsyncEvent *e)
+IOManagerIOCP::registerEvent(AsyncEvent *e)
{
MORDOR_ASSERT(e);
e->m_scheduler = Scheduler::getThis();
@@ -278,7 +295,7 @@ IOManager::registerEvent(AsyncEvent *e)
}
void
-IOManager::unregisterEvent(AsyncEvent *e)
+IOManagerIOCP::unregisterEvent(AsyncEvent *e)
{
MORDOR_ASSERT(e);
MORDOR_LOG_DEBUG(g_log) << this << " unregisterEvent(" << &e->overlapped << ")";
@@ -301,7 +318,7 @@ IOManager::unregisterEvent(AsyncEvent *e)
}
void
-IOManager::registerEvent(HANDLE handle, boost::function<void ()> dg, bool recurring)
+IOManagerIOCP::registerEvent(HANDLE handle, boost::function<void ()> dg, bool recurring)
{
MORDOR_LOG_DEBUG(g_log) << this << " registerEvent(" << handle << ", " << dg
<< ")";
@@ -322,7 +339,7 @@ IOManager::registerEvent(HANDLE handle, boost::function<void ()> dg, bool recurr
}
size_t
-IOManager::unregisterEvent(HANDLE handle)
+IOManagerIOCP::unregisterEvent(HANDLE handle)
{
MORDOR_ASSERT(handle);
boost::mutex::scoped_lock lock(m_mutex);
@@ -337,7 +354,7 @@ IOManager::unregisterEvent(HANDLE handle)
}
void
-IOManager::cancelEvent(HANDLE hFile, AsyncEvent *e)
+IOManagerIOCP::cancelEvent(HANDLE hFile, AsyncEvent *e)
{
MORDOR_ASSERT(hFile);
MORDOR_ASSERT(e);
@@ -370,20 +387,18 @@ IOManager::cancelEvent(HANDLE hFile, AsyncEvent *e)
}
bool
-IOManager::stopping(unsigned long long &nextTimeout)
+IOManagerIOCP::stoppingInternal()
{
- nextTimeout = nextTimer();
- if (nextTimeout == ~0ull && Scheduler::stopping()) {
- if (m_pendingEventCount != 0)
- return false;
- boost::mutex::scoped_lock lock(m_mutex);
- return m_waitBlocks.empty();
- }
- return false;
+ if (m_pendingEventCount != 0)
+ return false;
+ if (m_posixStyleIOManager && !m_posixStyleIOManager->stopping())
+ return false;
+ boost::mutex::scoped_lock lock(m_mutex);
+ return m_waitBlocks.empty();
}
void
-IOManager::idle()
+IOManagerIOCP::idle()
{
OVERLAPPED_ENTRY events[64];
ULONG count;
@@ -476,7 +491,7 @@ IOManager::idle()
}
void
-IOManager::tickle()
+IOManagerIOCP::tickle()
{
BOOL bRet = PostQueuedCompletionStatus(m_hCompletionPort, 0, ~0, NULL);
MORDOR_LOG_LEVEL(g_log, bRet ? Log::DEBUG : Log::ERROR) << this
@@ -486,4 +501,15 @@ IOManager::tickle()
MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("PostQueuedCompletionStatus");
}
+void
+IOManagerIOCP::ensurePosixStyleIOManager()
+{
+ if (!m_posixStyleIOManager) {
+ IOManager *newIOManager = NULL;
+ // TODO: use poll or select
+ if (atomicCompareAndSwap(m_posixStyleIOManager, newIOManager, (IOManager *)NULL) != NULL)
+ delete newIOManager;
+ }
+}
+
}
View
49 mordor/iomanager_iocp.h
@@ -7,8 +7,7 @@
#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
-#include "scheduler.h"
-#include "timer.h"
+#include "iomanager.h"
#include "version.h"
#ifndef WINDOWS
@@ -19,27 +18,33 @@ namespace Mordor {
class Fiber;
-struct AsyncEvent
+/// @brief IOManager using I/O Completion Ports
+///
+/// IOManagerIOCP offers additional functionality over a generic IOManager,
+/// including I/O Completion Port style asynchronous I/O, and notification of
+/// any HANDLE being signalled
+class IOManagerIOCP : public IOManager
{
- AsyncEvent();
+ friend class WaitBlock;
+public:
+ struct AsyncEvent
+ {
+ AsyncEvent();
- OVERLAPPED overlapped;
+ OVERLAPPED overlapped;
- Scheduler *m_scheduler;
- tid_t m_thread;
- boost::shared_ptr<Fiber> m_fiber;
-};
+ Scheduler *m_scheduler;
+ tid_t m_thread;
+ boost::shared_ptr<Fiber> m_fiber;
+ };
-class IOManager : public Scheduler, public TimerManager
-{
- friend class WaitBlock;
private:
class WaitBlock : public boost::enable_shared_from_this<WaitBlock>
{
public:
typedef boost::shared_ptr<WaitBlock> ptr;
public:
- WaitBlock(IOManager &outer);
+ WaitBlock(IOManagerIOCP &outer);
~WaitBlock();
bool registerEvent(HANDLE handle, boost::function<void ()> dg,
@@ -52,7 +57,7 @@ class IOManager : public Scheduler, public TimerManager
private:
boost::mutex m_mutex;
- IOManager &m_outer;
+ IOManagerIOCP &m_outer;
HANDLE m_reconfigured;
HANDLE m_handles[MAXIMUM_WAIT_OBJECTS];
Scheduler *m_schedulers[MAXIMUM_WAIT_OBJECTS];
@@ -63,10 +68,14 @@ class IOManager : public Scheduler, public TimerManager
};
public:
- IOManager(size_t threads = 1, bool useCaller = true);
- ~IOManager();
+ IOManagerIOCP(size_t threads = 1, bool useCaller = true);
+ ~IOManagerIOCP();
- bool stopping();
+ void registerEvent(int fd, Event event, boost::function<void ()> dg = NULL);
+ bool unregisterEvent(int fd, Event event);
+ bool cancelEvent(int fd, Event event);
+
+ Implementation implementation() const { return IOCP; }
void registerFile(HANDLE handle);
void registerEvent(AsyncEvent *e);
@@ -80,11 +89,12 @@ class IOManager : public Scheduler, public TimerManager
void cancelEvent(HANDLE hFile, AsyncEvent *e);
protected:
- bool stopping(unsigned long long &nextTimeout);
+ bool stoppingInternal();
void idle();
void tickle();
- void onTimerInsertedAtFront() { tickle(); }
+private:
+ void ensurePosixStyleIOManager();
private:
HANDLE m_hCompletionPort;
@@ -94,6 +104,7 @@ class IOManager : public Scheduler, public TimerManager
size_t m_pendingEventCount;
boost::mutex m_mutex;
std::list<WaitBlock::ptr> m_waitBlocks;
+ IOManager *m_posixStyleIOManager;
};
}
View
152 mordor/iomanager_kqueue.cpp
@@ -13,65 +13,41 @@ namespace Mordor {
static Logger::ptr g_log = Log::lookup("mordor:iomanager");
-IOManager::IOManager(size_t threads, bool useCaller)
+IOManagerKQueue::IOManagerKQueue(size_t threads, bool useCaller)
: Scheduler(threads, useCaller)
{
m_kqfd = kqueue();
MORDOR_LOG_LEVEL(g_log, m_kqfd <= 0 ? Log::ERROR : Log::TRACE) << this
<< " kqueue(): " << m_kqfd;
- if (m_kqfd <= 0) {
+ if (m_kqfd <= 0)
MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("kqueue");
- }
- int rc = pipe(m_tickleFds);
- MORDOR_LOG_LEVEL(g_log, rc ? Log::ERROR : Log::VERBOSE) << this << " pipe(): "
- << rc << " (" << errno << ")";
- if (rc) {
- close(m_kqfd);
- MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("pipe");
- }
- MORDOR_ASSERT(m_tickleFds[0] > 0);
- MORDOR_ASSERT(m_tickleFds[1] > 0);
struct kevent event;
- EV_SET(&event, m_tickleFds[0], EVFILT_READ, EV_ADD, 0, 0, NULL);
+ EV_SET(&event, tickleFd(), EVFILT_READ, EV_ADD, 0, 0, NULL);
rc = kevent(m_kqfd, &event, 1, NULL, 0, NULL);
MORDOR_LOG_LEVEL(g_log, rc ? Log::ERROR : Log::VERBOSE) << this << " kevent("
- << m_kqfd << ", (" << m_tickleFds[0] << ", EVFILT_READ, EV_ADD)): " << rc
+ << m_kqfd << ", (" << tickleFd() << ", EVFILT_READ, EV_ADD)): " << rc
<< " (" << errno << ")";
if (rc) {
- close(m_tickleFds[0]);
- close(m_tickleFds[1]);
close(m_kqfd);
MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("kevent");
}
try {
start();
} catch (...) {
- close(m_tickleFds[0]);
- close(m_tickleFds[1]);
close(m_kqfd);
throw;
}
}
-IOManager::~IOManager()
+IOManagerKQueue::~IOManagerKQueue()
{
stop();
close(m_kqfd);
MORDOR_LOG_TRACE(g_log) << this << " close(" << m_kqfd << ")";
- close(m_tickleFds[0]);
- MORDOR_LOG_VERBOSE(g_log) << this << " close(" << m_tickleFds[0] << ")";
- close(m_tickleFds[1]);
-}
-
-bool
-IOManager::stopping()
-{
- unsigned long long timeout;
- return stopping(timeout);
}
void
-IOManager::registerEvent(int fd, Event events,
+IOManagerKQueue::registerEvent(int fd, Event events,
boost::function<void ()> dg)
{
MORDOR_ASSERT(fd > 0);
@@ -120,8 +96,47 @@ IOManager::registerEvent(int fd, Event events,
MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("kevent");
}
-void
-IOManager::cancelEvent(int fd, Event events)
+bool
+IOManagerKQueue::unregisterEvent(int fd, Event events)
+{
+ Event eventsKey = events;
+ if (eventsKey == CLOSE)
+ eventsKey = READ;
+ boost::mutex::scoped_lock lock(m_mutex);
+ std::map<std::pair<int, Event>, AsyncEvent>::iterator it =
+ m_pendingEvents.find(std::pair<int, Event>(fd, eventsKey));
+ if (it == m_pendingEvents.end())
+ return false;
+ AsyncEvent &e = it->second;
+ MORDOR_ASSERT(e.event.ident == (unsigned)fd);
+ if (events == READ || events == WRITE) {
+ if (!e.m_fiber && !e.m_dg)
+ return false;
+ e.m_fiber.reset();
+ e.m_dg = NULL;
+ if (e.m_fiberClose || e.m_dgClose)
+ return true;
+ } else if (events == CLOSE) {
+ if (!e.m_fiberClose && !e.m_dgClose)
+ return false;
+ e.m_fiberClose.reset();
+ e.m_dgClose = NULL;
+ if (e.m_fiber || e.m_dg)
+ return true;
+ }
+ e.event.flags = EV_DELETE;
+ int rc = kevent(m_kqfd, &e.event, 1, NULL, 0, NULL);
+ MORDOR_LOG_LEVEL(g_log, rc ? Log::ERROR : Log::VERBOSE) << this << " kevent("
+ << m_kqfd << ", (" << fd << ", " << eventsKey << ", EV_DELETE)): " << rc
+ << " (" << errno << ")";
+ if (rc)
+ MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("kevent");
+ m_pendingEvents.erase(it);
+ return true;
+}
+
+bool
+IOManagerKQueue::cancelEvent(int fd, Event events)
{
Event eventsKey = events;
if (eventsKey == CLOSE)
@@ -130,7 +145,7 @@ IOManager::cancelEvent(int fd, Event events)
std::map<std::pair<int, Event>, AsyncEvent>::iterator it =
m_pendingEvents.find(std::pair<int, Event>(fd, eventsKey));
if (it == m_pendingEvents.end())
- return;
+ return false;
AsyncEvent &e = it->second;
MORDOR_ASSERT(e.event.ident == (unsigned)fd);
Scheduler *scheduler;
@@ -145,7 +160,7 @@ IOManager::cancelEvent(int fd, Event events)
scheduler->schedule(dg);
else
scheduler->schedule(fiber);
- return;
+ return true;
}
} else if (events == CLOSE) {
scheduler = e.m_schedulerClose;
@@ -156,7 +171,7 @@ IOManager::cancelEvent(int fd, Event events)
scheduler->schedule(dg);
else
scheduler->schedule(fiber);
- return;
+ return true;
}
} else if (events == WRITE) {
scheduler = e.m_scheduler;
@@ -179,55 +194,15 @@ IOManager::cancelEvent(int fd, Event events)
m_pendingEvents.erase(it);
}
-void
-IOManager::unregisterEvent(int fd, Event events)
-{
- Event eventsKey = events;
- if (eventsKey == CLOSE)
- eventsKey = READ;
- boost::mutex::scoped_lock lock(m_mutex);
- std::map<std::pair<int, Event>, AsyncEvent>::iterator it =
- m_pendingEvents.find(std::pair<int, Event>(fd, eventsKey));
- if (it == m_pendingEvents.end())
- return;
- AsyncEvent &e = it->second;
- MORDOR_ASSERT(e.event.ident == (unsigned)fd);
- if (events == READ) {
- e.m_fiber.reset();
- e.m_dg = NULL;
- if (e.m_fiberClose || e.m_dgClose)
- return;
- } else if (events == CLOSE) {
- e.m_fiberClose.reset();
- e.m_dgClose = NULL;
- if (e.m_fiber || e.m_dg)
- return;
- }
- e.event.flags = EV_DELETE;
- int rc = kevent(m_kqfd, &e.event, 1, NULL, 0, NULL);
- MORDOR_LOG_LEVEL(g_log, rc ? Log::ERROR : Log::VERBOSE) << this << " kevent("
- << m_kqfd << ", (" << fd << ", " << eventsKey << ", EV_DELETE)): " << rc
- << " (" << errno << ")";
- if (rc)
- MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("kevent");
- m_pendingEvents.erase(it);
-}
-
-
bool
-IOManager::stopping(unsigned long long &nextTimeout)
+IOManagerKQueue::stoppingInternal()
{
- nextTimeout = nextTimer();
- if (nextTimeout == ~0ull && Scheduler::stopping()) {
- boost::mutex::scoped_lock lock(m_mutex);
- if (m_pendingEvents.empty())
- return true;
- }
- return false;
+ boost::mutex::scoped_lock lock(m_mutex);
+ return m_pendingEvents.empty();
}
void
-IOManager::idle()
+IOManagerKQueue::idle()
{
struct kevent events[64];
while (true) {
@@ -256,14 +231,8 @@ IOManager::idle()
for(int i = 0; i < rc; ++i) {
struct kevent &event = events[i];
- if ((int)event.ident == m_tickleFds[0]) {
- unsigned char dummy;
-#ifdef DEBUG
- int rc2 =
-#endif
- read(m_tickleFds[0], &dummy, 1);
- MORDOR_ASSERT(rc2 == 1);
- MORDOR_LOG_VERBOSE(g_log) << this << " received tickle";
+ if ((int)event.ident == tickleFd()) {
+ consumeTickle();
continue;
}
@@ -325,15 +294,6 @@ IOManager::idle()
}
}
-void
-IOManager::tickle()
-{
- int rc = write(m_tickleFds[1], "T", 1);
- MORDOR_LOG_VERBOSE(g_log) << this << " write(" << m_tickleFds[1] << ", 1): "
- << rc << " (" << errno << ")";
- MORDOR_VERIFY(rc == 1);
-}
-
}
#endif
View
31 mordor/iomanager_kqueue.h
@@ -7,25 +7,16 @@
#include <map>
-#include "scheduler.h"
-#include "timer.h"
-#include "version.h"
+#include "iomanager.h"
#ifndef BSD
#error IOManagerKQueue is BSD only
#endif
namespace Mordor {
-class IOManager : public Scheduler, public TimerManager
+class IOManagerKQueue : public PipeTicklingIOManager
{
-public:
- enum Event {
- READ,
- WRITE,
- CLOSE
- };
-
private:
struct AsyncEvent
{
@@ -40,25 +31,21 @@ class IOManager : public Scheduler, public TimerManager
};
public:
- IOManager(size_t threads = 1, bool useCaller = true);
- ~IOManager();
-
- bool stopping();
+ IOManagerKQueue(size_t threads = 1, bool useCaller = true);
+ ~IOManagerKQueue();
void registerEvent(int fd, Event events, boost::function<void ()> dg = NULL);
- void cancelEvent(int fd, Event events);
- void unregisterEvent(int fd, Event events);
+ bool unregisterEvent(int fd, Event events);
+ bool cancelEvent(int fd, Event events);
+
+ Implementation implementation() const { return KQUEUE; }
protected:
- bool stopping(unsigned long long &nextTimeout);
+ bool stoppingInternal();
void idle();
- void tickle();
-
- void onTimerInsertedAtFront() { tickle(); }
private:
int m_kqfd;
- int m_tickleFds[2];
std::map<std::pair<int, Event>, AsyncEvent> m_pendingEvents;
boost::mutex m_mutex;
};
View
63 mordor/iomanager_poll.cpp
@@ -0,0 +1,63 @@
+// Copyright (c) 2010 - Mozy, Inc.
+
+#include "iomanager_poll.h"
+
+namespace Mordor {
+
+IOManagerPoll::IOManagerPoll(size_t threads, bool useCaller)
+ : PipeTicklingIOManager(threads, useCaller)
+{
+}
+
+void
+IOManagerPoll::registerEvent(int fd, Event event, boost::function<void ()> dg)
+{
+ WSAPOLLFD pollfd;
+ pollfd.
+}
+
+#ifdef WINDOWS
+#define poll WSAPoll
+#endif
+#define pollStr #poll
+
+void
+IOManagerPoll::idle()
+{
+ std::vector<WSA(POLLFD)> localEvents;
+ while (true) {
+ unsigned long long nextTimeout;
+ if (stopping(nextTimeout))
+ return;
+ int rc = -1;
+ lastError(WSA(EINTR));
+ while (rc < 0 && lastError() == WSA(EINTR)) {
+ timeout = NULL;
+ if (nextTimeout != ~0ull)
+ timeout = 1;
+ rc =
+#ifdef WINDOWS
+ WSAPoll(
+#else
+ poll(
+#endif
+ localEvents.data(), localEvents.size(), timeout);
+ if (rc < 0 && lastError() == WSA(EINTR))
+ nextTimeout = nextTimer();
+ }
+ if (rc < 0)
+ MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API(
+#ifdef WINDOWS
+ "WSAPoll"
+#else
+ "poll"
+#endif
+ );
+
+ }
+
+
+}
+
+
+}
View
61 mordor/iomanager_poll.h
@@ -0,0 +1,61 @@
+#ifndef __MORDOR_IOMANAGER_POLL_H__
+#define __MORDOR_IOMANAGER_POLL_H__
+// Copyright (c) 2010 - Mozy, Inc.
+
+#include <boost/thread/mutex.hpp>
+
+#include "iomanager.h"
+
+#ifdef WINDOWS
+#define WSA(error) WSA ## error
+#else
+#define WSA(error) error
+#endif
+
+namespace Mordor {
+
+class IOManagerPoll : public PipeTicklingIOManager
+{
+private:
+ struct AsyncState : boost::noncopyable
+ {
+ AsyncState();
+ ~AsyncState();
+
+ struct EventContext
+ {
+ EventContext() : scheduler(NULL) {}
+ Scheduler *scheduler;
+ boost::shared_ptr<Fiber> fiber;
+ boost::function<void ()> dg;
+ };
+
+ EventContext &contextForEvent(Event event);
+ bool triggerEvent(Event event, size_t &pendingEventCount);
+
+ int m_fd;
+ EventContext m_in, m_out;
+ Event m_events;
+ };
+public:
+ IOManagerPoll(size_t threads = 1, bool useCaller = true);
+ ~IOManagerPoll();
+
+ void registerEvent(int fd, Event event, boost::function<void ()> dg = NULL);
+ bool unregisterEvent(int fd, Event event);
+ bool cancelEvent(int fd, Event event);
+
+protected:
+ bool stoppingInternal();
+ void idle();
+ void tickle();
+
+private:
+ boost::mutex m_mutex;
+ std::vector<WSA(POLLFD)> m_pendingEvents;
+ std::vector<AsyncState *> m_events;
+};
+
+}
+
+#endif
View
5 mordor/mordor.vcxproj
@@ -103,6 +103,8 @@
<ClCompile Include="http\auth.cpp" />
<ClCompile Include="http\basic.cpp" />
<ClCompile Include="http\broker.cpp" />
+ <ClCompile Include="iomanager.cpp" />
+ <ClCompile Include="iomanager_poll.cpp" />
<ClCompile Include="main.cpp" />
<ClCompile Include="daemon.cpp" />
<ClCompile Include="parallel.cpp" />
@@ -167,6 +169,7 @@
<ClCompile Include="timer.cpp" />
<ClCompile Include="streams\transfer.cpp" />
<ClCompile Include="streams\zlib.cpp" />
+ <ClCompile Include="timerscheduler.cpp" />
<ClCompile Include="workerpool.cpp" />
<ClCompile Include="zip.cpp" />
</ItemGroup>
@@ -178,6 +181,7 @@
<ClInclude Include="http\auth.h" />
<ClInclude Include="http\basic.h" />
<ClInclude Include="http\broker.h" />
+ <ClInclude Include="iomanager_poll.h" />
<ClInclude Include="main.h" />
<ClInclude Include="daemon.h" />
<ClInclude Include="parallel.h" />
@@ -221,6 +225,7 @@
<ClInclude Include="http\oauth.h" />
<ClInclude Include="http\parser.h" />
<ClInclude Include="thread.h" />
+ <ClInclude Include="timerscheduler.h" />
<ClInclude Include="workerpool.h" />
<ClInclude Include="xml\parser.h" />
<ClInclude Include="pch.h" />
View
15 mordor/mordor.vcxproj.filters
@@ -206,6 +206,15 @@
<ClCompile Include="zip.cpp">
<Filter>Source Files</Filter>
</ClCompile>
+ <ClCompile Include="timerscheduler.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
+ <ClCompile Include="iomanager.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
+ <ClCompile Include="iomanager_poll.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="anymap.h">
@@ -463,6 +472,12 @@
<ClInclude Include="zip.h">
<Filter>Header Files</Filter>
</ClInclude>
+ <ClInclude Include="timerscheduler.h">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="iomanager_poll.h">
+ <Filter>Header Files</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<Ragel Include="http\http_parser.rl" />
View
393 mordor/socket.cpp
@@ -26,6 +26,12 @@
#define closesocket close
#endif
+#ifdef WINDOWS
+#define WSA(error) WSA ## error
+#else
+#define WSA(error) error
+#endif
+
namespace Mordor {
namespace {
@@ -171,14 +177,17 @@ Socket::Socket(IOManager *ioManager, int family, int type, int protocol, int ini
m_cancelledSend(0),
m_cancelledReceive(0),
#ifdef WINDOWS
+ m_iocpIoManager(NULL),
m_hEvent(NULL),
m_scheduler(NULL),
#endif
m_isConnected(false),
m_isRegisteredForRemoteClose(false)
{
#ifdef WINDOWS
- if (pAcceptEx && m_ioManager) {
+ if (m_ioManager->implementation() == IOManager::IOCP)
+ m_iocpIoManager = static_cast<IOManagerIOCP *>(m_ioManager);
+ if (pAcceptEx && m_iocpIoManager) {
m_sock = socket(family, type, protocol);
MORDOR_LOG_LEVEL(g_log, m_sock == -1 ? Log::ERROR : Log::DEBUG) << this
<< " socket(" << (Family)family << ", " << (Type)type << ", "
@@ -204,6 +213,9 @@ Socket::Socket(int family, int type, int protocol)
<< lastError() << ")";
if (m_sock == -1)
MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("socket");
+#ifdef WINDOWS
+ m_iocpIoManager = NULL;
+#endif
#ifdef OSX
unsigned int opt = 1;
if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof(opt)) == -1) {
@@ -223,6 +235,7 @@ Socket::Socket(IOManager &ioManager, int family, int type, int protocol)
m_cancelledSend(0),
m_cancelledReceive(0),
#ifdef WINDOWS
+ m_iocpIoManager(NULL),
m_hEvent(NULL),
m_scheduler(NULL),
#endif
@@ -236,15 +249,25 @@ Socket::Socket(IOManager &ioManager, int family, int type, int protocol)
if (m_sock == -1)
MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("socket");
#ifdef WINDOWS
- try {
- m_ioManager->registerFile((HANDLE)m_sock);
- m_skipCompletionPortOnSuccess =
- !!pSetFileCompletionNotificationModes((HANDLE)m_sock,
- FILE_SKIP_COMPLETION_PORT_ON_SUCCESS |
- FILE_SKIP_SET_EVENT_ON_HANDLE);
- } catch(...) {
- closesocket(m_sock);
- throw;
+ if (m_ioManager->implementation() == IOManager::IOCP)
+ m_iocpIoManager = static_cast<IOManagerIOCP *>(m_ioManager);
+ if (m_iocpIoManager) {
+ try {
+ m_iocpIoManager->registerFile((HANDLE)m_sock);
+ m_skipCompletionPortOnSuccess =
+ !!pSetFileCompletionNotificationModes((HANDLE)m_sock,
+ FILE_SKIP_COMPLETION_PORT_ON_SUCCESS |
+ FILE_SKIP_SET_EVENT_ON_HANDLE);
+ } catch(...) {
+ closesocket(m_sock);
+ throw;
+ }
+ } else if (m_ioManager) {
+ u_long arg = 1;
+ if (ioctlsocket(m_sock, FIONBIO, &arg) == SOCKET_ERROR) {
+ ::closesocket(m_sock);
+ MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("fcntl");
+ }
}
#else
if (fcntl(m_sock, F_SETFL, O_NONBLOCK) == -1) {
@@ -264,9 +287,9 @@ Socket::Socket(IOManager &ioManager, int family, int type, int protocol)
Socket::~Socket()
{
#ifdef WINDOWS
- if (m_ioManager && m_hEvent) {
+ if (m_iocpIoManager && m_hEvent) {
if (m_isRegisteredForRemoteClose) {
- m_ioManager->unregisterEvent(m_hEvent);
+ m_iocpIoManager->unregisterEvent(m_hEvent);
WSAEventSelect(m_sock, m_hEvent, 0);
}
CloseHandle(m_hEvent);
@@ -302,6 +325,14 @@ Socket::bind(Address::ptr addr)
bind(*addr);
}
+#ifdef WINDOWS
+static void cancelEventLocal(IOManagerIOCP *ioManager, HANDLE sock,
+ IOManagerIOCP::AsyncEvent *event)
+{
+ ioManager->cancelEvent(sock, event);
+}
+#endif
+
void
Socket::connect(const Address &to)
{
@@ -313,8 +344,8 @@ Socket::connect(const Address &to)
MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("connect");
}
MORDOR_LOG_INFO(g_log) << this << " connect(" << m_sock << ", " << to << ")";
- } else {
#ifdef WINDOWS
+ } else if (m_iocpIoManager) {
if (ConnectEx) {
if (!m_localAddress) {
// need to be bound, even to ADDR_ANY, before calling ConnectEx
@@ -356,11 +387,11 @@ Socket::connect(const Address &to)
}
}
- m_ioManager->registerEvent(&m_sendEvent);
+ m_iocpIoManager->registerEvent(&m_sendEvent);
BOOL bRet = ConnectEx(m_sock, to.name(), to.nameLen(), NULL, 0, NULL, &m_sendEvent.overlapped);
if (!bRet && GetLastError() != WSA_IO_PENDING) {
if (GetLastError() == WSAEINVAL) {
- m_ioManager->unregisterEvent(&m_sendEvent);
+ m_iocpIoManager->unregisterEvent(&m_sendEvent);
// Some LSPs are *borken* (I'm looking at you, bmnet.dll),
// and don't properly support ConnectEx (and AcceptEx). In
// that case, go to how we work on Windows 2000 without
@@ -369,24 +400,24 @@ Socket::connect(const Address &to)
}
MORDOR_LOG_ERROR(g_log) << this << " ConnectEx(" << m_sock
<< ", " << to << "): (" << lastError() << ")";
- m_ioManager->unregisterEvent(&m_sendEvent);
+ m_iocpIoManager->unregisterEvent(&m_sendEvent);
MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("ConnectEx");
}
if (m_skipCompletionPortOnSuccess && bRet) {
- m_ioManager->unregisterEvent(&m_sendEvent);
+ m_iocpIoManager->unregisterEvent(&m_sendEvent);
m_sendEvent.overlapped.Internal = STATUS_SUCCESS;
} else {
if (m_cancelledSend) {
MORDOR_LOG_ERROR(g_log) << this << " ConnectEx(" << m_sock << ", " << to
<< "): (" << m_cancelledSend << ")";
- m_ioManager->cancelEvent((HANDLE)m_sock, &m_sendEvent);
+ m_iocpIoManager->cancelEvent((HANDLE)m_sock, &m_sendEvent);
Scheduler::yieldTo();
MORDOR_THROW_EXCEPTION_FROM_ERROR_API(m_cancelledSend, "ConnectEx");
}
Timer::ptr timeout;
if (m_sendTimeout != ~0ull)
timeout = m_ioManager->registerTimer(m_sendTimeout, boost::bind(
- &IOManager::cancelEvent, m_ioManager, (HANDLE)m_sock, &m_sendEvent));
+ &cancelEventLocal, m_iocpIoManager, (HANDLE)m_sock, &m_sendEvent));
Scheduler::yieldTo();
if (timeout)
timeout->cancel();
@@ -419,21 +450,21 @@ Socket::connect(const Address &to)
return;
}
if (GetLastError() == WSAEWOULDBLOCK) {
- m_ioManager->registerEvent(m_hEvent);
+ m_iocpIoManager->registerEvent(m_hEvent);
m_fiber = Fiber::getThis();
m_scheduler = Scheduler::getThis();
m_unregistered = false;
if (m_cancelledSend) {
MORDOR_LOG_ERROR(g_log) << this << " connect(" << m_sock << ", " << to
<< "): (" << m_cancelledSend << ")";
- if (!m_ioManager->unregisterEvent(m_hEvent))
+ if (!m_iocpIoManager->unregisterEvent(m_hEvent))
Scheduler::yieldTo();
MORDOR_THROW_EXCEPTION_FROM_ERROR_API(m_cancelledSend, "connect");
}
Timer::ptr timeout;
if (m_sendTimeout != ~0ull)
- timeout = m_ioManager->registerTimer(m_sendTimeout,
- boost::bind(&Socket::cancelIo, this,
+ timeout = m_iocpIoManager->registerTimer(m_sendTimeout,
+ boost::bind(&Socket::cancelIoWindows, this,
boost::ref(m_cancelledSend), WSAETIMEDOUT));
Scheduler::yieldTo();
m_fiber.reset();
@@ -469,14 +500,13 @@ Socket::connect(const Address &to)
MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("connect");
}
}
-#else
+#endif
+ } else {
if (!::connect(m_sock, to.name(), to.nameLen())) {
MORDOR_LOG_INFO(g_log) << this << " connect(" << m_sock << ", " << to
<< ")";
// Worked first time
- return;
- }
- if (errno == EINPROGRESS) {
+ } else if (lastError() == WSA(EINPROGRESS)) {
m_ioManager->registerEvent(m_sock, IOManager::WRITE);
if (m_cancelledSend) {
MORDOR_LOG_ERROR(g_log) << this << " connect(" << m_sock << ", " << to
@@ -489,7 +519,7 @@ Socket::connect(const Address &to)
if (m_sendTimeout != ~0ull)
timeout = m_ioManager->registerTimer(m_sendTimeout, boost::bind(
&Socket::cancelIo, this, IOManager::WRITE,
- boost::ref(m_cancelledSend), ETIMEDOUT));
+ boost::ref(m_cancelledSend), WSA(ETIMEDOUT)));
Scheduler::yieldTo();
if (timeout)
timeout->cancel();
@@ -513,11 +543,10 @@ Socket::connect(const Address &to)
<< "): (" << lastError() << ")";
MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("connect");
}
-#endif
- m_isConnected = true;
- if (!m_onRemoteClose.empty())
- registerForRemoteClose();
}
+ m_isConnected = true;
+ if (!m_onRemoteClose.empty())
+ registerForRemoteClose();
}
void
@@ -544,7 +573,7 @@ void
Socket::accept(Socket &target)
{
#ifdef WINDOWS
- if (m_ioManager) {
+ if (m_iocpIoManager) {
MORDOR_ASSERT(target.m_sock != -1);
} else {
MORDOR_ASSERT(target.m_sock == -1);
@@ -563,40 +592,40 @@ Socket::accept(Socket &target)
MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("accept");
}
target.m_sock = newsock;
- } else {
#ifdef WINDOWS
+ } else if (m_iocpIoManager) {
if (pAcceptEx) {
- m_ioManager->registerEvent(&m_receiveEvent);
+ m_iocpIoManager->registerEvent(&m_receiveEvent);
unsigned char addrs[sizeof(SOCKADDR_STORAGE) * 2 + 16];
DWORD bytes;
BOOL ret = pAcceptEx(m_sock, target.m_sock, addrs, 0, sizeof(SOCKADDR_STORAGE) + 16, sizeof(SOCKADDR_STORAGE) + 16, &bytes,
&m_receiveEvent.overlapped);
if (!ret && GetLastError() != WSA_IO_PENDING) {
if (GetLastError() == WSAENOTSOCK) {
- m_ioManager->unregisterEvent(&m_receiveEvent);
+ m_iocpIoManager->unregisterEvent(&m_receiveEvent);
// See comment in similar line in connect()
goto suckylsp;
}
MORDOR_LOG_ERROR(g_log) << this << " AcceptEx(" << m_sock << "): ("
<< lastError() << ")";
- m_ioManager->unregisterEvent(&m_receiveEvent);
+ m_iocpIoManager->unregisterEvent(&m_receiveEvent);
MORDOR_THROW_EXCEPTION_FROM_LAST_ERROR_API("AcceptEx");
}
if (m_skipCompletionPortOnSuccess && ret) {
- m_ioManager->unregisterEvent(&m_receiveEvent);
+ m_iocpIoManager->unregisterEvent(&m_receiveEvent);
m_receiveEvent.overlapped.Internal = STATUS_SUCCESS;
} else {
if (m_cancelledReceive) {
MORDOR_LOG_ERROR(g_log) << this << " AcceptEx(" << m_sock << "): ("
<< m_cancelledReceive << ")";
- m_ioManager->cancelEvent((HANDLE)m_sock, &m_receiveEvent);
+ m_iocpIoManager->cancelEvent((HANDLE)m_sock, &m_receiveEvent);
Scheduler::yieldTo();
MORDOR_THROW_EXCEPTION_FROM_ERROR_API(m_cancelledReceive, "AcceptEx");
}
Timer::ptr timeout;
if (m_receiveTimeout != ~0ull)
timeout = m_ioManager->registerTimer(m_receiveTimeout, boost::bind(
- &IOManager::cancelEvent, m_ioManager, (HANDLE)m_sock, &m_receiveEvent));
+ &cancelEventLocal, m_iocpIoManager, (HANDLE)m_sock, &m_receiveEvent));
Scheduler::yieldTo();
if (timeout)
timeout->cancel();
@@ -625,7 +654,7 @@ Socket::accept(Socket &target)
MORDOR_LOG_INFO(g_log) << this << " AcceptEx(" << m_sock << "): "
<< target.m_sock << os.str();
target.setOption(SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, &m_sock, sizeof(m_sock));
- target.m_ioManager->registerFile((HANDLE)target.m_sock);
+ target.m_iocpIoManager->registerFile((HANDLE)target.m_sock);
target.m_skipCompletionPortOnSuccess =
!!pSetFileCompletionNotificationModes((HANDLE)target.m_sock,
FILE_SKIP_COMPLETION_PORT_ON_SUCCESS |
@@ -647,21 +676,21 @@ Socket::accept(Socket &target)
<< newsock;
// Worked first time
} else if (GetLastError() == WSAEWOULDBLOCK) {
- m_ioManager->registerEvent(m_hEvent);
+ m_iocpIoManager->registerEvent(m_hEvent);
m_fiber = Fiber::getThis();
m_scheduler = Scheduler::getThis();
if (m_cancelledReceive) {
MORDOR_LOG_ERROR(g_log) << this << " accept(" << m_sock << "): ("
<< m_cancelledReceive << ")";
- if (!m_ioManager->unregisterEvent(m_hEvent))
+ if (!m_iocpIoManager->unregisterEvent(m_hEvent))