Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 44 additions & 32 deletions config/image-cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const IMAGE_RESIZE_WIDTH = parseInt(process.env['IMAGE_CACHE_RESIZE_WIDTH'] ?? '
const GCS_BUCKET_NAME = process.env['IMAGE_CACHE_GCS_BUCKET_NAME'];
const GCS_OBJECT_NAME_PREFIX = process.env['IMAGE_CACHE_GCS_OBJECT_NAME_PREFIX'];
const CDN_BASE_PATH = process.env['IMAGE_CACHE_CDN_BASE_PATH'];
const TIMEOUT = parseInt(process.env['METADATA_FETCH_TIMEOUT_MS'] ?? '30');
const TIMEOUT = parseInt(process.env['METADATA_FETCH_TIMEOUT_MS'] ?? '30000');
const MAX_REDIRECTIONS = parseInt(process.env['METADATA_FETCH_MAX_REDIRECTIONS'] ?? '0');
const MAX_RESPONSE_SIZE = parseInt(process.env['IMAGE_CACHE_MAX_BYTE_SIZE'] ?? '-1');

Expand All @@ -47,35 +47,29 @@ async function getGcsAuthToken() {
{
method: 'GET',
headers: { 'Metadata-Flavor': 'Google' },
throwOnError: true,
}
);
const json = await response.body.json();
if (response.statusCode === 200 && json.access_token) {
// Cache the token so we can reuse it for other images.
process.env['IMAGE_CACHE_GCS_AUTH_TOKEN'] = json.access_token;
return json.access_token;
}
throw new Error(`GCS access token not found ${response.statusCode}: ${json}`);
// Cache the token so we can reuse it for other images.
process.env['IMAGE_CACHE_GCS_AUTH_TOKEN'] = json.access_token;
return json.access_token;
} catch (error) {
throw new Error(`Error fetching GCS access token: ${error.message}`);
throw new Error(`GCS access token error: ${error}`);
Comment on lines +54 to +58
Copy link
Contributor

@zone117x zone117x May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The @google-cloud/storage library can be used to handle the auth and upload a lot easier, it would be like 3-4 lines, I'm using it over here https://github.com/hirosystems/stacks-node-crawler/blob/720d4f9f79155a84a19c3d1028f0df1a54b3fbfb/src/index.ts#L5

E.g.

import { Storage } from '@google-cloud/storage';
const bucket = new Storage().bucket(myBucket); // auth handled automatically
const uploadStream = bucket.file(myfile).createWriteStream({ contentType: 'image/png' });
stream.pipe(uploadStream);

}
}

async function upload(stream, name, authToken) {
try {
const response = await request(
`https://storage.googleapis.com/upload/storage/v1/b/${GCS_BUCKET_NAME}/o?uploadType=media&name=${GCS_OBJECT_NAME_PREFIX}${name}`,
{
method: 'POST',
body: stream,
headers: { 'Content-Type': 'image/png', Authorization: `Bearer ${authToken}` },
}
);
if (response.statusCode !== 200) throw new Error(`GCS error: ${response.statusCode}`);
return `${CDN_BASE_PATH}${name}`;
} catch (error) {
throw new Error(`Error uploading ${name}: ${error.message}`);
}
await request(
`https://storage.googleapis.com/upload/storage/v1/b/${GCS_BUCKET_NAME}/o?uploadType=media&name=${GCS_OBJECT_NAME_PREFIX}${name}`,
{
method: 'POST',
body: stream,
headers: { 'Content-Type': 'image/png', Authorization: `Bearer ${authToken}` },
throwOnError: true,
}
);
return `${CDN_BASE_PATH}${name}`;
}

fetch(
Expand All @@ -86,15 +80,13 @@ fetch(
bodyTimeout: TIMEOUT,
maxRedirections: MAX_REDIRECTIONS,
maxResponseSize: MAX_RESPONSE_SIZE,
throwOnError: true,
connect: {
rejectUnauthorized: false, // Ignore SSL cert errors.
},
}),
},
({ statusCode, body }) => {
if (statusCode !== 200) throw new Error(`Failed to fetch image: ${statusCode}`);
return body;
}
({ body }) => body
)
.then(async response => {
const imageReadStream = Readable.fromWeb(response.body);
Expand All @@ -115,21 +107,41 @@ fetch(
upload(fullSizeTransform, `${CONTRACT_PRINCIPAL}/${TOKEN_NUMBER}.png`, authToken),
upload(thumbnailTransform, `${CONTRACT_PRINCIPAL}/${TOKEN_NUMBER}-thumb.png`, authToken),
]);
// The API will read these strings as CDN URLs.
for (const result of results) console.log(result);
for (const r of results) console.log(r);
break;
} catch (error) {
if (
(error.message.endsWith('403') || error.message.endsWith('401')) &&
!didRetryUnauthorized
!didRetryUnauthorized &&
error.cause &&
error.cause.code == 'UND_ERR_RESPONSE_STATUS_CODE' &&
(error.cause.statusCode === 401 || error.cause.statusCode === 403)
) {
// Force a dynamic token refresh and try again.
// GCS token is probably expired. Force a token refresh before trying again.
process.env['IMAGE_CACHE_GCS_AUTH_TOKEN'] = undefined;
didRetryUnauthorized = true;
} else throw error;
}
}
})
.catch(error => {
throw new Error(`Error fetching image: ${error}`);
console.error(error);
// TODO: Handle `Input buffer contains unsupported image format` error from sharp when the image
// is actually a video or another media file.
let exitCode = 1;
if (
error.cause &&
(error.cause.code == 'UND_ERR_HEADERS_TIMEOUT' ||
error.cause.code == 'UND_ERR_BODY_TIMEOUT' ||
error.cause.code == 'UND_ERR_CONNECT_TIMEOUT' ||
error.cause.code == 'ECONNRESET')
) {
exitCode = 2;
} else if (
error.cause &&
error.cause.code == 'UND_ERR_RESPONSE_STATUS_CODE' &&
error.cause.statusCode === 429
) {
exitCode = 3;
}
process.exit(exitCode);
});
4 changes: 2 additions & 2 deletions src/token-processor/queue/job/process-token-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { StacksNodeRpcClient } from '../../stacks-node/stacks-node-rpc-client';
import { StacksNodeClarityError, TooManyRequestsHttpError } from '../../util/errors';
import {
fetchAllMetadataLocalesFromBaseUri,
getFetchableUrl,
getFetchableDecentralizedStorageUrl,
getTokenSpecificUri,
} from '../../util/metadata-helpers';
import { RetryableJobError } from '../errors';
Expand Down Expand Up @@ -214,7 +214,7 @@ export class ProcessTokenJob extends Job {
return;
}
// Before we return the uri, check if its fetchable hostname is not already rate limited.
const fetchable = getFetchableUrl(uri);
const fetchable = getFetchableDecentralizedStorageUrl(uri);
const rateLimitedHost = await this.db.getRateLimitedHost({ hostname: fetchable.hostname });
if (rateLimitedHost) {
const retryAfter = Date.parse(rateLimitedHost.retry_after);
Expand Down
108 changes: 77 additions & 31 deletions src/token-processor/util/image-cache.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,81 @@
import * as child_process from 'child_process';
import { ENV } from '../../env';
import { MetadataParseError } from './errors';
import { parseDataUrl, getFetchableUrl } from './metadata-helpers';
import { MetadataParseError, MetadataTimeoutError, TooManyRequestsHttpError } from './errors';
import { parseDataUrl, getFetchableDecentralizedStorageUrl } from './metadata-helpers';
import { logger } from '@hirosystems/api-toolkit';
import { PgStore } from '../../pg/pg-store';
import { errors } from 'undici';
import { RetryableJobError } from '../queue/errors';

/**
* If an external image processor script is configured, then it will process the given image URL for
* the purpose of caching on a CDN (or whatever else it may be created to do). The script is
* expected to return a new URL for the image. If the script is not configured, then the original
* URL is returned immediately. If a data-uri is passed, it is also immediately returned without
* being passed to the script.
* If an external image processor script is configured in the `METADATA_IMAGE_CACHE_PROCESSOR` ENV
* var, this function will process the given image URL for the purpose of caching on a CDN (or
* whatever else it may be created to do). The script is expected to return a new URL for the image
* via `stdout`, with an optional 2nd line with another URL for a thumbnail version of the same
* cached image. If the script is not configured, then the original URL is returned immediately. If
* a data-uri is passed, it is also immediately returned without being passed to the script.
*
* The Image Cache script must return a status code of `0` to mark a successful cache. Other code
* returns available are:
* * `1`: A generic error occurred. Cache should not be retried.
* * `2`: Image fetch timed out before caching was possible. Should be retried.
* * `3`: Image fetch failed due to rate limits from the remote server. Should be retried.
*/
export async function processImageUrl(
export async function processImageCache(
imgUrl: string,
contractPrincipal: string,
tokenNumber: bigint
): Promise<string[]> {
const imageCacheProcessor = ENV.METADATA_IMAGE_CACHE_PROCESSOR;
if (!imageCacheProcessor) {
return [imgUrl];
}
if (imgUrl.startsWith('data:')) {
return [imgUrl];
if (!imageCacheProcessor || imgUrl.startsWith('data:')) return [imgUrl];
logger.info(`ImageCache processing token ${contractPrincipal} (${tokenNumber}) at ${imgUrl}`);
const { code, stdout, stderr } = await callImageCacheScript(
imageCacheProcessor,
imgUrl,
contractPrincipal,
tokenNumber
);
switch (code) {
case 0:
try {
const urls = stdout
.trim()
.split('\n')
.map(r => new URL(r).toString());
logger.info(urls, `ImageCache processed token ${contractPrincipal} (${tokenNumber})`);
return urls;
} catch (error) {
// The script returned a code `0` but the results are invalid. This could happen because of
// an unknown script error so we should mark it as retryable.
throw new RetryableJobError(
`ImageCache unknown error`,
new Error(`Invalid cached url for ${imgUrl}: ${stdout}, stderr: ${stderr}`)
);
}
case 2:
throw new RetryableJobError(`ImageCache fetch timed out`, new MetadataTimeoutError(imgUrl));
case 3:
throw new RetryableJobError(
`ImageCache fetch rate limited`,
new TooManyRequestsHttpError(new URL(imgUrl), new errors.ResponseStatusCodeError())
);
default:
throw new Error(`ImageCache script error (code ${code}): ${stderr}`);
}
}

logger.info(`ImageCache processing token ${contractPrincipal} (${tokenNumber}) at ${imgUrl}`);
async function callImageCacheScript(
imageCacheProcessor: string,
imgUrl: string,
contractPrincipal: string,
tokenNumber: bigint
): Promise<{
code: number;
stdout: string;
stderr: string;
}> {
const repoDir = process.cwd();
const { code, stdout, stderr } = await new Promise<{
return await new Promise<{
code: number;
stdout: string;
stderr: string;
Expand All @@ -37,26 +85,24 @@ export async function processImageUrl(
[imgUrl, contractPrincipal, tokenNumber.toString()],
{ cwd: repoDir }
);
let code = 0;
let stdout = '';
let stderr = '';
cp.stdout.on('data', data => (stdout += data));
cp.stderr.on('data', data => (stderr += data));
cp.on('close', code => resolve({ code: code ?? 0, stdout, stderr }));
cp.on('close', _ => resolve({ code, stdout, stderr }));
cp.on('exit', processCode => {
code = processCode ?? 0;
});
});
if (code !== 0) throw new Error(`ImageCache error: ${stderr}`);
const result = stdout.trim().split('\n');
logger.info(result, `ImageCache processed token ${contractPrincipal} (${tokenNumber})`);

try {
return result.map(r => new URL(r).toString());
} catch (error) {
throw new Error(
`Image processing script returned an invalid url for ${imgUrl}: ${result}, stderr: ${stderr}`
);
}
}

export function getImageUrl(uri: string): string {
/**
* Converts a raw image URI from metadata into a fetchable URL.
* @param uri - Original image URI
* @returns Normalized URL string
*/
export function normalizeImageUri(uri: string): string {
// Support images embedded in a Data URL
if (uri.startsWith('data:')) {
const dataUrl = parseDataUrl(uri);
Expand All @@ -68,7 +114,7 @@ export function getImageUrl(uri: string): string {
}
return uri;
}
const fetchableUrl = getFetchableUrl(uri);
const fetchableUrl = getFetchableDecentralizedStorageUrl(uri);
return fetchableUrl.toString();
}

Expand All @@ -81,8 +127,8 @@ export async function reprocessTokenImageCache(
const imageUris = await db.getTokenImageUris(contractPrincipal, tokenIds);
for (const token of imageUris) {
try {
const [cached, thumbnail] = await processImageUrl(
getFetchableUrl(token.image).toString(),
const [cached, thumbnail] = await processImageCache(
getFetchableDecentralizedStorageUrl(token.image).toString(),
contractPrincipal,
BigInt(token.token_number)
);
Expand Down
14 changes: 7 additions & 7 deletions src/token-processor/util/metadata-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
TooManyRequestsHttpError,
} from './errors';
import { RetryableJobError } from '../queue/errors';
import { getImageUrl, processImageUrl } from './image-cache';
import { normalizeImageUri, processImageCache } from './image-cache';
import {
RawMetadataLocale,
RawMetadataLocalizationCType,
Expand Down Expand Up @@ -134,8 +134,8 @@ async function parseMetadataForInsertion(
let cachedImage: string | undefined;
let cachedThumbnailImage: string | undefined;
if (image && typeof image === 'string') {
const normalizedUrl = getImageUrl(image);
[cachedImage, cachedThumbnailImage] = await processImageUrl(
const normalizedUrl = normalizeImageUri(image);
[cachedImage, cachedThumbnailImage] = await processImageCache(
normalizedUrl,
contract.principal,
token.token_number
Expand Down Expand Up @@ -255,7 +255,7 @@ export async function getMetadataFromUri(token_uri: string): Promise<RawMetadata
}

// Support HTTP/S URLs otherwise
const httpUrl = getFetchableUrl(token_uri);
const httpUrl = getFetchableDecentralizedStorageUrl(token_uri);
const urlStr = httpUrl.toString();
let fetchImmediateRetryCount = 0;
let content: string | undefined;
Expand All @@ -270,7 +270,7 @@ export async function getMetadataFromUri(token_uri: string): Promise<RawMetadata
} catch (error) {
fetchImmediateRetryCount++;
fetchError = error;
if (error instanceof MetadataTimeoutError && isUriFromDecentralizedGateway(token_uri)) {
if (error instanceof MetadataTimeoutError && isUriFromDecentralizedStorage(token_uri)) {
// Gateways like IPFS and Arweave commonly time out when a resource can't be found quickly.
// Try again later if this is the case.
throw new RetryableJobError(`Gateway timeout for ${urlStr}`, error);
Expand Down Expand Up @@ -314,7 +314,7 @@ function parseJsonMetadata(url: string, content?: string): RawMetadata {
* @param uri - URL to convert
* @returns Fetchable URL
*/
export function getFetchableUrl(uri: string): URL {
export function getFetchableDecentralizedStorageUrl(uri: string): URL {
try {
const parsedUri = new URL(uri);
if (parsedUri.protocol === 'http:' || parsedUri.protocol === 'https:') return parsedUri;
Expand All @@ -334,7 +334,7 @@ export function getFetchableUrl(uri: string): URL {
throw new MetadataParseError(`Unsupported uri protocol: ${uri}`);
}

function isUriFromDecentralizedGateway(uri: string): boolean {
function isUriFromDecentralizedStorage(uri: string): boolean {
return (
uri.startsWith('ipfs:') ||
uri.startsWith('ipns:') ||
Expand Down
12 changes: 7 additions & 5 deletions tests/image-cache.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ENV } from '../src/env';
import { processImageUrl } from '../src/token-processor/util/image-cache';
import { processImageCache } from '../src/token-processor/util/image-cache';

describe('Image cache', () => {
const contract = 'SP3QSAJQ4EA8WXEDSRRKMZZ29NH91VZ6C5X88FGZQ.crashpunks-v2';
Expand All @@ -11,7 +11,7 @@ describe('Image cache', () => {
});

test('transforms image URL correctly', async () => {
const transformed = await processImageUrl(url, contract, tokenNumber);
const transformed = await processImageCache(url, contract, tokenNumber);
expect(transformed).toStrictEqual([
'http://cloudflare-ipfs.com/test/image.png?processed=true',
'http://cloudflare-ipfs.com/test/image.png?processed=true&thumb=true',
Expand All @@ -20,18 +20,20 @@ describe('Image cache', () => {

test('ignores data: URL', async () => {
const url = 'data:123456';
const transformed = await processImageUrl(url, contract, tokenNumber);
const transformed = await processImageCache(url, contract, tokenNumber);
expect(transformed).toStrictEqual(['data:123456']);
});

test('ignores empty script paths', async () => {
ENV.METADATA_IMAGE_CACHE_PROCESSOR = '';
const transformed = await processImageUrl(url, contract, tokenNumber);
const transformed = await processImageCache(url, contract, tokenNumber);
expect(transformed).toStrictEqual([url]);
});

test('handles script errors', async () => {
ENV.METADATA_IMAGE_CACHE_PROCESSOR = './tests/test-image-cache-error.js';
await expect(processImageUrl(url, contract, tokenNumber)).rejects.toThrow(/ImageCache error/);
await expect(processImageCache(url, contract, tokenNumber)).rejects.toThrow(
/ImageCache script error/
);
});
});
Loading