From d77817631bd07c2bdd8dc42cbbe36ff40330dde5 Mon Sep 17 00:00:00 2001 From: amyrazz44 Date: Mon, 8 May 2017 17:27:07 +0800 Subject: [PATCH] HAWQ-1458. Fix share input scan bug for writer part. --- src/backend/executor/nodeMaterial.c | 35 ++++- src/backend/executor/nodeShareInputScan.c | 161 +++++++++++++++++++++- src/backend/utils/misc/guc.c | 12 +- src/include/executor/nodeMaterial.h | 2 + src/include/executor/nodeShareInputScan.h | 3 + src/include/utils/guc.h | 2 + 6 files changed, 205 insertions(+), 10 deletions(-) diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c index f2b82b2ea2..551db212d8 100644 --- a/src/backend/executor/nodeMaterial.c +++ b/src/backend/executor/nodeMaterial.c @@ -41,19 +41,20 @@ #include "postgres.h" #include "executor/executor.h" -#include "executor/nodeMaterial.h" #include "executor/instrument.h" /* Instrumentation */ #include "utils/tuplestorenew.h" - +#include "executor/nodeMaterial.h" #include "miscadmin.h" #include "cdb/cdbvars.h" +#include "postmaster/primary_mirror_mode.h" + static void ExecMaterialExplainEnd(PlanState *planstate, struct StringInfoData *buf); static void ExecChildRescan(MaterialState *node, ExprContext *exprCtxt); static void DestroyTupleStore(MaterialState *node); static void ExecMaterialResetWorkfileState(MaterialState *node); - +static void mkLockFileForWriter(int size, int share_id, char * name); /* ---------------------------------------------------------------- * ExecMaterial @@ -115,6 +116,7 @@ ExecMaterial(MaterialState *node) ts = ntuplestore_create_readerwriter(rwfile_prefix, PlanStateOperatorMemKB((PlanState *)node) * 1024, true); tsa = ntuplestore_create_accessor(ts, true); + mkLockFileForWriter(MAXPGPATH, ma->share_id, "writer"); } else { @@ -759,3 +761,30 @@ ExecEagerFreeMaterial(MaterialState *node) } } + +/* + * mkLockFileForWriter + * + * Create a unique lock file for writer, then use flock() to lock/unlock the lock file. + * We can make sure the lock file will be locked forerver until the writer process quits. + */ +static void mkLockFileForWriter(int size, int share_id, char * name) +{ + char *lock_file; + int lock; + + lock_file = (char *)palloc0(size); + generate_lock_file_name(lock_file, size, share_id, name); + elog(DEBUG3, "The lock file for writer in SISC is %s", lock_file); + sisc_writer_lock_fd = open(lock_file, O_CREAT, S_IRWXU); + if(sisc_writer_lock_fd < 0) + { + elog(ERROR, "Could not create lock file %s for writer in SISC. The error number is %d", lock_file, errno); + } + lock = flock(sisc_writer_lock_fd, LOCK_EX | LOCK_NB); + if(lock == -1) + elog(DEBUG3, "Could not lock lock file \"%s\" for writer in SISC . The error number is %d", lock_file, errno); + else if(lock == 0) + elog(LOG, "Successfully locked lock file \"%s\" for writer in SISC.", lock_file); + pfree(lock_file); +} diff --git a/src/backend/executor/nodeShareInputScan.c b/src/backend/executor/nodeShareInputScan.c index 049943be64..b9937bf2af 100644 --- a/src/backend/executor/nodeShareInputScan.c +++ b/src/backend/executor/nodeShareInputScan.c @@ -43,7 +43,6 @@ #include "cdb/cdbvars.h" #include "executor/executor.h" #include "executor/nodeShareInputScan.h" - #include "utils/tuplestorenew.h" #include "miscadmin.h" @@ -552,6 +551,52 @@ static void sisc_lockname(char* p, int size, int share_id, const char* name) } } + +char *joint_lock_file_name(ShareInput_Lk_Context *lk_ctxt, char *name) +{ + char *lock_file = palloc0(MAXPGPATH); + + if(strncmp("writer", name, strlen("writer")) ==0 ) + { + strncat(lock_file, lk_ctxt->lkname_ready, MAXPGPATH - strlen(lock_file) - 1); + } + else + { + strncat(lock_file, lk_ctxt->lkname_done, MAXPGPATH - strlen(lock_file) - 1); + } + strncat(lock_file, name, MAXPGPATH - strlen(lock_file) -1); + return lock_file; +} + +void drop_lock_files(ShareInput_Lk_Context *lk_ctxt) +{ + char *writer_lock_file = NULL; + char *reader_lock_file = NULL; + + writer_lock_file = joint_lock_file_name(lk_ctxt, "writer"); + if(access(writer_lock_file, F_OK) == 0) + { + elog(DEBUG3, "Drop writer's lock files %s in SISC", writer_lock_file); + unlink(writer_lock_file); + } + else + { + elog(DEBUG3, "Writer's lock files %s has been dropped already in SISC", writer_lock_file); + } + pfree(writer_lock_file); + reader_lock_file = joint_lock_file_name(lk_ctxt, "reader"); + if(access(reader_lock_file, F_OK) == 0) + { + elog(DEBUG3, "Drop reader's lock files %s in SISC", reader_lock_file); + unlink(writer_lock_file); + } + else + { + elog(DEBUG3, "Reader's lock files %s has been dropped already in SISC", reader_lock_file); + } + pfree(reader_lock_file); +} + static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context *lk_ctxt) { int err; @@ -590,6 +635,10 @@ static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context *lk_ctxt) lk_ctxt->del_done = false; } + elog(DEBUG3, "Begin to drop all the lock files for SISC"); + drop_lock_files(lk_ctxt); + elog(DEBUG3, "End of drop lock files for SISC"); + gp_free2 (lk_ctxt, sizeof(ShareInput_Lk_Context)); } @@ -666,6 +715,29 @@ static int retry_write(int fd, char *buf, int wsize) return 0; } + + +/* + * generate_lock_file_name + * + * Called by reader or writer to make the unique lock file name. + */ +void generate_lock_file_name(char* p, int size, int share_id, const char* name) +{ + if (strncmp(name , "writer", strlen("writer")) == 0) + { + sisc_lockname(p, size, share_id, "ready"); + strncat(p, name, size - strlen(p) - 1); + } + else + { + sisc_lockname(p, size, share_id, "done"); + strncat(p, name, size - strlen(p) - 1); + } +} + + + /* * Readiness (a) synchronization. * @@ -709,6 +781,13 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen) struct timeval tval; int n; char a; + int file_exists = -1; + int timeout_interval = 0; + bool flag = false; //A tag for file exists or not. + int lock_fd = -1; + int lock = -1; + bool is_lock_firsttime = true; + char *writer_lock_file = NULL; //current path for lock file. ShareInput_Lk_Context *pctxt = gp_malloc(sizeof(ShareInput_Lk_Context)); @@ -738,6 +817,9 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen) if(pctxt->donefd < 0) elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_done); + writer_lock_file = joint_lock_file_name(pctxt, "writer"); + elog(DEBUG3, "The lock file of writer in SISC is %s", writer_lock_file); + while(1) { CHECK_FOR_INTERRUPTS(); @@ -773,13 +855,13 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen) retry_read(pctxt->readyfd, &a, 1); Assert(rwsize == 1 && a == 'a'); - elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready got writer's handshake", + elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready got writer's handshake", share_id, currentSliceId); if (planGen == PLANGEN_PLANNER) { /* For planner-generated plans, we send ack back after receiving the handshake */ - elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready writing ack back to writer", + elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready writing ack back to writer", share_id, currentSliceId); #if USE_ASSERT_CHECKING @@ -793,8 +875,70 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen) } else if(n==0) { - elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready time out once", - share_id, currentSliceId); + file_exists = access(writer_lock_file, F_OK); + if(file_exists != 0) + { + elog(DEBUG3, "Wait lock file for writer time out interval is %d", timeout_interval); + if(timeout_interval >= share_input_scan_wait_lockfile_timeout || flag == true) //If lock file never exists or disappeared, reader will no longer waiting for writer + { + elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready time out and break", + share_id, currentSliceId); + pfree(writer_lock_file); + break; + } + timeout_interval += tval.tv_sec * 1000 + tval.tv_usec; + } + else + { + elog(LOG, "writer lock file of shareinput_reader_waitready() is %s", writer_lock_file); + flag = true; + lock_fd = open(writer_lock_file, O_RDONLY); + if(lock_fd < 0) + { + elog(DEBUG3, "Open writer's lock file %s failed!, error number is %d", writer_lock_file, errno); + continue; + } + lock = flock(lock_fd, LOCK_EX | LOCK_NB); + if(lock == -1) + { + /* + * Reader try to lock the lock file which writer created until locked the lock file successfully + * which means that writer process quit. If reader lock the lock file failed, it means that writer + * process is healthy. + */ + elog(DEBUG3, "Lock writer's lock file %s failed!, error number is %d", writer_lock_file, errno); + } + else if(lock == 0) + { + /* + * There is one situation to consider about. + * Writer need a time interval to lock the lock file after the lock file has been created. + * So, if reader lock the lock file ahead of writer, we should unlock it. + * If reader lock the lock file after writer, it means that writer process has abort. + * We should break the loop to make sure reader no longer wait for writer. + */ + if(is_lock_firsttime == true) + { + lock = flock(lock_fd, LOCK_UN); + is_lock_firsttime = false; + elog(DEBUG3, "Lock writer's lock file %s first time successfully in SISC! Unlock it.", writer_lock_file); + continue; + } + else + { + elog(LOG, "Lock writer's lock file %s successfully in SISC!", writer_lock_file); + /* Retry to close the fd in case there is interruption from signal */ + while ((close(lock_fd) < 0) && (errno == EINTR)) + { + elog(DEBUG3, "Failed to close SISC temporary file due to strerror(errno)"); + } + pfree(writer_lock_file); + break; + } + } + elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready time out once", + share_id, currentSliceId); + } } else { @@ -802,6 +946,7 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen) elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready try again, errno %d ... ", share_id, currentSliceId, save_errno); } + } return (void *) pctxt; } @@ -853,7 +998,6 @@ shareinput_writer_notifyready(int share_id, int xslice, PlanGenerator planGen) pctxt->donefd = open(pctxt->lkname_done, O_RDWR, 0600); if(pctxt->donefd < 0) elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_done); - for(n=0; n 0) + { + close(sisc_writer_lock_fd); + } + } else if(numReady==0) { diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 6769d3bfb6..ac29d87641 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -668,6 +668,8 @@ bool gp_cte_sharing = false; char *gp_idf_deduplicate_str; +int share_input_scan_wait_lockfile_timeout; + /* gp_disable_catalog_access_on_segment */ bool gp_disable_catalog_access_on_segment = false; @@ -6685,6 +6687,15 @@ static struct config_int ConfigureNamesInt[] = &metadata_cache_max_hdfs_file_num, 524288, 32768, 8388608, NULL, NULL }, + { + {"share_input_scan_wait_lockfile_timeout", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("timeout (in millisecond) for waiting lock file which writer creates."), + NULL + }, + &share_input_scan_wait_lockfile_timeout, + 300000, 1, INT_MAX, NULL, NULL + }, + /* End-of-list marker */ { @@ -8349,7 +8360,6 @@ static struct config_string ConfigureNamesString[] = NULL, NULL, NULL }, - /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL diff --git a/src/include/executor/nodeMaterial.h b/src/include/executor/nodeMaterial.h index 509f78c1f0..187824ba9b 100644 --- a/src/include/executor/nodeMaterial.h +++ b/src/include/executor/nodeMaterial.h @@ -15,7 +15,9 @@ #define NODEMATERIAL_H #include "nodes/execnodes.h" +#include "executor/nodeShareInputScan.h" +static int sisc_writer_lock_fd = -1; extern int ExecCountSlotsMaterial(Material *node); extern MaterialState *ExecInitMaterial(Material *node, EState *estate, int eflags); extern TupleTableSlot *ExecMaterial(MaterialState *node); diff --git a/src/include/executor/nodeShareInputScan.h b/src/include/executor/nodeShareInputScan.h index 23025b99eb..aeb88a7bf1 100644 --- a/src/include/executor/nodeShareInputScan.h +++ b/src/include/executor/nodeShareInputScan.h @@ -30,6 +30,8 @@ #define NODESHAREINPUTSCAN_H #include "nodes/execnodes.h" +#include "executor/nodeMaterial.h" + extern int ExecCountSlotsShareInputScan(ShareInputScan* node); extern ShareInputScanState *ExecInitShareInputScan(ShareInputScan *node, EState *estate, int eflags); extern TupleTableSlot *ExecShareInputScan(ShareInputScanState *node); @@ -52,4 +54,5 @@ static inline gpmon_packet_t * GpmonPktFromShareInputState(ShareInputScanState * return &node->ss.ps.gpmon_pkt; } +extern void generate_lock_file_name(char* p, int size, int share_id, const char* name); #endif /* NODESHAREINPUTSCAN_H */ diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index 546584f8a7..922fe0cab1 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -444,6 +444,8 @@ extern bool optimizer_prefer_scalar_dqa_multistage_agg; extern bool optimizer_parallel_union; extern bool optimizer_array_constraints; +/* Timeout for shareinputscan writer/reader wait for lock files */ +extern int share_input_scan_wait_lockfile_timeout; /* fallback in ranger ACL check */ extern int information_schema_namespace_oid;