Skip to content

Commit

Permalink
fix: Migrate to AWS SDK v3
Browse files Browse the repository at this point in the history
  • Loading branch information
baumandm committed Mar 17, 2023
1 parent 22c9ee7 commit f4f16e4
Show file tree
Hide file tree
Showing 9 changed files with 2,710 additions and 463 deletions.
2,971 changes: 2,609 additions & 362 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
"lint:fix": "eslint . --fix"
},
"dependencies": {
"@aws-sdk/client-s3": "3.292.0",
"@aws-sdk/client-sqs": "3.292.0",
"@elastic/elasticsearch": "7.13.0",
"@octokit/graphql": "4.8.0",
"@octokit/rest": "19.0.7",
"apollo-server-core": "3.8.1",
"apollo-server-express": "3.7.0",
"aws-sdk": "2.1317.0",
"axios": "0.27.2",
"better-queue": "3.8.10",
"chardet": "1.4.0",
Expand Down
25 changes: 13 additions & 12 deletions packages/backend/src/controllers/avatars.v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
* limitations under the License.
*/

import { NoSuchKey } from '@aws-sdk/client-s3';
import { getLogger } from '@iex/shared/logger';
import { AWSError } from 'aws-sdk';
import { Response, Request } from 'express';

import { streamFromS3 } from '../lib/storage';
Expand All @@ -31,20 +31,21 @@ export const getAvatar = async (req: Request, res: Response): Promise<void> => {
const path = `avatars/${key}`;
logger.debug(`Attempting to load avatar: ${path}`);

const readable = await streamFromS3(path);
readable.on('error', (error: AWSError) => {
if (error.code == 'NoSuchKey') {
try {
const readable = await streamFromS3(path);

if (req.query['content-type']) {
res.contentType(req.query['content-type'] as string);
} else {
res.contentType(getType(key));
}

readable.pipe(res);
} catch (error) {
if (error instanceof NoSuchKey) {
res.status(404).send();
} else {
res.status(500).send(error);
}
});

if (req.query['content-type']) {
res.contentType(req.query['content-type'] as string);
} else {
res.contentType(getType(key));
}

readable.pipe(res);
};
21 changes: 11 additions & 10 deletions packages/backend/src/controllers/drafts.v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
* limitations under the License.
*/

import { NoSuchKey } from '@aws-sdk/client-s3';
import { getLogger } from '@iex/shared/logger';
import { AWSError } from 'aws-sdk';
import { Response, Request } from 'express';

import { streamFromS3 } from '../lib/storage';
Expand All @@ -29,18 +29,19 @@ export const getDraftAttachment = async (req: Request, res: Response): Promise<v
const draftKey = `drafts/${req.params.draftKey}/files/${req.params.attachmentKey}`;
logger.debug(`Attempting to load draft attachment: ${draftKey}, ${req.params.attachmentKey}`);

const readable = await streamFromS3(draftKey);
readable.on('error', (error: AWSError) => {
if (error.code == 'NoSuchKey') {
try {
const readable = await streamFromS3(draftKey);

if (req.query['content-type']) {
res.contentType(req.query['content-type'] as string);
}

readable.pipe(res);
} catch (error) {
if (error instanceof NoSuchKey) {
res.status(404).send();
} else {
res.status(500).send(error);
}
});

if (req.query['content-type']) {
res.contentType(req.query['content-type'] as string);
}

readable.pipe(res);
};
59 changes: 31 additions & 28 deletions packages/backend/src/controllers/insights.v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
* limitations under the License.
*/

import { Readable } from 'node:stream';

import { NoSuchKey } from '@aws-sdk/client-s3';
import { getLogger } from '@iex/shared/logger';
import { AWSError } from 'aws-sdk';
import { Response, Request } from 'express';

import { getFromS3, headFromS3 } from '../lib/storage';
Expand Down Expand Up @@ -59,31 +61,32 @@ export const getInsightFile = async (req: Request, res: Response): Promise<void>

const range = Array.isArray(req.headers['range']) ? req.headers['range'][0] : req.headers['range'];

const response = getFromS3(key, range);

response
.on('httpHeaders', function (code, headers) {
if (code < 300) {
res.set('Accept-Ranges', headers['accept-ranges']);
res.set('Content-Length', headers['content-length']);

// res.set('Content-Type', headers['content-type']);
res.contentType(getType(filePath));

if (range) {
res.set('Content-Range', headers['content-range']);
res.status(206); // Partial Content success
}
}
})
.createReadStream()
.on('error', (error: AWSError) => {
if (error.code == 'NoSuchKey') {
res.status(404).send();
} else {
logger.error('S3 error: ' + error);
res.status(500).send(error);
}
})
.pipe(res);
try {
const response = await getFromS3(key, range);

if (response.AcceptRanges) {
res.set('Accept-Ranges', response.AcceptRanges);
}
if (response.ContentLength) {
res.set('Content-Length', response.ContentLength.toString());
}

//res.set('Content-Type', response.ContentType);
res.contentType(getType(filePath));

if (range) {
res.set('Content-Range', response.ContentRange);
res.status(206);
}

if (response.Body) {
(response.Body as Readable).pipe(res);
}
} catch (error) {
if (error instanceof NoSuchKey) {
res.status(404).send();
} else {
res.status(500).send(error);
}
}
};
73 changes: 33 additions & 40 deletions packages/backend/src/lib/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,34 @@
* limitations under the License.
*/

import {
GetObjectCommand,
GetObjectCommandOutput,
HeadObjectCommand,
HeadObjectCommandOutput,
NotFound,
PutObjectCommand,
S3Client,
S3ClientConfig
} from '@aws-sdk/client-s3';
import { getLogger } from '@iex/shared/logger';
import { S3 } from 'aws-sdk';
import type { AWSError, Request } from 'aws-sdk';
import type { HeadObjectOutput } from 'aws-sdk/clients/s3';
import type { ReadStream } from 'fs-extra';

const logger = getLogger('storage');

const defaultOptions: S3.Types.ClientConfiguration = {
const defaultOptions: S3ClientConfig = {
region: process.env.S3_REGION,
maxRetries: 3,
maxAttempts: 4,

endpoint: process.env.S3_ENDPOINT !== '' ? process.env.S3_ENDPOINT : undefined,

// S3 Path-style requests are deprecated
// But some S3-compatible APIs may use them (e.g. Minio)
s3ForcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true' ? true : undefined
forcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true' ? true : undefined
};

export function createS3Client(options: S3.Types.ClientConfiguration = defaultOptions): S3 {
return new S3({ ...defaultOptions, ...options });
export function createS3Client(options: S3ClientConfig = defaultOptions): S3Client {
return new S3Client({ ...defaultOptions, ...options });
}

export const defaultS3Client = createS3Client();
Expand All @@ -46,12 +53,12 @@ export const defaultS3Client = createS3Client();
* @param {Buffer | String} body file content to write to S3
* @param {string} key S3 bucket key to write to
* @returns {string} S3 bucket URI to the uploaded file
* @throws {AWSError} If putobject request fails
* @throws {Error} If putobject request fails
*/
export async function writeToS3(body: Buffer | string, key: string): Promise<string> {
const bucket = process.env.S3_BUCKET!;

const response = await defaultS3Client.putObject({ Body: body, Bucket: bucket, Key: key }).promise();
const response = await defaultS3Client.send(new PutObjectCommand({ Body: body, Bucket: bucket, Key: key }));
const uri = `s3://${bucket}/${key}`;

logger.info(`S3 file successfully uploaded with Etag: ${response.ETag} and URI: ${uri}`);
Expand All @@ -66,36 +73,20 @@ export async function writeToS3(body: Buffer | string, key: string): Promise<str
* @param {number} fileSize Size of the file
* @param {string} key S3 bucket key to write to
* @returns {string} S3 bucket URI to the uploaded file
* @throws {AWSError} If putobject request fails
* @throws {Error} If putobject request fails
*/
export async function streamToS3(stream: ReadStream, fileSize: number, key: string): Promise<string> {
const bucket = process.env.S3_BUCKET!;

const response = await defaultS3Client
.upload({ Body: stream, Bucket: bucket, ContentLength: fileSize, Key: key })
.promise();
const response = await defaultS3Client.send(
new PutObjectCommand({ Body: stream, Bucket: bucket, ContentLength: fileSize, Key: key })
);
const uri = `s3://${bucket}/${key}`;

logger.info(`S3 file successfully uploaded with Etag: ${response.ETag} and URI: ${uri}`);
return uri;
}

/**
* Reads data from the Insights Explorer S3 bucket.
*
* @note The function executes a getObject() request
* @param {string} key Key to get file from bucket
* @returns {Promise<Buffer>} Returns requested S3 buffer
*/
export async function readFromS3(key: string): Promise<Buffer> {
const bucket = process.env.S3_BUCKET!;

logger.info(`Streaming from s3://${bucket}/${key}`);
const file = await defaultS3Client.getObject({ Bucket: bucket, Key: key }).promise();

return Buffer.from(file.Body as Buffer);
}

/**
* Streams data from the Insights Explorer S3 bucket.
*
Expand All @@ -109,9 +100,9 @@ export async function streamFromS3(key: string, range?: string): Promise<ReadStr

logger.info(`Streaming from s3://${bucket}/${key}`);

const response = defaultS3Client.getObject({ Bucket: bucket, Key: key, Range: range });
const response = await defaultS3Client.send(new GetObjectCommand({ Bucket: bucket, Key: key, Range: range }));

return response.createReadStream() as ReadStream;
return response.Body as unknown as ReadStream;
}

/**
Expand All @@ -120,14 +111,14 @@ export async function streamFromS3(key: string, range?: string): Promise<ReadStr
* @note The function executes a getObject() request
* @param {string} key Key to get file from bucket
* @range {string} range Optional range to retrieve
* @returns {Request<S3.Types.GetObjectOutput, AWSError>} Returns requested S3 GetObject response
* @returns {GetObjectCommandOutput} Returns requested S3 GetObject response
*/
export function getFromS3(key: string, range?: string): Request<S3.Types.GetObjectOutput, AWSError> {
export function getFromS3(key: string, range?: string): Promise<GetObjectCommandOutput> {
const bucket = process.env.S3_BUCKET!;

logger.info(`Streaming from s3://${bucket}/${key}`);

return defaultS3Client.getObject({ Bucket: bucket, Key: key, Range: range });
return defaultS3Client.send(new GetObjectCommand({ Bucket: bucket, Key: key, Range: range }));
}

/**
Expand All @@ -137,14 +128,16 @@ export function getFromS3(key: string, range?: string): Request<S3.Types.GetObje
* @param {string} key Key of file to check in bucket
* @returns {Request<S3.Types.HeadObjectOutput, AWSError>} Returns requested S3 HeadObject response
*/
export async function headFromS3(key: string): Promise<HeadObjectOutput | undefined> {
export async function headFromS3(key: string): Promise<HeadObjectCommandOutput | undefined> {
const bucket = process.env.S3_BUCKET!;

logger.info(`Checking existance of s3://${bucket}/${key}`);
try {
return await defaultS3Client.headObject({ Bucket: bucket, Key: key }).promise();
return await defaultS3Client.send(new HeadObjectCommand({ Bucket: bucket, Key: key }));
} catch (error: any) {
if (error.code == 'NotFound') return undefined;
if (error instanceof NotFound) return undefined;

logger.error(JSON.stringify(error, null, 2));
throw error;
}
}
Expand All @@ -161,10 +154,10 @@ export async function existsInS3(key: string): Promise<boolean> {

logger.info(`Checking existance of s3://${bucket}/${key}`);
try {
await defaultS3Client.headObject({ Bucket: bucket, Key: key }).promise();
await defaultS3Client.send(new HeadObjectCommand({ Bucket: bucket, Key: key }));
return true;
} catch (error: any) {
if (error.code == 'NotFound') return false;
if (error instanceof NotFound) return false;
throw error;
}
}
4 changes: 2 additions & 2 deletions packages/mq/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"dist"
],
"dependencies": {
"aws-sdk": "2.1317.0",
"sqs-consumer": "5.6.0"
"@aws-sdk/client-sqs": "3.292.0",
"sqs-consumer": "7.0.2"
}
}
15 changes: 8 additions & 7 deletions packages/mq/src/message-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
* limitations under the License.
*/

import type { Message } from '@aws-sdk/client-sqs';
import { SendMessageCommand, SQSClient } from '@aws-sdk/client-sqs';
import { getLogger } from '@iex/shared/logger';
import { SQS } from 'aws-sdk';
import { Consumer } from 'sqs-consumer';

const logger = getLogger('message-queue');
Expand All @@ -25,25 +26,25 @@ export interface SQSMessageQueueOptions {
queueUrl: string;
}
export class MessageQueue {
private sqsClient: SQS;
private sqsClient: SQSClient;

constructor(readonly options: SQSMessageQueueOptions) {
this.sqsClient = new SQS({
this.sqsClient = new SQSClient({
region: options.region
});
}

async sendMessage(body: Record<string, any>): Promise<string | undefined> {
const response = await this.sqsClient
.sendMessage({
const response = await this.sqsClient.send(
new SendMessageCommand({
MessageBody: JSON.stringify(body),
QueueUrl: this.options.queueUrl
})
.promise();
);
return response.MessageId;
}

consumeMessages<T extends Record<string, any>>(handler: (body: T, message: SQS.Message) => Promise<void>): Consumer {
consumeMessages<T extends Record<string, any>>(handler: (body: T, message: Message) => Promise<void>): Consumer {
const consumer = Consumer.create({
queueUrl: this.options.queueUrl,
region: this.options.region,
Expand Down
2 changes: 1 addition & 1 deletion packages/shared/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"dist"
],
"dependencies": {
"aws-sdk": "2.1317.0",
"@aws-sdk/client-s3": "3.292.0",
"fs-extra": "10.1.0",
"lodash": "4.17.21",
"parsimmon": "1.18.1",
Expand Down

0 comments on commit f4f16e4

Please sign in to comment.