diff --git a/META.json b/META.json index ff92f9f..46fa181 100644 --- a/META.json +++ b/META.json @@ -2,15 +2,15 @@ "name": "cstore_fdw", "abstract": "Columnar Store for PostgreSQL", "description": "PostgreSQL extension which implements a Columnar Store.", - "version": "1.2.0", - "maintainer": "Hadi Moshayedi ", + "version": "1.3.0", + "maintainer": "Murat Tuncer ", "license": "apache_2_0", "provides": { "cstore_fdw": { "abstract": "Foreign Data Wrapper for Columnar Store Tables", - "file": "cstore_fdw--1.2.sql", + "file": "cstore_fdw--1.3.sql", "docfile": "README.md", - "version": "1.2.0" + "version": "1.3.0" } }, "prereqs": { diff --git a/Makefile b/Makefile index b369385..2eaeca9 100644 --- a/Makefile +++ b/Makefile @@ -11,9 +11,11 @@ OBJS = cstore.pb-c.o cstore_fdw.o cstore_writer.o cstore_reader.o \ cstore_metadata_serialization.o EXTENSION = cstore_fdw -DATA = cstore_fdw--1.2.sql cstore_fdw--1.1--1.2.sql cstore_fdw--1.0--1.1.sql +DATA = cstore_fdw--1.3.sql cstore_fdw--1.2--1.3.sql cstore_fdw--1.1--1.2.sql \ + cstore_fdw--1.0--1.1.sql -REGRESS = create load query analyze data_types functions block_filtering drop insert copyto +REGRESS = create load query analyze data_types functions block_filtering drop \ + insert copyto alter EXTRA_CLEAN = cstore.pb-c.h cstore.pb-c.c data/*.cstore data/*.cstore.footer \ sql/block_filtering.sql sql/create.sql sql/data_types.sql sql/load.sql \ sql/copyto.sql expected/block_filtering.out expected/create.out \ diff --git a/README.md b/README.md index fbd54b5..5e56602 100644 --- a/README.md +++ b/README.md @@ -115,13 +115,13 @@ most efficient execution plan for each query. commands. We also don't support single row inserts. -Updating from version 1.0 or 1.1 to 1.2 ---------------------------------------- +Updating from version 1.0, 1.1 or 1.2 to 1.3 +-------------------------------------------- -To update your existing cstore_fdw installation from version 1.0 or 1.1 to 1.2, +To update your existing cstore_fdw installation from version 1.0, 1.1, or 1.2 to 1.3 you can take the following steps: -* Download and install cstore_fdw version 1.2 using instructions from the "Building" +* Download and install cstore_fdw version 1.3 using instructions from the "Building" section, * Restart the PostgreSQL server, * Run the ```ALTER EXTENSION cstore_fdw UPDATE;``` command. @@ -284,6 +284,14 @@ the installation: Changeset --------- +### Version 1.3 + +* (Feature) Added support for ```ALTER TABLE ADD COLUMN``` and ```ALTER TABLE DROP COLUMN```. +* (Feature) Added column list support in ```COPY FROM```. +* (Optimization) Improve row count estimation, which results in better plans. +* (Fix) Fix the deadlock issue during concurrent inserts. +* (Fix) Return correct result when using whole row references. + ### Version 1.2 * (Feature) Added support for ```COPY TO```. diff --git a/TODO.md b/TODO.md index ae3adad..179fbc8 100644 --- a/TODO.md +++ b/TODO.md @@ -4,13 +4,10 @@ To see the list of features and bug-fixes planned for next releases, see our Requested Features ------------------ -* Improve query cost estimation * Improve write performance * Improve read performance * Add checksum logic * Add new compression methods -* Enable ALTER FOREIGN TABLE ADD COLUMN -* Enable ALTER FOREIGN TABLE DROP COLUMN * Enable INSERT/DELETE/UPDATE * Enable users other than superuser to safely create columnar tables (permissions) * Transactional semantics @@ -27,8 +24,6 @@ Known Issues command prints incorrect file size. * If two different columnar tables are configured to point to the same file, writes to the underlying file aren't protected from each other. -* Hstore and json types work. However, constructors for hstore and json types - applied on a tuple return NULL. * When a data load is in progress, concurrent reads on the table overestimate the page count. * We have a minor memory leak in CStoreEndWrite. We need to also free the @@ -38,7 +33,9 @@ Known Issues * We don't yet incorporate the compression method's impact on disk I/O into cost estimates. * CitusDB integration errors: - * Concurrent staging cstore\_fdw tables doesn't work. +* Concurrent staging cstore\_fdw tables doesn't work. +* Setting a default value for column with ALTER TABLE has limited support for + existing rows. [roadmap]: https://github.com/citusdata/cstore_fdw/wiki/Roadmap diff --git a/cstore_fdw--1.2--1.3.sql b/cstore_fdw--1.2--1.3.sql new file mode 100644 index 0000000..3ad187d --- /dev/null +++ b/cstore_fdw--1.2--1.3.sql @@ -0,0 +1,3 @@ +/* cstore_fdw/cstore_fdw--1.2--1.3.sql */ + +-- No new functions or definitions were added in 1.3 diff --git a/cstore_fdw--1.2.sql b/cstore_fdw--1.3.sql similarity index 95% rename from cstore_fdw--1.2.sql rename to cstore_fdw--1.3.sql index cb14440..18513e9 100644 --- a/cstore_fdw--1.2.sql +++ b/cstore_fdw--1.3.sql @@ -1,4 +1,4 @@ -/* cstore_fdw/cstore_fdw--1.2.sql */ +/* cstore_fdw/cstore_fdw--1.3.sql */ -- complain if script is sourced in psql, rather than via CREATE EXTENSION \echo Use "CREATE EXTENSION cstore_fdw" to load this file. \quit diff --git a/cstore_fdw.c b/cstore_fdw.c index 33c1fca..6052415 100644 --- a/cstore_fdw.c +++ b/cstore_fdw.c @@ -88,7 +88,7 @@ static ForeignScan * CStoreGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel List *targetList, List *scanClauses); static double TupleCountEstimate(RelOptInfo *baserel, const char *filename); static BlockNumber PageCount(const char *filename); -static List * ColumnList(RelOptInfo *baserel); +static List * ColumnList(RelOptInfo *baserel, Oid foreignTableId); static void CStoreExplainForeignScan(ForeignScanState *scanState, ExplainState *explainState); static void CStoreBeginForeignScan(ForeignScanState *scanState, int executorFlags); @@ -243,7 +243,7 @@ CStoreProcessUtility(Node *parseTree, const char *queryString, List *droppedTables = DroppedCStoreFilenameList((DropStmt*) parseTree); CallPreviousProcessUtility(parseTree, queryString, context, - paramListInfo, destReceiver, completionTag); + paramListInfo, destReceiver, completionTag); foreach(fileListCell, droppedTables) { @@ -385,23 +385,16 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString) CStoreFdwOptions *cstoreFdwOptions = NULL; MemoryContext tupleContext = NULL; - List *columnNameList = copyStatement->attlist; - if (columnNameList != NULL) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("copy column list is not supported"))); - } - /* Only superuser can copy from or to local file */ CheckSuperuserPrivilegesForCopy(copyStatement); Assert(copyStatement->relation != NULL); /* - * Open and lock the relation. We acquire ExclusiveLock to allow concurrent - * reads, but block concurrent writes. + * Open and lock the relation. We acquire ShareUpdateExclusiveLock to allow + * concurrent reads, but block concurrent writes. */ - relation = heap_openrv(copyStatement->relation, ExclusiveLock); + relation = heap_openrv(copyStatement->relation, ShareUpdateExclusiveLock); relationId = RelationGetRelid(relation); /* allocate column values and nulls arrays */ @@ -427,7 +420,8 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString) /* init state to read from COPY data source */ copyState = BeginCopyFrom(relation, copyStatement->filename, - copyStatement->is_program, NIL, + copyStatement->is_program, + copyStatement->attlist, copyStatement->options); /* init state to write to the cstore file */ @@ -457,7 +451,7 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString) /* end read/write sessions and close the relation */ EndCopyFrom(copyState); CStoreEndWrite(writeState); - heap_close(relation, ExclusiveLock); + heap_close(relation, ShareUpdateExclusiveLock); return processedRowCount; } @@ -1115,7 +1109,7 @@ CStoreGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId * algorithm and using the correlation statistics to detect which columns * are in stored in sorted order. */ - List *queryColumnList = ColumnList(baserel); + List *queryColumnList = ColumnList(baserel, foreignTableId); uint32 queryColumnCount = list_length(queryColumnList); BlockNumber relationPageCount = PageCount(cstoreFdwOptions->filename); uint32 relationColumnCount = RelationGetNumberOfAttributes(relation); @@ -1177,7 +1171,7 @@ CStoreGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId, * in executor's callback functions, so we get the column list here and put * it into foreign scan node's private list. */ - columnList = ColumnList(baserel); + columnList = ColumnList(baserel, foreignTableId); foreignPrivateList = list_make1(columnList); /* create the foreign scan node */ @@ -1213,24 +1207,7 @@ TupleCountEstimate(RelOptInfo *baserel, const char *filename) } else { - /* - * Otherwise we have to fake it. We back into this estimate using the - * planner's idea of relation width, which may be inaccurate. For better - * estimates, users need to run ANALYZE. - */ - struct stat statBuffer; - int tupleWidth = 0; - - int statResult = stat(filename, &statBuffer); - if (statResult < 0) - { - /* file may not be there at plan time, so use a default estimate */ - statBuffer.st_size = 10 * BLCKSZ; - } - - tupleWidth = MAXALIGN(baserel->width) + MAXALIGN(sizeof(HeapTupleHeaderData)); - tupleCountEstimate = (double) statBuffer.st_size / (double) tupleWidth; - tupleCountEstimate = clamp_row_est(tupleCountEstimate); + tupleCountEstimate = (double) CStoreTableRowCount(filename); } return tupleCountEstimate; @@ -1268,7 +1245,7 @@ PageCount(const char *filename) * and returns them in a new list. This function is unchanged from mongo_fdw. */ static List * -ColumnList(RelOptInfo *baserel) +ColumnList(RelOptInfo *baserel, Oid foreignTableId) { List *columnList = NIL; List *neededColumnList = NIL; @@ -1277,6 +1254,10 @@ ColumnList(RelOptInfo *baserel) List *targetColumnList = baserel->reltargetlist; List *restrictInfoList = baserel->baserestrictinfo; ListCell *restrictInfoCell = NULL; + const AttrNumber wholeRow = 0; + Relation relation = heap_open(foreignTableId, AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(relation); + Form_pg_attribute *attributeFormArray = tupleDescriptor->attrs; /* first add the columns used in joins and projections */ neededColumnList = list_copy(targetColumnList); @@ -1311,6 +1292,16 @@ ColumnList(RelOptInfo *baserel) column = neededColumn; break; } + else if (neededColumn->varattno == wholeRow) + { + Form_pg_attribute attributeForm = attributeFormArray[columnIndex - 1]; + Index tableId = neededColumn->varno; + + column = makeVar(tableId, columnIndex, attributeForm->atttypid, + attributeForm->atttypmod, attributeForm->attcollation, + 0); + break; + } } if (column != NULL) @@ -1319,6 +1310,8 @@ ColumnList(RelOptInfo *baserel) } } + heap_close(relation, AccessShareLock); + return columnList; } @@ -1509,10 +1502,12 @@ CStoreAcquireSampleRows(Relation relation, int logLevel, Form_pg_attribute attributeForm = attributeFormArray[columnIndex]; const Index tableId = 1; - Var *column = makeVar(tableId, columnIndex + 1, attributeForm->atttypid, - attributeForm->atttypmod, attributeForm->attcollation, 0); - - columnList = lappend(columnList, column); + if (!attributeForm->attisdropped) + { + Var *column = makeVar(tableId, columnIndex + 1, attributeForm->atttypid, + attributeForm->atttypmod, attributeForm->attcollation, 0); + columnList = lappend(columnList, column); + } } /* setup foreign scan plan node */ @@ -1703,7 +1698,7 @@ CStoreBeginForeignModify(ModifyTableState *modifyTableState, Assert (modifyTableState->operation == CMD_INSERT); foreignTableOid = RelationGetRelid(relationInfo->ri_RelationDesc); - relation = heap_open(foreignTableOid, ExclusiveLock); + relation = heap_open(foreignTableOid, ShareUpdateExclusiveLock); cstoreFdwOptions = CStoreGetOptions(foreignTableOid); tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc); @@ -1757,7 +1752,7 @@ CStoreEndForeignModify(EState *executorState, ResultRelInfo *relationInfo) Relation relation = writeState->relation; CStoreEndWrite(writeState); - heap_close(relation, ExclusiveLock); + heap_close(relation, ShareUpdateExclusiveLock); } } diff --git a/cstore_fdw.control b/cstore_fdw.control index 087dada..caabf4d 100644 --- a/cstore_fdw.control +++ b/cstore_fdw.control @@ -1,5 +1,5 @@ # cstore_fdw extension comment = 'foreign-data wrapper for flat cstore access' -default_version = '1.2' +default_version = '1.3' module_pathname = '$libdir/cstore_fdw' relocatable = true diff --git a/cstore_fdw.h b/cstore_fdw.h index 13d2905..99fae44 100644 --- a/cstore_fdw.h +++ b/cstore_fdw.h @@ -47,7 +47,7 @@ /* CStore file signature */ #define CSTORE_MAGIC_NUMBER "citus_cstore" #define CSTORE_VERSION_MAJOR 1 -#define CSTORE_VERSION_MINOR 2 +#define CSTORE_VERSION_MINOR 3 /* miscellaneous defines */ #define CSTORE_FDW_NAME "cstore_fdw" @@ -334,6 +334,7 @@ extern ColumnBlockData ** CreateEmptyBlockDataArray(uint32 columnCount, bool *co uint32 blockRowCount); extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, uint32 columnCount); +extern uint64 CStoreTableRowCount(const char *filename); #endif /* CSTORE_FDW_H */ diff --git a/cstore_metadata_serialization.c b/cstore_metadata_serialization.c index 27d6f4d..d2f6dc2 100644 --- a/cstore_metadata_serialization.c +++ b/cstore_metadata_serialization.c @@ -408,6 +408,41 @@ DeserializeBlockCount(StringInfo buffer) } +/* + * DeserializeRowCount deserializes the given column skip list buffer and + * returns the total number of rows in block skip list. + */ +uint32 +DeserializeRowCount(StringInfo buffer) +{ + uint32 rowCount = 0; + Protobuf__ColumnBlockSkipList *protobufBlockSkipList = NULL; + uint32 blockIndex = 0; + uint32 blockCount = 0; + + protobufBlockSkipList = + protobuf__column_block_skip_list__unpack(NULL, buffer->len, + (uint8 *) buffer->data); + if (protobufBlockSkipList == NULL) + { + ereport(ERROR, (errmsg("could not unpack column store"), + errdetail("invalid skip list buffer"))); + } + + blockCount = (uint32) protobufBlockSkipList->n_blockskipnodearray; + for (blockIndex = 0; blockIndex < blockCount; blockIndex++) + { + Protobuf__ColumnBlockSkipNode *protobufBlockSkipNode = + protobufBlockSkipList->blockskipnodearray[blockIndex]; + rowCount += protobufBlockSkipNode->rowcount; + } + + protobuf__column_block_skip_list__free_unpacked(protobufBlockSkipList, NULL); + + return rowCount; +} + + /* * DeserializeColumnSkipList deserializes the given buffer and returns the result as * a ColumnBlockSkipNode array. If the number of unpacked block skip nodes are not diff --git a/cstore_metadata_serialization.h b/cstore_metadata_serialization.h index 064960b..6188856 100644 --- a/cstore_metadata_serialization.h +++ b/cstore_metadata_serialization.h @@ -32,6 +32,7 @@ extern StringInfo SerializeColumnSkipList(ColumnBlockSkipNode *blockSkipNodeArra extern void DeserializePostScript(StringInfo buffer, uint64 *tableFooterLength); extern TableFooter * DeserializeTableFooter(StringInfo buffer); extern uint32 DeserializeBlockCount(StringInfo buffer); +extern uint32 DeserializeRowCount(StringInfo buffer); extern StripeFooter * DeserializeStripeFooter(StringInfo buffer); extern ColumnBlockSkipNode * DeserializeColumnSkipList(StringInfo buffer, bool typeByValue, int typeLength, diff --git a/cstore_reader.c b/cstore_reader.c index 57855f4..c5ff4c0 100644 --- a/cstore_reader.c +++ b/cstore_reader.c @@ -75,12 +75,16 @@ static void DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray, Datum *datumArray); static void DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, Form_pg_attribute *attributeFormArray, - uint32 rowCount, ColumnBlockData **blockDataArray); + uint32 rowCount, ColumnBlockData **blockDataArray, + TupleDesc tupleDescriptor); +static Datum ColumnDefaultValue(TupleConstr *tupleConstraints, + Form_pg_attribute attributeForm); static int64 FileSize(FILE *file); static StringInfo ReadFromFile(FILE *file, uint64 offset, uint32 size); static StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType); static void ResetUncompressedBlockData(ColumnBlockData **blockDataArray, uint32 columnCount); +static uint64 StripeRowCount(FILE *tableFile, StripeMetadata *stripeMetadata); /* @@ -297,7 +301,8 @@ CStoreReadNextRow(TableReadState *readState, Datum *columnValues, bool *columnNu oldContext = MemoryContextSwitchTo(readState->stripeReadContext); DeserializeBlockData(readState->stripeBuffers, blockIndex, attributeFormArray, - blockRowCount, readState->blockDataArray); + blockRowCount, readState->blockDataArray, + readState->tupleDescriptor); MemoryContextSwitchTo(oldContext); @@ -390,6 +395,71 @@ FreeColumnBlockDataArray(ColumnBlockData **blockDataArray, uint32 columnCount) } +/* CStoreTableRowCount returns the exact row count of a table using skiplists */ +uint64 +CStoreTableRowCount(const char *filename) +{ + TableFooter *tableFooter = NULL; + FILE *tableFile; + ListCell *stripeMetadataCell = NULL; + uint64 totalRowCount = 0; + + StringInfo tableFooterFilename = makeStringInfo(); + + appendStringInfo(tableFooterFilename, "%s%s", filename, CSTORE_FOOTER_FILE_SUFFIX); + + tableFooter = CStoreReadFooter(tableFooterFilename); + + pfree(tableFooterFilename->data); + pfree(tableFooterFilename); + + tableFile = AllocateFile(filename, PG_BINARY_R); + if (tableFile == NULL) + { + ereport(ERROR, (errcode_for_file_access(), + errmsg("could not open file \"%s\" for reading: %m", filename))); + } + + foreach(stripeMetadataCell, tableFooter->stripeMetadataList) + { + StripeMetadata *stripeMetadata = (StripeMetadata *) lfirst(stripeMetadataCell); + totalRowCount += StripeRowCount(tableFile, stripeMetadata); + } + + FreeFile(tableFile); + + return totalRowCount; +} + + +/* + * StripeRowCount reads serialized stripe footer, the first column's + * skip list, and returns number of rows for given stripe. + */ +static uint64 +StripeRowCount(FILE *tableFile, StripeMetadata *stripeMetadata) +{ + uint64 rowCount = 0; + StripeFooter *stripeFooter = NULL; + StringInfo footerBuffer = NULL; + StringInfo firstColumnSkipListBuffer = NULL; + uint64 footerOffset = 0; + + footerOffset += stripeMetadata->fileOffset; + footerOffset += stripeMetadata->skipListLength; + footerOffset += stripeMetadata->dataLength; + + footerBuffer = ReadFromFile(tableFile, footerOffset, stripeMetadata->footerLength); + stripeFooter = DeserializeStripeFooter(footerBuffer); + + firstColumnSkipListBuffer = ReadFromFile(tableFile, stripeMetadata->fileOffset, + stripeFooter->skipListSizeArray[0]); + rowCount = DeserializeRowCount(firstColumnSkipListBuffer); + + return rowCount; +} + + /* * LoadFilteredStripeBuffers reads serialized stripe data from the given file. * The function skips over blocks whose rows are refuted by restriction qualifiers, @@ -424,7 +494,7 @@ LoadFilteredStripeBuffers(FILE *tableFile, StripeMetadata *stripeMetadata, columnBuffersArray = palloc0(columnCount * sizeof(ColumnBuffers *)); currentColumnFileOffset = stripeMetadata->fileOffset + stripeMetadata->skipListLength; - for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + for (columnIndex = 0; columnIndex < stripeFooter->columnCount; columnIndex++) { uint64 existsSize = stripeFooter->existsSizeArray[columnIndex]; uint64 valueSize = stripeFooter->valueSizeArray[columnIndex]; @@ -561,7 +631,7 @@ LoadStripeFooter(FILE *tableFile, StripeMetadata *stripeMetadata, footerBuffer = ReadFromFile(tableFile, footerOffset, stripeMetadata->footerLength); stripeFooter = DeserializeStripeFooter(footerBuffer); - if (stripeFooter->columnCount != columnCount) + if (stripeFooter->columnCount > columnCount) { ereport(ERROR, (errmsg("stripe footer column count and table column count " "don't match"))); @@ -583,6 +653,7 @@ LoadStripeSkipList(FILE *tableFile, StripeMetadata *stripeMetadata, uint64 currentColumnSkipListFileOffset = 0; uint32 columnIndex = 0; uint32 stripeBlockCount = 0; + uint32 stripeColumnCount = stripeFooter->columnCount; /* deserialize block count */ firstColumnSkipListBuffer = ReadFromFile(tableFile, stripeMetadata->fileOffset, @@ -593,7 +664,7 @@ LoadStripeSkipList(FILE *tableFile, StripeMetadata *stripeMetadata, blockSkipNodeArray = palloc0(columnCount * sizeof(ColumnBlockSkipNode *)); currentColumnSkipListFileOffset = stripeMetadata->fileOffset; - for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + for (columnIndex = 0; columnIndex < stripeColumnCount; columnIndex++) { uint64 columnSkipListSize = stripeFooter->skipListSizeArray[columnIndex]; Form_pg_attribute attributeForm = attributeFormArray[columnIndex]; @@ -609,6 +680,30 @@ LoadStripeSkipList(FILE *tableFile, StripeMetadata *stripeMetadata, currentColumnSkipListFileOffset += columnSkipListSize; } + /* table contains additional columns added after this stripe is created */ + for (columnIndex = stripeColumnCount; columnIndex < columnCount; columnIndex++) + { + ColumnBlockSkipNode *columnSkipList = NULL; + uint32 blockIndex = 0; + + /* create empty ColumnBlockSkipNode for missing columns*/ + columnSkipList = palloc0(stripeBlockCount * sizeof(ColumnBlockSkipNode)); + + for (blockIndex = 0; blockIndex < stripeBlockCount; blockIndex++) + { + columnSkipList->rowCount = 0; + columnSkipList->hasMinMax = false; + columnSkipList->minimumValue = 0; + columnSkipList->maximumValue = 0; + columnSkipList->existsBlockOffset = 0; + columnSkipList->valueBlockOffset = 0; + columnSkipList->existsLength = 0; + columnSkipList->valueLength = 0; + columnSkipList->valueCompressionType = COMPRESSION_NONE; + } + blockSkipNodeArray[columnIndex] = columnSkipList; + } + stripeSkipList = palloc0(sizeof(StripeSkipList)); stripeSkipList->blockSkipNodeArray = blockSkipNodeArray; stripeSkipList->columnCount = columnCount; @@ -1035,22 +1130,31 @@ DeserializeDatumArray(StringInfo datumBuffer, bool *existsArray, uint32 datumCou * DeserializeBlockData deserializes requested data block for all columns and * stores in blockDataArray. It uncompresses serialized data if necessary. The * function also deallocates data buffers used for previous block, and compressed - * data buffers for the current block which will not be needed again. + * data buffers for the current block which will not be needed again. If a column + * data is not present serialized buffer, then default value (or null) is used + * to fill value array. */ static void DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, Form_pg_attribute *attributeFormArray, uint32 rowCount, - ColumnBlockData **blockDataArray) + ColumnBlockData **blockDataArray, TupleDesc tupleDescriptor) { int columnIndex = 0; for (columnIndex = 0; columnIndex < stripeBuffers->columnCount; columnIndex++) { + ColumnBlockData *blockData = blockDataArray[columnIndex]; + Form_pg_attribute attributeForm = attributeFormArray[columnIndex]; ColumnBuffers *columnBuffers = stripeBuffers->columnBuffersArray[columnIndex]; + bool columnAdded = false; + + if ((columnBuffers == NULL) && (blockData != NULL)) + { + columnAdded = true; + } + if (columnBuffers != NULL) { ColumnBlockBuffers *blockBuffers = columnBuffers->blockBuffersArray[blockIndex]; - Form_pg_attribute attributeForm = attributeFormArray[columnIndex]; - ColumnBlockData *blockData = blockDataArray[columnIndex]; StringInfo valueBuffer = NULL; /* free previous block's data buffers */ @@ -1078,10 +1182,77 @@ DeserializeBlockData(StripeBuffers *stripeBuffers, uint64 blockIndex, /* store current block's data buffer to be freed at next block read */ blockData->valueBuffer = valueBuffer; } + else if (columnAdded) + { + /* + * This is a column that was added after creation of this stripe. + * So we use either the default value or NULL. + */ + if (attributeForm->atthasdef) + { + int rowIndex = 0; + + Datum defaultValue = ColumnDefaultValue(tupleDescriptor->constr, + attributeForm); + + for (rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + blockData->existsArray[rowIndex] = true; + blockData->valueArray[rowIndex] = defaultValue; + } + } + else + { + memset(blockData->existsArray, false, rowCount); + } + + } } } +/* + * ColumnDefaultValue returns default value for given column. Only const values + * are supported. The function errors on any other default value expressions. + */ +static Datum +ColumnDefaultValue(TupleConstr *tupleConstraints, Form_pg_attribute attributeForm) +{ + Datum defaultValue = 0; + Node *defaultValueNode = NULL; + int defValIndex = 0; + + for (defValIndex = 0; defValIndex < tupleConstraints->num_defval; defValIndex++) + { + AttrDefault defaultValue = tupleConstraints->defval[defValIndex]; + if (defaultValue.adnum == attributeForm->attnum) + { + defaultValueNode = stringToNode(defaultValue.adbin); + break; + } + } + + Assert(defaultValueNode != NULL); + + /* try reducing the default value node to a const node */ + defaultValueNode = eval_const_expressions(NULL, defaultValueNode); + if (IsA(defaultValueNode, Const)) + { + Const *constNode = (Const *) defaultValueNode; + defaultValue = constNode->constvalue; + } + else + { + const char *columnName = NameStr(attributeForm->attname); + ereport(ERROR, (errmsg("unsupported default value for column \"%s\"", columnName), + errhint("Expression is either mutable or " + "does not evaluate to constant value"))); + } + + return defaultValue; +} + + /* Returns the size of the given file handle. */ static int64 FileSize(FILE *file) diff --git a/cstore_writer.c b/cstore_writer.c index 859eb13..a2679e2 100644 --- a/cstore_writer.c +++ b/cstore_writer.c @@ -149,9 +149,16 @@ CStoreBeginWrite(const char *filename, CompressionType compressionType, comparisonFunctionArray = palloc0(columnCount * sizeof(FmgrInfo *)); for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { - Oid typeId = tupleDescriptor->attrs[columnIndex]->atttypid; - FmgrInfo *comparisonFunction = GetFunctionInfoOrNull(typeId, BTREE_AM_OID, - BTORDER_PROC); + FmgrInfo *comparisonFunction = NULL; + FormData_pg_attribute *attributeForm = tupleDescriptor->attrs[columnIndex]; + + if (!attributeForm->attisdropped) + { + Oid typeId = attributeForm->atttypid; + + comparisonFunction = GetFunctionInfoOrNull(typeId, BTREE_AM_OID, BTORDER_PROC); + } + comparisonFunctionArray[columnIndex] = comparisonFunction; } diff --git a/expected/alter.out b/expected/alter.out new file mode 100644 index 0000000..2ca3e54 --- /dev/null +++ b/expected/alter.out @@ -0,0 +1,154 @@ +-- +-- Testing ALTER TABLE on cstore_fdw tables. +-- +CREATE FOREIGN TABLE test_alter_table (a int, b int, c int) SERVER cstore_server; +WITH sample_data AS (VALUES + (1, 2, 3), + (4, 5, 6), + (7, 8, 9) +) +INSERT INTO test_alter_table SELECT * FROM sample_data; +-- drop a column +ALTER FOREIGN TABLE test_alter_table DROP COLUMN a; +-- test analyze +ANALYZE test_alter_table; +-- verify select queries run as expected +SELECT * FROM test_alter_table; + b | c +---+--- + 2 | 3 + 5 | 6 + 8 | 9 +(3 rows) + +SELECT a FROM test_alter_table; +ERROR: column "a" does not exist +LINE 1: SELECT a FROM test_alter_table; + ^ +SELECT b FROM test_alter_table; + b +--- + 2 + 5 + 8 +(3 rows) + +-- verify insert runs as expected +INSERT INTO test_alter_table (SELECT 3, 5, 8); +ERROR: INSERT has more expressions than target columns +LINE 1: INSERT INTO test_alter_table (SELECT 3, 5, 8); + ^ +INSERT INTO test_alter_table (SELECT 5, 8); +-- add a column with no defaults +ALTER FOREIGN TABLE test_alter_table ADD COLUMN d int; +SELECT * FROM test_alter_table; + b | c | d +---+---+--- + 2 | 3 | + 5 | 6 | + 8 | 9 | + 5 | 8 | +(4 rows) + +INSERT INTO test_alter_table (SELECT 3, 5, 8); +SELECT * FROM test_alter_table; + b | c | d +---+---+--- + 2 | 3 | + 5 | 6 | + 8 | 9 | + 5 | 8 | + 3 | 5 | 8 +(5 rows) + +-- add a fixed-length column with default value +ALTER FOREIGN TABLE test_alter_table ADD COLUMN e int default 3; +SELECT * from test_alter_table; + b | c | d | e +---+---+---+--- + 2 | 3 | | 3 + 5 | 6 | | 3 + 8 | 9 | | 3 + 5 | 8 | | 3 + 3 | 5 | 8 | 3 +(5 rows) + +INSERT INTO test_alter_table (SELECT 1, 2, 4, 8); +SELECT * from test_alter_table; + b | c | d | e +---+---+---+--- + 2 | 3 | | 3 + 5 | 6 | | 3 + 8 | 9 | | 3 + 5 | 8 | | 3 + 3 | 5 | 8 | 3 + 1 | 2 | 4 | 8 +(6 rows) + +-- add a variable-length column with default value +ALTER FOREIGN TABLE test_alter_table ADD COLUMN f text DEFAULT 'TEXT ME'; +SELECT * from test_alter_table; + b | c | d | e | f +---+---+---+---+--------- + 2 | 3 | | 3 | TEXT ME + 5 | 6 | | 3 | TEXT ME + 8 | 9 | | 3 | TEXT ME + 5 | 8 | | 3 | TEXT ME + 3 | 5 | 8 | 3 | TEXT ME + 1 | 2 | 4 | 8 | TEXT ME +(6 rows) + +INSERT INTO test_alter_table (SELECT 1, 2, 4, 8, 'ABCDEF'); +SELECT * from test_alter_table; + b | c | d | e | f +---+---+---+---+--------- + 2 | 3 | | 3 | TEXT ME + 5 | 6 | | 3 | TEXT ME + 8 | 9 | | 3 | TEXT ME + 5 | 8 | | 3 | TEXT ME + 3 | 5 | 8 | 3 | TEXT ME + 1 | 2 | 4 | 8 | TEXT ME + 1 | 2 | 4 | 8 | ABCDEF +(7 rows) + +-- drop couple of columns +ALTER FOREIGN TABLE test_alter_table DROP COLUMN c; +ALTER FOREIGN TABLE test_alter_table DROP COLUMN e; +ANALYZE test_alter_table; +SELECT * from test_alter_table; + b | d | f +---+---+--------- + 2 | | TEXT ME + 5 | | TEXT ME + 8 | | TEXT ME + 5 | | TEXT ME + 3 | 8 | TEXT ME + 1 | 4 | TEXT ME + 1 | 4 | ABCDEF +(7 rows) + +-- unsupported default values +ALTER FOREIGN TABLE test_alter_table ADD COLUMN g boolean DEFAULT isfinite(current_date); +ALTER FOREIGN TABLE test_alter_table ADD COLUMN h DATE DEFAULT current_date; +SELECT * FROM test_alter_table; +ERROR: unsupported default value for column "g" +HINT: Expression is either mutable or does not evaluate to constant value +ALTER FOREIGN TABLE test_alter_table ALTER COLUMN g DROP DEFAULT; +SELECT * FROM test_alter_table; +ERROR: unsupported default value for column "h" +HINT: Expression is either mutable or does not evaluate to constant value +ALTER FOREIGN TABLE test_alter_table ALTER COLUMN h DROP DEFAULT; +ANALYZE test_alter_table; +SELECT * FROM test_alter_table; + b | d | f | g | h +---+---+---------+---+--- + 2 | | TEXT ME | | + 5 | | TEXT ME | | + 8 | | TEXT ME | | + 5 | | TEXT ME | | + 3 | 8 | TEXT ME | | + 1 | 4 | TEXT ME | | + 1 | 4 | ABCDEF | | +(7 rows) + +DROP FOREIGN TABLE test_alter_table; diff --git a/expected/query.out b/expected/query.out index 87d4452..d3811fc 100644 --- a/expected/query.out +++ b/expected/query.out @@ -75,3 +75,10 @@ SELECT * FROM contestant_compressed ORDER BY handle; h | 1987-10-26 | 2112 | 95.4 | XD | {w,a} (8 rows) +-- Verify that we handle whole-row references correctly +SELECT to_json(v) FROM contestant v ORDER BY rating LIMIT 1; + to_json +------------------------------------------------------------------------------------------------------------------ + {"handle":"g","birthdate":"1991-12-13","rating":1803,"percentile":85.1,"country":"XD ","achievements":["a","c"]} +(1 row) + diff --git a/input/load.source b/input/load.source index b6db36d..0913acd 100644 --- a/input/load.source +++ b/input/load.source @@ -21,3 +21,24 @@ COPY contestant_compressed FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV; -- COPY into uncompressed table from program COPY contestant_compressed FROM PROGRAM 'cat @abs_srcdir@/data/contestants.2.csv' WITH CSV; + +-- Test column list +CREATE FOREIGN TABLE famous_constants (id int, name text, value real) + SERVER cstore_server; +COPY famous_constants (value, name, id) FROM STDIN WITH CSV; +3.141,pi,1 +2.718,e,2 +0.577,gamma,3 +5.291e-11,bohr radius,4 +\. + +COPY famous_constants (name, value) FROM STDIN WITH CSV; +avagadro,6.022e23 +electron mass,9.109e-31 +proton mass,1.672e-27 +speed of light,2.997e8 +\. + +SELECT * FROM famous_constants ORDER BY id, name; + +DROP FOREIGN TABLE famous_constants; diff --git a/output/load.source b/output/load.source index 39fdb39..c76f203 100644 --- a/output/load.source +++ b/output/load.source @@ -18,3 +18,22 @@ COPY contestant_compressed FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV; -- COPY into uncompressed table from program COPY contestant_compressed FROM PROGRAM 'cat @abs_srcdir@/data/contestants.2.csv' WITH CSV; +-- Test column list +CREATE FOREIGN TABLE famous_constants (id int, name text, value real) + SERVER cstore_server; +COPY famous_constants (value, name, id) FROM STDIN WITH CSV; +COPY famous_constants (name, value) FROM STDIN WITH CSV; +SELECT * FROM famous_constants ORDER BY id, name; + id | name | value +----+----------------+----------- + 1 | pi | 3.141 + 2 | e | 2.718 + 3 | gamma | 0.577 + 4 | bohr radius | 5.291e-11 + | avagadro | 6.022e+23 + | electron mass | 9.109e-31 + | proton mass | 1.672e-27 + | speed of light | 2.997e+08 +(8 rows) + +DROP FOREIGN TABLE famous_constants; diff --git a/sql/alter.sql b/sql/alter.sql new file mode 100644 index 0000000..eb21056 --- /dev/null +++ b/sql/alter.sql @@ -0,0 +1,68 @@ +-- +-- Testing ALTER TABLE on cstore_fdw tables. +-- + +CREATE FOREIGN TABLE test_alter_table (a int, b int, c int) SERVER cstore_server; + +WITH sample_data AS (VALUES + (1, 2, 3), + (4, 5, 6), + (7, 8, 9) +) +INSERT INTO test_alter_table SELECT * FROM sample_data; + +-- drop a column +ALTER FOREIGN TABLE test_alter_table DROP COLUMN a; + +-- test analyze +ANALYZE test_alter_table; + +-- verify select queries run as expected +SELECT * FROM test_alter_table; +SELECT a FROM test_alter_table; +SELECT b FROM test_alter_table; + +-- verify insert runs as expected +INSERT INTO test_alter_table (SELECT 3, 5, 8); +INSERT INTO test_alter_table (SELECT 5, 8); + + +-- add a column with no defaults +ALTER FOREIGN TABLE test_alter_table ADD COLUMN d int; +SELECT * FROM test_alter_table; +INSERT INTO test_alter_table (SELECT 3, 5, 8); +SELECT * FROM test_alter_table; + + +-- add a fixed-length column with default value +ALTER FOREIGN TABLE test_alter_table ADD COLUMN e int default 3; +SELECT * from test_alter_table; +INSERT INTO test_alter_table (SELECT 1, 2, 4, 8); +SELECT * from test_alter_table; + + +-- add a variable-length column with default value +ALTER FOREIGN TABLE test_alter_table ADD COLUMN f text DEFAULT 'TEXT ME'; +SELECT * from test_alter_table; +INSERT INTO test_alter_table (SELECT 1, 2, 4, 8, 'ABCDEF'); +SELECT * from test_alter_table; + + +-- drop couple of columns +ALTER FOREIGN TABLE test_alter_table DROP COLUMN c; +ALTER FOREIGN TABLE test_alter_table DROP COLUMN e; +ANALYZE test_alter_table; +SELECT * from test_alter_table; + + +-- unsupported default values +ALTER FOREIGN TABLE test_alter_table ADD COLUMN g boolean DEFAULT isfinite(current_date); +ALTER FOREIGN TABLE test_alter_table ADD COLUMN h DATE DEFAULT current_date; +SELECT * FROM test_alter_table; +ALTER FOREIGN TABLE test_alter_table ALTER COLUMN g DROP DEFAULT; +SELECT * FROM test_alter_table; +ALTER FOREIGN TABLE test_alter_table ALTER COLUMN h DROP DEFAULT; +ANALYZE test_alter_table; +SELECT * FROM test_alter_table; + +DROP FOREIGN TABLE test_alter_table; diff --git a/sql/query.sql b/sql/query.sql index 66e5142..c56ee15 100644 --- a/sql/query.sql +++ b/sql/query.sql @@ -18,3 +18,6 @@ SELECT avg(rating), stddev_samp(rating) FROM contestant_compressed; SELECT country, avg(rating) FROM contestant_compressed WHERE rating > 2200 GROUP BY country ORDER BY country; SELECT * FROM contestant_compressed ORDER BY handle; + +-- Verify that we handle whole-row references correctly +SELECT to_json(v) FROM contestant v ORDER BY rating LIMIT 1;