Skip to content

Commit

Permalink
Merge pull request #224 from citusdata/develop_v1x
Browse files Browse the repository at this point in the history
Merge develop branch changes
  • Loading branch information
mtuncer committed Feb 18, 2020
2 parents fb2c329 + da737b2 commit a4e7e70
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 39 deletions.
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
sudo: required
dist: trusty
dist: bionic
language: c
cache:
apt: true
Expand All @@ -16,8 +16,10 @@ env:
- PGVERSION=9.6
- PGVERSION=10
- PGVERSION=11
- PGVERSION=12

before_install:
- git clone -b v0.7.9 --depth 1 https://github.com/citusdata/tools.git
- git clone -b v0.7.13 --depth 1 https://github.com/citusdata/tools.git
- sudo make -C tools install
- setup_apt
- nuke_pg
Expand Down
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ ifeq ($(enable_coverage),yes)
EXTRA_CLEAN += *.gcno
endif

UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Darwin)
PG_CPPFLAGS += -I/usr/local/include
endif

#
# Users need to specify their Postgres installation path through pg_config. For
# example: /usr/local/pgsql/bin/pg_config or /usr/lib/postgresql/9.3/bin/pg_config
Expand All @@ -41,8 +46,8 @@ ifndef MAJORVERSION
MAJORVERSION := $(basename $(VERSION))
endif

ifeq (,$(findstring $(MAJORVERSION), 9.3 9.4 9.5 9.6 10 11))
$(error PostgreSQL 9.3 or 9.4 or 9.5 or 9.6 or 10 or 11 is required to compile this extension)
ifeq (,$(findstring $(MAJORVERSION), 9.3 9.4 9.5 9.6 10 11 12))
$(error PostgreSQL 9.3 to 12 is required to compile this extension)
endif

cstore.pb-c.c: cstore.proto
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ So we need to install these packages first:
# Ubuntu 10.4+
sudo apt-get install protobuf-c-compiler
sudo apt-get install libprotobuf-c0-dev

# Ubuntu 18.4+
sudo apt-get install protobuf-c-compiler
sudo apt-get install libprotobuf-c-dev

# Mac OS X
brew install protobuf-c
Expand Down
11 changes: 9 additions & 2 deletions cstore_compression.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,16 @@ DecompressBuffer(StringInfo buffer, CompressionType compressionType)
decompressedData = palloc0(decompressedDataSize);

#if PG_VERSION_NUM >= 90500

#if PG_VERSION_NUM >= 120000
decompressedByteCount = pglz_decompress(CSTORE_COMPRESS_RAWDATA(buffer->data),
compressedDataSize, decompressedData,
decompressedDataSize, true);
#else
decompressedByteCount = pglz_decompress(CSTORE_COMPRESS_RAWDATA(buffer->data),
compressedDataSize,
decompressedData, decompressedDataSize);
compressedDataSize, decompressedData,
decompressedDataSize);
#endif

if (decompressedByteCount < 0)
{
Expand Down
153 changes: 129 additions & 24 deletions cstore_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@
#include "optimizer/pathnode.h"
#include "optimizer/planmain.h"
#include "optimizer/restrictinfo.h"
#if PG_VERSION_NUM >= 120000
#include "access/heapam.h"
#include "access/tableam.h"
#include "executor/tuptable.h"
#include "optimizer/optimizer.h"
#else
#include "optimizer/var.h"
#endif
#include "parser/parser.h"
#include "parser/parsetree.h"
#include "parser/parse_coerce.h"
Expand All @@ -55,7 +62,11 @@
#include "utils/memutils.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#if PG_VERSION_NUM >= 120000
#include "utils/snapmgr.h"
#else
#include "utils/tqual.h"
#endif


/* local functions forward declarations */
Expand Down Expand Up @@ -94,6 +105,7 @@ static bool DirectoryExists(StringInfo directoryName);
static void CreateDirectory(StringInfo directoryName);
static void RemoveCStoreDatabaseDirectory(Oid databaseOid);
static StringInfo OptionNamesString(Oid currentContextId);
static HeapTuple GetSlotHeapTuple(TupleTableSlot *tts);
static CStoreFdwOptions * CStoreGetOptions(Oid foreignTableId);
static char * CStoreGetOptionValue(Oid foreignTableId, const char *optionName);
static void ValidateForeignTableOptions(char *filename, char *compressionTypeString,
Expand Down Expand Up @@ -135,12 +147,18 @@ static List * CStorePlanForeignModify(PlannerInfo *plannerInfo, ModifyTable *pla
static void CStoreBeginForeignModify(ModifyTableState *modifyTableState,
ResultRelInfo *relationInfo, List *fdwPrivate,
int subplanIndex, int executorflags);
static void CStoreBeginForeignInsert(ModifyTableState *modifyTableState,
ResultRelInfo *relationInfo);
static TupleTableSlot * CStoreExecForeignInsert(EState *executorState,
ResultRelInfo *relationInfo,
TupleTableSlot *tupleSlot,
TupleTableSlot *planSlot);
static void CStoreEndForeignModify(EState *executorState, ResultRelInfo *relationInfo);

static void CStoreEndForeignInsert(EState *executorState, ResultRelInfo *relationInfo);
#if PG_VERSION_NUM >= 90600
static bool CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte);
#endif

/* declarations for dynamic loading */
PG_MODULE_MAGIC;
Expand Down Expand Up @@ -572,7 +590,11 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
{
/* read the next row in tupleContext */
MemoryContext oldContext = MemoryContextSwitchTo(tupleContext);
#if PG_VERSION_NUM >= 120000
nextRowFound = NextCopyFrom(copyState, NULL, columnValues, columnNulls);
#else
nextRowFound = NextCopyFrom(copyState, NULL, columnValues, columnNulls, NULL);
#endif
MemoryContextSwitchTo(oldContext);

/* write the row to the cstore file */
Expand Down Expand Up @@ -786,7 +808,7 @@ FindCStoreTables(List *tableList)
}


/*
/*
* OpenRelationsForTruncate opens and locks relations for tables to be truncated.
*
* It also performs a permission checks to see if the user has truncate privilege
Expand Down Expand Up @@ -964,9 +986,9 @@ DistributedTable(Oid relationId)
bool distributedTable = false;
Oid partitionOid = InvalidOid;
Relation heapRelation = NULL;
HeapScanDesc scanDesc = NULL;
TableScanDesc scanDesc = NULL;
const int scanKeyCount = 1;
ScanKeyData scanKey[scanKeyCount];
ScanKeyData scanKey[1];
HeapTuple heapTuple = NULL;

bool missingOK = true;
Expand All @@ -989,13 +1011,13 @@ DistributedTable(Oid relationId)
ScanKeyInit(&scanKey[0], ATTR_NUM_PARTITION_RELATION_ID, InvalidStrategy,
F_OIDEQ, ObjectIdGetDatum(relationId));

scanDesc = heap_beginscan(heapRelation, SnapshotSelf, scanKeyCount, scanKey);
scanDesc = table_beginscan(heapRelation, SnapshotSelf, scanKeyCount, scanKey);

heapTuple = heap_getnext(scanDesc, ForwardScanDirection);

distributedTable = HeapTupleIsValid(heapTuple);

heap_endscan(scanDesc);
table_endscan(scanDesc);
relation_close(heapRelation, AccessShareLock);

return distributedTable;
Expand Down Expand Up @@ -1108,7 +1130,7 @@ CreateDirectory(StringInfo directoryName)

/*
* RemoveCStoreDatabaseDirectory removes CStore directory previously
* created for this database.
* created for this database.
* However it does not remove 'cstore_fdw' directory even if there
* are no other databases left.
*/
Expand Down Expand Up @@ -1206,6 +1228,15 @@ cstore_fdw_handler(PG_FUNCTION_ARGS)
fdwRoutine->ExecForeignInsert = CStoreExecForeignInsert;
fdwRoutine->EndForeignModify = CStoreEndForeignModify;

#if PG_VERSION_NUM >= 110000
fdwRoutine->BeginForeignInsert = CStoreBeginForeignInsert;
fdwRoutine->EndForeignInsert = CStoreEndForeignInsert;
#endif

#if PG_VERSION_NUM >= 90600
fdwRoutine->IsForeignScanParallelSafe = CStoreIsForeignScanParallelSafe;
#endif

PG_RETURN_POINTER(fdwRoutine);
}

Expand Down Expand Up @@ -1351,6 +1382,20 @@ OptionNamesString(Oid currentContextId)
}


/*
* GetSlotHeapTuple abstracts getting HeapTuple from TupleTableSlot between versions
*/
static HeapTuple
GetSlotHeapTuple(TupleTableSlot *tts)
{
#if PG_VERSION_NUM >= 120000
return tts->tts_ops->copy_heap_tuple(tts);
#else
return tts->tts_tuple;
#endif
}


/*
* CStoreGetOptions returns the option values to be used when reading and writing
* the cstore file. To resolve these values, the function checks options for the
Expand Down Expand Up @@ -1510,17 +1555,24 @@ static char *
CStoreDefaultFilePath(Oid foreignTableId)
{
Relation relation = relation_open(foreignTableId, AccessShareLock);
RelFileNode relationFileNode = relation->rd_node;

RelFileNode relationFileNode = relation->rd_node;
Oid databaseOid = relationFileNode.dbNode;
Oid relationFileOid = relationFileNode.relNode;

relation_close(relation, AccessShareLock);

/* PG12 onward does not create relfilenode for foreign tables */
if (databaseOid == InvalidOid)
{
databaseOid = MyDatabaseId;
relationFileOid = foreignTableId;

}

StringInfo cstoreFilePath = makeStringInfo();
appendStringInfo(cstoreFilePath, "%s/%s/%u/%u", DataDir, CSTORE_FDW_NAME,
databaseOid, relationFileOid);

relation_close(relation, AccessShareLock);

return cstoreFilePath->data;
}

Expand Down Expand Up @@ -2062,7 +2114,9 @@ CStoreAcquireSampleRows(Relation relation, int logLevel,
/* set up tuple slot */
columnValues = palloc0(columnCount * sizeof(Datum));
columnNulls = palloc0(columnCount * sizeof(bool));
#if PG_VERSION_NUM >= 110000
#if PG_VERSION_NUM >= 120000
scanTupleSlot = MakeTupleTableSlot(NULL, &TTSOpsVirtual);
#elif PG_VERSION_NUM >= 110000
scanTupleSlot = MakeTupleTableSlot(NULL);
#else
scanTupleSlot = MakeTupleTableSlot();
Expand Down Expand Up @@ -2107,7 +2161,7 @@ CStoreAcquireSampleRows(Relation relation, int logLevel,
MemoryContextSwitchTo(oldContext);

/* if there are no more records to read, break */
if (scanTupleSlot->tts_isempty)
if (TTS_EMPTY(scanTupleSlot))
{
break;
}
Expand Down Expand Up @@ -2223,18 +2277,15 @@ CStorePlanForeignModify(PlannerInfo *plannerInfo, ModifyTable *plan,
}


/* CStoreBeginForeignModify prepares cstore table for insert operation. */
/*
* CStoreBeginForeignModify prepares cstore table for a modification.
* Only insert is currently supported.
*/
static void
CStoreBeginForeignModify(ModifyTableState *modifyTableState,
ResultRelInfo *relationInfo, List *fdwPrivate,
int subplanIndex, int executorFlags)
{
Oid foreignTableOid = InvalidOid;
CStoreFdwOptions *cstoreFdwOptions = NULL;
TupleDesc tupleDescriptor = NULL;
TableWriteState *writeState = NULL;
Relation relation = NULL;

/* if Explain with no Analyze, do nothing */
if (executorFlags & EXEC_FLAG_EXPLAIN_ONLY)
{
Expand All @@ -2243,6 +2294,23 @@ CStoreBeginForeignModify(ModifyTableState *modifyTableState,

Assert (modifyTableState->operation == CMD_INSERT);

CStoreBeginForeignInsert(modifyTableState, relationInfo);
}


/*
* CStoreBeginForeignInsert prepares a cstore table for an insert or rows
* coming from a COPY.
*/
static void
CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *relationInfo)
{
Oid foreignTableOid = InvalidOid;
CStoreFdwOptions *cstoreFdwOptions = NULL;
TupleDesc tupleDescriptor = NULL;
TableWriteState *writeState = NULL;
Relation relation = NULL;

foreignTableOid = RelationGetRelid(relationInfo->ri_RelationDesc);
relation = heap_open(foreignTableOid, ShareUpdateExclusiveLock);
cstoreFdwOptions = CStoreGetOptions(foreignTableOid);
Expand All @@ -2268,14 +2336,19 @@ CStoreExecForeignInsert(EState *executorState, ResultRelInfo *relationInfo,
TupleTableSlot *tupleSlot, TupleTableSlot *planSlot)
{
TableWriteState *writeState = (TableWriteState*) relationInfo->ri_FdwState;
HeapTuple heapTuple;

Assert(writeState != NULL);

if(HeapTupleHasExternal(tupleSlot->tts_tuple))
heapTuple = GetSlotHeapTuple(tupleSlot);

if (HeapTupleHasExternal(heapTuple))
{
/* detoast any toasted attributes */
tupleSlot->tts_tuple = toast_flatten_tuple(tupleSlot->tts_tuple,
tupleSlot->tts_tupleDescriptor);
HeapTuple newTuple = toast_flatten_tuple(heapTuple,
tupleSlot->tts_tupleDescriptor);

ExecForceStoreHeapTuple(newTuple, tupleSlot, true);
}

slot_getallattrs(tupleSlot);
Expand All @@ -2286,9 +2359,22 @@ CStoreExecForeignInsert(EState *executorState, ResultRelInfo *relationInfo,
}


/* CStoreEndForeignModify ends the current insert operation. */
/*
* CStoreEndForeignModify ends the current modification. Only insert is currently
* supported.
*/
static void
CStoreEndForeignModify(EState *executorState, ResultRelInfo *relationInfo)
{
CStoreEndForeignInsert(executorState, relationInfo);
}


/*
* CStoreEndForeignInsert ends the current insert or COPY operation.
*/
static void
CStoreEndForeignInsert(EState *executorState, ResultRelInfo *relationInfo)
{
TableWriteState *writeState = (TableWriteState*) relationInfo->ri_FdwState;

Expand All @@ -2302,3 +2388,22 @@ CStoreEndForeignModify(EState *executorState, ResultRelInfo *relationInfo)
}
}


#if PG_VERSION_NUM >= 90600
/*
* CStoreIsForeignScanParallelSafe always returns true to indicate that
* reading from a cstore_fdw table in a parallel worker is safe. This
* does not enable parallelism for queries on individual cstore_fdw
* tables, but does allow parallel scans of cstore_fdw partitions.
*
* cstore_fdw is parallel-safe because all writes are immediately committed
* to disk and then read from disk. There is no uncommitted state that needs
* to be shared across processes.
*/
static bool
CStoreIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte)
{
return true;
}
#endif
Loading

0 comments on commit a4e7e70

Please sign in to comment.