From 7c700a28c893026eb93d3c0cac23f2518a268fcf Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 23 Aug 2022 11:06:38 +0000 Subject: [PATCH] complet copy from with some bugs --- src/backend/commands/copy.c | 16 ++-- src/backend/px/dispatcher/px_dispatchresult.c | 81 ------------------- src/backend/px/dispatcher/px_gang.c | 4 +- src/backend/px/px_copy.c | 7 +- src/backend/px/px_hash.c | 3 +- src/include/px/px_dispatchresult.h | 3 - 6 files changed, 18 insertions(+), 96 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 02e93778bbc..725a365f812 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -3067,7 +3067,7 @@ if (cstate->dispatch_mode == COPY_DISPATCH) if (cstate->dispatch_mode == COPY_DISPATCH) { /* In QD, compute the target segment to send this row to. */ - target_seg = GetTargetSeg(distData, myslot); + target_seg = GetTargetSeg(distData, tuple); } if (cstate->dispatch_mode == COPY_DISPATCH) @@ -3082,8 +3082,8 @@ if (cstate->dispatch_mode == COPY_DISPATCH) cstate->cur_lineno, cstate->line_buf.data, cstate->line_buf.len, - myslot->tts_values, - myslot->tts_isnull); + values, + slot->tts_isnull); skip_tuple = true; processed++; } @@ -3232,6 +3232,10 @@ if (cstate->dispatch_mode == COPY_DISPATCH) MemoryContextSwitchTo(oldcontext); + pxCopyEnd(pxCopy, NULL, NULL); + + PxDispatchCopyEnd(pxCopy); + /* * In the old protocol, tell pqcomm that we can process normal protocol * messages again. @@ -3716,7 +3720,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, // switch (cstate->dispatch_mode) // { // case COPY_DIRECT: - // stop_processing_at_field = -1; + // stop_processing_at_field = -1; // attnumlist = cstate->attnumlist; // break; @@ -5449,8 +5453,8 @@ SendCopyFromForwardedTuple(CopyState cstate, int16 attnum = i + 1; /* NULLs are simply left out of the message. */ - if (nulls[i]) - continue; + // if (nulls[i]) + // continue; /* * Make sure we have room for the attribute number. While we're at it, diff --git a/src/backend/px/dispatcher/px_dispatchresult.c b/src/backend/px/dispatcher/px_dispatchresult.c index 9e56960e100..d7fb97839c1 100644 --- a/src/backend/px/dispatcher/px_dispatchresult.c +++ b/src/backend/px/dispatcher/px_dispatchresult.c @@ -26,9 +26,6 @@ #include "px/px_vars.h" #include "utils/faultinjector.h" -static int pxdisp_snatchPGresults(PxDispatchResult *dispatchResult, - struct pg_result **pgresultptrs, int maxresults); - static void noTrailingNewlinePQ(PQExpBuffer buf) { @@ -734,48 +731,6 @@ pxdisp_resultEnd(PxDispatchResults *results, int sliceIndex) return &results->resultArray[si->resultEnd]; } -void -pxdisp_returnResults(PxDispatchResults *primaryResults, PxPgResults *px_pgresults) -{ - PxDispatchResult *dispatchResult; - int nslots; - int nresults = 0; - int i; - - if (!primaryResults || !px_pgresults) - return; - - /* - * Allocate result set ptr array. The caller must PQclear() each PGresult - * and free() the array. - */ - nslots = 0; - - for (i = 0; i < primaryResults->resultCount; ++i) - nslots += pxdisp_numPGresult(&primaryResults->resultArray[i]); - - px_pgresults->pg_results = (struct pg_result **) palloc0(nslots * sizeof(struct pg_result *)); - - /* - * Collect results from primary gang. - */ - for (i = 0; i < primaryResults->resultCount; ++i) - { - dispatchResult = &primaryResults->resultArray[i]; - - /* - * Take ownership of this QE's PGresult object(s). - */ - nresults += pxdisp_snatchPGresults(dispatchResult, - px_pgresults->pg_results + nresults, - nslots - nresults); - } - - Assert(nresults == nslots); - - /* tell the caller how many sets we're returning. */ - px_pgresults->numResults = nresults; -} /* * used in the interconnect on the dispatcher to avoid error-cleanup deadlocks. @@ -858,39 +813,3 @@ pxdisp_clearPxPgResults(PxPgResults *px_pgresults) px_pgresults->numResults = 0; } - -/* - * Remove all of the PGresult ptrs from a PxDispatchResult object - * and place them into an array provided by the caller. The caller - * becomes responsible for PQclear()ing them. Returns the number of - * PGresult ptrs placed in the array. - */ -static int -pxdisp_snatchPGresults(PxDispatchResult *dispatchResult, - struct pg_result **pgresultptrs, int maxresults) -{ - PQExpBuffer buf = dispatchResult->resultbuf; - PGresult **begp = (PGresult **) buf->data; - PGresult **endp = (PGresult **) (buf->data + buf->len); - PGresult **p; - int nresults = 0; - - /* - * Snatch the PGresult objects. - */ - for (p = begp; p < endp; ++p) - { - Assert(*p != NULL); - Assert(nresults < maxresults); - pgresultptrs[nresults++] = *p; - *p = NULL; - } - - /* - * Empty our PGresult array. - */ - resetPQExpBuffer(buf); - dispatchResult->okindex = -1; - - return nresults; -} \ No newline at end of file diff --git a/src/backend/px/dispatcher/px_gang.c b/src/backend/px/dispatcher/px_gang.c index 94584762b14..0e1d4bf5241 100644 --- a/src/backend/px/dispatcher/px_gang.c +++ b/src/backend/px/dispatcher/px_gang.c @@ -240,8 +240,8 @@ buildGangDefinition(List *segments, SegmentType segmentType) { workerId = lfirst_int(lc); newGangDefinition->db_descriptors[i] = - // pxnode_allocateIdlePX(workerId, totalPxNodes, segmentType); - pxnode_allocateIdlePX(RW_SEGMENT, totalPxNodes, segmentType); + pxnode_allocateIdlePX(workerId, totalPxNodes, segmentType); + // pxnode_allocateIdlePX(RW_SEGMENT, totalPxNodes, segmentType); } } PG_CATCH(); diff --git a/src/backend/px/px_copy.c b/src/backend/px/px_copy.c index bb47e9126af..f8898e171e8 100644 --- a/src/backend/px/px_copy.c +++ b/src/backend/px/px_copy.c @@ -101,8 +101,9 @@ makePxCopy(CopyState cstate, bool is_copy_in) /* initial replicated policy*/ int numsegments = -1; - numsegments = pxnode_getPxNodes()->totalPxNodes - * polar_get_stmt_px_dop(); + // numsegments = pxnode_getPxNodes()->totalPxNodes + // * polar_get_stmt_px_dop(); + numsegments = polar_get_stmt_px_dop(); policy = createReplicatedPolicy(numsegments); Assert(policy); @@ -133,7 +134,7 @@ makePxCopy(CopyState cstate, bool is_copy_in) c->total_segs = policy->numsegments; for (i = 0; i < c->total_segs; i++) - c->seglist = lappend_int(c->seglist, i); + c->seglist = lappend_int(c->seglist, RW_SEGMENT); } cstate->pxCopy = c; diff --git a/src/backend/px/px_hash.c b/src/backend/px/px_hash.c index f84dec64cb5..fed3e10ee34 100644 --- a/src/backend/px/px_hash.c +++ b/src/backend/px/px_hash.c @@ -132,7 +132,8 @@ makePxHashForRelation(Relation rel) { // PxPolicy *policy = rel->rd_pxpolicy; PxPolicy *policy; - int numsegments = getPxWorkerCount(); + // int numsegments = getPxWorkerCount(); + int numsegments = polar_get_stmt_px_dop(); policy = createReplicatedPolicy(numsegments); Oid *hashfuncs; int i; diff --git a/src/include/px/px_dispatchresult.h b/src/include/px/px_dispatchresult.h index 39071dbf8a1..ed849889f2e 100644 --- a/src/include/px/px_dispatchresult.h +++ b/src/include/px/px_dispatchresult.h @@ -272,9 +272,6 @@ PxDispatchResult *pxdisp_resultBegin(PxDispatchResults *results, int sliceIndex) */ PxDispatchResult *pxdisp_resultEnd(PxDispatchResults *results, int sliceIndex); -void -pxdisp_returnResults(PxDispatchResults *primaryResults, PxPgResults *cdb_pgresults); - /* * used in the interconnect on the dispatcher to avoid error-cleanup deadlocks. */