Skip to content

Commit

Permalink
error: try to connect to the different process,
Browse files Browse the repository at this point in the history
but the server closed the connection unexpectedly
  • Loading branch information
Weijun-H committed Aug 22, 2022
1 parent 7705d73 commit 6872895
Show file tree
Hide file tree
Showing 8 changed files with 582 additions and 134 deletions.
498 changes: 496 additions & 2 deletions src/backend/commands/copy.c

Large diffs are not rendered by default.

125 changes: 3 additions & 122 deletions src/backend/px/dispatcher/px_disp_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -1062,15 +1062,7 @@ PxDispatchCopyStart(struct PxCopy *pxCopy, Node *stmt, int flags)
ErrorData *error = NULL;
bool needTwoPhase = flags & DF_NEED_TWO_PHASE;

// if (needTwoPhase)
// {
px_sql_wal_lsn = polar_px_max_valid_lsn();
// }

// elogif(log_min_messages <= DEBUG5, LOG,
// "PxDispatchCopyStart: %s (needTwoPhase = %s)",
// (PointerIsValid(debug_query_string) ? debug_query_string : "\"\""),
// (needTwoPhase ? "true" : "false"));

pQueryParms = pxdisp_buildUtilityQueryParms(stmt, flags, NULL);

Expand All @@ -1097,8 +1089,8 @@ PxDispatchCopyStart(struct PxCopy *pxCopy, Node *stmt, int flags)
// /* Start a background libpq thread */
// pxdisp_startPqThread(ds);
// /* If libpq is not run in background*/
// if (!pxdisp_isDsThreadRuning())
// pxdisp_waitDispatchFinish(ds);
if (!pxdisp_isDsThreadRuning())
pxdisp_waitDispatchFinish(ds);

pxdisp_checkDispatchResult(ds, DISPATCH_WAIT_NONE);

Expand All @@ -1123,115 +1115,4 @@ PxDispatchCopyEnd(struct PxCopy *pxCopy)
ds = pxCopy->dispatcherState;
pxCopy->dispatcherState = NULL;
pxdisp_destroyDispatcherState(ds);
}


/*
* PxDispatchUtilityStatement
*
* Dispatch an already parsed statement to all primary writer QEs, wait until
* all QEs finished successfully. If one or more QEs got error,
* throw an Error.
*
* -flags:
* Is the combination of DF_NEED_TWO_PHASE, DF_WITH_SNAPSHOT,DF_CANCEL_ON_ERROR
*
* -px_pgresults:
* Indicate whether return the pg_result for each QE connection.
*
*/
void
PxDispatchUtilityStatement(struct Node *stmt,
int flags,
List *oid_assignments,
PxPgResults *px_pgresults)
{
DispatchCommandQueryParms *pQueryParms;
bool needTwoPhase = flags & DF_NEED_TWO_PHASE;

// if (needTwoPhase)
// setupDtxTransaction();

// elogif((Debug_print_full_dtm || log_min_messages <= DEBUG5), LOG,
// "PxDispatchUtilityStatement: %s (needTwoPhase = %s)",
// (PointerIsValid(debug_query_string) ? debug_query_string : "\"\""),
// (needTwoPhase ? "true" : "false"));

pQueryParms = pxdisp_buildUtilityQueryParms(stmt, flags, oid_assignments);

return pxdisp_dispatchCommandInternal(pQueryParms,
flags,
pxcomponent_getPxComponentsList(),
px_pgresults);
}

static void
pxdisp_dispatchCommandInternal(DispatchCommandQueryParms *pQueryParms,
int flags,
List *segments,
PxPgResults *px_pgresults)
{
PxDispatcherState *ds;
Gang *primaryGang;
PxDispatchResults *pr;
ErrorData *qeError = NULL;
char *queryText;
int queryTextLength;

/*
* Dispatch the command.
*/
ds = pxdisp_makeDispatcherState(false);

/*
* Reader gangs use local snapshot to access catalog, as a result, it will
* not synchronize with the global snapshot from write gang which will lead
* to inconsistent visibilty of catalog table. Considering the case:
*
* select * from t, t t1; -- create a reader gang.
* begin;
* create role r1;
* set role r1; -- set command will also dispatched to idle reader gang
*
* When set role command dispatched to reader gang, reader gang cannot see
* the new tuple t1 in catalog table pg_auth.
* To fix this issue, we should drop the idle reader gangs after each
* utility statement which may modify the catalog table.
*/
// ds->destroyIdleReaderGang = true;

queryText = buildPXQueryString(pQueryParms, &queryTextLength);

/*
* Allocate a primary QE for every available segDB in the system.
*/
primaryGang = AllocateGang(ds, GANGTYPE_PRIMARY_WRITER, segments);
Assert(primaryGang);

pxdisp_makeDispatchResults(ds, 1, flags & DF_CANCEL_ON_ERROR);
pxdisp_makeDispatchParams (ds, 1, queryText, queryTextLength);

pxdisp_dispatchToGang(ds, primaryGang, -1);

// if ((flags & DF_NEED_TWO_PHASE) != 0 || isDtxExplicitBegin())
// addToGxactDtxSegments(primaryGang);

pxdisp_waitDispatchFinish(ds);

pxdisp_checkDispatchResult(ds, DISPATCH_WAIT_NONE);

pr = pxdisp_getDispatchResults(ds, &qeError);

if (qeError)
{
FlushErrorState();
ReThrowError(qeError);
}

/* collect pgstat from QEs for current transaction level */
// pgstat_combine_from_qe(pr, -1);

pxdisp_returnResults(pr, px_pgresults);

pxdisp_destroyDispatcherState(ds);
}
}
3 changes: 2 additions & 1 deletion src/backend/px/dispatcher/px_gang.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ buildGangDefinition(List *segments, SegmentType segmentType)
{
workerId = lfirst_int(lc);
newGangDefinition->db_descriptors[i] =
pxnode_allocateIdlePX(workerId, totalPxNodes, segmentType);
// pxnode_allocateIdlePX(workerId, totalPxNodes, segmentType);
pxnode_allocateIdlePX(RW_SEGMENT, totalPxNodes, segmentType);
}
}
PG_CATCH();
Expand Down
10 changes: 6 additions & 4 deletions src/backend/px/px_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@
#include "px/px_copy.h"
#include "px/px_disp_query.h"
#include "px/px_dispatchresult.h"
// #include "px/px_fts.h"
#include "px/px_gang.h"
// #include "px/px_tm.h"
#include "px/px_vars.h"
#include "commands/copy.h"
#include "commands/defrem.h"
Expand Down Expand Up @@ -118,8 +116,12 @@ makePxCopy(CopyState cstate, bool is_copy_in)
c->seglist = NIL;
c->dispatcherState = NULL;
initStringInfo(&(c->copy_out_buf));

if (!is_copy_in)

/*
* COPY replicated table TO file, pick only one replica, otherwise, duplicate
* rows will be copied.
*/
if (!is_copy_in)
{
c->total_segs = 1;
c->seglist = list_make1_int(px_session_id % c->total_segs);
Expand Down
31 changes: 31 additions & 0 deletions src/backend/px/px_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,37 @@ makePxHash(int numsegs, int natts, Oid *hashfuncs)
return h;
}

/*
* Convenience routine, to create a PxHash according to a relation's
* distribution policy.
*/
PxHash *
makePxHashForRelation(Relation rel)
{
// PxPolicy *policy = rel->rd_pxpolicy;
PxPolicy *policy;
int numsegments = getPxWorkerCount();
policy = createReplicatedPolicy(numsegments);
Oid *hashfuncs;
int i;
TupleDesc desc = RelationGetDescr(rel);

hashfuncs = palloc(policy->nattrs * sizeof(Oid));

for (i = 0; i < policy->nattrs; i++)
{
AttrNumber attnum = policy->attrs[i];
Oid typeoid = TupleDescAttr(desc, attnum - 1)->atttypid;
Oid opfamily;

opfamily = get_opclass_family(policy->opclasses[i]);

hashfuncs[i] = px_hashproc_in_opfamily(opfamily, typeoid);
}

return makePxHash(policy->numsegments, policy->nattrs, hashfuncs);
}

/*
* Initialize PxHash for hashing the next tuple values.
*/
Expand Down
44 changes: 43 additions & 1 deletion src/include/commands/copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,32 @@ typedef enum EolType
EOL_CRNL
} EolType;

/*
*
* COPY FROM modes (from file/client to table)
*
* 1. "normal", direct, mode. This means ON SEGMENT running on a segment, or
* utility mode, or non-distributed table in QD.
* 2. Dispatcher mode. We are reading from file/client, and forwarding all data to QEs,
* or vice versa.
* 3. Executor mode. We are receiving pre-processed data from QD, and inserting to table.
*
* COPY TO modes (table/query to file/client)
*
* 1. Direct. This can mean ON SEGMENT running on segment, or utility mode, or
* non-distributed table in QD. Or COPY TO running on segment.
* 2. Dispatcher mode. We are receiving pre-formatted data from segments, and forwarding
* it all to to the client.
* 3. Executor mode. Not used.
*/

typedef enum
{
COPY_DIRECT,
COPY_DISPATCH,
COPY_EXECUTOR
} CopyDispatchMode;

/*
* This struct contains all the state variables used throughout a COPY
* operation. For simplicity, we use the same struct for all variants of COPY,
Expand Down Expand Up @@ -118,6 +144,7 @@ typedef struct CopyStateData
/*
* Working state for COPY TO/FROM
*/
CopyDispatchMode dispatch_mode;
MemoryContext copycontext; /* per-copy execution context */

/*
Expand Down Expand Up @@ -145,6 +172,9 @@ typedef struct CopyStateData

TransitionCaptureState *transition_capture;

StringInfo dispatch_msgbuf; /* used in COPY_DISPATCH mode, to construct message
* to send to QE. */

/*
* These variables are used to reduce overhead in textual COPY FROM.
*
Expand Down Expand Up @@ -184,8 +214,10 @@ typedef struct CopyStateData
int raw_buf_len; /* total # of bytes stored */


int first_qe_processed_field;
/* Information on the connections to QEs. */
PxCopy *pxCopy;
PxCopy *pxCopy;

} CopyStateData;

typedef struct CopyStateData *CopyState;
Expand Down Expand Up @@ -217,4 +249,14 @@ extern uint64 CopyFrom(CopyState cstate);

extern DestReceiver *CreateCopyDestReceiver(void);

/*
* This is used to hold information about the target's distribution policy,
* during COPY FROM.
*/
typedef struct PxDistributionData
{
PxPolicy *policy; /* partitioning policy for this table */
PxHash *pxHash; /* corresponding CdbHash object */
} PxDistributionData;

#endif /* COPY_H */
4 changes: 0 additions & 4 deletions src/include/px/px_disp_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ void PxDispatchPlan(struct QueryDesc *queryDesc,

extern ParamListInfo deserializeParamListInfo(const char *str, int slen);

extern void PxDispatchUtilityStatement(struct Node *stmt,
int flags,
List *oid_assignments,
struct PxPgResults* cdb_pgresults);
extern void PxDispatchCopyStart(struct PxCopy *pxCopy, Node *stmt, int flags);
extern void PxDispatchCopyEnd(struct PxCopy *pxCopy);

Expand Down
1 change: 1 addition & 0 deletions src/include/px/px_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ typedef struct PxHash
* Create and initialize a PxHash in the current memory context.
*/
extern PxHash *makePxHash(int numsegs, int natts, Oid *typeoids);
extern PxHash *makePxHashForRelation(Relation rel);

/*
* Initialize PxHash for hashing the next tuple values.
Expand Down

0 comments on commit 6872895

Please sign in to comment.