Skip to content

Commit

Permalink
First cut of @aws-lite/s3 PutObject method
Browse files Browse the repository at this point in the history
Specifically just the chunked / streamed upload part
  • Loading branch information
ryanblock committed Sep 27, 2023
1 parent 1b39179 commit 2cb10db
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 13 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
"src"
],
"workspaces": [
"plugins/dynamodb"
"plugins/dynamodb",
"plugins/s3"
],
"eslintConfig": {
"extends": "@architect/eslint-config"
Expand Down
21 changes: 21 additions & 0 deletions plugins/s3/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"name": "@aws-lite/s3",
"version": "0.0.0",
"description": "Official `aws-lite` plugin for S3",
"homepage": "https://github.com/architect/aws-lite",
"repository": {
"type": "git",
"url": "https://github.com/architect/aws-lite",
"directory": "plugins/s3"
},
"bugs": "https://github.com/architect/aws-lite/issues",
"main": "src/index.mjs",
"engines": {
"node": ">=16"
},
"author": "@architect",
"license": "Apache-2.0",
"files": [
"src"
]
}
17 changes: 17 additions & 0 deletions plugins/s3/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# `@aws-lite/s3`

> Official `aws-lite` plugin for S3
> Maintained by: [@architect](https://github.com/architect)

## Install

```sh
npm i @aws-lite/s3
```


## Learn more

Please see the [main `aws-lite` readme](https://github.com/architect/aws-lite) for more information about `aws-lite` plugins.
14 changes: 14 additions & 0 deletions plugins/s3/src/index.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import PutObject from './put-object.mjs'

const service = 's3'

/**
* Plugin maintained by: @architect
*/
export default {
service,
methods: {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
PutObject
}
}
153 changes: 153 additions & 0 deletions plugins/s3/src/put-object.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import aws4 from 'aws4'
import crypto from 'node:crypto'
import { readFile, stat } from 'node:fs/promises'
import { Readable } from 'node:stream'

const required = true

const minSize = 1024 * 1024 * 5
const intToHexString = int => String(Number(int).toString(16))
const algo = 'sha256', utf8 = 'utf8', hex = 'hex'
const hash = str => crypto.createHash(algo).update(str, utf8).digest(hex)
const hmac = (key, str, enc) => crypto.createHmac(algo, key).update(str, utf8).digest(enc)

let chunkBreak = `\r\n`
function payloadMetadata (chunkSize, signature) {
// Don't forget: after the signature + break would normally follow the body + one more break
return intToHexString(chunkSize) + `;chunk-signature=${signature}` + chunkBreak
}

// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
// https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
const PutObject = {
validate: {
Bucket: { type: 'string', required },
Key: { type: 'string', required },
file: { type: 'string', required },
headers: { type: 'object' },
minChunkSize: { type: 'number' },
},
request: async (params, utils) => {
let { Bucket, Key, file, headers = {}, minChunkSize } = params
let { credentials, region } = utils
minChunkSize = minChunkSize || minSize

let dataSize
try {
let stats = await stat(file)
dataSize = stats.size
}
catch (err) {
console.log(`Error reading file: ${file}`)
throw err
}

// TODO non-streaming upload
if (dataSize > minChunkSize) {
// We'll assemble file indices of chunks here
let chunks = [
// Reminder: no payload is sent with the canonical request
{ canonicalRequest: true },
]

// We'll need to compute all chunk sizes (including metadata) so that we can get the total content-length for the canonical request
let totalRequestSize = dataSize
let dummySig = 'a'.repeat(64)
let emptyHash = hash('')

// Multipart uploading requires an extra zero-data chunk to denote completion
let chunkAmount = Math.ceil(dataSize / minChunkSize) + 1

for (let i = 0; i < chunkAmount; i++) {
// Get start end byte position for streaming
let start = i === 0 ? 0 : i * minChunkSize
let end = (i * minChunkSize) + minChunkSize

let chunk = {}, chunkSize
// The last real chunk
if (end > dataSize) {
end = dataSize
}
// The 0-byte trailing chunk
if (start > dataSize) {
chunkSize = 0
chunk.finalRequest = true
}
// Normal
else {
chunkSize = end - start
chunk.start = start
chunk.end = end
}

totalRequestSize += payloadMetadata(chunkSize, dummySig).length +
chunkBreak.length
chunks.push({ ...chunk, chunkSize })
}

headers = {
...headers,
'content-encoding': 'aws-chunked',
'content-length': totalRequestSize,
'x-amz-content-sha256': 'STREAMING-AWS4-HMAC-SHA256-PAYLOAD',
'x-amz-decoded-content-length': dataSize,
}
let canonicalReq = aws4.sign({
service: 's3',
region,
method: 'PUT',
path: `/${Bucket}/${Key}`,
headers,
}, credentials)
let seedSignature = canonicalReq.headers.Authorization.split('Signature=')[1]
chunks[0].signature = seedSignature

let date = canonicalReq.headers['X-Amz-Date'] ||
canonicalReq.headers['x-amz-date']
let yyyymmdd = date.split('T')[0]
let payloadSigHeader = `AWS4-HMAC-SHA256-PAYLOAD\n` +
`${date}\n` +
`${yyyymmdd}/${canonicalReq.region}/s3/aws4_request\n`

// TODO make this streamable
let data = await readFile(file)
let stream = new Readable()
chunks.forEach((chunk, i) => {
if (chunk.canonicalRequest) return

// Ideally we'd use start/end with fs.createReadStream
let { start, end } = chunk
let body = chunk.finalRequest ? '' : data.slice(start, end)
let chunkHash = chunk.finalRequest ? emptyHash : hash(body)

let payloadSigValues = [
chunks[i - 1].signature, // Previous chunk signature
emptyHash, // Hash of an empty line ¯\_(ツ)_/¯
chunkHash, // Hash of the current chunk
].join('\n')
let signing = payloadSigHeader + payloadSigValues

// lol at this cascade of hmacs
let kDate = hmac('AWS4' + credentials.secretAccessKey, yyyymmdd)
let kRegion = hmac(kDate, region)
let kService = hmac(kRegion, 's3')
let kCredentials = hmac(kService, 'aws4_request')
let chunkSignature = hmac(kCredentials, signing, hex)

// Important: populate the signature for the next chunk down the line
chunks[i].signature = chunkSignature

// Now add the chunk to the stream
let part = payloadMetadata(chunk.chunkSize, chunkSignature) + body + chunkBreak
stream.push(part)

if (chunk.finalRequest) {
stream.push(null)
}
})
canonicalReq.stream = stream
return canonicalReq
}
},
}
export default PutObject
1 change: 1 addition & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ export default {
<!-- ! Do not remove plugins_start / plugins_end ! -->
<!-- plugins_start -->
- [DynamoDB](https://www.npmjs.com/package/@aws-lite/dynamodb)
- [S3](https://www.npmjs.com/package/@aws-lite/s3)
<!-- plugins_end -->
Expand Down
1 change: 1 addition & 0 deletions scripts/generate-plugins/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const cwd = process.cwd()
// - maintainers: array of GitHub handles of the individual(s) or org(s) responsible for maintaining the plugin
const plugins = [
{ name: 'dynamodb', service: 'DynamoDB', maintainers: [ '@architect' ] },
{ name: 's3', service: 'S3', maintainers: [ '@architect' ] },
].sort()
const pluginTmpl = readFileSync(join(__dirname, '_plugin-tmpl.mjs')).toString()
const readmeTmpl = readFileSync(join(__dirname, '_readme-tmpl.md')).toString()
Expand Down
34 changes: 24 additions & 10 deletions src/client-factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ let { awsjson } = require('./lib')
let { marshall, unmarshall } = require('./_vendor')
let errorHandler = require('./error')

let copy = obj => JSON.parse(JSON.stringify(obj))

// Never autoload these `@aws-lite/*` packages:
let ignored = [ 'client', 'arc' ]

Expand Down Expand Up @@ -82,7 +84,19 @@ module.exports = async function clientFactory (config, creds, region) {
throw ReferenceError(`All plugin error methods must be a function: ${service}`)
}
})
let pluginUtils = { awsjsonMarshall: marshall, awsjsonUnmarshall: unmarshall }

let credentials = copy(creds)
Object.defineProperty(config, 'secretAccessKey', { enumerable: false })
Object.defineProperty(config, 'secretAccessKey', { enumerable: false })
Object.defineProperty(credentials, 'sessionToken', { enumerable: false })
Object.defineProperty(credentials, 'sessionToken', { enumerable: false })
let pluginUtils = {
awsjsonMarshall: marshall,
awsjsonUnmarshall: unmarshall,
config: copy(config),
credentials,
region,
}
let clientMethods = {}
Object.entries(methods).forEach(([ name, method ]) => {
// For convenient error reporting (and jic anyone wants to enumerate everything) try to ensure the AWS API method names pass through
Expand All @@ -92,38 +106,38 @@ module.exports = async function clientFactory (config, creds, region) {

// Run plugin.request()
try {
var result = await method.request(input, pluginUtils)
result = result || {}
var req = await method.request(input, pluginUtils)
req = req || {}
}
catch (methodError) {
errorHandler({ error: methodError, metadata })
}

// Hit plugin.validate
let params = { ...input, ...result }
let params = { ...input, ...req }
if (method.validate) {
validateInput(method.validate, params, metadata)
}

// Make the request
try {
let response = await request({ ...params, ...result, service }, creds, selectedRegion, config, metadata)
let response = await request({ ...params, service }, creds, selectedRegion, config, metadata)

// Run plugin.response()
/* istanbul ignore next */ // TODO remove as soon as plugin.response() API settles
if (method.response) {
try {
var result = await method.response(response, pluginUtils)
if (result && result.response === undefined) {
var pluginRes = await method.response(response, pluginUtils)
if (pluginRes && pluginRes.response === undefined) {
throw TypeError('Response plugins must return a response property')
}
}
catch (methodError) {
errorHandler({ error: methodError, metadata })
}
response = result?.awsjson
? awsjson.unmarshall(result.response, result.awsjson)
: result?.response || response
response = pluginRes?.awsjson
? awsjson.unmarshall(pluginRes.response, pluginRes.awsjson)
: pluginRes?.response || response
}
return response
}
Expand Down
7 changes: 5 additions & 2 deletions src/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ module.exports = function request (params, creds, region, config, metadata) {
return new Promise((resolve, reject) => {

// Path
params.path = params.endpoint || '/'
// Note: params.path may be passed if the request is coming from a plugin that pre-signed with aws4
params.path = params.endpoint || params.path || '/'
if (!params.path.startsWith('/')) {
params.path = '/' + params.path
}
Expand Down Expand Up @@ -147,6 +148,8 @@ module.exports = function request (params, creds, region, config, metadata) {
port: options.port,
}
}))
req.end(options.body || '')
/* istanbul ignore next */ // TODO remove and test
if (options.stream) options.stream.pipe(req)
else req.end(options.body || '')
})
}

0 comments on commit 2cb10db

Please sign in to comment.