Skip to content

Commit 762d962

Browse files
rajeshkt78safern
authored andcommitted
Merged PR 1539952: allChangesForCluster option for change streams
### Does this PR have any customer impact? Yes. This PR provides the option of "allChangesForCluster" to be able to track all the changes happening in the cluster with a change stream cursor. ### Type (Feature, Refactoring, Bugfix, DevOps, Testing, Perf, etc) Feature ### Does it involve schema level changes? (Table, Column, Index, UDF, etc level changes) No ### Are you introducing any new config? If yes, do you have tests with and without them being set? No ### Release Note (Refer [Template](../docs/release_notes/pgmongo_template.md)) ### Description first version of allChangesForCluster implementation ---- #### AI description (iteration 1) #### PR Classification New feature #### PR Summary Implemented the first version of the `allChangesForCluster` feature in the change stream. - `change_stream_wal_utils.c`: Added logic to track changes for the entire cluster and refactored relation ID retrieval. - `change_stream_aggregation_stage.c`: Updated context initialization to handle `allChangesForCluster` and added error handling for missing collections. - `change_stream_decoding_utils.c`: Enhanced event building to support cluster-wide change tracking and added a function to get collection by relation ID. - `change_stream.h`: Extended `ChangeStreamEvent` and `ChangeStreamSpec` structures to include cluster-wide tracking. - Updated tests in `pgmongo_change_stream_crud_tests.sql` and `pgmongo_change_stream_crud_tests.out` to reflect the new feature.
1 parent c9b7650 commit 762d962

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

pg_documentdb/include/metadata/collection.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,9 @@ MongoCollection * CopyMongoCollection(const MongoCollection *collection);
207207
/* get Mongo collection metadata by collection id */
208208
MongoCollection * GetMongoCollectionByColId(uint64 collectionId, LOCKMODE lockMode);
209209

210+
/* get Mongo collection metadata by realtion ID of a collection's shard */
211+
MongoCollection * GetMongoCollectionByRelationShardId(Oid relationId);
212+
210213
/* get OID of Mongo documents table by collection id */
211214
Oid GetRelationIdForCollectionId(uint64 collectionId, LOCKMODE lockMode);
212215

pg_documentdb/src/metadata/collection.c

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ static MongoCollection * GetMongoCollectionByNameDatumCore(Datum databaseNameDat
128128
Datum collectionNameDatum,
129129
LOCKMODE lockMode);
130130
static Datum GetCollectionOrViewCore(PG_FUNCTION_ARGS, bool allowViews);
131+
static uint64 GetCollectionIdFromShardName(const char *shardName);
131132

132133
/*
133134
* CollectionCacheIsValid determines whether the collections hashes are
@@ -416,6 +417,50 @@ GetMongoCollectionByColId(uint64 collectionId, LOCKMODE lockMode)
416417
}
417418

418419

420+
/*
421+
* GetMongoCollectionByRelationShardId gets the MongoCollection metadata by
422+
* relation Id of one of the collection's shard tables. This is used by
423+
* change stream aggregation to lookup a collection for a given change's
424+
* relation id parsed from the WAL records. It finds the collection Id
425+
* from the relation ID using the naming convention of the shard table
426+
* name format documents_<collection ID>_<shard ID> and just calls the
427+
* GetMongoCollectionByColId to lookup the collection's meta data from
428+
* the hash table.
429+
*/
430+
MongoCollection *
431+
GetMongoCollectionByRelationShardId(Oid relationId)
432+
{
433+
/* Get the relation's name using the relationId. */
434+
const char *shardName = get_rel_name(relationId);
435+
436+
/* If the shardName lookup failed, return false. */
437+
if (shardName == NULL)
438+
{
439+
return NULL;
440+
}
441+
442+
/* Get the collection ID from the shard table name. */
443+
uint64 collectionId = GetCollectionIdFromShardName(shardName);
444+
445+
/*
446+
* If the collection Id can't be parsed from relation's name,
447+
* then this is not a valid shard of a collection.
448+
*/
449+
if (collectionId == 0)
450+
{
451+
return NULL;
452+
}
453+
454+
/*
455+
* Look up the collection using it's ID and return it. we can directly
456+
* lookup from the RelationIdToCollectionHash here, but this collection may
457+
* have been evicted from cache, so an invalidation may be needed. So
458+
* we call the GetMongoCollectionByColId which handles this already.
459+
*/
460+
return GetMongoCollectionByColId(collectionId, NoLock);
461+
}
462+
463+
419464
/*
420465
* GetMongoCollectionOrViewByNameDatum returns collection metadata by database and
421466
* collection name or NULL if the collection does not exist. Also returns views
@@ -1957,3 +2002,23 @@ UpdateMongoCollectionUsingIds(MongoCollection *mongoCollection, uint64 collectio
19572002
snprintf(mongoCollection->tableName, NAMEDATALEN, DOCUMENT_DATA_TABLE_NAME_FORMAT,
19582003
collectionId);
19592004
}
2005+
2006+
2007+
static uint64
2008+
GetCollectionIdFromShardName(const char *shardName)
2009+
{
2010+
uint64 collectionId = 0, shardId = 0;
2011+
2012+
/* Use sscanf to parse the string */
2013+
int scanned = sscanf(shardName, "documents_%lu_%lu", &collectionId, &shardId);
2014+
2015+
/* Check if sscanf successfully scanned 2 numbers and the prefix is correct */
2016+
if (scanned == 2 && strncmp(shardName, "documents_", 10) == 0)
2017+
{
2018+
return collectionId;
2019+
}
2020+
else
2021+
{
2022+
return -1;
2023+
}
2024+
}

0 commit comments

Comments
 (0)