Skip to content

Commit

Permalink
added NormStreamGetVacancy() and some NormSocket improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
bebopagogo committed Apr 30, 2020
1 parent 1c6b530 commit 88e600c
Show file tree
Hide file tree
Showing 16 changed files with 507 additions and 61 deletions.
7 changes: 6 additions & 1 deletion examples/normServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ ClientInfo NormGetClientInfo(NormNodeHandle client)
return ClientInfo(version, addr, port);
} // end NormGetClientInfo(NormNodeHandle)

ClientInfo NormGetSocketInfo(NormSocketHandle socket)
static ClientInfo NormGetSocketInfo(NormSocketHandle socket)
{
char addr[16]; // big enough for IPv6
unsigned int addrLen = 16;
Expand Down Expand Up @@ -490,6 +490,11 @@ int main(int argc, char* argv[])
{
if (event.socket == serverSocket)
{

// TBD - now that the NormSocket code manages its own client_table by remote addr/port
// and should eliminate the 'duplicative' connect itself, we can just keep track
// of client sockets by their NormSocketHandle

// Possibly a new "client" connecting to our "server"
// First confirm that this really is a new client.
if (NORM_SOCKET_INVALID != FindClientSocket(clientMap, clientInfo))
Expand Down
284 changes: 277 additions & 7 deletions examples/normSocket.cpp

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions examples/normSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ bool NormConnect(NormSocketHandle normSocket,
NormSocketHandle NormAccept(NormSocketHandle serverSocket,
NormNodeHandle clientNode,
NormInstanceHandle instance = NORM_INSTANCE_INVALID);

void NormReject(NormSocketHandle serverSocket,
NormNodeHandle clientNode);

void NormShutdown(NormSocketHandle normSocket);

Expand Down
6 changes: 6 additions & 0 deletions include/normApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,9 @@ void NormSetRxPortReuse(NormSessionHandle sessionHandle,
NORM_API_LINKAGE
UINT16 NormGetRxPort(NormSessionHandle sessionHandle);

NORM_API_LINKAGE
bool NormGetRxBindAddress(NormSessionHandle sessionHandle, char* addr, unsigned int& addrLen, UINT16& port);

// TBD - We should probably have a "NormSetCCMode(NormCCMode ccMode)" function for users
NORM_API_LINKAGE
void NormSetEcnSupport(NormSessionHandle sessionHandle,
Expand Down Expand Up @@ -580,6 +583,9 @@ void NormStreamSetPushEnable(NormObjectHandle streamHandle,
NORM_API_LINKAGE
bool NormStreamHasVacancy(NormObjectHandle streamHandle);

NORM_API_LINKAGE
unsigned int NormStreamGetVacancy(NormObjectHandle streamHandle, unsigned int bytesWanted = 0);

NORM_API_LINKAGE
void NormStreamMarkEom(NormObjectHandle streamHandle);

Expand Down
2 changes: 1 addition & 1 deletion include/normNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class NormLossEstimator2
history[1] = (unsigned int)((1.0 / lossFraction) + 0.5);
}
unsigned long CurrentLossInterval() {return history[0];}
unsigned int LastLossInterval() {return history[1];}
unsigned long LastLossInterval() {return history[1];}

void SetIgnoreLoss(bool state)
{ignore_loss = state;}
Expand Down
4 changes: 4 additions & 0 deletions include/normObject.h
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,10 @@ class NormStreamObject : public NormObject
bool HasVacancy()
{return (stream_closing ? false : write_vacancy);}

// Returns how many bytes can be written to stream without blocking
// (up to 'wanted' for non-zero 'wanted', otherwise max vacancy available)
unsigned int GetVacancy(unsigned int wanted = 0);

NormBlock* StreamBlockLo()
{return stream_buffer.Find(stream_buffer.RangeLo());}
void SetLastNackTime(NormBlockId blockId, const ProtoTime& theTime)
Expand Down
8 changes: 6 additions & 2 deletions include/normSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ class NormSession

UINT16 GetRxPort() const;

const ProtoAddress& GetRxBindAddr() const
{return rx_bind_addr;}

// "SetEcnSupport(true)" sets up raw packet capture (pcap) so that incoming packet
// ECN status may be checked
// NOTE: only effective _before_ sndr/rcvr startup!
Expand Down Expand Up @@ -428,6 +431,9 @@ class NormSession
bool SenderSendCmd(const char* cmdBuffer, unsigned int cmdLength, bool robust);
void SenderCancelCmd();

// The following method is currently only used for NormSocket purposes
bool SenderSendAppCmd(const char* buffer, unsigned int length, const ProtoAddress& dst);

void SenderSetSynStatus(bool state)
{syn_status = state;}

Expand Down Expand Up @@ -697,8 +703,6 @@ class NormSession
bool SenderBuildRepairAdv(NormCmdRepairAdvMsg& cmd);
void SenderUpdateGroupSize();
bool SenderQueueAppCmd();
// The following method is only used for NormSocket purposes
bool SenderSendAppCmd(const char* buffer, unsigned int length, const ProtoAddress& dst);

// Receiver message handling routines
void ReceiverHandleObjectMessage(const struct timeval& currentTime,
Expand Down
23 changes: 22 additions & 1 deletion makefiles/Makefile.common
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,35 @@ normStreamer: $(STREAMER_OBJ) libnorm.a $(LIBPROTO)
mkdir -p ../bin
cp $@ ../bin/$@

# (nocmCast) file sender/receiver
# (normCast) file sender/receiver
CAST_SRC = $(EXAMPLE)/normCast.cpp
CAST_OBJ = $(CAST_SRC:.cpp=.o)

normCast: $(CAST_OBJ) libnorm.a $(LIBPROTO)
$(CC) $(CFLAGS) -o $@ $(CAST_OBJ) $(LDFLAGS) libnorm.a $(LIBPROTO) $(LIBS)
mkdir -p ../bin
cp $@ ../bin/$@

# These are the new "NormSocket" API extension examples
SERVER_SRC = $(EXAMPLE)/normServer.cpp $(EXAMPLE)/normSocket.cpp
SERVER_OBJ = $(SERVER_SRC:.cpp=.o)

normServer: $(SERVER_OBJ) libnorm.a $(LIBPROTO)
$(CC) $(CFLAGS) -o $@ $(SERVER_OBJ) $(LDFLAGS) libnorm.a $(LIBPROTO) $(LIBS)
mkdir -p ../bin
cp $@ ../bin/$@


CLIENT_SRC = $(EXAMPLE)/normClient.cpp $(EXAMPLE)/normSocket.cpp
CLIENT_OBJ = $(CLIENT_SRC:.cpp=.o)

normClient: $(CLIENT_OBJ) libnorm.a $(LIBPROTO)
$(CC) $(CFLAGS) -o $@ $(CLIENT_OBJ) $(LDFLAGS) libnorm.a $(LIBPROTO) $(LIBS)
mkdir -p ../bin
cp $@ ../bin/$@




# (pcap2norm) - parses pcap (e.g. tcpdump) file and prints NORM trace
PCAP_SRC = $(COMMON)/pcap2norm.cpp
Expand Down
88 changes: 84 additions & 4 deletions src/common/normApi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,11 @@ void NormInstance::Notify(NormController::Event event,
{
case SEND_OK:
// Purge any pending NORM_SEND_ERROR notifications for session
TRACE("purging SEND_ERROR ...\n");
PurgeNotifications(session, NORM_SEND_ERROR);
return;
case SEND_ERROR:
TRACE("got SEND_ERROR\n");
default:
break;
}
Expand Down Expand Up @@ -554,6 +557,7 @@ bool NormInstance::GetNextEvent(NormEvent* theEvent)
case NORM_SEND_ERROR:
{
NormSession* session = (NormSession*)next->event.session;
TRACE("calling ClearSendError() ....\n");
session->ClearSendError();
break;
}
Expand Down Expand Up @@ -1073,6 +1077,40 @@ UINT16 NormGetRxPort(NormSessionHandle sessionHandle)
} // end NormGetRxPort()


NORM_API_LINKAGE
bool NormGetRxBindAddress(NormSessionHandle sessionHandle, char* addr, unsigned int& addrLen, UINT16& port)
{
bool result = false;
NormInstance* instance = NormInstance::GetInstanceFromSession(sessionHandle);
if ((NULL != instance) && instance->dispatcher.SuspendThread())
{
NormSession* session = (NormSession*)sessionHandle;
port = session->GetRxPort();
ProtoAddress bindAddr = session->GetRxBindAddr();
if (!bindAddr.IsValid())
{
addrLen = 0;
result = true;
}
else if (addrLen < bindAddr.GetLength())
{
addrLen = bindAddr.GetLength();
}
else
{
addrLen = bindAddr.GetLength();
memcpy(addr, bindAddr.GetRawHostAddress(), addrLen);
result = true;
}
instance->dispatcher.ResumeThread();
}
else
{
addrLen = port = 0;
}
return result;
} // end NormGetRxBindAddress()

NORM_API_LINKAGE
bool NormSetTxPort(NormSessionHandle sessionHandle,
UINT16 txPort,
Expand Down Expand Up @@ -1132,7 +1170,7 @@ bool NormPresetObjectInfo(NormSessionHandle sessionHandle,
NormSession* session = (NormSession*)sessionHandle;
if (session)
{
result = session->SetPresetFtiData(objectSize, segmentSize, numData, numParity);
result = session->SetPresetFtiData((unsigned int)objectSize, segmentSize, numData, numParity);
if (result) session->SenderSetFtiMode(NormSession::FTI_PRESET);
}
instance->dispatcher.ResumeThread();
Expand Down Expand Up @@ -1511,7 +1549,7 @@ NormSessionId NormGetRandomSessionId()
{
ProtoTime currentTime;
currentTime.GetCurrentTime();
srand(currentTime.usec()); // seed random number generator
srand((unsigned int)currentTime.usec()); // seed random number generator
return (NormSessionId)rand();
} // end NormGetRandomSessionId()

Expand Down Expand Up @@ -1956,6 +1994,22 @@ bool NormStreamHasVacancy(NormObjectHandle streamHandle)
return result;
} // end NormStreamHasVacancy()

NORM_API_LINKAGE
unsigned int NormStreamGetVacancy(NormObjectHandle streamHandle, unsigned int bytesWanted)
{
bool result = false;
NormInstance* instance = NormInstance::GetInstanceFromObject(streamHandle);
if (instance && instance->dispatcher.SuspendThread())
{
NormStreamObject* stream =
static_cast<NormStreamObject*>((NormObject*)streamHandle);
if (NULL != stream)
result = stream->GetVacancy(bytesWanted);
instance->dispatcher.ResumeThread();
}
return result;
} // end NormStreamGetVacancy()

NORM_API_LINKAGE
void NormStreamMarkEom(NormObjectHandle streamHandle)
{
Expand Down Expand Up @@ -2247,6 +2301,32 @@ void NormCancelCommand(NormSessionHandle sessionHandle)
}
} // end NormCancelCommand()


// This one is not part of the public API (for NormSocket use currently)
NORM_API_LINKAGE
bool NormSendCommandTo(NormSessionHandle sessionHandle,
const char* cmdBuffer,
unsigned int cmdLength,
const char* addr,
UINT16 port)
{
bool result = false;
NormInstance* instance = NormInstance::GetInstanceFromSession(sessionHandle);
if (instance && instance->dispatcher.SuspendThread())
{
NormSession* session = (NormSession*)sessionHandle;
ProtoAddress dest;
if (dest.ResolveFromString(addr))
{
dest.SetPort(port);
result = session->SenderSendAppCmd(cmdBuffer, cmdLength, dest);
}
instance->dispatcher.ResumeThread();
}
return result;
} // end NormSendCommandTo()


NORM_API_LINKAGE
void NormSetSynStatus(NormSessionHandle sessionHandle, bool state)
{
Expand Down Expand Up @@ -2457,7 +2537,7 @@ bool NormPreallocateRemoteSender(NormSessionHandle sessionHandle,
if (instance && instance->dispatcher.SuspendThread())
{
NormSession* session = (NormSession*)sessionHandle;
result = session->PreallocateRemoteSender(bufferSize, segmentSize, numData, numParity, streamBufferSize);
result = session->PreallocateRemoteSender((unsigned int)bufferSize, segmentSize, numData, numParity, streamBufferSize);
instance->dispatcher.ResumeThread();
}
return result;
Expand Down Expand Up @@ -2754,7 +2834,7 @@ NORM_API_LINKAGE
bool NormNodeGetAddress(NormNodeHandle nodeHandle,
char* addrBuffer,
unsigned int* bufferLen,
UINT16* port)
UINT16* port)
{
bool result = false;
if (NORM_NODE_INVALID != nodeHandle)
Expand Down
6 changes: 3 additions & 3 deletions src/common/normEncoderRS16.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ inline gf gf_mul(int x, int y)
}

#define init_mul_table()
#define USE_GF_MULC register gf * __gf_mulc_
#define USE_GF_MULC gf * __gf_mulc_
#define GF_MULC0(c) __gf_mulc_ = &gf_exp[ gf_log[c] ]
#define GF_ADDMULC(dst, x) { if (x) dst ^= __gf_mulc_[ gf_log[x] ] ; }

Expand Down Expand Up @@ -261,8 +261,8 @@ static void generate_gf()
static void addmul1(gf* dst1, gf* src1, gf c, int sz)
{
USE_GF_MULC ;
register gf* dst = dst1;
register gf* src = src1 ;
gf* dst = dst1;
gf* src = src1 ;
gf* lim = &dst[sz - UNROLL + 1] ;

GF_MULC0(c) ;
Expand Down
6 changes: 3 additions & 3 deletions src/common/normEncoderRS8.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ static inline gf modnn(int x)
static gf gf_mul_table[GF_SIZE + 1][GF_SIZE + 1];

#define gf_mul(x,y) gf_mul_table[x][y]
#define USE_GF_MULC register gf * __gf_mulc_
#define USE_GF_MULC gf * __gf_mulc_
#define GF_MULC0(c) __gf_mulc_ = gf_mul_table[c]
#define GF_ADDMULC(dst, x) dst ^= __gf_mulc_[x]

Expand Down Expand Up @@ -262,8 +262,8 @@ static void generate_gf()
static void addmul1(gf* dst1, gf* src1, gf c, int sz)
{
USE_GF_MULC ;
register gf* dst = dst1;
register gf* src = src1 ;
gf* dst = dst1;
gf* src = src1 ;
gf* lim = &dst[sz - UNROLL + 1] ;

GF_MULC0(c) ;
Expand Down
2 changes: 1 addition & 1 deletion src/common/normFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ bool NormDirectoryIterator::GetNextFile(char* fileName)
NormFile::Type type = NormFile::GetType(fileName);
if (NormFile::NORMAL == type)
{
int nameLen = strlen(fileName);
size_t nameLen = strlen(fileName);
nameLen = MIN(PATH_MAX, nameLen);
nameLen -= path_len;
memmove(fileName, fileName+path_len, nameLen);
Expand Down
14 changes: 9 additions & 5 deletions src/common/normNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@ bool NormSenderNode::AllocateBuffers(unsigned int bufferSpace,

unsigned long numSegments = numBlocks * segPerBlock;

if (!block_pool.Init(numBlocks, blockSize))
if (!block_pool.Init((UINT32)numBlocks, blockSize))
{
PLOG(PL_FATAL, "NormSenderNode::AllocateBuffers() block_pool init error\n");
Close();
return false;
}

// Segment buffers include space for NORM_OBJECT_STREAM stream payload header
if (!segment_pool.Init(numSegments, segmentSize+NormDataMsg::GetStreamPayloadHeaderLength()))
if (!segment_pool.Init((unsigned int)numSegments, segmentSize+NormDataMsg::GetStreamPayloadHeaderLength()))
{
PLOG(PL_FATAL, "NormSenderNode::AllocateBuffers() segment_pool init error\n");
Close();
Expand Down Expand Up @@ -1571,8 +1571,12 @@ void NormSenderNode::HandleObjectMessage(const NormObjectMsg& msg)
}
// else wait for NORM_INFO message with sender FTI
}
if (gotFTI && !AllocateBuffers(session.RemoteSenderBufferSize(), fecId, ftiData.GetFecInstanceId(), ftiData.GetFecFieldSize(),
ftiData.GetSegmentSize(), ftiData.GetFecMaxBlockLen(), ftiData.GetFecNumParity()))
if (gotFTI && !AllocateBuffers((unsigned int)session.RemoteSenderBufferSize(),
fecId, ftiData.GetFecInstanceId(),
ftiData.GetFecFieldSize(),
ftiData.GetSegmentSize(),
ftiData.GetFecMaxBlockLen(),
ftiData.GetFecNumParity()))
{
PLOG(PL_ERROR, "NormSenderNode::HandleObjectMessage() node>%lu sender>%lu buffer allocation error\n",
(unsigned long)LocalNodeId(), (unsigned long)GetId());
Expand Down Expand Up @@ -1980,7 +1984,7 @@ void NormSenderNode::Sync(NormObjectId objectId)
incrementResyncCount = true; // more than just a trim
}
unsigned long numBits = (UINT16)(objectId - firstPending);
rx_pending_mask.UnsetBits(firstPending, numBits);
rx_pending_mask.UnsetBits(firstPending, (UINT32)numBits);
if (incrementResyncCount) IncrementResyncCount();
}
}
Expand Down
Loading

0 comments on commit 88e600c

Please sign in to comment.