Skip to content

Commit

Permalink
fix: move from fetch to request to fix ENOBUFS
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelcr committed Jan 25, 2023
1 parent 6ce6454 commit 9b26439
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 96 deletions.
8 changes: 6 additions & 2 deletions src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ interface Env {
* before declaring the failure as a non-retryable error.
*/
METADATA_MAX_IMMEDIATE_URI_RETRIES: number;
/** Timeout period for a token metadata URL fetch (milliseconds) */
/**
* Timeout period for a token metadata URL fetch in milliseconds. You should not make this
* timeout very short as usually IPFS and other gateways take a few seconds to respond with the
* requested resource. Defaults to 30 seconds.
*/
METADATA_FETCH_TIMEOUT_MS: number;
/**
* The maximum number of bytes of metadata to fetch. If the fetch encounters more bytes than this
Expand Down Expand Up @@ -204,7 +208,7 @@ export function getEnvVars(): Env {
},
METADATA_FETCH_TIMEOUT_MS: {
type: 'number',
default: 10_000,
default: 30_000,
},
METADATA_MAX_PAYLOAD_BYTE_SIZE: {
type: 'number',
Expand Down
2 changes: 1 addition & 1 deletion src/token-processor/util/image-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export async function processImageUrl(imgUrl: string): Promise<string> {

export function getImageUrl(uri: string): string {
// Support images embedded in a Data URL
if (new URL(uri).protocol === 'data:') {
if (uri.startsWith('data:')) {
const dataUrl = parseDataUrl(uri);
if (!dataUrl) {
throw new MetadataParseError(`Data URL could not be parsed: ${uri}`);
Expand Down
89 changes: 37 additions & 52 deletions src/token-processor/util/metadata-helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as querystring from 'querystring';
import { fetch } from 'undici';
import { Agent, errors, request } from 'undici';
import {
DbMetadataAttributeInsert,
DbMetadataInsert,
Expand All @@ -8,7 +8,6 @@ import {
DbToken,
} from '../../pg/types';
import { ENV } from '../../env';
import { TextDecoder } from 'util';
import {
HttpError,
MetadataParseError,
Expand Down Expand Up @@ -180,54 +179,36 @@ async function parseMetadataForInsertion(
* Fetches metadata while monitoring timeout and size limits. Throws if any is reached.
* Taken from https://github.com/node-fetch/node-fetch/issues/1149#issuecomment-840416752
* @param httpUrl - URL to fetch
* @returns JSON result string
* @returns Response text
*/
export async function performSizeAndTimeLimitedMetadataFetch(
httpUrl: URL
): Promise<string | undefined> {
const url = httpUrl.toString();
const ctrl = new AbortController();
let abortReason: Error | undefined;

const timer = setTimeout(() => {
abortReason = new MetadataTimeoutError(url);
ctrl.abort();
}, ENV.METADATA_FETCH_TIMEOUT_MS);
try {
const networkResult = await fetch(url, {
const result = await request(url, {
method: 'GET',
signal: ctrl.signal,
throwOnError: true,
dispatcher:
// Disable during tests so we can inject a global mock agent.
process.env.NODE_ENV === 'test'
? undefined
: new Agent({
headersTimeout: ENV.METADATA_FETCH_TIMEOUT_MS,
bodyTimeout: ENV.METADATA_FETCH_TIMEOUT_MS,
maxResponseSize: ENV.METADATA_MAX_PAYLOAD_BYTE_SIZE,
}),
});
if (networkResult.status >= 400) {
if (networkResult.status === 429) {
throw new TooManyRequestsHttpError(url);
}
throw new HttpError(`${url} (${networkResult.status})`);
}
if (networkResult.body) {
const decoder = new TextDecoder();
let responseText: string = '';
let bytesWritten = 0;
const reportedContentLength = Number(networkResult.headers.get('content-length') ?? 0);
if (reportedContentLength > ENV.METADATA_MAX_PAYLOAD_BYTE_SIZE) {
abortReason = new MetadataSizeExceededError(url);
ctrl.abort();
}
for await (const chunk of networkResult.body) {
bytesWritten += chunk.byteLength;
if (bytesWritten > ENV.METADATA_MAX_PAYLOAD_BYTE_SIZE) {
abortReason = new MetadataSizeExceededError(url);
ctrl.abort();
}
responseText += decoder.decode(chunk as ArrayBuffer, { stream: true });
}
responseText += decoder.decode(); // flush the remaining bytes
clearTimeout(timer);
return responseText;
}
return (await result.body.text()) as string;
} catch (error) {
clearTimeout(timer);
throw abortReason ?? error;
if (error instanceof errors.HeadersTimeoutError || error instanceof errors.BodyTimeoutError) {
throw new MetadataTimeoutError(url);
} else if (error instanceof errors.ResponseExceededMaxSizeError) {
throw new MetadataSizeExceededError(url);
} else if (error instanceof errors.ResponseStatusCodeError && error.statusCode === 429) {
throw new TooManyRequestsHttpError(url);
}
throw new HttpError(`${url}: ${error}`, error);
}
}

Expand Down Expand Up @@ -313,17 +294,21 @@ export async function getMetadataFromUri(token_uri: string): Promise<RawMetadata
* @returns Fetchable URL
*/
export function getFetchableUrl(uri: string): URL {
const parsedUri = new URL(uri);
if (parsedUri.protocol === 'http:' || parsedUri.protocol === 'https:') return parsedUri;
if (parsedUri.protocol === 'ipfs:') {
const host = parsedUri.host === 'ipfs' ? 'ipfs' : `ipfs/${parsedUri.host}`;
return new URL(`${ENV.PUBLIC_GATEWAY_IPFS}/${host}${parsedUri.pathname}`);
}
if (parsedUri.protocol === 'ipns:') {
return new URL(`${ENV.PUBLIC_GATEWAY_IPFS}/${parsedUri.host}${parsedUri.pathname}`);
}
if (parsedUri.protocol === 'ar:') {
return new URL(`${ENV.PUBLIC_GATEWAY_ARWEAVE}/${parsedUri.host}${parsedUri.pathname}`);
try {
const parsedUri = new URL(uri);
if (parsedUri.protocol === 'http:' || parsedUri.protocol === 'https:') return parsedUri;
if (parsedUri.protocol === 'ipfs:') {
const host = parsedUri.host === 'ipfs' ? 'ipfs' : `ipfs/${parsedUri.host}`;
return new URL(`${ENV.PUBLIC_GATEWAY_IPFS}/${host}${parsedUri.pathname}`);
}
if (parsedUri.protocol === 'ipns:') {
return new URL(`${ENV.PUBLIC_GATEWAY_IPFS}/${parsedUri.host}${parsedUri.pathname}`);
}
if (parsedUri.protocol === 'ar:') {
return new URL(`${ENV.PUBLIC_GATEWAY_ARWEAVE}/${parsedUri.host}${parsedUri.pathname}`);
}
} catch (error) {
throw new MetadataParseError(`Invalid uri: ${uri}`);
}
throw new MetadataParseError(`Unsupported uri protocol: ${uri}`);
}
Expand Down
41 changes: 0 additions & 41 deletions tests/metadata-helpers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,47 +32,6 @@ describe('Metadata Helpers', () => {
expect(result).toBe('hello');
});

test('reject large responses', async () => {
const yugeBuffer = Buffer.alloc(ENV.METADATA_MAX_PAYLOAD_BYTE_SIZE + 100);
const url = new URL('http://test.io/1.json');

const agent = new MockAgent();
agent.disableNetConnect();
agent
.get('http://test.io')
.intercept({
path: '/1.json',
method: 'GET',
})
.reply(200, yugeBuffer);
setGlobalDispatcher(agent);

await expect(performSizeAndTimeLimitedMetadataFetch(url)).rejects.toThrow(
MetadataSizeExceededError
);
});

test('reject timed out requests', async () => {
const prevTimeout = ENV.METADATA_FETCH_TIMEOUT_MS;
ENV.METADATA_FETCH_TIMEOUT_MS = 100;
const url = new URL('http://test.io/1.json');

const agent = new MockAgent();
agent.disableNetConnect();
agent
.get('http://test.io')
.intercept({
path: '/1.json',
method: 'GET',
})
.reply(200, '')
.delay(150);
setGlobalDispatcher(agent);

await expect(performSizeAndTimeLimitedMetadataFetch(url)).rejects.toThrow(MetadataTimeoutError);
ENV.METADATA_FETCH_TIMEOUT_MS = prevTimeout;
});

test('throws on incorrect raw metadata schema', async () => {
const agent = new MockAgent();
agent.disableNetConnect();
Expand Down

0 comments on commit 9b26439

Please sign in to comment.