Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: deferred initialization #317

Merged
merged 2 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ export class BigQueryStorageClient {
private _innerApiCalls: {[name: string]: Function};
private _pathTemplates: {[name: string]: gax.PathTemplate};
private _terminated = false;
private _opts: ClientOptions;
private _gaxModule: typeof gax | typeof gax.fallback;
private _gaxGrpc: gax.GrpcClient | gax.fallback.GrpcClient;
private _protos: {};
private _defaults: {[method: string]: gax.CallSettings};
auth: gax.GoogleAuth;
bigQueryStorageStub: Promise<{[name: string]: Function}>;
bigQueryStorageStub?: Promise<{[name: string]: Function}>;

/**
* Construct an instance of BigQueryStorageClient.
Expand Down Expand Up @@ -91,28 +96,31 @@ export class BigQueryStorageClient {
// If we are in browser, we are already using fallback because of the
// "browser" field in package.json.
// But if we were explicitly requested to use fallback, let's do it now.
const gaxModule = !isBrowser && opts.fallback ? gax.fallback : gax;
this._gaxModule = !isBrowser && opts.fallback ? gax.fallback : gax;

// Create a `gaxGrpc` object, with any grpc-specific options
// sent to the client.
opts.scopes = (this.constructor as typeof BigQueryStorageClient).scopes;
const gaxGrpc = new gaxModule.GrpcClient(opts);
this._gaxGrpc = new this._gaxModule.GrpcClient(opts);

// Save options to use in initialize() method.
this._opts = opts;

// Save the auth object to the client, for use by other methods.
this.auth = (gaxGrpc.auth as gax.GoogleAuth);
this.auth = (this._gaxGrpc.auth as gax.GoogleAuth);

// Determine the client header string.
const clientHeader = [
`gax/${gaxModule.version}`,
`gax/${this._gaxModule.version}`,
`gapic/${version}`,
];
if (typeof process !== 'undefined' && 'versions' in process) {
clientHeader.push(`gl-node/${process.versions.node}`);
} else {
clientHeader.push(`gl-web/${gaxModule.version}`);
clientHeader.push(`gl-web/${this._gaxModule.version}`);
}
if (!opts.fallback) {
clientHeader.push(`grpc/${gaxGrpc.grpcVersion}`);
clientHeader.push(`grpc/${this._gaxGrpc.grpcVersion}`);
}
if (opts.libName && opts.libVersion) {
clientHeader.push(`${opts.libName}/${opts.libVersion}`);
Expand All @@ -122,7 +130,7 @@ export class BigQueryStorageClient {
// For browsers, pass the JSON content.

const nodejsProtoPath = path.join(__dirname, '..', '..', 'protos', 'protos.json');
const protos = gaxGrpc.loadProto(
this._protos = this._gaxGrpc.loadProto(
opts.fallback ?
require("../../protos/protos.json") :
nodejsProtoPath
Expand All @@ -132,38 +140,56 @@ export class BigQueryStorageClient {
// identifiers to uniquely identify resources within the API.
// Create useful helper objects for these.
this._pathTemplates = {
readSessionPathTemplate: new gaxModule.PathTemplate(
readSessionPathTemplate: new this._gaxModule.PathTemplate(
'projects/{project}/locations/{location}/sessions/{session}'
),
streamPathTemplate: new gaxModule.PathTemplate(
streamPathTemplate: new this._gaxModule.PathTemplate(
'projects/{project}/locations/{location}/streams/{stream}'
),
};

// Some of the methods on this service provide streaming responses.
// Provide descriptors for these.
this._descriptors.stream = {
readRows: new gaxModule.StreamDescriptor(gax.StreamType.SERVER_STREAMING)
readRows: new this._gaxModule.StreamDescriptor(gax.StreamType.SERVER_STREAMING)
};

// Put together the default options sent with requests.
const defaults = gaxGrpc.constructSettings(
this._defaults = this._gaxGrpc.constructSettings(
'google.cloud.bigquery.storage.v1beta1.BigQueryStorage', gapicConfig as gax.ClientConfig,
opts.clientConfig || {}, {'x-goog-api-client': clientHeader.join(' ')});

// Set up a dictionary of "inner API calls"; the core implementation
// of calling the API is handled in `google-gax`, with this code
// merely providing the destination and request information.
this._innerApiCalls = {};
}

/**
* Initialize the client.
* Performs asynchronous operations (such as authentication) and prepares the client.
* This function will be called automatically when any class method is called for the
* first time, but if you need to initialize it before calling an actual method,
* feel free to call initialize() directly.
*
* You can await on this method if you want to make sure the client is initialized.
*
* @returns {Promise} A promise that resolves to an authenticated service stub.
*/
initialize() {
// If the client stub promise is already initialized, return immediately.
if (this.bigQueryStorageStub) {
return this.bigQueryStorageStub;
}

// Put together the "service stub" for
// google.cloud.bigquery.storage.v1beta1.BigQueryStorage.
this.bigQueryStorageStub = gaxGrpc.createStub(
opts.fallback ?
(protos as protobuf.Root).lookupService('google.cloud.bigquery.storage.v1beta1.BigQueryStorage') :
this.bigQueryStorageStub = this._gaxGrpc.createStub(
this._opts.fallback ?
(this._protos as protobuf.Root).lookupService('google.cloud.bigquery.storage.v1beta1.BigQueryStorage') :
// tslint:disable-next-line no-any
(protos as any).google.cloud.bigquery.storage.v1beta1.BigQueryStorage,
opts) as Promise<{[method: string]: Function}>;
(this._protos as any).google.cloud.bigquery.storage.v1beta1.BigQueryStorage,
this._opts) as Promise<{[method: string]: Function}>;

// Iterate over each of the methods that the service provides
// and create an API call method for each.
Expand All @@ -182,9 +208,9 @@ export class BigQueryStorageClient {
throw err;
});

const apiCall = gaxModule.createApiCall(
const apiCall = this._gaxModule.createApiCall(
innerCallPromise,
defaults[methodName],
this._defaults[methodName],
this._descriptors.page[methodName] ||
this._descriptors.stream[methodName] ||
this._descriptors.longrunning[methodName]
Expand All @@ -198,6 +224,8 @@ export class BigQueryStorageClient {
return apiCall(argument, callOptions, callback);
};
}

return this.bigQueryStorageStub;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this._gaxModule.createApiCall, does initialize not get called when you new a client?

}

/**
Expand Down Expand Up @@ -343,6 +371,7 @@ export class BigQueryStorageClient {
'table_reference.project_id': request.tableReference!.projectId || '',
'table_reference.dataset_id': request.tableReference!.datasetId || '',
});
this.initialize();
return this._innerApiCalls.createReadSession(request, options, callback);
}
batchCreateReadSessionStreams(
Expand Down Expand Up @@ -409,6 +438,7 @@ export class BigQueryStorageClient {
] = gax.routingHeader.fromParams({
'session.name': request.session!.name || '',
});
this.initialize();
return this._innerApiCalls.batchCreateReadSessionStreams(request, options, callback);
}
finalizeStream(
Expand Down Expand Up @@ -481,6 +511,7 @@ export class BigQueryStorageClient {
] = gax.routingHeader.fromParams({
'stream.name': request.stream!.name || '',
});
this.initialize();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this this potentially be:

this.initialize().then(() => {
  return this._innerApiCalls.splitReadStream(request, options, callback);
})

It seems like we're still potentially creating promises that are unhandled, or maybe I'm missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this does not change. We won't be able to make it perfect until we stop supporting callbacks. But now it can be more predictable since users can do await client.initialize() and catch all errors there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem is that the method must immediately return a cancellable call that comes from Gax, not just a promise that eventually resolves to a result.

This use case:

const call = client.doStuff(request);
call.cancel();  // <---- existing feature, we don't want to break it now
...
call.then(result => ...);

return this._innerApiCalls.finalizeStream(request, options, callback);
}
splitReadStream(
Expand Down Expand Up @@ -560,6 +591,7 @@ export class BigQueryStorageClient {
] = gax.routingHeader.fromParams({
'original_stream.name': request.originalStream!.name || '',
});
this.initialize();
return this._innerApiCalls.splitReadStream(request, options, callback);
}

Expand Down Expand Up @@ -598,6 +630,7 @@ export class BigQueryStorageClient {
] = gax.routingHeader.fromParams({
'read_position.stream.name': request.readPosition!.stream!.name || '',
});
this.initialize();
return this._innerApiCalls.readRows(request, options);
}

Expand Down Expand Up @@ -709,8 +742,9 @@ export class BigQueryStorageClient {
* The client will no longer be usable and all future behavior is undefined.
*/
close(): Promise<void> {
this.initialize();
if (!this._terminated) {
return this.bigQueryStorageStub.then(stub => {
return this.bigQueryStorageStub!.then(stub => {
this._terminated = true;
stub.close();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,30 @@ describe('v1beta1.BigQueryStorageClient', () => {
});
assert(client);
});
it('has initialize method and supports deferred initialization', async () => {
const client = new bigquerystorageModule.v1beta1.BigQueryStorageClient({
credentials: { client_email: 'bogus', private_key: 'bogus' },
projectId: 'bogus',
});
assert.strictEqual(client.bigQueryStorageStub, undefined);
await client.initialize();
assert(client.bigQueryStorageStub);
});
it('has close method', () => {
const client = new bigquerystorageModule.v1beta1.BigQueryStorageClient({
credentials: { client_email: 'bogus', private_key: 'bogus' },
projectId: 'bogus',
});
client.close();
});
describe('createReadSession', () => {
it('invokes createReadSession without error', done => {
const client = new bigquerystorageModule.v1beta1.BigQueryStorageClient({
credentials: {client_email: 'bogus', private_key: 'bogus'},
projectId: 'bogus',
});
// Initialize client before mocking
client.initialize();
// Mock request
const request: protosTypes.google.cloud.bigquery.storage.v1beta1.ICreateReadSessionRequest = {};
request.tableReference = {};
Expand All @@ -129,6 +147,8 @@ describe('v1beta1.BigQueryStorageClient', () => {
credentials: {client_email: 'bogus', private_key: 'bogus'},
projectId: 'bogus',
});
// Initialize client before mocking
client.initialize();
// Mock request
const request: protosTypes.google.cloud.bigquery.storage.v1beta1.ICreateReadSessionRequest = {};
request.tableReference = {};
Expand Down Expand Up @@ -157,6 +177,8 @@ describe('v1beta1.BigQueryStorageClient', () => {
credentials: {client_email: 'bogus', private_key: 'bogus'},
projectId: 'bogus',
});
// Initialize client before mocking
client.initialize();
// Mock request
const request: protosTypes.google.cloud.bigquery.storage.v1beta1.IBatchCreateReadSessionStreamsRequest = {};
request.session = {};
Expand All @@ -181,6 +203,8 @@ describe('v1beta1.BigQueryStorageClient', () => {
credentials: {client_email: 'bogus', private_key: 'bogus'},
projectId: 'bogus',
});
// Initialize client before mocking
client.initialize();
// Mock request
const request: protosTypes.google.cloud.bigquery.storage.v1beta1.IBatchCreateReadSessionStreamsRequest = {};
request.session = {};
Expand All @@ -207,6 +231,8 @@ describe('v1beta1.BigQueryStorageClient', () => {
credentials: {client_email: 'bogus', private_key: 'bogus'},
projectId: 'bogus',
});
// Initialize client before mocking
client.initialize();
// Mock request
const request: protosTypes.google.cloud.bigquery.storage.v1beta1.IFinalizeStreamRequest = {};
request.stream = {};
Expand All @@ -231,6 +257,8 @@ describe('v1beta1.BigQueryStorageClient', () => {
credentials: {client_email: 'bogus', private_key: 'bogus'},
projectId: 'bogus',
});
// Initialize client before mocking
client.initialize();
// Mock request
const request: protosTypes.google.cloud.bigquery.storage.v1beta1.IFinalizeStreamRequest = {};
request.stream = {};
Expand All @@ -257,6 +285,8 @@ describe('v1beta1.BigQueryStorageClient', () => {
credentials: {client_email: 'bogus', private_key: 'bogus'},
projectId: 'bogus',
});
// Initialize client before mocking
client.initialize();
// Mock request
const request: protosTypes.google.cloud.bigquery.storage.v1beta1.ISplitReadStreamRequest = {};
request.originalStream = {};
Expand All @@ -281,6 +311,8 @@ describe('v1beta1.BigQueryStorageClient', () => {
credentials: {client_email: 'bogus', private_key: 'bogus'},
projectId: 'bogus',
});
// Initialize client before mocking
client.initialize();
// Mock request
const request: protosTypes.google.cloud.bigquery.storage.v1beta1.ISplitReadStreamRequest = {};
request.originalStream = {};
Expand All @@ -307,6 +339,8 @@ describe('v1beta1.BigQueryStorageClient', () => {
credentials: {client_email: 'bogus', private_key: 'bogus'},
projectId: 'bogus',
});
// Initialize client before mocking
client.initialize();
// Mock request
const request: protosTypes.google.cloud.bigquery.storage.v1beta1.IReadRowsRequest = {};
request.readPosition = {};
Expand All @@ -331,6 +365,8 @@ describe('v1beta1.BigQueryStorageClient', () => {
credentials: {client_email: 'bogus', private_key: 'bogus'},
projectId: 'bogus',
});
// Initialize client before mocking
client.initialize();
// Mock request
const request: protosTypes.google.cloud.bigquery.storage.v1beta1.IReadRowsRequest = {};
request.readPosition = {};
Expand Down
Loading