Skip to content

Commit

Permalink
AsyncUDPServerSocket passes socket in callback
Browse files Browse the repository at this point in the history
Summary: AsyncUDPServerSocket doesn't make it easy to write to the same socket you read from.  Add the socket as a callback param, similar to AsyncServerSocket

Test Plan:
fbconfig -r folly; fbmake dbg

Will fixup any other spots contbuild finds

Reviewed By: hans@fb.com

Subscribers: bmatheny, doug, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D1948936

Signature: t1:1948936:1427841651:20d13d73c06d31c75056624f051a6fd35b9701fb
  • Loading branch information
Dave Watson authored and viswanathgs committed Apr 10, 2015
1 parent 163a570 commit f697ba4
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 10 deletions.
18 changes: 11 additions & 7 deletions folly/io/async/AsyncUDPServerSocket.h
Expand Up @@ -56,9 +56,11 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
/**
* Invoked when a new packet is received
*/
virtual void onDataAvailable(const folly::SocketAddress& addr,
std::unique_ptr<folly::IOBuf> buf,
bool truncated) noexcept = 0;
virtual void onDataAvailable(
std::shared_ptr<AsyncUDPSocket> socket,
const folly::SocketAddress& addr,
std::unique_ptr<folly::IOBuf> buf,
bool truncated) noexcept = 0;

virtual ~Callback() {}
};
Expand All @@ -85,7 +87,7 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
void bind(const folly::SocketAddress& addy) {
CHECK(!socket_);

socket_ = folly::make_unique<AsyncUDPSocket>(evb_);
socket_ = std::make_shared<AsyncUDPSocket>(evb_);
socket_->setReusePort(reusePort_);
socket_->bind(addy);
}
Expand Down Expand Up @@ -131,6 +133,7 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback

void close() {
CHECK(socket_) << "Need to bind before closing";
socket_->close();
socket_.reset();
}

Expand Down Expand Up @@ -165,11 +168,12 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
auto mvp =
folly::MoveWrapper<
std::unique_ptr<folly::IOBuf>>(std::move(data));
auto socket = socket_;

// Schedule it in the listener's eventbase
// XXX: Speed this up
std::function<void()> f = [client, callback, mvp, truncated] () mutable {
callback->onDataAvailable(client, std::move(*mvp), truncated);
std::function<void()> f = [socket, client, callback, mvp, truncated] () mutable {
callback->onDataAvailable(socket, client, std::move(*mvp), truncated);
};

listeners_[nextListener_].first->runInEventBaseThread(f);
Expand All @@ -196,7 +200,7 @@ class AsyncUDPServerSocket : private AsyncUDPSocket::ReadCallback
EventBase* const evb_;
const size_t packetSize_;

std::unique_ptr<AsyncUDPSocket> socket_;
std::shared_ptr<AsyncUDPSocket> socket_;

// List of listener to distribute packets among
typedef std::pair<EventBase*, Callback*> Listener;
Expand Down
3 changes: 2 additions & 1 deletion folly/io/async/test/AsyncUDPSocketTest.cpp
Expand Up @@ -47,7 +47,8 @@ class UDPAcceptor
void onListenStopped() noexcept {
}

void onDataAvailable(const folly::SocketAddress& client,
void onDataAvailable(std::shared_ptr<folly::AsyncUDPSocket> socket,
const folly::SocketAddress& client,
std::unique_ptr<folly::IOBuf> data,
bool truncated) noexcept {

Expand Down
5 changes: 4 additions & 1 deletion folly/wangle/acceptor/Acceptor.h
Expand Up @@ -233,7 +233,10 @@ class Acceptor :

void onListenStarted() noexcept {}
void onListenStopped() noexcept {}
void onDataAvailable(const SocketAddress&, std::unique_ptr<IOBuf>, bool) noexcept {}
void onDataAvailable(
std::shared_ptr<AsyncUDPSocket> socket,
const SocketAddress&,
std::unique_ptr<IOBuf>, bool) noexcept {}

virtual AsyncSocket::UniquePtr makeNewAsyncSocket(EventBase* base, int fd) {
return AsyncSocket::UniquePtr(new AsyncSocket(base, fd));
Expand Down
3 changes: 2 additions & 1 deletion folly/wangle/bootstrap/ServerBootstrap-inl.h
Expand Up @@ -98,7 +98,8 @@ class ServerAcceptor
}

// UDP thunk
void onDataAvailable(const folly::SocketAddress& addr,
void onDataAvailable(std::shared_ptr<AsyncUDPSocket> socket,
const folly::SocketAddress& addr,
std::unique_ptr<folly::IOBuf> buf,
bool truncated) noexcept {
acceptorPipeline_->read(buf.release());
Expand Down

0 comments on commit f697ba4

Please sign in to comment.