Skip to content

Commit

Permalink
Fix memory issues on busy n2kd instances.
Browse files Browse the repository at this point in the history
  • Loading branch information
keesverruijt committed Jan 25, 2015
1 parent 82d8c30 commit 8cdad67
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 52 deletions.
6 changes: 4 additions & 2 deletions common/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ void sbAppendData(StringBuffer * sb, const void * data, size_t len)
sb->len += len;
sb->data[sb->len] = 0;
logDebug("Appended %u bytes to %p len %u\n", len, sb->data, sb->len);
logDebug("+ [%1.*s]\n", len, data);
logDebug("= [%1.*s]\n", sb->len, sb->data);
//logDebug("+ [%1.*s]\n", len, data);
//logDebug("= [%1.*s]\n", sb->len, sb->data);
}

void sbAppendString(StringBuffer * sb, const char * string)
Expand Down Expand Up @@ -238,6 +238,7 @@ int getJSONValue( const char * message, const char * fieldName, char * value, si
while ((isdigit(*loc) || *loc == '.' || *loc == '-' || *loc == 'E' || *loc == 'e' || *loc == '+') && len > 1)
{
*value++ = *loc++;
len--;
}
*value = 0;
return 1;
Expand Down Expand Up @@ -295,6 +296,7 @@ int getJSONValue( const char * message, const char * fieldName, char * value, si
{
*value++ = *loc++;
}
len--;
}
*value = 0;
return 1;
Expand Down
117 changes: 67 additions & 50 deletions n2kd/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,14 @@ StringBuffer nmeaMessage;/* Buffer for sending to NMEA0183 TCP clients */

#define MIN_PGN (59391)
#define MAX_PGN (131000)
#define ACTISENSE_BEM (0x400000)
#define ACTISENSE_BEM (0x40000)
#define ACTISENSE_RNG (0x100)
#define NMEA_RNG (MAX_PGN - MIN_PGN + 1)

#define PGN_SPACE (ACTISENSE_RNG + MAX_PGN - MIN_PGN)
#define PrnToIdx(prn) ((prn <= MAX_PGN) ? (prn - MIN_PGN) : ((prn <= ACTISENSE_BEM + ACTISENSE_RNG) ? (prn - ACTISENSE_BEM) : -1))
#define PGN_SPACE (ACTISENSE_RNG + NMEA_RNG)
#define PrnToIdx(prn) ((prn <= MAX_PGN) ? (prn - MIN_PGN) : \
((prn <= ACTISENSE_BEM + ACTISENSE_RNG && prn >= ACTISENSE_BEM) ? (prn + NMEA_RNG - ACTISENSE_BEM) : \
-1))

/*
* We store messages and where they come from.
Expand Down Expand Up @@ -593,7 +596,7 @@ void writeAllClients(void)
# endif
}

void storeMessage(char * line, size_t len)
static bool storeMessage(char * line, size_t len)
{
char *s, *e = 0, *e2;
Message * m;
Expand All @@ -603,39 +606,49 @@ void storeMessage(char * line, size_t len)
time_t now;
char * key2 = 0;
int valid;
char value[16];

now = time(0);

logDebug("storeMessage(\"%s\",%u)\n", line, len);

if (!strstr(line, "\"fields\":"))
{
logDebug("Ignore pgn %u without fields\n", prn);
return;
logDebug("Ignore: pgn %u without fields\n", prn);
return false;
}
if (memcmp(line, "{\"timestamp", 11) != 0)
{
logDebug("Ignore '%s'\n", line);
return;
logDebug("Ignore: no timestamp: '%s'\n", line);
return false;
}
if (memcmp(line + len - 2, "}}", 2) != 0)
{
logDebug("Ignore '%s' (end)\n", line);
return;
logDebug("Ignore: no line end: '%s'\n", line);
return false;
}
s = strstr(line, "\"src\":");
if (s)

if (getJSONValue(line, "src", value, sizeof(value)))
{
if (sscanf(s + sizeof("\"src\":"), "%u\",\"dst\":\"%u\",\"pgn\":\"%u\"", &src, &dst, &prn))
{
#ifdef DEBUG
logDebug("prn=%u src=%u\n", prn, src);
#endif
}
sscanf(value, "%d", &src);
}
if (prn == 0 || prn > MAX_PGN)

if (getJSONValue(line, "dst", value, sizeof(value)))
{
return;
sscanf(value, "%d", &dst);
}

if (getJSONValue(line, "pgn", value, sizeof(value)))
{
sscanf(value, "%d", &prn);
}

idx = PrnToIdx(prn);
logDebug("src=%d dst=%d prn=%d idx=%d\n", src, dst, prn, idx);
if (idx < 0)
{
logError("Ignore: prn %d: '%s'\n", prn, line);
return false;
}

/* Look for a secondary key */
Expand Down Expand Up @@ -672,12 +685,6 @@ void storeMessage(char * line, size_t len)
}
}

idx = PrnToIdx(prn);
if (idx < 0)
{
logAbort("PRN %d is out of range\n", prn);
}

pgn = pgnIdx[idx];
if (!pgn)
{
Expand Down Expand Up @@ -712,7 +719,7 @@ void storeMessage(char * line, size_t len)
if (!e)
{
logDebug("Cannot find end of description in %s\n", s);
return;
return false;
}
logDebug("New PGN '%.*s'\n", e - s, s);
pgn->p_description = malloc(e - s + 1);
Expand Down Expand Up @@ -813,18 +820,28 @@ void storeMessage(char * line, size_t len)
}
logDebug("stored prn %d timeout=%d 2ndKey=%d\n", prn, valid, k);
m->m_time = now + valid;
return true;
}

void handleClientRequest(int i)
{
ssize_t r;
char * p;
size_t remain;

r = read(stream[i].fd, stream[i].buffer + stream[i].len, sizeof(stream[i].buffer) - 1 - stream[i].len);
logDebug("read %s i=%d fd=%d r=%d\n", streamTypeName[stream[i].type], i, stream[i].fd, r);
if (stream[i].len >= sizeof(stream[i].buffer) - 2)
{
logAbort("Input line on stream %d too long: %s\n", i, stream[i].buffer);
}
remain = sizeof(stream[i].buffer) - stream[i].len - 2;

logDebug("handleClientRequest: read i=%d\n", i);
logDebug("read %s i=%d fd=%d len=%u remain=%u\n", streamTypeName[stream[i].type], i, stream[i].fd, stream[i].len, remain);
r = read(stream[i].fd, stream[i].buffer + stream[i].len, remain);

if (r <= 0)
{
logDebug("read %s i=%d fd=%d r=%d\n", streamTypeName[stream[i].type], i, stream[i].fd, r);
if (stream[i].type == DATA_INPUT_STREAM)
{
logAbort("EOF on reading stdin\n");
Expand All @@ -835,33 +852,33 @@ void handleClientRequest(int i)

stream[i].len += r;
stream[i].buffer[stream[i].len] = 0;
while (r > 0)
while (stream[i].len > 0)
{
size_t len;

p = strchr(stream[i].buffer, '\n');
if (p)
if (!p)
{
size_t len = p - stream[i].buffer;
sbAppendData(&tcpMessage, stream[i].buffer, len + 1);
if (stream[i].type != DATA_INPUT_STREAM || stream[outputIdx].type == DATA_OUTPUT_COPY)
{
/* Send all TCP client input and the main stdin stream if the mode is -o */
/* directly to stdout */
sbAppendData(&outMessage, stream[i].buffer, len + 1);
}
*p = 0;
convertJSONToNMEA0183(&nmeaMessage, stream[i].buffer);
storeMessage(stream[i].buffer, len);
p++, len++;

/* Now remove [buffer..p> */
memmove(stream[i].buffer, p, strlen(p));
stream[i].len -= len;
r -= len;
break;
}
else
len = p - stream[i].buffer;
sbAppendData(&tcpMessage, stream[i].buffer, len + 1);
if (stream[i].type != DATA_INPUT_STREAM || stream[outputIdx].type == DATA_OUTPUT_COPY)
{
r = 0;
/* Send all TCP client input and the main stdin stream if the mode is -o */
/* directly to stdout */
sbAppendData(&outMessage, stream[i].buffer, len + 1);
}
*p = 0;
if (storeMessage(stream[i].buffer, len))
{
convertJSONToNMEA0183(&nmeaMessage, stream[i].buffer);
}
p++, len++;
stream[i].len -= len;

/* Now remove [buffer..p> == the entire line */
memmove(stream[i].buffer, p, stream[i].len + 1);
}
}

Expand Down

0 comments on commit 8cdad67

Please sign in to comment.