Skip to content

Commit

Permalink
cNetwork: Added link creation callback.
Browse files Browse the repository at this point in the history
This allows the callback classes to store the link inside them and use it internally later on, mainly for sending data.
  • Loading branch information
madmaxoft committed Jan 22, 2015
1 parent 9cb8a41 commit 87e73f5
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 26 deletions.
18 changes: 15 additions & 3 deletions src/OSSupport/Network.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@



// fwd:
class cTCPLink;
typedef SharedPtr<cTCPLink> cTCPLinkPtr;





/** Interface that provides the methods available on a single TCP connection. */
class cTCPLink
{
Expand All @@ -25,16 +33,20 @@ class cTCPLink
// Force a virtual destructor for all descendants:
virtual ~cCallbacks() {}

/** Called when the cTCPLink for the connection is created.
The callback may store the cTCPLink instance for later use, but it should remove it in OnError(), OnRemoteClosed() or right after Close(). */
virtual void OnLinkCreated(cTCPLinkPtr a_Link) = 0;

/** Called when there's data incoming from the remote peer. */
virtual void OnReceivedData(cTCPLink & a_Link, const char * a_Data, size_t a_Length) = 0;
virtual void OnReceivedData(const char * a_Data, size_t a_Length) = 0;

/** Called when the remote end closes the connection.
The link is still available for connection information query (IP / port).
Sending data on the link is not an error, but the data won't be delivered. */
virtual void OnRemoteClosed(cTCPLink & a_Link) = 0;
virtual void OnRemoteClosed(void) = 0;

/** Called when an error is detected on the connection. */
virtual void OnError(cTCPLink & a_Link, int a_ErrorCode, const AString & a_ErrorMsg) = 0;
virtual void OnError(int a_ErrorCode, const AString & a_ErrorMsg) = 0;
};
typedef SharedPtr<cCallbacks> cCallbacksPtr;

Expand Down
2 changes: 2 additions & 0 deletions src/OSSupport/ServerHandleImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ void cServerHandleImpl::Callback(evconnlistener * a_Listener, evutil_socket_t a_
cCSLock Lock(Self->m_CS);
Self->m_Connections.push_back(Link);
} // Lock(m_CS)
LinkCallbacks->OnLinkCreated(Link);
Link->Enable();

// Call the OnAccepted callback:
Self->m_ListenCallbacks->OnAccepted(*Link);
Expand Down
26 changes: 16 additions & 10 deletions src/OSSupport/TCPLinkImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ cTCPLinkImpl::cTCPLinkImpl(cTCPLink::cCallbacksPtr a_LinkCallbacks):
m_BufferEvent(bufferevent_socket_new(cNetworkSingleton::Get().GetEventBase(), -1, BEV_OPT_CLOSE_ON_FREE)),
m_Server(nullptr)
{
// Create the LibEvent handle, but don't assign a socket to it yet (will be assigned within Connect() method):
bufferevent_setcb(m_BufferEvent, ReadCallback, nullptr, EventCallback, this);
bufferevent_enable(m_BufferEvent, EV_READ | EV_WRITE);
}


Expand All @@ -37,10 +34,6 @@ cTCPLinkImpl::cTCPLinkImpl(evutil_socket_t a_Socket, cTCPLink::cCallbacksPtr a_L
// Update the endpoint addresses:
UpdateLocalAddress();
UpdateAddress(a_Address, a_AddrLen, m_RemoteIP, m_RemotePort);

// Create the LibEvent handle:
bufferevent_setcb(m_BufferEvent, ReadCallback, nullptr, EventCallback, this);
bufferevent_enable(m_BufferEvent, EV_READ | EV_WRITE);
}


Expand All @@ -65,6 +58,8 @@ cTCPLinkImplPtr cTCPLinkImpl::Connect(const AString & a_Host, UInt16 a_Port, cTC
cTCPLinkImplPtr res{new cTCPLinkImpl(a_LinkCallbacks)}; // Cannot use std::make_shared here, constructor is not accessible
res->m_ConnectCallbacks = a_ConnectCallbacks;
cNetworkSingleton::Get().AddLink(res);
res->m_Callbacks->OnLinkCreated(res);
res->Enable();

// If a_Host is an IP address, schedule a connection immediately:
sockaddr_storage sa;
Expand Down Expand Up @@ -107,6 +102,17 @@ cTCPLinkImplPtr cTCPLinkImpl::Connect(const AString & a_Host, UInt16 a_Port, cTC



void cTCPLinkImpl::Enable(void)
{
// Set the LibEvent callbacks and enable processing:
bufferevent_setcb(m_BufferEvent, ReadCallback, nullptr, EventCallback, this);
bufferevent_enable(m_BufferEvent, EV_READ | EV_WRITE);
}





bool cTCPLinkImpl::Send(const void * a_Data, size_t a_Length)
{
return (bufferevent_write(m_BufferEvent, a_Data, a_Length) == 0);
Expand Down Expand Up @@ -160,7 +166,7 @@ void cTCPLinkImpl::ReadCallback(bufferevent * a_BufferEvent, void * a_Self)
size_t length;
while ((length = bufferevent_read(a_BufferEvent, data, sizeof(data))) > 0)
{
Self->m_Callbacks->OnReceivedData(*Self, data, length);
Self->m_Callbacks->OnReceivedData(data, length);
}
}

Expand Down Expand Up @@ -189,7 +195,7 @@ void cTCPLinkImpl::EventCallback(bufferevent * a_BufferEvent, short a_What, void
}
else
{
Self->m_Callbacks->OnError(*Self, err, evutil_socket_error_to_string(err));
Self->m_Callbacks->OnError(err, evutil_socket_error_to_string(err));
if (Self->m_Server == nullptr)
{
cNetworkSingleton::Get().RemoveLink(Self);
Expand Down Expand Up @@ -219,7 +225,7 @@ void cTCPLinkImpl::EventCallback(bufferevent * a_BufferEvent, short a_What, void
// If the connection has been closed, call the link callback and remove the connection:
if (a_What & BEV_EVENT_EOF)
{
Self->m_Callbacks->OnRemoteClosed(*Self);
Self->m_Callbacks->OnRemoteClosed();
if (Self->m_Server != nullptr)
{
Self->m_Server->RemoveLink(Self);
Expand Down
11 changes: 9 additions & 2 deletions src/OSSupport/TCPLinkImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class cTCPLinkImpl:
public:
/** Creates a new link based on the given socket.
Used for connections accepted in a server using cNetwork::Listen().
a_Address and a_AddrLen describe the remote peer that has connected. */
a_Address and a_AddrLen describe the remote peer that has connected.
The link is created disabled, you need to call Enable() to start the regular communication. */
cTCPLinkImpl(evutil_socket_t a_Socket, cCallbacksPtr a_LinkCallbacks, cServerHandleImpl * a_Server, const sockaddr * a_Address, socklen_t a_AddrLen);

/** Destroys the LibEvent handle representing the link. */
Expand All @@ -48,6 +49,11 @@ class cTCPLinkImpl:
Returns a link that has the connection request queued, or NULL for failure. */
static cTCPLinkImplPtr Connect(const AString & a_Host, UInt16 a_Port, cTCPLink::cCallbacksPtr a_LinkCallbacks, cNetwork::cConnectCallbacksPtr a_ConnectCallbacks);

/** Enables communication over the link.
Links are created with communication disabled, so that creation callbacks can be called first.
This function then enables the regular communication to be reported. */
void Enable(void);

// cTCPLink overrides:
virtual bool Send(const void * a_Data, size_t a_Length) override;
virtual AString GetLocalIP(void) const override { return m_LocalIP; }
Expand Down Expand Up @@ -85,7 +91,8 @@ class cTCPLinkImpl:

/** Creates a new link to be queued to connect to a specified host:port.
Used for outgoing connections created using cNetwork::Connect().
To be used only by the Connect() factory function. */
To be used only by the Connect() factory function.
The link is created disabled, you need to call Enable() to start the regular communication. */
cTCPLinkImpl(const cCallbacksPtr a_LinkCallbacks);

/** Callback that LibEvent calls when there's data available from the remote peer. */
Expand Down
38 changes: 30 additions & 8 deletions tests/Network/EchoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,55 @@
class cEchoLinkCallbacks:
public cTCPLink::cCallbacks
{
virtual void OnReceivedData(cTCPLink & a_Link, const char * a_Data, size_t a_Size) override
// cTCPLink::cCallbacks overrides:
virtual void OnLinkCreated(cTCPLinkPtr a_Link) override
{
ASSERT(m_Link == nullptr);
m_Link = a_Link;
}


virtual void OnReceivedData(const char * a_Data, size_t a_Size) override
{
ASSERT(m_Link != nullptr);

// Echo the incoming data back to outgoing data:
LOGD("%p (%s:%d): Data received (%u bytes), echoing back.", &a_Link, a_Link.GetRemoteIP().c_str(), a_Link.GetRemotePort(), static_cast<unsigned>(a_Size));
a_Link.Send(a_Data, a_Size);
LOGD("%p (%s:%d): Data received (%u bytes), echoing back.", m_Link.get(), m_Link->GetRemoteIP().c_str(), m_Link->GetRemotePort(), static_cast<unsigned>(a_Size));
m_Link->Send(a_Data, a_Size);
LOGD("Echo queued");

// Search for a Ctrl+Z, if found, drop the connection:
for (size_t i = 0; i < a_Size; i++)
{
if (a_Data[i] == '\x1a')
{
a_Link.Close();
m_Link->Close();
m_Link.reset();
return;
}
}
}

virtual void OnRemoteClosed(cTCPLink & a_Link) override

virtual void OnRemoteClosed(void) override
{
LOGD("%p (%s:%d): Remote has closed the connection.", &a_Link, a_Link.GetRemoteIP().c_str(), a_Link.GetRemotePort());
ASSERT(m_Link != nullptr);

LOGD("%p (%s:%d): Remote has closed the connection.", m_Link.get(), m_Link->GetRemoteIP().c_str(), m_Link->GetRemotePort());
m_Link.reset();
}

virtual void OnError(cTCPLink & a_Link, int a_ErrorCode, const AString & a_ErrorMsg) override

virtual void OnError(int a_ErrorCode, const AString & a_ErrorMsg) override
{
LOGD("%p (%s:%d): Error %d in the cEchoLinkCallbacks: %s", &a_Link, a_Link.GetRemoteIP().c_str(), a_Link.GetRemotePort(), a_ErrorCode, a_ErrorMsg.c_str());
ASSERT(m_Link != nullptr);

LOGD("%p (%s:%d): Error %d in the cEchoLinkCallbacks: %s", m_Link.get(), m_Link->GetRemoteIP().c_str(), m_Link->GetRemotePort(), a_ErrorCode, a_ErrorMsg.c_str());
m_Link.reset();
}

/** The link attached to this callbacks instance. */
cTCPLinkPtr m_Link;
};


Expand Down
21 changes: 18 additions & 3 deletions tests/Network/Google.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,39 @@ class cDumpCallbacks:
public cTCPLink::cCallbacks
{
cEvent & m_Event;
cTCPLinkPtr m_Link;

virtual void OnReceivedData(cTCPLink & a_Link, const char * a_Data, size_t a_Size) override
virtual void OnLinkCreated(cTCPLinkPtr a_Link) override
{
ASSERT(m_Link == nullptr);
m_Link = a_Link;
}

virtual void OnReceivedData(const char * a_Data, size_t a_Size) override
{
ASSERT(m_Link != nullptr);

// Log the incoming data size:
AString Hex;
CreateHexDump(Hex, a_Data, a_Size, 16);
LOGD("Incoming data: %u bytes:\n%s", static_cast<unsigned>(a_Size), Hex.c_str());
}

virtual void OnRemoteClosed(cTCPLink & a_Link) override
virtual void OnRemoteClosed(void) override
{
ASSERT(m_Link != nullptr);

LOGD("Remote has closed the connection.");
m_Link.reset();
m_Event.Set();
}

virtual void OnError(cTCPLink & a_Link, int a_ErrorCode, const AString & a_ErrorMsg) override
virtual void OnError(int a_ErrorCode, const AString & a_ErrorMsg) override
{
ASSERT(m_Link != nullptr);

LOGD("Error %d (%s) in the cDumpCallbacks.", a_ErrorCode, a_ErrorMsg.c_str());
m_Link.reset();
m_Event.Set();
}

Expand Down

0 comments on commit 87e73f5

Please sign in to comment.