Skip to content
Permalink
Browse files
HAWQ-1458. Fix share input scan bug for writer part.
  • Loading branch information
amyrazz44 authored and radarwave committed Aug 23, 2017
1 parent 820d974 commit 270575d8aca18b6073212c2fed44aedc3c417615
Showing 6 changed files with 205 additions and 10 deletions.
@@ -41,19 +41,20 @@
#include "postgres.h"

#include "executor/executor.h"
#include "executor/nodeMaterial.h"
#include "executor/instrument.h" /* Instrumentation */
#include "utils/tuplestorenew.h"

#include "executor/nodeMaterial.h"
#include "miscadmin.h"

#include "cdb/cdbvars.h"
#include "postmaster/primary_mirror_mode.h"


static void ExecMaterialExplainEnd(PlanState *planstate, struct StringInfoData *buf);
static void ExecChildRescan(MaterialState *node, ExprContext *exprCtxt);
static void DestroyTupleStore(MaterialState *node);
static void ExecMaterialResetWorkfileState(MaterialState *node);

static void mkLockFileForWriter(int size, int share_id, char * name);

/* ----------------------------------------------------------------
* ExecMaterial
@@ -115,6 +116,7 @@ ExecMaterial(MaterialState *node)

ts = ntuplestore_create_readerwriter(rwfile_prefix, PlanStateOperatorMemKB((PlanState *)node) * 1024, true);
tsa = ntuplestore_create_accessor(ts, true);
mkLockFileForWriter(MAXPGPATH, ma->share_id, "writer");
}
else
{
@@ -759,3 +761,30 @@ ExecEagerFreeMaterial(MaterialState *node)
}
}


/*
* mkLockFileForWriter
*
* Create a unique lock file for writer, then use flock() to lock/unlock the lock file.
* We can make sure the lock file will be locked forerver until the writer process quits.
*/
static void mkLockFileForWriter(int size, int share_id, char * name)
{
char *lock_file;
int lock;

lock_file = (char *)palloc0(size);
generate_lock_file_name(lock_file, size, share_id, name);
elog(DEBUG3, "The lock file for writer in SISC is %s", lock_file);
sisc_writer_lock_fd = open(lock_file, O_CREAT, S_IRWXU);
if(sisc_writer_lock_fd < 0)
{
elog(ERROR, "Could not create lock file %s for writer in SISC. The error number is %d", lock_file, errno);
}
lock = flock(sisc_writer_lock_fd, LOCK_EX | LOCK_NB);
if(lock == -1)
elog(DEBUG3, "Could not lock lock file \"%s\" for writer in SISC . The error number is %d", lock_file, errno);
else if(lock == 0)
elog(LOG, "Successfully locked lock file \"%s\" for writer in SISC.", lock_file);
pfree(lock_file);
}
@@ -43,7 +43,6 @@
#include "cdb/cdbvars.h"
#include "executor/executor.h"
#include "executor/nodeShareInputScan.h"

#include "utils/tuplestorenew.h"
#include "miscadmin.h"

@@ -552,6 +551,52 @@ static void sisc_lockname(char* p, int size, int share_id, const char* name)
}
}


char *joint_lock_file_name(ShareInput_Lk_Context *lk_ctxt, char *name)
{
char *lock_file = palloc0(MAXPGPATH);

if(strncmp("writer", name, strlen("writer")) ==0 )
{
strncat(lock_file, lk_ctxt->lkname_ready, MAXPGPATH - strlen(lock_file) - 1);
}
else
{
strncat(lock_file, lk_ctxt->lkname_done, MAXPGPATH - strlen(lock_file) - 1);
}
strncat(lock_file, name, MAXPGPATH - strlen(lock_file) -1);
return lock_file;
}

void drop_lock_files(ShareInput_Lk_Context *lk_ctxt)
{
char *writer_lock_file = NULL;
char *reader_lock_file = NULL;

writer_lock_file = joint_lock_file_name(lk_ctxt, "writer");
if(access(writer_lock_file, F_OK) == 0)
{
elog(DEBUG3, "Drop writer's lock files %s in SISC", writer_lock_file);
unlink(writer_lock_file);
}
else
{
elog(DEBUG3, "Writer's lock files %s has been dropped already in SISC", writer_lock_file);
}
pfree(writer_lock_file);
reader_lock_file = joint_lock_file_name(lk_ctxt, "reader");
if(access(reader_lock_file, F_OK) == 0)
{
elog(DEBUG3, "Drop reader's lock files %s in SISC", reader_lock_file);
unlink(writer_lock_file);
}
else
{
elog(DEBUG3, "Reader's lock files %s has been dropped already in SISC", reader_lock_file);
}
pfree(reader_lock_file);
}

static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context *lk_ctxt)
{
int err;
@@ -590,6 +635,10 @@ static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context *lk_ctxt)
lk_ctxt->del_done = false;
}

elog(DEBUG3, "Begin to drop all the lock files for SISC");
drop_lock_files(lk_ctxt);
elog(DEBUG3, "End of drop lock files for SISC");

gp_free2 (lk_ctxt, sizeof(ShareInput_Lk_Context));
}

@@ -666,6 +715,29 @@ static int retry_write(int fd, char *buf, int wsize)
return 0;
}



/*
* generate_lock_file_name
*
* Called by reader or writer to make the unique lock file name.
*/
void generate_lock_file_name(char* p, int size, int share_id, const char* name)
{
if (strncmp(name , "writer", strlen("writer")) == 0)
{
sisc_lockname(p, size, share_id, "ready");
strncat(p, name, size - strlen(p) - 1);
}
else
{
sisc_lockname(p, size, share_id, "done");
strncat(p, name, size - strlen(p) - 1);
}
}



/*
* Readiness (a) synchronization.
*
@@ -709,6 +781,13 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
struct timeval tval;
int n;
char a;
int file_exists = -1;
int timeout_interval = 0;
bool flag = false; //A tag for file exists or not.
int lock_fd = -1;
int lock = -1;
bool is_lock_firsttime = true;
char *writer_lock_file = NULL; //current path for lock file.

ShareInput_Lk_Context *pctxt = gp_malloc(sizeof(ShareInput_Lk_Context));

@@ -738,6 +817,9 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
if(pctxt->donefd < 0)
elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_done);

writer_lock_file = joint_lock_file_name(pctxt, "writer");
elog(DEBUG3, "The lock file of writer in SISC is %s", writer_lock_file);

while(1)
{
CHECK_FOR_INTERRUPTS();
@@ -773,13 +855,13 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
retry_read(pctxt->readyfd, &a, 1);
Assert(rwsize == 1 && a == 'a');

elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready got writer's handshake",
elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready got writer's handshake",
share_id, currentSliceId);

if (planGen == PLANGEN_PLANNER)
{
/* For planner-generated plans, we send ack back after receiving the handshake */
elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready writing ack back to writer",
elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready writing ack back to writer",
share_id, currentSliceId);

#if USE_ASSERT_CHECKING
@@ -793,15 +875,78 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
}
else if(n==0)
{
elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready time out once",
share_id, currentSliceId);
file_exists = access(writer_lock_file, F_OK);
if(file_exists != 0)
{
elog(DEBUG3, "Wait lock file for writer time out interval is %d", timeout_interval);
if(timeout_interval >= share_input_scan_wait_lockfile_timeout || flag == true) //If lock file never exists or disappeared, reader will no longer waiting for writer
{
elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready time out and break",
share_id, currentSliceId);
pfree(writer_lock_file);
break;
}
timeout_interval += tval.tv_sec * 1000 + tval.tv_usec;
}
else
{
elog(LOG, "writer lock file of shareinput_reader_waitready() is %s", writer_lock_file);
flag = true;
lock_fd = open(writer_lock_file, O_RDONLY);
if(lock_fd < 0)
{
elog(DEBUG3, "Open writer's lock file %s failed!, error number is %d", writer_lock_file, errno);
continue;
}
lock = flock(lock_fd, LOCK_EX | LOCK_NB);
if(lock == -1)
{
/*
* Reader try to lock the lock file which writer created until locked the lock file successfully
* which means that writer process quit. If reader lock the lock file failed, it means that writer
* process is healthy.
*/
elog(DEBUG3, "Lock writer's lock file %s failed!, error number is %d", writer_lock_file, errno);
}
else if(lock == 0)
{
/*
* There is one situation to consider about.
* Writer need a time interval to lock the lock file after the lock file has been created.
* So, if reader lock the lock file ahead of writer, we should unlock it.
* If reader lock the lock file after writer, it means that writer process has abort.
* We should break the loop to make sure reader no longer wait for writer.
*/
if(is_lock_firsttime == true)
{
lock = flock(lock_fd, LOCK_UN);
is_lock_firsttime = false;
elog(DEBUG3, "Lock writer's lock file %s first time successfully in SISC! Unlock it.", writer_lock_file);
continue;
}
else
{
elog(LOG, "Lock writer's lock file %s successfully in SISC!", writer_lock_file);
/* Retry to close the fd in case there is interruption from signal */
while ((close(lock_fd) < 0) && (errno == EINTR))
{
elog(DEBUG3, "Failed to close SISC temporary file due to strerror(errno)");
}
pfree(writer_lock_file);
break;
}
}
elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready time out once",
share_id, currentSliceId);
}
}
else
{
int save_errno = errno;
elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready try again, errno %d ... ",
share_id, currentSliceId, save_errno);
}

}
return (void *) pctxt;
}
@@ -853,7 +998,6 @@ shareinput_writer_notifyready(int share_id, int xslice, PlanGenerator planGen)
pctxt->donefd = open(pctxt->lkname_done, O_RDWR, 0600);
if(pctxt->donefd < 0)
elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_done);

for(n=0; n<xslice; ++n)
{
#if USE_ASSERT_CHECKING
@@ -935,6 +1079,11 @@ writer_wait_for_acks(ShareInput_Lk_Context *pctxt, int share_id, int xslice)
elog(DEBUG1, "SISC WRITER (shareid=%d, slice=%d): notify ready succeed 1, xslice remaining %d",
share_id, currentSliceId, ack_needed);
}
if(ack_needed == 0 && sisc_writer_lock_fd > 0)
{
close(sisc_writer_lock_fd);
}

}
else if(numReady==0)
{
@@ -668,6 +668,8 @@ bool gp_cte_sharing = false;

char *gp_idf_deduplicate_str;

int share_input_scan_wait_lockfile_timeout;

/* gp_disable_catalog_access_on_segment */
bool gp_disable_catalog_access_on_segment = false;

@@ -6685,6 +6687,15 @@ static struct config_int ConfigureNamesInt[] =
&metadata_cache_max_hdfs_file_num,
524288, 32768, 8388608, NULL, NULL
},
{
{"share_input_scan_wait_lockfile_timeout", PGC_USERSET, DEVELOPER_OPTIONS,
gettext_noop("timeout (in millisecond) for waiting lock file which writer creates."),
NULL
},
&share_input_scan_wait_lockfile_timeout,
300000, 1, INT_MAX, NULL, NULL
},


/* End-of-list marker */
{
@@ -8349,7 +8360,6 @@ static struct config_string ConfigureNamesString[] =
NULL, NULL, NULL
},


/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL
@@ -15,7 +15,9 @@
#define NODEMATERIAL_H

#include "nodes/execnodes.h"
#include "executor/nodeShareInputScan.h"

static int sisc_writer_lock_fd = -1;
extern int ExecCountSlotsMaterial(Material *node);
extern MaterialState *ExecInitMaterial(Material *node, EState *estate, int eflags);
extern TupleTableSlot *ExecMaterial(MaterialState *node);
@@ -30,6 +30,8 @@
#define NODESHAREINPUTSCAN_H

#include "nodes/execnodes.h"
#include "executor/nodeMaterial.h"

extern int ExecCountSlotsShareInputScan(ShareInputScan* node);
extern ShareInputScanState *ExecInitShareInputScan(ShareInputScan *node, EState *estate, int eflags);
extern TupleTableSlot *ExecShareInputScan(ShareInputScanState *node);
@@ -52,4 +54,5 @@ static inline gpmon_packet_t * GpmonPktFromShareInputState(ShareInputScanState *
return &node->ss.ps.gpmon_pkt;
}

extern void generate_lock_file_name(char* p, int size, int share_id, const char* name);
#endif /* NODESHAREINPUTSCAN_H */
@@ -444,6 +444,8 @@ extern bool optimizer_prefer_scalar_dqa_multistage_agg;
extern bool optimizer_parallel_union;
extern bool optimizer_array_constraints;

/* Timeout for shareinputscan writer/reader wait for lock files */
extern int share_input_scan_wait_lockfile_timeout;

/* fallback in ranger ACL check */
extern int information_schema_namespace_oid;

0 comments on commit 270575d

Please sign in to comment.