Skip to content

Commit

Permalink
complet copy from with some bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Weijun-H committed Aug 23, 2022
1 parent 6872895 commit 261b48c
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 96 deletions.
16 changes: 10 additions & 6 deletions src/backend/commands/copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3067,7 +3067,7 @@ if (cstate->dispatch_mode == COPY_DISPATCH)
if (cstate->dispatch_mode == COPY_DISPATCH)
{
/* In QD, compute the target segment to send this row to. */
target_seg = GetTargetSeg(distData, myslot);
target_seg = GetTargetSeg(distData, tuple);
}

if (cstate->dispatch_mode == COPY_DISPATCH)
Expand All @@ -3082,8 +3082,8 @@ if (cstate->dispatch_mode == COPY_DISPATCH)
cstate->cur_lineno,
cstate->line_buf.data,
cstate->line_buf.len,
myslot->tts_values,
myslot->tts_isnull);
values,
slot->tts_isnull);
skip_tuple = true;
processed++;
}
Expand Down Expand Up @@ -3232,6 +3232,10 @@ if (cstate->dispatch_mode == COPY_DISPATCH)

MemoryContextSwitchTo(oldcontext);

pxCopyEnd(pxCopy, NULL, NULL);

PxDispatchCopyEnd(pxCopy);

/*
* In the old protocol, tell pqcomm that we can process normal protocol
* messages again.
Expand Down Expand Up @@ -3716,7 +3720,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
// switch (cstate->dispatch_mode)
// {
// case COPY_DIRECT:
// stop_processing_at_field = -1;
// stop_processing_at_field = -1;
// attnumlist = cstate->attnumlist;
// break;

Expand Down Expand Up @@ -5449,8 +5453,8 @@ SendCopyFromForwardedTuple(CopyState cstate,
int16 attnum = i + 1;

/* NULLs are simply left out of the message. */
if (nulls[i])
continue;
// if (nulls[i])
// continue;

/*
* Make sure we have room for the attribute number. While we're at it,
Expand Down
81 changes: 0 additions & 81 deletions src/backend/px/dispatcher/px_dispatchresult.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
#include "px/px_vars.h"
#include "utils/faultinjector.h"

static int pxdisp_snatchPGresults(PxDispatchResult *dispatchResult,
struct pg_result **pgresultptrs, int maxresults);

static void
noTrailingNewlinePQ(PQExpBuffer buf)
{
Expand Down Expand Up @@ -734,48 +731,6 @@ pxdisp_resultEnd(PxDispatchResults *results, int sliceIndex)
return &results->resultArray[si->resultEnd];
}

void
pxdisp_returnResults(PxDispatchResults *primaryResults, PxPgResults *px_pgresults)
{
PxDispatchResult *dispatchResult;
int nslots;
int nresults = 0;
int i;

if (!primaryResults || !px_pgresults)
return;

/*
* Allocate result set ptr array. The caller must PQclear() each PGresult
* and free() the array.
*/
nslots = 0;

for (i = 0; i < primaryResults->resultCount; ++i)
nslots += pxdisp_numPGresult(&primaryResults->resultArray[i]);

px_pgresults->pg_results = (struct pg_result **) palloc0(nslots * sizeof(struct pg_result *));

/*
* Collect results from primary gang.
*/
for (i = 0; i < primaryResults->resultCount; ++i)
{
dispatchResult = &primaryResults->resultArray[i];

/*
* Take ownership of this QE's PGresult object(s).
*/
nresults += pxdisp_snatchPGresults(dispatchResult,
px_pgresults->pg_results + nresults,
nslots - nresults);
}

Assert(nresults == nslots);

/* tell the caller how many sets we're returning. */
px_pgresults->numResults = nresults;
}

/*
* used in the interconnect on the dispatcher to avoid error-cleanup deadlocks.
Expand Down Expand Up @@ -858,39 +813,3 @@ pxdisp_clearPxPgResults(PxPgResults *px_pgresults)

px_pgresults->numResults = 0;
}

/*
* Remove all of the PGresult ptrs from a PxDispatchResult object
* and place them into an array provided by the caller. The caller
* becomes responsible for PQclear()ing them. Returns the number of
* PGresult ptrs placed in the array.
*/
static int
pxdisp_snatchPGresults(PxDispatchResult *dispatchResult,
struct pg_result **pgresultptrs, int maxresults)
{
PQExpBuffer buf = dispatchResult->resultbuf;
PGresult **begp = (PGresult **) buf->data;
PGresult **endp = (PGresult **) (buf->data + buf->len);
PGresult **p;
int nresults = 0;

/*
* Snatch the PGresult objects.
*/
for (p = begp; p < endp; ++p)
{
Assert(*p != NULL);
Assert(nresults < maxresults);
pgresultptrs[nresults++] = *p;
*p = NULL;
}

/*
* Empty our PGresult array.
*/
resetPQExpBuffer(buf);
dispatchResult->okindex = -1;

return nresults;
}
4 changes: 2 additions & 2 deletions src/backend/px/dispatcher/px_gang.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ buildGangDefinition(List *segments, SegmentType segmentType)
{
workerId = lfirst_int(lc);
newGangDefinition->db_descriptors[i] =
// pxnode_allocateIdlePX(workerId, totalPxNodes, segmentType);
pxnode_allocateIdlePX(RW_SEGMENT, totalPxNodes, segmentType);
pxnode_allocateIdlePX(workerId, totalPxNodes, segmentType);
// pxnode_allocateIdlePX(RW_SEGMENT, totalPxNodes, segmentType);
}
}
PG_CATCH();
Expand Down
7 changes: 4 additions & 3 deletions src/backend/px/px_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ makePxCopy(CopyState cstate, bool is_copy_in)

/* initial replicated policy*/
int numsegments = -1;
numsegments = pxnode_getPxNodes()->totalPxNodes
* polar_get_stmt_px_dop();
// numsegments = pxnode_getPxNodes()->totalPxNodes
// * polar_get_stmt_px_dop();
numsegments = polar_get_stmt_px_dop();
policy = createReplicatedPolicy(numsegments);
Assert(policy);

Expand Down Expand Up @@ -133,7 +134,7 @@ makePxCopy(CopyState cstate, bool is_copy_in)
c->total_segs = policy->numsegments;

for (i = 0; i < c->total_segs; i++)
c->seglist = lappend_int(c->seglist, i);
c->seglist = lappend_int(c->seglist, RW_SEGMENT);
}

cstate->pxCopy = c;
Expand Down
3 changes: 2 additions & 1 deletion src/backend/px/px_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ makePxHashForRelation(Relation rel)
{
// PxPolicy *policy = rel->rd_pxpolicy;
PxPolicy *policy;
int numsegments = getPxWorkerCount();
// int numsegments = getPxWorkerCount();
int numsegments = polar_get_stmt_px_dop();
policy = createReplicatedPolicy(numsegments);
Oid *hashfuncs;
int i;
Expand Down
3 changes: 0 additions & 3 deletions src/include/px/px_dispatchresult.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,6 @@ PxDispatchResult *pxdisp_resultBegin(PxDispatchResults *results, int sliceIndex)
*/
PxDispatchResult *pxdisp_resultEnd(PxDispatchResults *results, int sliceIndex);

void
pxdisp_returnResults(PxDispatchResults *primaryResults, PxPgResults *cdb_pgresults);

/*
* used in the interconnect on the dispatcher to avoid error-cleanup deadlocks.
*/
Expand Down

0 comments on commit 261b48c

Please sign in to comment.