Skip to content

Commit

Permalink
migrate database:get to new apiv2 client (with stream support!) (#2781)
Browse files Browse the repository at this point in the history
* add support for returning response streams

* use the new apiv2 client for database:get

* remove lodash

* fix writing to a file

* create streamToString, fix get error handling

* throw -> log

* always require resolving http errors while streaming

* fix error handling
  • Loading branch information
bkendall committed Nov 6, 2020
1 parent 250bee9 commit fea6258
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 87 deletions.
27 changes: 22 additions & 5 deletions src/apiv2.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import fetch, { Response, RequestInit } from "node-fetch";
import { Readable } from "stream";

import { FirebaseError } from "./error";
import * as logger from "./logger";
Expand All @@ -12,7 +13,7 @@ interface RequestOptions<T> extends VerbOptions<T> {
method: HttpMethod;
path: string;
json?: T;
responseType?: "json";
responseType?: "json" | "stream";
}

interface VerbOptions<T> {
Expand Down Expand Up @@ -143,6 +144,15 @@ export class Client {
reqOptions.responseType = "json";
}

// TODO(bkendall): stream + !resolveOnHTTPError makes for difficult handling.
// Figure out if there's a better way to handle streamed >=400 responses.
if (reqOptions.responseType === "stream" && !reqOptions.resolveOnHTTPError) {
throw new FirebaseError(
"apiv2 will not handle HTTP errors while streaming and you must set `resolveOnHTTPError` and check for res.status >= 400 on your own",
{ exit: 2 }
);
}

reqOptions = this.addRequestHeaders(reqOptions);

if (this.opts.auth) {
Expand Down Expand Up @@ -238,6 +248,8 @@ export class Client {
let body: ResT;
if (options.responseType === "json") {
body = await res.json();
} else if (options.responseType === "stream") {
body = (res.body as unknown) as ResT;
} else {
// This is how the linter wants the casting to T to work.
body = ((await res.text()) as unknown) as ResT;
Expand Down Expand Up @@ -283,10 +295,15 @@ export class Client {

let logBody = "[omitted]";
if (!options.skipLog?.resBody) {
try {
logBody = JSON.stringify(body);
} catch (_) {
logBody = `${body}`;
if (body instanceof Readable) {
// Don't attempt to read any stream type, in case the caller needs it.
logBody = "[stream]";
} else {
try {
logBody = JSON.stringify(body);
} catch (_) {
logBody = `${body}`;
}
}
}
logger.debug(`<<< [apiv2][body] ${options.method} ${logURL} ${logBody}`);
Expand Down
148 changes: 67 additions & 81 deletions src/commands/database-get.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,50 @@
import * as _ from "lodash";
import * as fs from "fs";
import * as url from "url";

import { Client } from "../apiv2";
import { Command } from "../command";
import * as requireInstance from "../requireInstance";
import { requirePermissions } from "../requirePermissions";
import * as request from "request";
import * as api from "../api";
import * as responseToError from "../responseToError";
import * as logger from "../logger";
import { FirebaseError } from "../error";
import { Emulators } from "../emulator/types";
import { printNoticeIfEmulated } from "../emulator/commandUtils";
import { FirebaseError } from "../error";
import { populateInstanceDetails } from "../management/database";
import { printNoticeIfEmulated } from "../emulator/commandUtils";
import { realtimeOriginOrEmulatorOrCustomUrl } from "../database/api";
import { requirePermissions } from "../requirePermissions";
import * as logger from "../logger";
import * as requireInstance from "../requireInstance";
import * as responseToError from "../responseToError";
import * as utils from "../utils";

function applyStringOpts(dest: any, src: any, keys: string[], jsonKeys: string[]): void {
_.forEach(keys, (key) => {
/**
* Copies any `keys` from `src` to `dest`. Then copies any `jsonKeys` from
* `src` as JSON strings to `dest`. Modifies `dest`.
* @param dest destination object.
* @param src source to read from for `keys` and `jsonKeys`.
* @param keys keys to copy from `src`.
* @param jsonKeys keys to copy as JSON strings from `src`.
*/
function applyStringOpts(
dest: { [key: string]: string },
src: { [key: string]: string },
keys: string[],
jsonKeys: string[]
): void {
for (const key of keys) {
if (src[key]) {
dest[key] = src[key];
}
});

}
// some keys need JSON encoding of the querystring value
_.forEach(jsonKeys, (key) => {
for (const key of jsonKeys) {
let jsonVal;
try {
jsonVal = JSON.parse(src[key]);
} catch (e) {
} catch (_) {
jsonVal = src[key];
}

if (src[key]) {
dest[key] = JSON.stringify(jsonVal);
}
});
}
}

export default new Command("database:get <path>")
Expand All @@ -60,13 +69,14 @@ export default new Command("database:get <path>")
.before(requireInstance)
.before(populateInstanceDetails)
.before(printNoticeIfEmulated, Emulators.DATABASE)
.action((path, options) => {
if (!_.startsWith(path, "/")) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.action(async (path: string, options: any) => {
if (!path.startsWith("/")) {
return utils.reject("Path must begin with /", { exit: 1 });
}

const dbHost = realtimeOriginOrEmulatorOrCustomUrl(options);
let dbUrl = utils.getDatabaseUrl(dbHost, options.instance, path + ".json");
const dbUrl = utils.getDatabaseUrl(dbHost, options.instance, path + ".json");
const query: { [key: string]: string } = {};
if (options.shallow) {
query.shallow = "true";
Expand All @@ -91,70 +101,46 @@ export default new Command("database:get <path>")
);

const urlObj = new url.URL(dbUrl);
Object.keys(query).forEach((key) => {
urlObj.searchParams.set(key, query[key]);
const client = new Client({
urlPrefix: urlObj.origin,
auth: true,
});
const res = await client.request<unknown, NodeJS.ReadableStream>({
method: "GET",
path: urlObj.pathname,
queryParams: query,
responseType: "stream",
resolveOnHTTPError: true,
});

dbUrl = urlObj.href;

logger.debug("Query URL: ", dbUrl);
const reqOptions = {
url: dbUrl,
};
const fileOut = !!options.output;
const outStream = fileOut ? fs.createWriteStream(options.output) : process.stdout;

return api.addRequestHeaders(reqOptions).then((reqOptionsWithToken) => {
return new Promise((resolve, reject) => {
const fileOut = !!options.output;
const outStream = fileOut ? fs.createWriteStream(options.output) : process.stdout;
const writeOut = (s: Buffer | string, cb?: Function): void => {
if (outStream === process.stdout) {
outStream.write(s, cb);
} else if (outStream instanceof fs.WriteStream) {
outStream.write(s, (err) => {
if (cb) {
cb(err);
}
});
}
};
let erroring = false;
let errorResponse = "";
let response: any;
if (res.status >= 400) {
// TODO(bkendall): consider moving stream-handling logic to responseToError.
const r = await res.response.text();
let d;
try {
d = JSON.parse(r);
} catch (e) {
throw new FirebaseError("Malformed JSON response", { original: e, exit: 2 });
}
throw responseToError({ statusCode: res.status }, d);
}

request
.get(reqOptionsWithToken)
.on("response", (res) => {
response = res;
if (response.statusCode >= 400) {
erroring = true;
}
})
.on("data", (chunk) => {
if (erroring) {
errorResponse += chunk;
} else {
writeOut(chunk);
}
})
.on("end", () => {
writeOut("\n", () => {
resolve();
});
if (erroring) {
try {
const data = JSON.parse(errorResponse);
return reject(responseToError(response, data));
} catch (e) {
return reject(
new FirebaseError("Malformed JSON response", {
exit: 2,
original: e,
})
);
}
}
})
.on("error", reject);
});
res.body.pipe(outStream);
// Tack on a single newline at the end of the stream.
res.body.once("end", () => {
if (outStream === process.stdout) {
// `stdout` can simply be written to.
outStream.write("\n");
} else if (outStream instanceof fs.WriteStream) {
// .pipe closes the output file stream, so we need to re-open the file.
const s = fs.createWriteStream(options.output, { flags: "a" });
s.write("\n");
s.close();
} else {
logger.debug("[database:get] Could not write line break to outStream");
}
});
});
80 changes: 80 additions & 0 deletions src/test/apiv2.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as sinon from "sinon";

import { Client } from "../apiv2";
import { FirebaseError } from "../error";
import { streamToString } from "../utils";
import * as helpers from "./helpers";

describe("apiv2", () => {
Expand Down Expand Up @@ -71,6 +72,85 @@ describe("apiv2", () => {
expect(nock.isDone()).to.be.true;
});

it("should not allow resolving on http error when streaming", async () => {
const c = new Client({ urlPrefix: "https://example.com" });
const r = c.request<unknown, NodeJS.ReadableStream>({
method: "GET",
path: "/path/to/foo",
responseType: "stream",
resolveOnHTTPError: false,
});
await expect(r).to.eventually.be.rejectedWith(FirebaseError, /streaming.+resolveOnHTTPError/);
});

it("should be able to stream a GET request", async () => {
nock("https://example.com")
.get("/path/to/foo")
.reply(200, "ablobofdata");

const c = new Client({ urlPrefix: "https://example.com" });
const r = await c.request<unknown, NodeJS.ReadableStream>({
method: "GET",
path: "/path/to/foo",
responseType: "stream",
resolveOnHTTPError: true,
});
const data = await streamToString(r.body);
expect(data).to.deep.equal("ablobofdata");
expect(nock.isDone()).to.be.true;
});

it("should resolve a 400 GET request", async () => {
nock("https://example.com")
.get("/path/to/foo")
.reply(400, "who dis?");

const c = new Client({ urlPrefix: "https://example.com" });
const r = await c.request<unknown, NodeJS.ReadableStream>({
method: "GET",
path: "/path/to/foo",
responseType: "stream",
resolveOnHTTPError: true,
});
expect(r.status).to.equal(400);
expect(await streamToString(r.body)).to.equal("who dis?");
expect(nock.isDone()).to.be.true;
});

it("should resolve a 404 GET request", async () => {
nock("https://example.com")
.get("/path/to/foo")
.reply(404, "not here");

const c = new Client({ urlPrefix: "https://example.com" });
const r = await c.request<unknown, NodeJS.ReadableStream>({
method: "GET",
path: "/path/to/foo",
responseType: "stream",
resolveOnHTTPError: true,
});
expect(r.status).to.equal(404);
expect(await streamToString(r.body)).to.equal("not here");
expect(nock.isDone()).to.be.true;
});

it("should be able to resolve a stream on a 404 GET request", async () => {
nock("https://example.com")
.get("/path/to/foo")
.reply(404, "does not exist");

const c = new Client({ urlPrefix: "https://example.com" });
const r = await c.request<unknown, NodeJS.ReadableStream>({
method: "GET",
path: "/path/to/foo",
responseType: "stream",
resolveOnHTTPError: true,
});
const data = await streamToString(r.body);
expect(data).to.deep.equal("does not exist");
expect(nock.isDone()).to.be.true;
});

it("should make a basic GET request if path didn't include a leading slash", async () => {
nock("https://example.com")
.get("/path/to/foo")
Expand Down
10 changes: 10 additions & 0 deletions src/test/utils.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,14 @@ describe("utils", () => {
);
});
});

describe("streamToString/stringToStream", () => {
it("should be able to create and read streams", async () => {
const stream = utils.stringToStream("hello world");
if (!stream) {
throw new Error("stream came back undefined");
}
await expect(utils.streamToString(stream)).to.eventually.equal("hello world");
});
});
});
18 changes: 17 additions & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ export function explainStdin(): void {
}

/**
* Convert text input to a Readable stream.
* Converts text input to a Readable stream.
* @param text string to turn into a stream.
* @return Readable stream, or undefined if text is empty.
*/
export function stringToStream(text: string): Readable | undefined {
if (!text) {
Expand All @@ -227,6 +229,20 @@ export function stringToStream(text: string): Readable | undefined {
return s;
}

/**
* Converts a Readable stream into a string.
* @param s a readable stream.
* @return a promise resolving to the string'd contents of the stream.
*/
export function streamToString(s: NodeJS.ReadableStream): Promise<string> {
return new Promise((resolve, reject) => {
let b = "";
s.on("error", reject);
s.on("data", (d) => (b += `${d}`));
s.once("end", () => resolve(b));
});
}

/**
* Sets the active project alias or id in the specified directory.
*/
Expand Down

0 comments on commit fea6258

Please sign in to comment.