Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changefeed all versions and deletes mode #28161

Merged
merged 19 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 26 additions & 0 deletions sdk/cosmosdb/cosmos/review/cosmos.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export class ChangeFeedIterator<T> {

// @public
export interface ChangeFeedIteratorOptions {
changeFeedMode?: ChangeFeedMode;
changeFeedStartFrom?: ChangeFeedStartFrom;
maxItemCount?: number;
sessionToken?: string;
Expand All @@ -87,6 +88,14 @@ export class ChangeFeedIteratorResponse<T> {
readonly subStatusCode?: number;
}

// @public (undocumented)
export enum ChangeFeedMode {
// (undocumented)
AllVersionsAndDeletes = "Full-Fidelity Feed",
topshot99 marked this conversation as resolved.
Show resolved Hide resolved
// (undocumented)
LatestVersion = "Incremental Feed"
}

// @public
export interface ChangeFeedOptions {
continuation?: string;
Expand All @@ -96,6 +105,13 @@ export interface ChangeFeedOptions {
startTime?: Date;
}

// @public
export class ChangeFeedPolicy {
constructor(retentionDuration: ChangeFeedRetentionTimeSpan);
// (undocumented)
retentionDuration: number;
}

// @public
export interface ChangeFeedPullModelIterator<T> {
getAsyncIterator(): AsyncIterable<ChangeFeedIteratorResponse<Array<T & Resource>>>;
Expand All @@ -118,6 +134,11 @@ export class ChangeFeedResponse<T> {
readonly statusCode: number;
}

// @public (undocumented)
export class ChangeFeedRetentionTimeSpan {
static fromMinutes(minutes: number): ChangeFeedRetentionTimeSpan;
}

// @public
export abstract class ChangeFeedStartFrom {
// Warning: (ae-forgotten-export) The symbol "ChangeFeedStartFromBeginning" needs to be exported by the entry point index.d.ts
amanrao23 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -447,6 +468,7 @@ export const Constants: {
ContinuationToken: string;
PageSize: string;
ItemCount: string;
ChangeFeedWireFormatVersion: string;
ActivityId: string;
CorrelatedActivityId: string;
PreTriggerInclude: string;
Expand Down Expand Up @@ -558,6 +580,8 @@ export const Constants: {
MinimumInclusiveEffectivePartitionKey: string;
MaximumExclusiveEffectivePartitionKey: string;
};
AllVersionsAndDeletesChangeFeedWireFormatVersion: string;
ChangeFeedIfNoneMatchStartFromNowHeader: string;
};

// @public
Expand Down Expand Up @@ -593,6 +617,8 @@ export class Container {

// @public (undocumented)
export interface ContainerDefinition {
// (undocumented)
changeFeedPolicy?: ChangeFeedPolicy;
conflictResolutionPolicy?: ConflictResolutionPolicy;
defaultTtl?: number;
geospatialConfig?: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { extractOverlappingRanges } from "./changeFeedUtils";
import { InternalChangeFeedIteratorOptions } from "./InternalChangeFeedOptions";
import { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal";
import { getEmptyCosmosDiagnostics, withDiagnostics } from "../../utils/diagnostics";
import { ChangeFeedMode } from "./ChangeFeedMode";
/**
* @hidden
* Provides iterator for change feed for entire container or an epk range.
Expand All @@ -26,6 +27,7 @@ export class ChangeFeedForEpkRange<T> implements ChangeFeedPullModelIterator<T>
private startTime: string;
private isInstantiated: boolean;
private rId: string;
private startFromNow: boolean;
/**
* @internal
*/
Expand All @@ -43,10 +45,13 @@ export class ChangeFeedForEpkRange<T> implements ChangeFeedPullModelIterator<T>
this.continuationToken = changeFeedOptions.continuationToken
? JSON.parse(changeFeedOptions.continuationToken)
: undefined;
this.startTime = changeFeedOptions.startTime
? changeFeedOptions.startTime.toUTCString()
: undefined;
this.isInstantiated = false;
// startTime is used to store and specify time from which change feed should start reading new changes. StartFromNow flag is used to indicate fetching changes from now.
if (changeFeedOptions.startFromNow) {
this.startFromNow = true;
} else if (changeFeedOptions.startTime) {
this.startTime = changeFeedOptions.startTime.toUTCString();
}
}

private async setIteratorRid(diagnosticNode: DiagnosticNodeInternal): Promise<void> {
Expand Down Expand Up @@ -334,7 +339,6 @@ export class ChangeFeedForEpkRange<T> implements ChangeFeedPullModelIterator<T>
oldFeedRange,
resolvedRanges[i],
);

const newFeedRange = new ChangeFeedRange(
resolvedRanges[i].minInclusive,
resolvedRanges[i].maxExclusive,
Expand Down Expand Up @@ -378,7 +382,11 @@ export class ChangeFeedForEpkRange<T> implements ChangeFeedPullModelIterator<T>
feedRange: ChangeFeedRange,
diagnosticNode: DiagnosticNodeInternal,
): Promise<ChangeFeedIteratorResponse<Array<T & Resource>>> {
const feedOptions: FeedOptions = { initialHeaders: {}, useIncrementalFeed: true };
const feedOptions: FeedOptions = {
initialHeaders: {},
useIncrementalFeed: true,
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
useAllVersionsAndDeleteFeed: false,
};

if (typeof this.changeFeedOptions.maxItemCount === "number") {
feedOptions.maxItemCount = this.changeFeedOptions.maxItemCount;
Expand All @@ -393,11 +401,23 @@ export class ChangeFeedForEpkRange<T> implements ChangeFeedPullModelIterator<T>
type: Constants.HttpHeaders.IfNoneMatch,
condition: feedRange.continuationToken,
};
} else if (this.startFromNow) {
feedOptions.initialHeaders[Constants.HttpHeaders.IfNoneMatch] =
Constants.ChangeFeedIfNoneMatchStartFromNowHeader;
}

if (this.startTime) {
feedOptions.initialHeaders[Constants.HttpHeaders.IfModifiedSince] = this.startTime;
}

if (
amanrao23 marked this conversation as resolved.
Show resolved Hide resolved
this.changeFeedOptions.changeFeedMode &&
this.changeFeedOptions.changeFeedMode === ChangeFeedMode.AllVersionsAndDeletes
) {
feedOptions.useAllVersionsAndDeleteFeed = true;
feedOptions.useIncrementalFeed = false;
}

const rangeId = await this.getPartitionRangeId(feedRange, diagnosticNode);
try {
// startEpk and endEpk are only valid in case we want to fetch result for a part of partition and not the entire partition.
Expand All @@ -423,7 +443,15 @@ export class ChangeFeedForEpkRange<T> implements ChangeFeedPullModelIterator<T>
getEmptyCosmosDiagnostics(),
);
} catch (err) {
// If any errors are encountered, eg. partition split or gone, handle it based on error code and not break the flow.
if (err.code >= 400 && err.code !== StatusCodes.Gone) {
philipthomas-MSFT marked this conversation as resolved.
Show resolved Hide resolved
const errorResponse = new ErrorResponse(err.message);
errorResponse.code = err.code;
errorResponse.headers = err.headers;

throw errorResponse;
}

// If any other errors are encountered, eg. partition split or gone, handle it based on error code and not break the flow.
return new ChangeFeedIteratorResponse(
[],
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import { InternalChangeFeedIteratorOptions } from "./InternalChangeFeedOptions";
import { ChangeFeedIteratorResponse } from "./ChangeFeedIteratorResponse";
import { Container, Resource } from "../../client";
import { ClientContext } from "../../ClientContext";
import { Constants, ResourceType } from "../../common";
import { Constants, ResourceType, StatusCodes } from "../../common";
import { FeedOptions, Response, ErrorResponse } from "../../request";
import { ContinuationTokenForPartitionKey } from "./ContinuationTokenForPartitionKey";
import { ChangeFeedPullModelIterator } from "./ChangeFeedPullModelIterator";
import { PartitionKey } from "../../documents";
import { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal";
import { getEmptyCosmosDiagnostics, withDiagnostics } from "../../utils/diagnostics";
import { ChangeFeedMode } from "./ChangeFeedMode";
/**
* @hidden
* Provides iterator for change feed for one partition key.
Expand All @@ -22,6 +23,7 @@ export class ChangeFeedForPartitionKey<T> implements ChangeFeedPullModelIterator
private startTime: string;
private rId: string;
private isInstantiated: boolean;
private startFromNow: boolean;
/**
* @internal
*/
Expand All @@ -37,8 +39,10 @@ export class ChangeFeedForPartitionKey<T> implements ChangeFeedPullModelIterator
? JSON.parse(changeFeedOptions.continuationToken)
: undefined;
this.isInstantiated = false;

if (changeFeedOptions.startTime) {
// startTime is used to store and specify time from which change feed should start reading new changes. StartFromNow flag is used to indicate fetching changes from now.
if (changeFeedOptions.startFromNow) {
this.startFromNow = true;
} else if (changeFeedOptions.startTime) {
this.startTime = changeFeedOptions.startTime.toUTCString();
}
}
Expand Down Expand Up @@ -119,8 +123,11 @@ export class ChangeFeedForPartitionKey<T> implements ChangeFeedPullModelIterator
private async getFeedResponse(
diagnosticNode: DiagnosticNodeInternal,
): Promise<ChangeFeedIteratorResponse<Array<T & Resource>>> {
const feedOptions: FeedOptions = { initialHeaders: {}, useIncrementalFeed: true };

const feedOptions: FeedOptions = {
initialHeaders: {},
useIncrementalFeed: true,
useAllVersionsAndDeleteFeed: false,
};
if (typeof this.changeFeedOptions.maxItemCount === "number") {
feedOptions.maxItemCount = this.changeFeedOptions.maxItemCount;
}
Expand All @@ -135,29 +142,53 @@ export class ChangeFeedForPartitionKey<T> implements ChangeFeedPullModelIterator
type: Constants.HttpHeaders.IfNoneMatch,
condition: continuation,
};
} else if (this.startFromNow) {
feedOptions.initialHeaders[Constants.HttpHeaders.IfNoneMatch] =
Constants.ChangeFeedIfNoneMatchStartFromNowHeader;
}

if (this.startTime) {
feedOptions.initialHeaders[Constants.HttpHeaders.IfModifiedSince] = this.startTime;
}

const response: Response<Array<T & Resource>> = await (this.clientContext.queryFeed<T>({
path: this.resourceLink,
resourceType: ResourceType.item,
resourceId: this.resourceId,
resultFn: (result) => (result ? result.Documents : []),
diagnosticNode,
query: undefined,
options: feedOptions,
partitionKey: this.partitionKey,
}) as Promise<any>);

return new ChangeFeedIteratorResponse(
response.result,
response.result ? response.result.length : 0,
response.code,
response.headers,
getEmptyCosmosDiagnostics(),
);
if (
this.changeFeedOptions.changeFeedMode &&
this.changeFeedOptions.changeFeedMode === ChangeFeedMode.AllVersionsAndDeletes
) {
feedOptions.useAllVersionsAndDeleteFeed = true;
feedOptions.useIncrementalFeed = false;
}
try {
const response: Response<Array<T & Resource>> = await (this.clientContext.queryFeed<T>({
path: this.resourceLink,
resourceType: ResourceType.item,
resourceId: this.resourceId,
resultFn: (result) => (result ? result.Documents : []),
diagnosticNode,
query: undefined,
options: feedOptions,
partitionKey: this.partitionKey,
}) as Promise<any>);
return new ChangeFeedIteratorResponse(
response.result,
response.result ? response.result.length : 0,
response.code,
response.headers,
getEmptyCosmosDiagnostics(),
);
} catch (err) {
if (err.code >= 400 && err.code !== StatusCodes.Gone) {
const errorResponse = new ErrorResponse(err.message);
errorResponse.code = err.code;
errorResponse.headers = err.headers;
throw errorResponse;
}
return new ChangeFeedIteratorResponse(
[],
0,
err.code,
err.headers,
getEmptyCosmosDiagnostics(),
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT license.

import { ChangeFeedStartFrom } from "./ChangeFeedStartFrom";

import { ChangeFeedMode } from "./ChangeFeedMode";
/**
* Specifies options for the change feed
*
Expand All @@ -21,4 +21,8 @@ export interface ChangeFeedIteratorOptions {
* Signals where to start from in the change feed.
*/
changeFeedStartFrom?: ChangeFeedStartFrom;
/**
* Signals the mode in which the change feed needs to start.
*/
changeFeedMode?: ChangeFeedMode;
}
6 changes: 6 additions & 0 deletions sdk/cosmosdb/cosmos/src/client/ChangeFeed/ChangeFeedMode.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
export enum ChangeFeedMode {
LatestVersion = "Incremental Feed",
AllVersionsAndDeletes = "Full-Fidelity Feed",
}
13 changes: 13 additions & 0 deletions sdk/cosmosdb/cosmos/src/client/ChangeFeed/ChangeFeedPolicy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { ChangeFeedRetentionTimeSpan } from "./ChangeFeedRetentionTimeSpan";
/**
* Represents the change feed policy configuration for a container in the Azure Cosmos DB service.
*/
export class ChangeFeedPolicy {
public retentionDuration: number;

constructor(retentionDuration: ChangeFeedRetentionTimeSpan) {
this.retentionDuration = retentionDuration.getRetentionInMinutes();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { ErrorResponse } from "../../request";

/*
* Represents the change feed policy configuration for a container in the Azure Cosmos DB service.
*/
export class ChangeFeedRetentionTimeSpan {
private retentionInMinutes: number;
/**
* @internal
*/
constructor(minutes: number) {
if (typeof minutes !== "number") {
amanrao23 marked this conversation as resolved.
Show resolved Hide resolved
throw new ErrorResponse("ChangeFeedRetentionTimeSpan must be a number.");
}
if (minutes < 0) {
throw new ErrorResponse("ChangeFeedRetentionTimeSpan must be a positive number.");
}
if (minutes % 1 !== 0) {
throw new ErrorResponse("Retention's granularity is minutes.");
}
this.retentionInMinutes = minutes;
}
/**
* Specifies the retention window in minutes for which processing the change feed with allVersionsAndDeletes mode will be available.
*/
static fromMinutes(minutes: number): ChangeFeedRetentionTimeSpan {
return new ChangeFeedRetentionTimeSpan(minutes);
}
/**
* @internal
*/
public getRetentionInMinutes(): number {
return this.retentionInMinutes;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { ChangeFeedMode } from "./ChangeFeedMode";

/**
* @hidden
* Internal Change Feed Iterator Options used only by ChangeFeedForEpkRange and ChangeFeedForPartitionKey.
Expand All @@ -13,4 +15,8 @@ export interface InternalChangeFeedIteratorOptions {
continuationToken?: string;

startTime?: Date;

changeFeedMode?: ChangeFeedMode;

startFromNow?: boolean;
}