Skip to content
This repository has been archived by the owner on Jun 7, 2021. It is now read-only.

Commit

Permalink
[TRAFODION-3260] SSMP may wait 3 seconds before handling requests
Browse files Browse the repository at this point in the history
Encapsulated the changes in the commit 2ee272b325d021 to Ipc layer.

[TRAFODION-3274] At times sqlci or any other SQL process fails to come up and dumps core

It looks like there is a race condition in C++ main function prologue while
initializing the embedded global objects and the stdin, stdout and stderr file
descriptors.

File descriptor of value 2 is returned for epoll_create(). But, the error redirection
code possibly used this fd to redirect overriding the epoll fd.

This caused epoll_ctl to return EINVAL resulting in core dump of sql process.

Changed the global object gv_sock_ctlr to a global object pointer.
  • Loading branch information
selvaganesang committed Feb 12, 2019
1 parent da9f32c commit 95c2712
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 102 deletions.
58 changes: 36 additions & 22 deletions core/sqf/src/seabed/src/sock.cpp
Expand Up @@ -65,7 +65,7 @@ SB_Label_Map gv_sock_epoll_ctl_type_label_map = {
"<unknown>", ga_sock_epoll_ctl_type_labels
};

static SB_Trans::Sock_Controller gv_sock_ctlr;
static SB_Trans::Sock_Controller *gv_sock_ctlr = NULL;

static const char *sock_get_label_epoll_ctl(int pv_value) {
return SB_get_label(&gv_sock_epoll_ctl_type_label_map, pv_value);
Expand All @@ -78,6 +78,20 @@ static void *sock_comp_thread_fun(void *pp_arg) {
return NULL;
}

SB_Trans::Sock_Controller *getGlobalSockCtrl() {
if (gv_sock_ctlr != NULL)
return gv_sock_ctlr;
SB_util_short_lock();
if (gv_sock_ctlr != NULL) {
SB_util_short_unlock();
return gv_sock_ctlr;
}
gv_sock_ctlr = new SB_Trans::Sock_Controller();
SB_util_short_unlock();
return gv_sock_ctlr;
}


SB_Trans::Sock_Comp_Thread::Sock_Comp_Thread(const char *pp_name)
: Thread(sock_comp_thread_fun, pp_name),
iv_fin(false),
Expand All @@ -104,7 +118,7 @@ void SB_Trans::Sock_Comp_Thread::run() {

iv_running = true;
while (!iv_fin) {
gv_sock_ctlr.epoll_wait(WHERE, -1);
getGlobalSockCtrl()->epoll_wait(WHERE, -1);
}
if (gv_ms_trace_sock)
trace_where_printf(WHERE, "EXITING comp thread\n");
Expand Down Expand Up @@ -193,11 +207,11 @@ int SB_Trans::Sock_Client::connect(char *pp_host, int pv_port) {
}
if (lv_sock == -1)
return lv_errno;
lv_err = gv_sock_ctlr.set_nodelay(WHERE, lv_sock);
lv_err = getGlobalSockCtrl()->set_nodelay(WHERE, lv_sock);
SB_util_assert_ieq(lv_err, 0);
lv_err = gv_sock_ctlr.set_size_recv(WHERE, lv_sock, SIZE);
lv_err = getGlobalSockCtrl()->set_size_recv(WHERE, lv_sock, SIZE);
SB_util_assert_ieq(lv_err, 0);
lv_err = gv_sock_ctlr.set_size_send(WHERE, lv_sock, SIZE);
lv_err = getGlobalSockCtrl()->set_size_send(WHERE, lv_sock, SIZE);
SB_util_assert_ieq(lv_err, 0);
memset(&lv_addr, 0, sizeof(lv_addr));
lv_addr.sin_family = AF_INET;
Expand Down Expand Up @@ -462,7 +476,7 @@ int SB_Trans::Sock_Controller::set_size_send(const char *pp_where,
}

void SB_Trans::Sock_Controller::shutdown(const char *pp_where) {
gv_sock_ctlr.shutdown_this(pp_where);
getGlobalSockCtrl()->shutdown_this(pp_where);
}

void SB_Trans::Sock_Controller::shutdown_this(const char *pp_where) {
Expand Down Expand Up @@ -511,16 +525,16 @@ void SB_Trans::Sock_Controller::sock_add(const char *pp_where,
if (gv_ms_trace_sock)
trace_where_printf(WHERE, "%s-add fd=%d, eh=%p\n",
pp_where, pv_sock, pfp(pp_eh));
gv_sock_ctlr.epoll_ctl(pp_where,
getGlobalSockCtrl()->epoll_ctl(pp_where,
EPOLL_CTL_ADD,
pv_sock,
EPOLLIN,
pp_eh);
// need lock - can only have one comp thread
gv_sock_ctlr.lock();
getGlobalSockCtrl()->lock();
if (ip_comp_thread == NULL) {
ip_shutdown_eh = new Sock_Shutdown_EH();
gv_sock_ctlr.epoll_ctl(pp_where,
getGlobalSockCtrl()->epoll_ctl(pp_where,
EPOLL_CTL_ADD,
ip_shutdown_eh->get_read_fd(),
EPOLLIN,
Expand All @@ -531,22 +545,22 @@ void SB_Trans::Sock_Controller::sock_add(const char *pp_where,
trace_where_printf(WHERE, "starting sock comp thread %s\n", la_name);
ip_comp_thread->start();
}
gv_sock_ctlr.unlock();
getGlobalSockCtrl()->unlock();
}

void SB_Trans::Sock_Controller::sock_del(const char *pp_where,
int pv_sock) {
const char *WHERE = "Sock_Controller::sock_del";
if (gv_ms_trace_sock)
trace_where_printf(WHERE, "%s-delete fd=%d\n", pp_where, pv_sock);
gv_sock_ctlr.epoll_ctl(pp_where, EPOLL_CTL_DEL, pv_sock, 0, NULL);
getGlobalSockCtrl()->epoll_ctl(pp_where, EPOLL_CTL_DEL, pv_sock, 0, NULL);
}

void SB_Trans::Sock_Controller::sock_mod(const char *pp_where,
int pv_sock,
int pv_events,
Sock_EH *pp_eh) {
gv_sock_ctlr.epoll_ctl(pp_where, EPOLL_CTL_MOD, pv_sock, pv_events, pp_eh);
getGlobalSockCtrl()->epoll_ctl(pp_where, EPOLL_CTL_MOD, pv_sock, pv_events, pp_eh);
}

void SB_Trans::Sock_Controller::unlock() {
Expand Down Expand Up @@ -643,13 +657,13 @@ SB_Trans::Sock_Server *SB_Trans::Sock_Listener::accept() {
}
SB_util_assert_ine(lv_sock, -1);
}
lv_err = gv_sock_ctlr.set_reuseaddr(WHERE, lv_sock);
lv_err = getGlobalSockCtrl()->set_reuseaddr(WHERE, lv_sock);
SB_util_assert_ieq(lv_err, 0);
lv_err = gv_sock_ctlr.set_nodelay(WHERE, lv_sock);
lv_err = getGlobalSockCtrl()->set_nodelay(WHERE, lv_sock);
SB_util_assert_ieq(lv_err, 0);
lv_err = gv_sock_ctlr.set_size_recv(WHERE, lv_sock, SIZE);
lv_err = getGlobalSockCtrl()->set_size_recv(WHERE, lv_sock, SIZE);
SB_util_assert_ieq(lv_err, 0);
lv_err = gv_sock_ctlr.set_size_send(WHERE, lv_sock, SIZE);
lv_err = getGlobalSockCtrl()->set_size_send(WHERE, lv_sock, SIZE);
SB_util_assert_ieq(lv_err, 0);
if (gv_ms_trace_sock)
trace_where_printf(WHERE, "accept completed on sock=%d, new sock=%d\n",
Expand Down Expand Up @@ -715,7 +729,7 @@ void SB_Trans::Sock_Listener::listen(char *pp_host, int *pp_port) {
lv_domain, lv_sock);
}
SB_util_assert_ine(lv_sock, -1);
lv_err = gv_sock_ctlr.set_reuseaddr(WHERE, lv_sock);
lv_err = getGlobalSockCtrl()->set_reuseaddr(WHERE, lv_sock);
SB_util_assert_ieq(lv_err, 0);
memset(&lv_addr, 0, sizeof(lv_addr));
lv_addr.sin_family = static_cast<uint16_t>(lv_domain);
Expand Down Expand Up @@ -929,7 +943,7 @@ SB_Trans::Sock_User_Common::~Sock_User_Common() {
pfp(this), iv_sock);
if (iv_sock >= 0) {
if (iv_sock_added)
gv_sock_ctlr.sock_del(la_where, iv_sock);
getGlobalSockCtrl()->sock_del(la_where, iv_sock);
lv_err = ::close(iv_sock);
lv_errno = errno;
if (gv_ms_trace_sock) {
Expand Down Expand Up @@ -964,7 +978,7 @@ void SB_Trans::Sock_User_Common::event_change(int pv_events,

if (pv_events != iv_events) {
sprintf(la_where, "%s%s", ip_where, WHERE);
gv_sock_ctlr.sock_mod(la_where, iv_sock, pv_events, pp_eh);
getGlobalSockCtrl()->sock_mod(la_where, iv_sock, pv_events, pp_eh);
iv_events = pv_events;
}
}
Expand All @@ -974,7 +988,7 @@ void SB_Trans::Sock_User_Common::event_init(Sock_EH *pp_eh) {
SB_Buf_Line la_where;

sprintf(la_where, "%s%s", ip_where, WHERE);
gv_sock_ctlr.sock_add(la_where, iv_sock, pp_eh);
getGlobalSockCtrl()->sock_add(la_where, iv_sock, pp_eh);
iv_sock_added = true;
}

Expand Down Expand Up @@ -1015,7 +1029,7 @@ void SB_Trans::Sock_User_Common::set_nonblock() {
int lv_err;

sprintf(la_where, "%s%s", ip_where, WHERE);
lv_err = gv_sock_ctlr.set_nonblock(la_where, iv_sock);
lv_err = getGlobalSockCtrl()->set_nonblock(la_where, iv_sock);
SB_util_assert_ieq(lv_err, 0);
}

Expand All @@ -1029,7 +1043,7 @@ void SB_Trans::Sock_User_Common::stop() {
lv_status = iv_sock_mutex.lock();
SB_util_assert_ieq(lv_status, 0); // sw fault
if (iv_sock_added) {
gv_sock_ctlr.sock_del(la_where, iv_sock);
getGlobalSockCtrl()->sock_del(la_where, iv_sock);
iv_sock_added = false;
}
lv_status = iv_sock_mutex.unlock();
Expand Down
27 changes: 0 additions & 27 deletions core/sql/bin/ex_ssmp_main.cpp
Expand Up @@ -227,33 +227,6 @@ void runServer(Int32 argc, char **argv)

while (TRUE)
{

/*
* Until ssmp starts receiving messages, disable this check.
* We need ssmp to wake up periodically to perform garbage collection.
*
// wait for the first open message to come in
while (cc->getConnection() == NULL)
cc->wait(IpcInfiniteTimeout);
// start the first receive operation
#ifdef _DEBUG_RTS
cerr << "No. of Requesters-1 " << cc->getNumRequestors() << " \n";
#endif
while (cc->getNumRequestors() > 0)
for (;;)
{
ssmpGlobals->work();
}
}
*/
// Wait for messages, but we need ssmp to wake up periodically to
// perform garbage collection.
short mask = XWAIT(LREQ | LDONE, ssmpGlobals->getStatsMergeTimeout());
if (mask & LREQ) {
cc->wait(0);
}
// go do GC.
ssmpGlobals->work();
}
}
Expand Down
69 changes: 47 additions & 22 deletions core/sql/common/Ipc.cpp
Expand Up @@ -848,11 +848,6 @@ GuaConnectionToClient *IpcConnection::castToGuaConnectionToClient()
return NULL;
}

SqlTableConnection *IpcConnection::castToSqlTableConnection()
{
return NULL;
}

Int64 IpcConnection::getSqlTableTransid()
{
return -1;
Expand Down Expand Up @@ -1151,6 +1146,52 @@ void IpcConnection::reportBadMessage()
// Methods for class IpcAllConnections
// -----------------------------------------------------------------------


// wait for something to happen on any of the connections like awaitio(-1)
WaitReturnStatus IpcAllConnections::waitOnAll(
IpcTimeout timeout,
NABoolean calledByESP,
NABoolean *timedout,
Int64 *waitTime)
{
WaitReturnStatus retcode;
struct timespec startts;
struct timespec endts;
clock_gettime(CLOCK_MONOTONIC, &startts);

if (timeout != IpcImmediately && timeout != IpcInfiniteTimeout) {
short mask;
if (ipcEnv_->getControlConnection() != NULL) {
mask = XWAIT(LREQ | LDONE, timeout);
if (mask & LREQ)
retcode = ipcEnv_->getControlConnection()->castToGuaReceiveControlConnection()->wait(IpcImmediately);
else if (mask & LDONE)
retcode = pendingIOs_->waitOnSet(IpcImmediately, calledByESP, timedout);
if (timedout != NULL) {
if (mask != 0)
*timedout = FALSE;
else
*timedout = TRUE;
}
}
else
retcode = pendingIOs_->waitOnSet(timeout, calledByESP, timedout);
}
else
retcode = pendingIOs_->waitOnSet(timeout, calledByESP, timedout);
clock_gettime(CLOCK_MONOTONIC, &endts);
if (startts.tv_nsec > endts.tv_nsec)
{
// borrow 1 from tv_sec, convert to nanosec and add to tv_nsec.
endts.tv_nsec += 1 * 1000 * 1000 * 1000;
endts.tv_sec -= 1;
}
if (waitTime != NULL)
*waitTime = ((endts.tv_sec - startts.tv_sec) * 1000LL * 1000LL * 1000LL)
+ (endts.tv_nsec - startts.tv_nsec);
return retcode;
}

#ifdef IPC_INTEGRITY_CHECKING

void IpcAllConnections::checkIntegrity(void)
Expand Down Expand Up @@ -1197,22 +1238,6 @@ void IpcAllConnections::checkLocalIntegrity(void)

#endif

void IpcAllConnections::waitForAllSqlTableConnections(Int64 transid)
{
// wait for SqlTableConnections (with matched transid) to complete.

IpcSetOfConnections x = getPendingIOs();

for (CollIndex i = 0; x.setToNext(i); i++)
{
if (x.element(i)->castToSqlTableConnection())
{
if (x.element(i)->getSqlTableTransid() == transid)
x.element(i)->wait(IpcInfiniteTimeout);
}
}
}

CollIndex IpcAllConnections::fillInListOfPendingPins(char *buff,
ULng32 buffSize,
CollIndex numOfPins)
Expand Down Expand Up @@ -5005,7 +5030,7 @@ short getDefineShort( char * defineName )
if (heap_ == NULL)
heap_ = new DefaultIpcHeap; // here it's ok to use global operator new

allConnections_ = new(heap_) IpcAllConnections(heap_,
allConnections_ = new(heap_) IpcAllConnections(this, heap_,
(serverType == IPC_SQLESP_SERVER
|| serverType == IPC_SQLSSCP_SERVER
|| serverType == IPC_SQLSSMP_SERVER));
Expand Down
31 changes: 5 additions & 26 deletions core/sql/common/Ipc.h
Expand Up @@ -88,7 +88,6 @@ struct PersistentOpenEntry;
struct BawaitioxTraceEntry;
class GuaReceiveFastStart;
class SockConnection;
class SqlTableConnection;
class IpcNodeName;
struct GuaProcessHandle;
class MyGuaProcessHandle;
Expand Down Expand Up @@ -590,7 +589,6 @@ class IpcConnection : public NABasicObject
virtual GuaConnectionToServer *castToGuaConnectionToServer();
virtual GuaMsgConnectionToServer *castToGuaMsgConnectionToServer();
virtual GuaConnectionToClient *castToGuaConnectionToClient();
virtual SqlTableConnection *castToSqlTableConnection();

// Methods to do further status checking of connections: see whether
// there are I/O operations active at the time and whether unsent or
Expand Down Expand Up @@ -2981,8 +2979,9 @@ class IpcAllConnections : public ARRAY(IpcConnection *)

public:

IpcAllConnections(CollHeap *hp = 0, NABoolean esp = FALSE) : ARRAY(IpcConnection*)(hp)
IpcAllConnections(IpcEnvironment *env, CollHeap *hp = 0, NABoolean esp = FALSE) : ARRAY(IpcConnection*)(hp)
{
ipcEnv_ = env;
pendingIOs_ = new(hp) IpcSetOfConnections(this,hp,TRUE,esp);
completionSequenceNo_ = 0;
deleteCount_ = 0;
Expand All @@ -2995,32 +2994,11 @@ class IpcAllConnections : public ARRAY(IpcConnection *)
// copy ctor
IpcAllConnections (const IpcAllConnections & orig, CollHeap * h=0) ; // not written

void waitForAllSqlTableConnections(Int64 transid);

// wait for something to happen on any of the connections like awaitio(-1)
inline WaitReturnStatus waitOnAll(IpcTimeout timeout = IpcInfiniteTimeout,
WaitReturnStatus waitOnAll(IpcTimeout timeout = IpcInfiniteTimeout,
NABoolean calledByESP = FALSE,
NABoolean *timedout = NULL,
Int64 *waitTime = NULL)
{
WaitReturnStatus retcode;
struct timespec startts;
struct timespec endts;

clock_gettime(CLOCK_MONOTONIC, &startts);
retcode = pendingIOs_->waitOnSet(timeout, calledByESP, timedout);
clock_gettime(CLOCK_MONOTONIC, &endts);
if (startts.tv_nsec > endts.tv_nsec)
{
// borrow 1 from tv_sec, convert to nanosec and add to tv_nsec.
endts.tv_nsec += 1 * 1000 * 1000 * 1000;
endts.tv_sec -= 1;
}
if (waitTime != NULL)
*waitTime = ((endts.tv_sec - startts.tv_sec) * 1000LL * 1000LL * 1000LL)
+ (endts.tv_nsec - startts.tv_nsec);
return retcode;
}
Int64 *waitTime = NULL);

// used by asynchronous CLI cancel.
inline void cancelWait(NABoolean b) const
Expand Down Expand Up @@ -3118,6 +3096,7 @@ class IpcAllConnections : public ARRAY(IpcConnection *)
void *traceRef_;
CollIndex printEntry_;
Int32 numSMConnections_; // Number of SeaMonster connections
IpcEnvironment *ipcEnv_;
};

// Constants to indicates how many concurrent requests we allow per
Expand Down
4 changes: 0 additions & 4 deletions core/sql/executor/ex_split_bottom.cpp
Expand Up @@ -1940,10 +1940,6 @@ void SplitBottomRequestMessage::actOnReceive(IpcConnection * connection)
// in this split bottom code.
stmtGlobals->castToExEspStmtGlobals()->setNoNewRequest(TRUE);

// wait for DML requests.
stmtGlobals->getIpcEnvironment()->getAllConnections()->
waitForAllSqlTableConnections(stmtGlobals->getTransid());

// Change frag instance state. Want to let other active
// frag instances work until all msgs to producer ESPs get
// replies.
Expand Down

0 comments on commit 95c2712

Please sign in to comment.