Permalink
Browse files

more resilient to includes

  • Loading branch information...
1 parent 5fa91eb commit 989fd62783ef7c029479640cc3706c0e8a2c0b6a @elliotwoods committed Dec 9, 2011
Showing with 158 additions and 148 deletions.
  1. +88 −81 src/TalkyBase.cpp
  2. +36 −34 src/TalkyBuffer.h
  3. +34 −33 src/TalkyMessage.h
View
169 src/TalkyBase.cpp
@@ -15,15 +15,15 @@ namespace Talky {
// PUBLIC
//
- TalkyBase::TalkyBase() :
+ TalkyBase::TalkyBase() :
nodeType(0),
lastConnectAttempt(0),
isConnected(false),
isServerBound(false),
_bufferIn(TALKY_BUFFER_IN_SIZE),
_bufferOut(TALKY_BUFFER_OUT_SIZE)
{
-
+
}
TalkyBase::~TalkyBase()
@@ -32,24 +32,24 @@ namespace Talky {
throw("You need to implement a destructor in your Talky class. Check TalkyBase::~TalkyBase() for notes");
/*
//implement the following in destructor of TalkyBase implementing class. Example code from ofxTalky:
-
+
if (nodeType==0)
return;
-
+
stopThread(true);
-
+
if (nodeType == 1)
{
tcpClient->close();
delete tcpClient;
}
-
+
if (nodeType == 2)
{
tcpServer->close();
delete tcpServer;
}
-
+
*/
}
@@ -62,16 +62,16 @@ namespace Talky {
throw(string("Already initialised as node type ") + (nodeType==1 ? string("server") : string("client")));
return;
}
-
-
+
+
_remoteHost = remoteHost;
_remotePort = remotePort;
-
+
initClient();
beginThread();
-
+
nodeType = 1;
-
+
}
//SERVER SETUP
@@ -82,14 +82,14 @@ namespace Talky {
throw("Already initialised as node type " + (nodeType==1 ? string("server") : string("client")));
return;
}
-
+
_localPort = localPort;
-
+
initServer();
beginThread();
-
+
nodeType = 2;
-
+
}
bool TalkyBase::getIsServerBound()
@@ -113,36 +113,36 @@ namespace Talky {
float TalkyBase::getTimeUntilNextConnectNorm()
{
float frac = float(clock() / CLOCKS_PER_MILLISEC - lastConnectAttempt) / float(TALKY_RECONNECT_TIME);
-
+
return (frac < 1.0f ? frac : 1.0f);
}
-
+
string TalkyBase::getRemoteHost(){
if (nodeType == 1)
return _remoteHost;
else
return "";
}
-
+
//-----------------------------------------------------------
-
+
TalkyBase& TalkyBase::operator<<(const TalkyMessage &m) {
lockThread();
sendQueue.push_back(m);
unlockThread();
-
+
return *this;
}
-
+
bool TalkyBase::operator>>(TalkyMessage& m) {
if (!lockThread())
return false;
-
+
if (receiveQueue.size() > 0)
{
m = receiveQueue.front();
receiveQueue.erase(receiveQueue.begin());
-
+
unlockThread();
return true;
} else {
@@ -161,9 +161,9 @@ namespace Talky {
sleep(10);
#endif
}
-
+
receiveQueue.clear();
-
+
unlockThread();
}
@@ -176,15 +176,15 @@ namespace Talky {
{
if (nodeType == 0)
return;
-
+
/////////////////////////////////////
// CHECK RECONNECTS
/////////////////////////////////////
//
int currentTime = clock() / CLOCKS_PER_MILLISEC;
if (currentTime - lastConnectAttempt > TALKY_RECONNECT_TIME)
{
-
+
if (nodeType == 1) {
//client
if (!isConnected)
@@ -201,85 +201,85 @@ namespace Talky {
//server
isConnected = isServerConnected();
}
-
-
+
+
if (!isConnected && nodeType==1) {
//client
startClient();
isConnected = isClientConnected();
-
+
if (isConnected)
notifyClientIsNowConnected();
}
-
-
+
+
if (!isServerBound && nodeType==2) {
//server
startServer();
isConnected = isServerConnected();
}
-
+
lastConnectAttempt = currentTime;
}
//
/////////////////////////////////////
-
-
-
+
+
+
lockThread();
-
+
/////////////////////////////////////
// RECEIVE DATA
/////////////////////////////////////
- //
+ //
//
//
if (nodeType == 1)
- {
+ {
if (isClientConnected())
- {
+ {
//client
_bufferIn.clean();
rxClient();
processIncomingBuffer();
}
-
+
} else {
-
+
//server
if (getNumServerClients() > 0) {
-
+
if (lockServer())
{
for (int iRemote=0; iRemote<getNumServerClients() ; iRemote++)
{
if (!isServersClientConnected(iRemote))
- continue;
-
+ continue;
+
_bufferIn.clean();
- rxServer(iRemote);
+ rxServer(iRemote);
processIncomingBuffer();
-
+
}
unlockServer();
}
-
+
}
}
-
+
//
/////////////////////////////////////
-
-
-
+
+
+
/////////////////////////////////////
// SEND DATA
/////////////////////////////////////
//
if (isConnected)
{
bool hasDataToSend = false;
-
+
try {
while (sendQueue.size() > 0)
{
@@ -291,31 +291,38 @@ namespace Talky {
//buffer overrun
throwWarning("Buffer overrun, waiting until next frame to send remaining messages");
}
-
+
if (hasDataToSend)
tx();
}
//
/////////////////////////////////////
-
+
unlockThread();
}
bool TalkyBase::rxServer(int iClient) {
const int nBytesReceived = rxServer(iClient, _bufferIn.getWritePointer(), _bufferIn.getRemainingWriteSpace());
-
- try
- {
+
+ try {
_bufferIn.advanceWritePointer(nBytesReceived);
- } catch
+ } catch (TalkyException e) {
+ return false;
+ }
+ return true;
}
-
+
bool TalkyBase::rxClient() {
const int nBytesReceived = rxClient(_bufferIn.getWritePointer(), _bufferIn.getRemainingWriteSpace());
-
- _bufferIn.advanceWritePointer(nBytesReceived);
+
+ try {
+ _bufferIn.advanceWritePointer(nBytesReceived);
+ } catch (TalkyException e) {
+ return false;
+ }
+ return true;
}
-
+
void TalkyBase::tx() {
if (nodeType == 0) {
throwWarning("TalkyBase::tx : can't send, we're not initialsed as either a client or server");
@@ -324,65 +331,65 @@ namespace Talky {
if (nodeType == 1) {
//client
txClient();
-
+
} else if (nodeType == 2) {
//server
for (int iClient=0; iClient < getNumServerClients(); iClient++)
txServer(iClient, false);
-
+
//only clean the buffer once
_bufferOut.clean();
}
}
-
+
void TalkyBase::txServer(int iClient, bool clean) {
int nBytesSending = _bufferOut.getRemainingReadSpace();
txServer(iClient, _bufferOut.getReadPointer(), nBytesSending);
-
+
if (clean)
_bufferOut.clean();
}
-
+
void TalkyBase::txClient(bool clean) {
int nBytesSending = _bufferOut.getRemainingReadSpace();
txClient(_bufferOut.getReadPointer(), nBytesSending);
-
+
if (clean)
_bufferOut.clean();
}
-
- void TalkyBase::processIncomingBuffer() {
+
+ void TalkyBase::processIncomingBuffer() {
//perhaps recode this so we dont copy?
TalkyMessage msg;
while (_bufferIn >> msg)
receiveQueue.push_back(msg);
-
+
//sort by timestamp
sort(receiveQueue.begin(), receiveQueue.end());
-
+
//trigger message available event.
//processing will be performed in this thread
int msgCount = receiveQueue.size();
notifyReceiveEvent(msgCount);
}
-
+
string TalkyBase::toString() {
stringstream out;
-
+
out << "Talky node is ";
switch (nodeType) {
case 0:
out << "uninitialised." << endl;
break;
-
+
case 1:
- out << "a client, connecting to " << _remoteHost << " on port " << _remotePort << "." << endl;
+ out << "a client, connecting to " << _remoteHost << " on port " << _remotePort << "." << endl;
break;
-
+
case 2:
out << "a server on local port " << _localPort << ", with " << getNumClients() << " clients" << endl;
}
-
+
return out.str();
}
-}
+}
View
70 src/TalkyBuffer.h
@@ -7,6 +7,8 @@
// Copyright 2011 Kimchi and Chips. All rights reserved.
//
+#include <string.h>
+#include <math.h>
#include <sstream>
#include <string>
#include <iostream>
@@ -17,15 +19,15 @@
using namespace std;
namespace Talky {
-
+
/** BufferOffset is a Talky type.
- We use it for defining indexes within the
+ We use it for defining indexes within the
TalkyBuffer.
*/
typedef unsigned long BufferOffset;
-
+
class TalkyBuffer;
-
+
/** TalkySerialisable is an interface class for when
you want to override standard serialisation methods.
*/
@@ -34,50 +36,50 @@ namespace Talky {
virtual void serialiseToBuffer(TalkyBuffer &b) const = 0;
virtual bool deSerialiseFromBuffer(TalkyBuffer &b) = 0;
};
-
+
class TalkyBuffer : public TalkySerialisable {
public:
TalkyBuffer();
TalkyBuffer(BufferOffset size);
TalkyBuffer(const TalkyBuffer& other);
-
+
~TalkyBuffer();
-
+
///copy operator
TalkyBuffer& operator= (const TalkyBuffer & other);
-
+
void allocate(BufferOffset size);
void deAllocate();
void clean();
-
+
BufferOffset size() const;
const void * getData() const;
void setData(const void * d, BufferOffset size);
-
+
bool write(const void* d, BufferOffset size);
bool read(void* d, BufferOffset size);
-
+
bool hasSpaceToWrite(BufferOffset size) const;
bool hasSpaceToRead(BufferOffset size) const;
BufferOffset getRemainingWriteSpace() const;
BufferOffset getRemainingReadSpace() const;
-
+
template<class T>
TalkyBuffer& operator<<(T const &object) {
if (!write(&object, sizeof(T)))
throw("Buffer overrun - insufficient space to write");
return *this;
}
-
+
template<class T>
bool operator>>(T& object) {
return read(&object, sizeof(T));
readOffset += sizeof(T);
}
-
+
string toString(unsigned short maxLength=10) const;
-
-
+
+
/**
Hard ass access. You sure you really want to do this??
We need this in TalkyBase, consider moving to 'friend' class?
@@ -88,7 +90,7 @@ namespace Talky {
We need this in TalkyBase, consider moving to 'friend' class?
*/
void advanceWritePointer(BufferOffset);
-
+
/**
Hard ass access. You sure you really want to do this??
We need this in TalkyBase, consider moving to 'friend' class?
@@ -99,53 +101,53 @@ namespace Talky {
We need this in TalkyBase, consider moving to 'friend' class?
*/
void advanceReadPointer(BufferOffset);
-
+
bool getIsAllocated() const { return isAllocated; };
bool getIsDynamicallyAllocated() const { return isDynamicallyAllocated; };
bool getIsQuickReallocation() const { return quickReallocation; };
BufferOffset getAllocatedSize() const { return allocatedSize; };
BufferOffset getWrittenSize() const { return writtenSize; };
BufferOffset getReadOffset() const { return readOffset; };
BufferOffset getWriteOffset() const { return writeOffset; };
-
-
+
+
//TalkyBuffer inherits TalkySerialisable
void serialiseToBuffer(TalkyBuffer &b) const;
bool deSerialiseFromBuffer(TalkyBuffer &b);
-
-
+
+
//File access
bool loadFile(string filename);
bool saveFile(string filename) const;
-
+
protected:
void init();
-
+
///Used to perform dynamic reallocation. Returns false if we're dynamic allocation is turned off.
bool reAllocate(BufferOffset s);
-
+
char* _data;
-
+
///Does this buffer have space allocated?
bool isAllocated;
-
+
///Does this buffer allow for space to be dynamically allocated for storage? If false, then we presume static allocation
bool isDynamicallyAllocated;
-
+
///If this flag is true, we always reAllocate by a factor of 2, so that multiple reallocations are not necessary. We may wish to turn this flag off if we encounter large files.
bool quickReallocation;
-
-
+
+
BufferOffset allocatedSize;
BufferOffset writtenSize;
-
+
BufferOffset readOffset;
BufferOffset writeOffset;
-
+
};
-
+
///Overload template when serialising a TalkySerialisable object
TalkyBuffer& operator<<(TalkyBuffer& b, TalkySerialisable const &o);
///Overload template when deserialising to a TalkySerialisable object
bool operator>>(TalkyBuffer& b, TalkySerialisable &o);
-}
+}
View
67 src/TalkyMessage.h
@@ -21,62 +21,63 @@ using namespace std;
#define TALKY_END_TOKEN '\n'
namespace Talky {
-
+
typedef unsigned short ContentsType;
typedef unsigned short ProtocolVersion;
-
+
class TalkyMessageHeader {
public:
TalkyMessageHeader();
- TalkyMessageHeader(TalkyMessageHeader& other, ContentsType t);
+ TalkyMessageHeader(TalkyMessageHeader& other, ContentsType t);
TalkyMessageHeader(const char * Company, const char * Protocol, ProtocolVersion v, ContentsType t);
-
+
const char* getCompany() const;
const char* getProtocol() const;
ProtocolVersion getVersion() const;
ContentsType getContentsType() const;
unsigned long getTimestamp() const;
-
+
void setCompany(const char * s);
void setProtocol(const char * s);
void setVersion(unsigned short v);
void setContentsType(unsigned short t);
void setTimestamp();
-
+
string toString();
-
+
bool operator==(const TalkyMessageHeader &other) const;
-
+
protected:
char company[2];
-
+
char protocol[2];
unsigned short version;
-
+
unsigned short contentsType;
-
- /** Timestamp for the message.
+
+ /** Timestamp for the message.
This is set automatically when the header
is assigned to the message
*/
unsigned long timestamp;
};
-
+
//------
-
+
class TalkyMessage {
public:
friend class TalkyBuffer;
-
+
TalkyMessage();
TalkyMessage(TalkyMessageHeader const &h);
-
+
template <class T>
TalkyMessage& operator<<(const T &object)
{
payload << object;
+ return *this;
}
-
+
template <class T>
bool operator>>(T &object)
{
@@ -85,47 +86,47 @@ namespace Talky {
else
return false;
}
-
+
const TalkyBuffer& getPayload() const;
const TalkyMessageHeader& getHeader() const;
-
+
void setHeader(TalkyMessageHeader const &h);
-
+
int getTotalLength() const;
unsigned short getPayloadLength() const;
void initPayload(unsigned short length);
string toString();
-
+
///for sorting by timestamp
bool operator<(const TalkyMessage& other) const;
-
+
/** Serialise this onto main buffer.
- Consider instead using
+ Consider instead using
buf << msg;
syntax. Serialise may be depreciated / made private
- */
+ */
bool serialise(TalkyBuffer &buf) const;
-
+
/** Deserialise this off of main buffer.
- Consider instead using
+ Consider instead using
buf >> msg;
syntax. Deserialise may be depreciated / made private
- */
+ */
bool deSerialise(TalkyBuffer &buf);
-
+
static void setDefaultHeader(TalkyMessageHeader &h);
-
+
protected:
TalkyMessageHeader header;
TalkyBuffer payload;
-
+
static TalkyMessageHeader defaultHeader;
};
-
+
//-----
-
+
///Used when putting a message onto main buffer (tx)
TalkyBuffer& operator<<(TalkyBuffer& b, TalkyMessage const &m);
///Used when pulling a message off main buffer (rx)
bool operator>>(TalkyBuffer& b, TalkyMessage &m);
-}
+}

0 comments on commit 989fd62

Please sign in to comment.