Permalink
Browse files

first commit

  • Loading branch information...
0 parents commit 27e9901cf0eeae2f70b6ff2ddb9b898e136d98b9 @hughsk committed Jan 21, 2014
Showing with 370 additions and 0 deletions.
  1. +21 −0 LICENSE.md
  2. +57 −0 README.md
  3. +133 −0 index.js
  4. +42 −0 package.json
  5. +5 −0 test-config.js
  6. +112 −0 test.js
@@ -0,0 +1,21 @@
+## The MIT License (MIT) ##
+
+Copyright (c) 2014 Hugh Kennedy
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
@@ -0,0 +1,57 @@
+# s3-write-stream [![Flattr this!](https://api.flattr.com/button/flattr-badge-large.png)](https://flattr.com/submit/auto?user_id=hughskennedy&url=http://github.com/hughsk/s3-write-stream&title=s3-write-stream&description=hughsk/s3-write-stream%20on%20GitHub&language=en_GB&tags=flattr,github,javascript&category=software)[![experimental](http://hughsk.github.io/stability-badges/dist/experimental.svg)](http://github.com/hughsk/stability-badges) #
+
+Pipe data straight to an S3 key of your choice.
+
+This is a writeable stream that takes data and uploads it to Amazon S3 using
+its [multipart upload API](http://aws.amazon.com/about-aws/whats-new/2010/11/10/Amazon-S3-Introducing-Multipart-Upload/).
+This is ideal for handling generated content without needing to know the
+content's length ahead of time, and without resorting to file system hacks or
+buffering everything before the upload.
+
+Internally, there's a fibonacci backoff handling errors, stopping the stray failed requests which tend to tear apart long-running S3 uploads.
+
+The end result is that uploading files to S3 is as simple as this:
+
+``` javascript
+var fs = require('fs')
+var upload = require('s3-write-stream')({
+ accessKeyId: process.env.AWS_ACCESS_KEY
+ , secretAccessKey: process.env.AWS_SECRET_KEY
+ , Bucket: 'photo-album'
+})
+
+fs.createWriteStream(__dirname + '/photo_001.jpg')
+ .pipe(upload('images/photo_001.jpg'))
+```
+
+## Usage ##
+
+[![s3-write-stream](https://nodei.co/npm/s3-write-stream.png?mini=true)](https://nodei.co/npm/s3-write-stream)
+
+### `createStream = require('s3-write-stream')(opts)` ###
+
+Initiates the `s3-write-stream` module with your AWS configuration. The
+following properties are required:
+
+* `opts.accessKeyId`: your AWS access key id.
+* `opts.secretAccessKey`: your AWS secret access id.
+
+It's also recommended that you include `opts.Bucket` to define the default
+S3 bucket you want to upload to.
+
+### `createStream(key|opts)` ###
+
+Creates and returns a writeable stream, that you can pipe to upload to. You
+can either:
+
+* pass the upload's `key` as a string to determine the location you
+ want to upload to. By default, files uploaded this way will be public.
+* pass in an `opts` object, which will pass those parameters on to the
+ initial upload call via [`aws-sdk`](https://github.com/aws/aws-sdk-js).
+
+Note that if you haven't already specified a default bucket, you'll need to do
+so here and hence will need to use `opts`.
+
+## License ##
+
+MIT. See [LICENSE.md](http://github.com/hughsk/s3-write-stream/blob/master/LICENSE.md) for details.
@@ -0,0 +1,133 @@
+var through2 = require('through2')
+var backoff = require('backoff')
+var AWS = require('aws-sdk')
+var bl = require('bl')
+var maxSize = 5 * 1024 * 1024
+
+module.exports = factory
+
+function factory(config) {
+ config = config || {}
+ config.apiVersion = config.apiVersion || 'latest'
+
+ var s3 = new AWS.S3(config)
+ var boSettings = {
+ initialDelay: 500
+ , maxDelay: 10000
+ }
+
+ var countIn = 0
+ var countOut = 0
+
+ return function createStream(dest) {
+ if (typeof dest === 'string') dest = {
+ Key: dest
+ , ACL: 'public-read'
+ }
+
+ dest.Bucket = dest.Bucket || config.Bucket
+ if (!dest.Bucket) throw new Error(
+ 'You must specify the default S3 bucket ' +
+ 'you wish to use; either when creating the ' +
+ 'stream or initialising the configuration.'
+ )
+
+ var streamClosed = false
+ var buffer = bl()
+ var uploadId = null
+ var pending = 0
+ var part = 1
+ var parts = []
+
+ var stream = through2(write, flush)
+ var bo = backoff.fibonacci(boSettings)
+ var lastErr
+
+ countIn += 1
+
+ bo.failAfter(10)
+ bo.on('backoff', function() {
+ s3.createMultipartUpload(dest
+ , function(err, data) {
+ if (err) return (lastErr = err), bo.backoff()
+ uploadId = data.UploadId
+ stream.emit('upload started')
+ })
+ }).on('fail', function() {
+ return stream.emit('error', lastErr)
+ }).backoff()
+
+ return stream
+
+ function write(chunk, enc, next) {
+ buffer.append(chunk)
+ if (buffer.length < maxSize) return next()
+ flushChunk(next)
+ }
+
+ function flush() {
+ streamClosed = true
+ flushChunk()
+ }
+
+ function flushChunk(next) {
+ var lastErr = null
+ var chunk = part++
+ var uploading = buffer.slice()
+ var bo = backoff.fibonacci(boSettings)
+
+ buffer._bufs.length = 0
+ buffer.length = 0
+ pending += 1
+
+ if (!uploadId) return stream.once('upload started', uploadPart)
+
+ uploadPart()
+ function uploadPart() {
+ bo.failAfter(5)
+ bo.on('backoff', function() {
+ s3.uploadPart({
+ Body: uploading
+ , Bucket: dest.Bucket
+ , Key: dest.Key
+ , UploadId: uploadId
+ , PartNumber: chunk
+ }, function(err, result) {
+ if (err) return (lastErr = err), bo.backoff()
+
+ parts[chunk-1] = {
+ ETag: result.ETag
+ , PartNumber: chunk
+ }
+
+ if (next) next()
+ if (!--pending && streamClosed) finish()
+ })
+ }).on('fail', function() {
+ return stream.emit('error', lastErr)
+ }).backoff()
+ }
+ }
+
+ function finish() {
+ var bo = backoff.fibonacci(boSettings)
+
+ bo.failAfter(10)
+ bo.on('backoff', function() {
+ s3.completeMultipartUpload({
+ Bucket: dest.Bucket
+ , Key: dest.Key
+ , UploadId: uploadId
+ , MultipartUpload: {
+ Parts: parts
+ }
+ }, function(err, result) {
+ if (err) return (lastErr = err), bo.backoff()
+ stream.emit('end')
+ })
+ }).on('fail', function() {
+ stream.emit('error', lastErr)
+ }).backoff()
+ }
+ }
+}
@@ -0,0 +1,42 @@
+{
+ "name": "s3-write-stream",
+ "description": "Pipe data straight to an S3 key of your choice",
+ "version": "0.0.0",
+ "main": "index.js",
+ "browser": "index.js",
+ "scripts": {
+ "test": "node test | faucet"
+ },
+ "dependencies": {
+ "aws-sdk": "~2.0.0-rc8",
+ "through2": "~0.4.0",
+ "backoff": "~2.3.0",
+ "bl": "~0.7.0"
+ },
+ "devDependencies": {
+ "request": "~2.33.0",
+ "faucet": "0.0.0",
+ "tape": "~2.3.2",
+ "from": "~0.1.3"
+ },
+ "author": "Hugh Kennedy <hughskennedy@gmail.com> (http://hughsk.io/)",
+ "license": "MIT",
+ "repository": {
+ "type": "git",
+ "url": "git://github.com/hughsk/s3-write-stream"
+ },
+ "bugs": {
+ "url": "https://github.com/hughsk/s3-write-stream/issues"
+ },
+ "homepage": "https://github.com/hughsk/s3-write-stream",
+ "keywords": [
+ "s3",
+ "writeable",
+ "upload",
+ "streaming",
+ "multipart",
+ "backoff",
+ "retry",
+ "stream"
+ ]
+}
@@ -0,0 +1,5 @@
+module.exports = {
+ bucket: process.env.S3_STREAM_TEST_BUCKET || ''
+ , access: process.env.AWS_ACCESS_KEY || process.env.AWS_ACCESS_KEY_ID
+ , secret: process.env.AWS_SECRET_KEY || process.env.AWS_SECRET_ACCESS_KEY
+}
@@ -0,0 +1,112 @@
+var config = require('./test-config.js')
+var request = require('request')
+var from = require('from')
+var test = require('tape')
+var fs = require('fs')
+var bl = require('bl')
+var s3 = require('./')
+
+var testKey = 'testing-s3-write-stream'
+var largeUploadSize = 20*1024*1024
+var loremBuffer = new Buffer([
+ 'Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod'
+ , 'tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam,'
+ , 'quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo'
+ , 'consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse'
+ , 'cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non'
+ , 'proident, sunt in culpa qui officia deserunt mollit anim id est laborum.'
+].join('\n'))
+
+console.error('')
+console.error('Just a quick warning: this test requires uploading')
+console.error('a 20MB file, so it could take a while.')
+console.error('')
+
+test('config sanity check', function(t) {
+ console.error('Test config:')
+ console.error(JSON.stringify(config, null, 2))
+ console.error()
+ t.plan(3)
+ t.ok(config.bucket, '"bucket" property should be defined in ./test-config.js')
+ t.ok(config.access, '"access" property should be defined in ./test-config.js')
+ t.ok(config.secret, '"secret" property should be defined in ./test-config.js')
+})
+
+// @todo: test bigger uploads
+test('basic upload', function(t) {
+ t.plan(3)
+
+ var createStream = s3({
+ secretAccessKey: config.secret
+ , accessKeyId: config.access
+ , Bucket: config.bucket
+ })
+
+ var counter = 0
+ var uploadStream = createStream(testKey)
+
+ fs.createReadStream(__filename)
+ .once('error', t.ifError.bind(t))
+ .pipe(uploadStream)
+ .once('error', t.ifError.bind(t))
+ .once('end', check)
+
+ function check() {
+ t.pass('successfully completed upload process')
+
+ request.get(
+ 'https://s3.amazonaws.com/' +
+ (config.bucket + '/' + testKey).replace(/\/\//g, '/')
+ ).pipe(bl(function(err, contents) {
+ t.ifError(err, 'request finished succesfully')
+ t.equal(
+ String(contents)
+ , fs.readFileSync(__filename, 'utf8')
+ , 'uploaded file has same contents as local'
+ )
+ }))
+ }
+})
+
+test('larger upload', function(t) {
+ t.plan(3)
+
+ var buffer = bl()
+ var createStream = s3({
+ secretAccessKey: config.secret
+ , accessKeyId: config.access
+ , Bucket: config.bucket
+ })
+
+ var counter = 0
+ var uploadStream = createStream(testKey)
+
+ from(function(_, next) {
+ counter += loremBuffer.length
+ if (counter > largeUploadSize)
+ return this.emit('end'), false
+ var chunk = loremBuffer.slice()
+ buffer.append(chunk)
+ this.emit('data', chunk)
+ return true
+ }).pipe(uploadStream)
+ .once('error', t.ifError.bind(t))
+ .once('end', check)
+
+ function check() {
+ t.pass('successfully completed upload process')
+
+ request.get(
+ 'https://s3.amazonaws.com/' +
+ (config.bucket + '/' + testKey).replace(/\/\//g, '/')
+ ).pipe(bl(function(err, contents) {
+ t.ifError(err, 'request finished succesfully')
+ t.equal(
+ String(contents)
+ , String(buffer.slice())
+ , 'uploaded file has same contents as local stream'
+ )
+ t.end()
+ }))
+ }
+})

0 comments on commit 27e9901

Please sign in to comment.