Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add the count aggregate function #972

Merged
Show file tree
Hide file tree
Changes from 71 commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
fe55548
Files copied over
danieljbruce Jul 26, 2022
f84ebdd
implementation of aggregate queries.
danieljbruce Jul 26, 2022
259141c
Checkout index.ts previous version
danieljbruce Jul 26, 2022
8d8a01f
src seems to be error free
danieljbruce Jul 26, 2022
c88009e
optional parameters
danieljbruce Jul 26, 2022
f542235
replace with recognized type
danieljbruce Jul 27, 2022
1b21ec2
protos generated
danieljbruce Jul 28, 2022
83586e7
old package.json
danieljbruce Jul 28, 2022
0f2bd32
aggregation query request
danieljbruce Aug 8, 2022
5469ebc
Commit so I can revert
danieljbruce Aug 8, 2022
1733072
Revert "aggregation query request"
danieljbruce Aug 8, 2022
7a99e8c
Revert "Revert "aggregation query request""
danieljbruce Aug 8, 2022
7e4d53a
Added a parser
danieljbruce Aug 22, 2022
7290af7
Comment out filters
danieljbruce Aug 23, 2022
efe0826
Fixed parser
danieljbruce Aug 23, 2022
6aa110c
Add info
danieljbruce Aug 23, 2022
2de1518
more tests, adding alias
danieljbruce Aug 24, 2022
e36dcde
use deep strict equal instead
danieljbruce Aug 24, 2022
6e85612
Work on passing in buffered input.
danieljbruce Aug 24, 2022
7e87274
Convert to buffer
danieljbruce Aug 25, 2022
477a199
Add alias
danieljbruce Aug 25, 2022
5ffafd6
Add the aggregate function
danieljbruce Aug 26, 2022
cc602be
change test descriptions
danieljbruce Aug 26, 2022
5de6f5c
Add a unit test
danieljbruce Aug 26, 2022
5639d42
Revert "Files copied over"
danieljbruce Aug 26, 2022
5143df3
Revert "protos generated"
danieljbruce Aug 26, 2022
4329129
Eliminate comments
danieljbruce Aug 26, 2022
b0c05b8
Merge branch 'count-aggregation-2' of https://github.com/danieljbruce…
danieljbruce Aug 26, 2022
cd82996
Add new line
danieljbruce Aug 26, 2022
35ab76d
Merge branch 'count-aggregation-2' of https://github.com/danieljbruce…
danieljbruce Aug 26, 2022
b58a1d5
Change upTo
danieljbruce Aug 30, 2022
377d857
Change maximum name to upTo
danieljbruce Aug 30, 2022
3e74452
upTo relabelled from maximum
danieljbruce Aug 30, 2022
6e11243
remove import
danieljbruce Aug 31, 2022
1225d24
ran the linter
danieljbruce Aug 31, 2022
2d4386b
improve code readability
danieljbruce Aug 31, 2022
db9c340
move self equals this
danieljbruce Aug 31, 2022
b3ea976
change header
danieljbruce Aug 31, 2022
7e9d018
Revert "change header"
danieljbruce Aug 31, 2022
bb839a3
Add a header
danieljbruce Aug 31, 2022
5f5a815
change the unit test
danieljbruce Aug 31, 2022
bd97895
Architecture changes
danieljbruce Sep 2, 2022
9b0943c
Getting closer
danieljbruce Sep 2, 2022
3ed910a
Add the right parsing
danieljbruce Sep 2, 2022
60a3486
TODO message
danieljbruce Sep 14, 2022
83dad3b
Remove and add array over for aggregate fields
danieljbruce Sep 14, 2022
5370810
unit test and count function
danieljbruce Sep 14, 2022
b6278b3
fix unit test as result of new changes
danieljbruce Sep 21, 2022
eca8b33
linting fix
danieljbruce Sep 21, 2022
8fa14af
Added a test for encoding using a count function
danieljbruce Sep 22, 2022
5acff03
lint fix
danieljbruce Sep 22, 2022
a79f742
Merge branch 'main' into count-aggregation-only-relevant-changes
telpirion Oct 6, 2022
6766066
removed upTo
Oct 6, 2022
8c6748a
Merge branch 'main' into count-aggregation-only-relevant-changes
telpirion Oct 18, 2022
881a800
merge
Oct 18, 2022
c09d548
undid local changes to package.json
Oct 18, 2022
794314e
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Oct 19, 2022
b13b5e9
Update src/aggregate.ts
telpirion Oct 20, 2022
872675e
per reviewer
Oct 20, 2022
7cc59b7
Merge branch 'main' into count-aggregation-only-relevant-changes
telpirion Oct 26, 2022
cbce348
Merge branch 'main' into count-aggregation-only-relevant-changes
telpirion Nov 18, 2022
2b08cb5
refactor and rename some methods
kolea2 Nov 22, 2022
27d1365
Add run method to aggregate query
danieljbruce Jan 9, 2023
450a519
test update and return value change
danieljbruce Jan 9, 2023
f9acad7
compare aggregate results
danieljbruce Jan 9, 2023
72e412f
Merge branch 'main' into count-aggregation-only-relevant-changes
danieljbruce Jan 10, 2023
25e3b95
createAggregationQuery
danieljbruce Jan 10, 2023
28d5b60
Merge branch 'count-aggregation-only-relevant-changes' of https://git…
danieljbruce Jan 10, 2023
5707db9
linter and header fix
danieljbruce Jan 10, 2023
0874935
TODOs
danieljbruce Jan 10, 2023
4af78aa
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jan 10, 2023
f8d771e
Remove autogenerated comment
danieljbruce Jan 10, 2023
1f094d8
Merge branch 'count-aggregation-only-relevant-changes' of https://git…
danieljbruce Jan 10, 2023
77f3e59
count aggregation with limit filter tests
danieljbruce Jan 11, 2023
088c844
lint fix
danieljbruce Jan 11, 2023
6c23ccb
Commented functions and classes
danieljbruce Jan 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protos/protos.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion protos/protos.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 94 additions & 0 deletions src/aggregate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// ** This file is automatically generated by gapic-generator-typescript. **
// ** https://github.com/googleapis/gapic-generator-typescript **
// ** All changes to this file may be overwritten. **
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the notice correct that this is autogenerated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not autogenerated. Thanks for catching this one! Changes pushed.


import {Query} from './index';
import {RunQueryOptions, RunQueryResponse} from './query';
import {RequestCallback} from './request';
const AGGREGATE_QUERY = Symbol('AGGREGATE_QUERY');

class AggregateQuery {
type = AGGREGATE_QUERY;
aggregations: Array<AggregateField>;
query: Query | undefined;

constructor(query: Query) {
this.query = query;
this.aggregations = [];
}

count(alias: string): AggregateQuery {
this.aggregations.push(AggregateField.count().alias(alias));
return this;
}

addAggregation(aggregation: AggregateField): AggregateQuery {
this.aggregations.push(aggregation);
return this;
}

addAggregations(aggregations: AggregateField[]): AggregateQuery {
for (const aggregation of aggregations) {
this.aggregations.push(aggregation);
}
return this;
}

run(
optionsOrCallback?: RunQueryOptions | RequestCallback,
cb?: RequestCallback
): void | Promise<RunQueryResponse> {
const options =
typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
const callback =
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!;
const scope = this.query!.scope;
const runAggregationQuery = scope!.runAggregationQuery.bind(scope);
return runAggregationQuery(this, options, callback);
}

// eslint-disable-next-line
toProto(): any {
telpirion marked this conversation as resolved.
Show resolved Hide resolved
return this.aggregations.map(aggregation => aggregation.toProto());
}
}

abstract class AggregateField {
alias_?: string;
telpirion marked this conversation as resolved.
Show resolved Hide resolved

static count(): Count {
return new Count();
}

alias(alias: string): AggregateField {
this.alias_ = alias;
return this;
}

// eslint-disable-next-line
abstract toProto(): any;
}

class Count extends AggregateField {
// eslint-disable-next-line
toProto(): any {
const count = Object.assign({});
return Object.assign({count}, this.alias_ ? {alias: this.alias_} : null);
}
}

export {AggregateField, AggregateQuery, AGGREGATE_QUERY};
telpirion marked this conversation as resolved.
Show resolved Hide resolved
12 changes: 12 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import {
import {Transaction} from './transaction';
import {promisifyAll} from '@google-cloud/promisify';
import {google} from '../protos/protos';
import {AggregateQuery} from './aggregate';

const {grpc} = new GrpcClient();

Expand Down Expand Up @@ -509,6 +510,15 @@ class Datastore extends DatastoreRequest {
this.auth = new GoogleAuth(this.options);
}

/**
* Create an aggregation query from a Query.
*
* @param {Query} query A Query object.
*/
createAggregationQuery(query: Query): AggregateQuery {
return new AggregateQuery(query);
}

/**
* Export entities from this project to a Google Cloud Storage bucket.
*
Expand Down Expand Up @@ -1797,10 +1807,12 @@ class Datastore extends DatastoreRequest {
*/
promisifyAll(Datastore, {
exclude: [
'createAggregationQuery',
'double',
'isDouble',
'geoPoint',
'getProjectId',
'getSharedQueryOptions',
'isGeoPoint',
'index',
'int',
Expand Down
3 changes: 3 additions & 0 deletions src/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {Entity} from './entity';
import {Transaction} from './transaction';
import {CallOptions} from 'google-gax';
import {RunQueryStreamOptions} from '../src/request';
import {AggregateField, AggregateQuery} from './aggregate';
telpirion marked this conversation as resolved.
Show resolved Hide resolved

export type Operator =
| '='
Expand Down Expand Up @@ -572,6 +573,8 @@ export interface RunQueryCallback {

export type RunQueryResponse = [Entity[], RunQueryInfo];

export type RunAggregateQueryResponse = any;

export interface RunQueryInfo {
endCursor?: string;
moreResults?:
Expand Down
138 changes: 114 additions & 24 deletions src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {
KeyProto,
ResponseResult,
Entities,
ValueProto,
} from './entity';
import {
Query,
Expand All @@ -55,6 +56,7 @@ import {
} from './query';
import {Datastore} from '.';
import ITimestamp = google.protobuf.ITimestamp;
import {AggregateQuery} from './aggregate';

/**
* A map of read consistency values to proto codes.
Expand Down Expand Up @@ -564,6 +566,76 @@ class DatastoreRequest {
);
}

runAggregationQuery(
query: AggregateQuery,
options?: RunQueryOptions
): Promise<RunQueryResponse>;
runAggregationQuery(
query: AggregateQuery,
options: RunQueryOptions,
callback: RequestCallback
): void;
runAggregationQuery(query: AggregateQuery, callback: RequestCallback): void;
runAggregationQuery(
query: AggregateQuery,
optionsOrCallback?: RunQueryOptions | RequestCallback,
cb?: RequestCallback
): void | Promise<RunQueryResponse> {
const options =
typeof optionsOrCallback === 'object' ? optionsOrCallback : {};
const callback =
typeof optionsOrCallback === 'function' ? optionsOrCallback : cb!;

query.query = extend(true, new Query(), query.query);
let queryProto: QueryProto;
try {
queryProto = entity.queryToQueryProto(query.query);
} catch (e) {
// using setImmediate here to make sure this doesn't throw a
// synchronous error
setImmediate(callback, e as Error);
return;
}
const sharedQueryOpts = this.getSharedQueryOptions(query.query, options);
const aggregationQueryOptions: AggregationQueryOptions = {
nestedQuery: queryProto,
aggregations: query.toProto(),
};
const reqOpts: RunAggregationQueryRequest = Object.assign(sharedQueryOpts, {
aggregationQuery: aggregationQueryOptions,
});
this.request_(
{
client: 'DatastoreClient',
method: 'runAggregationQuery',
reqOpts,
gaxOpts: options.gaxOptions,
},
(err, res) => {
if (res && res.batch) {
const results = res.batch.aggregationResults;
const finalResults = results
.map(
(aggregationResult: any) => aggregationResult.aggregateProperties
)
.map((aggregateProperties: any) =>
Object.fromEntries(
new Map(
Object.keys(aggregateProperties).map(key => [
key,
entity.decodeValueProto(aggregateProperties[key]),
])
)
)
);
callback(err, finalResults);
} else {
callback(err, res);
}
}
);
}

/**
* Datastore allows you to query entities by kind, filter them by property
* filters, and sort them by a property name. Projection and pagination are
Expand Down Expand Up @@ -729,32 +801,20 @@ class DatastoreRequest {
*/
runQueryStream(query: Query, options: RunQueryStreamOptions = {}): Transform {
query = extend(true, new Query(), query);

const makeRequest = (query: Query) => {
const reqOpts = {} as RequestOptions;

let queryProto: QueryProto;
try {
reqOpts.query = entity.queryToQueryProto(query);
queryProto = entity.queryToQueryProto(query);
} catch (e) {
// using setImmediate here to make sure this doesn't throw a
// synchronous error
setImmediate(onResultSet, e as Error);
return;
}
const sharedQueryOpts = this.getSharedQueryOptions(query, options);

if (options.consistency) {
const code = CONSISTENCY_PROTO_CODE[options.consistency.toLowerCase()];
reqOpts.readOptions = {
readConsistency: code,
};
}

if (query.namespace) {
reqOpts.partitionId = {
namespaceId: query.namespace,
};
}

const reqOpts: RequestOptions = sharedQueryOpts;
reqOpts.query = queryProto;
this.request_(
{
client: 'DatastoreClient',
Expand Down Expand Up @@ -827,6 +887,25 @@ class DatastoreRequest {
return stream;
}

private getSharedQueryOptions(
query: Query,
options: RunQueryStreamOptions = {}
): SharedQueryOptions {
const sharedQueryOpts = {} as SharedQueryOptions;
if (options.consistency) {
const code = CONSISTENCY_PROTO_CODE[options.consistency.toLowerCase()];
sharedQueryOpts.readOptions = {
readConsistency: code,
};
}
if (query.namespace) {
sharedQueryOpts.partitionId = {
namespaceId: query.namespace,
};
}
return sharedQueryOpts;
}

/**
* Merge the specified object(s). If a key is incomplete, its associated object
* is inserted and the original Key object is updated to contain the generated ID.
Expand Down Expand Up @@ -857,7 +936,7 @@ class DatastoreRequest {
callback?: SaveCallback
): void | Promise<CommitResponse> {
const transaction = this.datastore.transaction();
transaction.run(async err => {
transaction.run(async (err: any) => {
if (err) {
try {
await transaction.rollback();
Expand Down Expand Up @@ -1066,27 +1145,36 @@ export interface RequestConfig {
prepared?: boolean;
reqOpts?: RequestOptions;
}
export interface RequestOptions {
mutations?: google.datastore.v1.IMutation[];
keys?: Entity;
export interface SharedQueryOptions {
projectId?: string;
partitionId?: google.datastore.v1.IPartitionId | null;
readOptions?: {
readConsistency?: number;
transaction?: string;
readTime?: ITimestamp;
};
partitionId?: google.datastore.v1.IPartitionId | null;
}
export interface RequestOptions extends SharedQueryOptions {
mutations?: google.datastore.v1.IMutation[];
keys?: Entity;
transactionOptions?: {
readOnly?: {};
readWrite?: {previousTransaction?: string};
} | null;
transaction?: string | null;
mode?: string;
projectId?: string;
query?: QueryProto;
filter?: string;
indexId?: string;
entityFilter?: google.datastore.admin.v1.IEntityFilter;
}
export interface RunAggregationQueryRequest extends SharedQueryOptions {
aggregationQuery: AggregationQueryOptions;
}
export interface AggregationQueryOptions {
nestedQuery: QueryProto;
aggregations: Array<any>;
}
export type RunQueryStreamOptions = RunQueryOptions;
export interface CommitCallback {
(err?: Error | null, resp?: google.datastore.v1.ICommitResponse): void;
Expand All @@ -1102,7 +1190,9 @@ export type DeleteResponse = CommitResponse;
* All async methods (except for streams) will return a Promise in the event
* that a callback is omitted.
*/
promisifyAll(DatastoreRequest);
promisifyAll(DatastoreRequest, {
exclude: ['getSharedQueryOptions'],
});

/**
* Reference to the {@link DatastoreRequest} class.
Expand Down
Loading