Skip to content

Commit

Permalink
added memory usage limit to health check; demoted log level of atl03 …
Browse files Browse the repository at this point in the history
…reader post failure
  • Loading branch information
jpswinski committed Apr 2, 2024
1 parent 399d20c commit cf5290a
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 27 deletions.
50 changes: 43 additions & 7 deletions packages/core/LuaLibrarySys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,13 @@ const struct luaL_Reg LuaLibrarySys::sysLibs [] = {
{"lsrec", LuaLibrarySys::lsys_lsrec},
{"cwd", LuaLibrarySys::lsys_cwd},
{"memu", LuaLibrarySys::lsys_memu},
{"setmemlimit", LuaLibrarySys::lsys_setmemlimit},
{"lsdev", DeviceObject::luaList},
{NULL, NULL}
};

double LuaLibrarySys::memory_limit = 1.0;

/******************************************************************************
* SYSTEM LIBRARY EXTENSION METHODS
******************************************************************************/
Expand Down Expand Up @@ -155,7 +158,7 @@ int LuaLibrarySys::lsys_quit (lua_State* L)
int errors = 0;
if(lua_isnumber(L, 1))
{
errors = lua_tonumber(L, 1);
errors = lua_tointeger(L, 1);
}

setinactive( errors );
Expand Down Expand Up @@ -195,7 +198,7 @@ int LuaLibrarySys::lsys_wait (lua_State* L)
int secs = 0;
if(lua_isnumber(L, 1))
{
secs = lua_tonumber(L, 1);
secs = lua_tointeger(L, 1);
}
else
{
Expand Down Expand Up @@ -394,7 +397,7 @@ int LuaLibrarySys::lsys_setstddepth (lua_State* L)
int depth = 0;
if(lua_isnumber(L, 1))
{
depth = lua_tonumber(L, 1);
depth = lua_tointeger(L, 1);
}
else
{
Expand Down Expand Up @@ -425,7 +428,7 @@ int LuaLibrarySys::lsys_setiosize (lua_State* L)
else
{
/* Set I/O Size */
int size = lua_tonumber(L, 1);
int size = lua_tointeger(L, 1);
status = OsApi::setIOMaxsize(size);
}

Expand All @@ -452,10 +455,10 @@ int LuaLibrarySys::lsys_seteventlvl (lua_State* L)

if(lua_isnumber(L, 1))
{
int type_mask = lua_tonumber(L, 1);
int type_mask = lua_tointeger(L, 1);
if(lua_isnumber(L, 2))
{
event_level_t lvl = (event_level_t)lua_tonumber(L, 2);
event_level_t lvl = (event_level_t)lua_tointeger(L, 2);
if(type_mask & EventLib::LOG) EventLib::setLvl(EventLib::LOG, lvl);
if(type_mask & EventLib::TRACE) EventLib::setLvl(EventLib::TRACE, lvl);
if(type_mask & EventLib::METRIC) EventLib::setLvl(EventLib::METRIC, lvl);
Expand Down Expand Up @@ -493,7 +496,17 @@ int LuaLibrarySys::lsys_geteventlvl (lua_State* L)
*----------------------------------------------------------------------------*/
int LuaLibrarySys::lsys_healthy (lua_State* L)
{
lua_pushboolean(L, true);
bool health = true;

/* Check Memory Usage */
double current_memory_usage = OsApi::memusage();
if(current_memory_usage >= memory_limit)
{
health = false;
}

/* Return Health */
lua_pushboolean(L, health);
return 1;
}

Expand Down Expand Up @@ -557,3 +570,26 @@ int LuaLibrarySys::lsys_memu (lua_State* L)
lua_pushnumber(L, m);
return 1;
}

/*----------------------------------------------------------------------------
* lsys_setmemlimit - set memory limit
*----------------------------------------------------------------------------*/
int LuaLibrarySys::lsys_setmemlimit (lua_State* L)
{
bool status = false;

if(!lua_isnumber(L, 1))
{
mlog(CRITICAL, "memory limit must be a number");
}
else
{
/* Set Memory Limit */
memory_limit = lua_tonumber(L, 1);
status = true;
}

/* Return Status */
lua_pushboolean(L, status);
return 1;
}
7 changes: 7 additions & 0 deletions packages/core/LuaLibrarySys.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ class LuaLibrarySys
static int lsys_lsrec (lua_State* L);
static int lsys_cwd (lua_State* L);
static int lsys_memu (lua_State* L);
static int lsys_setmemlimit (lua_State* L);

/*--------------------------------------------------------------------
* Data
*--------------------------------------------------------------------*/

static double memory_limit;
};

#endif /* __lua_library_sys__ */
2 changes: 1 addition & 1 deletion plugins/icesat2/plugin/Atl03BathyReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ FString json_contents(R"json({
extent->pointing_angle);

/* Write and Close JSON File */
fprintf(json_file, json_contents.c_str());
fprintf(json_file, "%s", json_contents.c_str());
fclose(json_file);

/* Open Data File */
Expand Down
36 changes: 18 additions & 18 deletions plugins/icesat2/plugin/Atl03Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1145,18 +1145,18 @@ uint8_t Atl03Reader::YapcScore::operator[] (int index) const
*----------------------------------------------------------------------------*/
Atl03Reader::TrackState::TrackState (const Atl03Data& atl03)
{
ph_in = 0;
seg_in = 0;
seg_ph = 0;
start_segment = 0;
start_distance = atl03.segment_dist_x[0];
seg_distance = 0.0;
start_seg_portion = 0.0;
track_complete = false;
bckgrd_in = 0;
extent_segment = 0;
extent_valid = true;
extent_length = 0.0;
ph_in = 0;
seg_in = 0;
seg_ph = 0;
start_segment = 0;
start_distance = atl03.segment_dist_x[0];
seg_distance = 0.0;
start_seg_portion = 0.0;
track_complete = false;
bckgrd_in = 0;
extent_segment = 0;
extent_valid = true;
extent_length = 0.0;
}

/*----------------------------------------------------------------------------
Expand Down Expand Up @@ -1501,7 +1501,7 @@ void* Atl03Reader::subsettingThread (void* parm)
}
catch(const RunTimeException& e)
{
alert(e.level(), e.code(), reader->outQ, &reader->active, "Error posting results for resource %s track %d: %s", info->reader->resource, info->track, e.what());
alert(e.level(), e.code(), reader->outQ, &reader->active, "Error generating results for resource %s track %d.%d: %s", info->reader->resource, info->track, info->pair, e.what());
}

/* Clean Up Records */
Expand All @@ -1521,7 +1521,7 @@ void* Atl03Reader::subsettingThread (void* parm)
}
catch(const RunTimeException& e)
{
alert(e.level(), e.code(), reader->outQ, &reader->active, "Failure on resource %s track %d: %s", info->reader->resource, info->track, e.what());
alert(e.level(), e.code(), reader->outQ, &reader->active, "Failure on resource %s track %d.%d: %s", info->reader->resource, info->track, info->pair, e.what());
}

/* Handle Global Reader Updates */
Expand All @@ -1538,7 +1538,7 @@ void* Atl03Reader::subsettingThread (void* parm)
reader->numComplete++;
if(reader->numComplete == reader->threadCount)
{
mlog(INFO, "Completed processing resource %s", info->reader->resource);
mlog(INFO, "Completed processing resource %s track %d.%d (f: %u, s: %u, d: %u)", info->reader->resource, info->track, info->pair, local_stats.extents_filtered, local_stats.extents_sent, local_stats.extents_dropped);

/* Indicate End of Data */
if(reader->sendTerminator)
Expand All @@ -1549,12 +1549,12 @@ void* Atl03Reader::subsettingThread (void* parm)
status = reader->outQ->postCopy("", 0, SYS_TIMEOUT);
if(status < 0)
{
mlog(CRITICAL, "Failed (%d) to post terminator for %s", status, info->reader->resource);
mlog(CRITICAL, "Failed (%d) to post terminator for %s track %d.%d", status, info->reader->resource, info->track, info->pair);
break;
}
else if(status == MsgQ::STATE_TIMEOUT)
{
mlog(INFO, "Timeout posting terminator for %s ... trying again", info->reader->resource);
mlog(INFO, "Timeout posting terminator for %s track %d.%d ... trying again", info->reader->resource, info->track, info->pair);
}
}
}
Expand Down Expand Up @@ -1753,7 +1753,7 @@ void Atl03Reader::postRecord (RecordObject& record, stats_t& local_stats)
}
else
{
mlog(ERROR, "Atl03 reader failed to post %s to stream %s: %d", record.getRecordType(), outQ->getName(), post_status);
mlog(DEBUG, "Atl03 reader failed to post %s to stream %s: %d", record.getRecordType(), outQ->getName(), post_status);
local_stats.extents_dropped++;
}
}
Expand Down
5 changes: 4 additions & 1 deletion scripts/apps/server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ sys.setenvver(environment_version)
-- Set Is Public --
sys.setispublic(is_public)

-- Set ECluster Name --
-- Set Cluster Name --
sys.setcluster(org_name)

-- Configure System Message Queue Depth --
sys.setstddepth(msgq_depth)

-- Configure Memory Limit --
sys.setmemlimit(stream_mem_thresh)

-- Configure Monitoring --
sys.setlvl(core.LOG | core.TRACE | core.METRIC, event_level) -- set level globally
local log_monitor = core.monitor(core.LOG, core.DEBUG, event_format):name("LogMonitor") -- monitor logs and write to stdout
Expand Down

0 comments on commit cf5290a

Please sign in to comment.