Skip to content

Commit

Permalink
Fix: improve mdstcl's error handling and add comments (#2746)
Browse files Browse the repository at this point in the history
* add comments regarding action service

* send_reply() now does cleanup_client() on bad socket

* explain mdstcl's receiver thread cannot access main thread's connection list

* Improve handling of non-MDSplus error codes

* add comments regarding action dispatch

* add comment explaining receiver thread select loop
  • Loading branch information
mwinkel-dev committed Apr 25, 2024
1 parent aef5fb5 commit 7ab78b8
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 8 deletions.
2 changes: 1 addition & 1 deletion mdstcpip/mdsipshr/ConnectToMds.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ static int do_login(Connection *c)
int status = SendMdsMsgC(c, msend, 0);
int err;
free(msend);
if (STATUS_NOT_OK)
if ((status == SsINTERNAL) || STATUS_NOT_OK)
{
perror("Error during login: send");
err = C_ERROR;
Expand Down
5 changes: 5 additions & 0 deletions mdstcpip/mdsipshr/Connections.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// #define DEBUG
#include <mdsmsg.h>

// Because threads have their own MdsipThreadStatic data, the receiver thread
// cannot access the connection list that is stored in the main thread. If
// that capability is ever needed, must use a global pointer. However, sharing
// the connection list with both threads is inadvisable because it can result
// in deadlock.
Connection *_FindConnection(int id, Connection **prev, MDSIPTHREADSTATIC_ARG)
{
if (id == INVALID_CONNECTION_ID)
Expand Down
1 change: 1 addition & 0 deletions mdstcpip/mdsipshr/GetAnswerInfo.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ int GetAnswerInfoTO(int id, char *dtype, short *length, char *ndims, int *dims,
CloseConnection(id);
status = MDSplusERROR;
}
if (status == SsINTERNAL) status = MDSplusERROR;
if (STATUS_NOT_OK)
{
free(m);
Expand Down
4 changes: 3 additions & 1 deletion mdstcpip/mdsipshr/GetMdsMsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//#define DEBUG
#include <mdsmsg.h>

// Can return non-MDSplus error code, SsINTERNAL
static int get_bytes_to(Connection *c, void *buffer, size_t bytes_to_recv, int to_msec)
{
char *bptr = (char *)buffer;
Expand Down Expand Up @@ -83,6 +84,7 @@ static int get_bytes_to(Connection *c, void *buffer, size_t bytes_to_recv, int t
// GetMdsMsg /////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////

// Can set status to non-MDSplus error code, SsINTERNAL
Message *GetMdsMsgTOC(Connection *c, int *status, int to_msec)
{
MsgHdr header;
Expand Down Expand Up @@ -125,7 +127,7 @@ Message *GetMdsMsgTOC(Connection *c, int *status, int to_msec)
m = malloc(msglen);
m->h = header;
*status = uncompress((unsigned char *)m->bytes, &dlen,
(unsigned char *)msg->bytes + 4, dlen - 4) == 0;
(unsigned char *)msg->bytes + 4, dlen - 4) == Z_OK;
if (IS_OK(*status))
{
m->h.msglen = msglen;
Expand Down
1 change: 1 addition & 0 deletions mdstcpip/mdsipshr/SendArg.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ int SendArg(int id, unsigned char idx, char dtype, unsigned char nargs,
m->h.message_id = (idx == 0 || nargs == 0) ? ConnectionIncMessageId(c)
: c->message_id;
int status = m->h.message_id ? SendMdsMsgC(c, m, 0) : MDSplusERROR;
if (status == SsINTERNAL) status = MDSplusERROR;
free(m);
if (STATUS_NOT_OK)
UnlockConnection(c);
Expand Down
4 changes: 3 additions & 1 deletion mdstcpip/mdsipshr/SendMdsMsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//#define DEBUG
#include <mdsmsg.h>

// Can return non-MDSplus error code, SsINTERNAL
static int send_bytes(Connection *c, void *buffer, size_t bytes_to_send, int options)
{
if (!c || !c->io)
Expand Down Expand Up @@ -74,6 +75,7 @@ static int send_bytes(Connection *c, void *buffer, size_t bytes_to_send, int opt
return MDSplusSUCCESS;
}

// Can return non-MDSplus error code of SsINTERNAL because of send_bytes()
int SendMdsMsgC(Connection *c, Message *m, int msg_options)
{
unsigned long len = m->h.msglen - sizeof(m->h);
Expand Down Expand Up @@ -104,7 +106,7 @@ int SendMdsMsgC(Connection *c, Message *m, int msg_options)
m->h.client_type = ClientType();
if (clength &&
compress2((unsigned char *)cm->bytes + 4, &clength,
(unsigned char *)m->bytes, len, c->compression_level) == 0 &&
(unsigned char *)m->bytes, len, c->compression_level) == Z_OK &&
clength < len)
{
cm->h = m->h;
Expand Down
3 changes: 3 additions & 0 deletions servershr/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ static void Client_do_message(Client *c, fd_set *fdactive)
}
switch (replyType)
{
// When an action service has processed an action, it sends back this reply.
// So update the status flags for the job. Don't remove the client because
// want the network connection to remain active.
case SrvJobFINISHED:
{
Job *j = Job_get_by_jobid(jobid);
Expand Down
3 changes: 2 additions & 1 deletion servershr/ServerDispatchPhase.c
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ static void dispatch(int i)
// ProgLoc = 7001;
send_monitor(MonitorDispatched, i);
// ProgLoc = 7002;
if (noact)
if (noact) // global, 1 = show actions but don't dispatch, 0 = dispatch
{
actions[i].dispatched = 1;
actions[i].status = status = 1;
Expand Down Expand Up @@ -687,6 +687,7 @@ static void action_done_action_locked(int idx)
MdsFree1Dx(&xd, NULL);
}

// Uses recursion to deal with cascade of actions
static void action_done_action_unlocked(int idx)
{
if (is_abort_in_progress())
Expand Down
3 changes: 2 additions & 1 deletion servershr/ServerQAction.c
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ static void cleanup_client(SrvJob *job)
}
}

/// returns the number of bytes sent
/// on success returns the number of bytes sent, on failure returns -1
static int send_all(SOCKET sock, char *msg, int len)
{
int sent;
Expand Down Expand Up @@ -1006,6 +1006,7 @@ static int send_reply(SrvJob *job, int replyType, int status_in, int length, cha
if (sock == INVALID_SOCKET)
{
MDSMSG(SVRJOB_PRI " break connection", SVRJOB_VAR(job));
cleanup_client(job);
break;
}
int bytes = send_all(sock, reply, 60);
Expand Down
23 changes: 20 additions & 3 deletions servershr/ServerSendMessage.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ static SOCKET get_socket_by_conid(int conid)
return INVALID_SOCKET;
}

// Mdstcl has two threads: main and receiver. The main thread dispatches
// actions to action services (typically one service per computer).
// The receiver thread processes replies from all action services.
// Each thread uses a different port, thus a different network connection.
// Connections persist and thus handle all traffic between the endpoints.
int ServerSendMessage(int *msgid, char *server, int op, int *retstatus,
pthread_rwlock_t *lock, int *conid_out, void (*callback_done)(),
void *callback_param, void (*callback_before)(), int numargs_in,
Expand Down Expand Up @@ -200,9 +205,11 @@ int ServerSendMessage(int *msgid, char *server, int op, int *retstatus,
Job_cleanup(status, jobid);
return status;
}
// The "action service" immediately sends back a handshake status confirming
// that it received the command sent above.
status = GetAnswerInfoTS(conid, &dtype, &len, &ndims, dims, &numbytes,
(void **)&dptr, &mem);
if (op == SrvStop)
if (op == SrvStop) // If stopped the server, a failed status is expected
{
if (STATUS_NOT_OK)
{
Expand Down Expand Up @@ -303,6 +310,7 @@ static SOCKET new_reply_socket(uint16_t *port_out)

static Condition ReceiverRunning = CONDITION_INITIALIZER;

// Returns non-MDSplus status: -1, 0, or 1. OK is 0, others are error.
static int start_receiver(uint16_t *port_out)
{
INIT_STATUS;
Expand All @@ -323,7 +331,7 @@ static int start_receiver(uint16_t *port_out)
if (!ReceiverRunning.value)
{
CREATE_DETACHED_THREAD(thread, *16, receiver_thread, &sock);
if (c_status)
if (c_status) // is from preceding macro
{
perror("Error creating pthread");
status = MDSplusERROR;
Expand All @@ -342,7 +350,7 @@ static int start_receiver(uint16_t *port_out)
static void receiver_atexit(void *arg)
{
(void)arg;
MDSDBG("ServerSendMessage thread exitted");
MDSDBG("ServerSendMessage thread exited");
CONDITION_RESET(&ReceiverRunning);
}

Expand Down Expand Up @@ -376,6 +384,8 @@ static void reset_fdactive(int rep, SOCKET server, fd_set *fdactive)
MDSWRN("reset fdactive in reset_fdactive");
}

// When any action service completes an action, a reply is sent back to mdstcl.
// The single receiver thread processes the replies from all action services.
static void receiver_thread(void *sockptr)
{
atexit((void *)receiver_atexit);
Expand All @@ -395,6 +405,7 @@ static void receiver_thread(void *sockptr)
int rep;
int num = 0;
struct timeval readto, timeout = {10, 0};
// Tries 10 times if select() always returns error (i.e., num < 0)
for (rep = 0; rep < 10; rep++)
{
for (readfds = fdactive, readto = timeout;;
Expand Down Expand Up @@ -523,6 +534,12 @@ EXPORT int ServerDisconnect(char *server_in)
return status;
}

// If a network connection already exists, reuse it. Only create a
// connection in two scenarios: new or defunct.
// In the simplest configuration, mdstcl has three connections:
// 1) to the mdsip service for tree access (to read the action nodes),
// 2) a connection to dispatch actions to the action service, and
// 3) a connection to receive replies from the action service.
static inline int server_connect(char *server, uint32_t addr, uint16_t port)
{
int conid;
Expand Down

0 comments on commit 7ab78b8

Please sign in to comment.