Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

changed it to process files one by one and stream the output instead …

…of keeping it all in memory
  • Loading branch information...
commit d374e9f0b9f00576c283e905f074f251a3e74446 1 parent 292fc1a
Bruno Jouhier bjouhier authored
Showing with 100 additions and 183 deletions.
  1. +100 −67 janzip/janzip.js
  2. +0 −116 janzip/throttlefs.js
167 janzip/janzip.js
View
@@ -2,19 +2,27 @@
* node.js zipping module
*
* (c) Jan Jongboom, 2011
+ *
+ * Modified by Bruno Jouhier to support compression and streaming.
*/
-// modified by Bruno Jouhier for async compress
"use strict";
if (!require('streamline/module')(module)) return;
var flows = require('streamline/lib/util/flows');
+var streams = require('streamline/lib/streams/server/streams');
var RollingBuffer = require("./rollingbuffer");
var fs = require("fs");
var zlib = require("zlib");
-var Zip = function() {
- var files = [];
+var Zip = function(outStream, options) {
+ // auto-wrap outStream with streamline stream
+ var os = outStream.emitter ? outStream : new streams.WritableStream(outStream);
+ options = options || {};
+ var zipMethod = options.zipMethod || exports.deflate;
+ var dirBuffers = [];
+ var fileOffset = 0;
+ var totalDirLength = 0;
/**
* Get the date / time parts of the file header
* @param {date} A date object
@@ -85,82 +93,112 @@ var Zip = function() {
return header;
}
+ function add(_, file) {
+ // compress
+ var data = zipMethod.compress(file.data, _);
+
+ // write file header
+ var fileHeader = getFileHeader(file, zipMethod.indicator, data);
+ var fileBuffer = new RollingBuffer(4 + fileHeader.length + file.name.length);
+ writeBytes(fileBuffer, [0x50, 0x4b, 0x03, 0x04]); // 4
+ fileBuffer.appendBuffer(fileHeader); // hmm...
+ fileBuffer.write(file.name, "ascii");
+ os.write(_, fileBuffer.buf);
+
+ // write file data
+ os.write(_, data);
+
+ // now create dir
+ var dirBuffer = new RollingBuffer(4 + 2 + fileHeader.length + 6 + 4 + 4 + file.name.length);
+ writeBytes(dirBuffer, [0x50, 0x4b, 0x01, 0x02]);
+ writeBytes(dirBuffer, [0x14, 0x00]);
+ dirBuffer.appendBuffer(fileHeader);
+ // comment length, disk start, file attributes
+ writeBytes(dirBuffer, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
+ // external file attributes, @todo
+ writeBytes(dirBuffer, [0x00, 0x00, 0x00, 0x00]);
+ // relative offset of local header
+ dirBuffer.writeInt32(fileOffset);
+ // file name
+ dirBuffer.write(file.name, "ascii");
+ dirBuffers.push(dirBuffer.buf);
+ totalDirLength += dirBuffer.buf.length;
+
+ // update offset
+ fileOffset += fileBuffer.buf.length + data.length;
+ }
/**
* Add a file to the current file
* @param {name} The name of the file (can include folder structure)
* @param {data} A buffer containing the data
*/
- this.add = function(filename, data) {
- files.push({
+ this.addData = function(_, filename, data) {
+ add(_, {
name: filename,
- data: data
+ data: data,
+ date: new Date()
});
+ return this;
+ }
+
+ /**
+ * Add one file
+ * Pass in an antry containing { name: "", path: "" }
+ */
+ this.addFile = function(_, entry) {
+ add(_, {
+ name: entry.name,
+ data: fs.readFile(entry.path, _),
+ date: entry.date || fs.stat(entry.path, _).mtime
+ })
+ return this;
}
/**
* Add multiple files to the current archive.
* Pass in an array containing { name: "", path: "" }
*/
- this.addFiles = function(filenames) {
- filenames.forEach(function(f) {
- files.push(f);
- });
+ this.addFiles = function(_, entries) {
+ flows.each(_, entries, this.addFile.bind(this))
+ return this;
}
/**
- * Returns the ZIP archive as a buffer object
+ * Add directory and all its contents
+ * entry contains { name: "", path: "", filter: optional function }
*/
- this.toBuffer = function(zipMethod, _) {
- zipMethod = zipMethod || store;
-
- var fileBuffers = [],
- dirBuffers = [];
- var fileOffset = 0;
-
- flows.each(_, files, function(_, file) {
- var noData = !file.data;
- if (noData) file.data = fs.readFile(file.path, _);
- if (!file.date) file.date = fs.stat(file.path, _).mtime;
- var data = zipMethod.compress(file.data, _);
- var fileHeader = getFileHeader(file, zipMethod.indicator, data);
-
- // write files
- var fileBuffer = new RollingBuffer(4 + fileHeader.length + file.name.length + data.length);
- writeBytes(fileBuffer, [0x50, 0x4b, 0x03, 0x04]); // 4
- fileBuffer.appendBuffer(fileHeader); // hmm...
- fileBuffer.write(file.name, "ascii");
- fileBuffer.appendBuffer(data);
-
- // now create dir
- var dirBuffer = new RollingBuffer(4 + 2 + fileHeader.length + 6 + 4 + 4 + file.name.length);
- writeBytes(dirBuffer, [0x50, 0x4b, 0x01, 0x02]);
- writeBytes(dirBuffer, [0x14, 0x00]);
- dirBuffer.appendBuffer(fileHeader);
- // comment length, disk start, file attributes
- writeBytes(dirBuffer, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
- // external file attributes, @todo
- writeBytes(dirBuffer, [0x00, 0x00, 0x00, 0x00]);
- // relative offset of local header
- dirBuffer.writeInt32(fileOffset);
- // file name
- dirBuffer.write(file.name, "ascii");
-
- // update offset
- fileOffset += fileBuffer.length;
-
- fileBuffers.push(fileBuffer);
- dirBuffers.push(dirBuffer);
- if (noData) delete file.data;
- });
+ this.addDirectory = function(_, entry) {
+ var stat = fs.stat(entry.path, _);
+ if (stat.isDirectory()) {
+ flows.each(_, fs.readdir(entry.path, _), function(_, n) {
+ if (entry.filter && !entry.filter(n, entry)) return;
+ this.addDirectory(_, {
+ name: entry.name ? entry.name + "/" + n : n,
+ path: entry.path + "/" + n,
+ filter: entry.filter
+ });
+ }, this);
+ } else {
+ this.addFile(_, entry);
+ }
+ return this;
+ }
- var totalDirLength = getTotalBufLength(dirBuffers);
- var totalFileLength = getTotalBufLength(fileBuffers);
+ /**
+ * Finishes the stream
+ */
+ this.finish = function(_) {
+ var totalFileLength = fileOffset;
+
+ flows.each(_, dirBuffers, function(_, b) {
+ os.write(_, b);
+ });
var dirEnd = new RollingBuffer(8 + 2 + 2 + 4 + 4 + 2);
writeBytes(dirEnd, [0x50, 0x4b, 0x05, 0x06, 0x00, 0x00, 0x00, 0x00]);
// total number of entries
- dirEnd.writeInt16(fileBuffers.length);
- dirEnd.writeInt16(fileBuffers.length);
+ dirEnd.writeInt16(dirBuffers.length);
+ dirEnd.writeInt16(dirBuffers.length);
// directory lengths
dirEnd.writeInt32(totalDirLength);
// file lengths
@@ -168,16 +206,11 @@ var Zip = function() {
// and the end of file
writeBytes(dirEnd, [0x00, 0x00]);
- var buffer = new RollingBuffer(totalFileLength + totalDirLength + dirEnd.length);
- fileBuffers.forEach(function(b) {
- buffer.appendBuffer(b);
- });
- dirBuffers.forEach(function(b) {
- buffer.appendBuffer(b);
- });
- buffer.appendBuffer(dirEnd);
-
- return buffer.getInternalBuffer();
+ os.write(_, dirEnd.buf);
+ // auto-close if wrapped automatically
+ if (os !== outStream)
+ os.close(_);
+ return this;
}
};
116 janzip/throttlefs.js
View
@@ -1,116 +0,0 @@
-/**
- * File system throttling
- *
- * So when you open alot of files the filesystem will just say 'screw you' and stop working.
- * With this (100% fs compatible) module, you will have auto throttling, so we'll have 20 file handles max.
- */
-var fs = require("fs");
-
-// number of active handles, plus a queue
-var active = 0, processed = 0, scheduled = 0, queue = [];
-var threshold = 20;
-
-/**
- * This is the function intercepter.
- * It gets 'fs' functions, wraps them into throttle code and either executes or queues them
- */
-function wrapper (target, funcName, args) {
- var callback = args.length && args[args.length - 1];
- var self = this;
-
- if (callback && typeof callback === "function") {
- scheduled++;
-
- // finished handler
- args[args.length - 1] = function () {
- processed++;
- active--;
- callback.apply(this, arguments);
- };
-
- // kickoff code
- var kickoff = function () {
- active++;
- target[funcName].apply(self, args);
- };
-
- // depending on the number of active items, we either execute it or schedule
- queue.push(kickoff);
- onItemAdded();
- }
- // cant find a callback function? then just execute
- else {
- return target[funcName].apply(this, args);
- }
-}
-
-// ===============================
-// extra methods outside fs
-/**
- * Throttled file copy
- */
-var augment = {
- copyFile: function (src, target, callback) {
- var srcFile = fs.createReadStream(src);
- var targetFile = fs.createWriteStream(target);
-
- targetFile.on("close", function () {
- callback(null, src);
- });
-
- srcFile.pipe(targetFile);
- }
-};
-
-// copy all the fs methods
-Object.keys(fs).filter(function (m) { return typeof fs[m] === "function"; }).forEach(function (func) {
- module.exports[func] = function () {
- return wrapper(fs, func, arguments);
- };
-});
-
-Object.keys(augment).forEach(function (func) {
- module.exports[func] = function () {
- return wrapper(augment, func, arguments);
- };
-});
-
-var monitor = 0;
-
-/**
- * When an item is added to the queue we'll create a monitoring service
- */
-var onItemAdded = function () {
- if (!monitor) {
- monitor = setInterval(intervalFn, 20);
- }
-};
-
-/**
- * Function that we'll run every X ms.
- */
-var intervalFn = function () {
- var toProcess = threshold - active;
- var ix = 0, item = null;
-
- if (processed === scheduled) {
- clearInterval(monitor);
- monitor = 0;
- }
- else {
- // can use this for debugging purposes
- // console.log("queue length", queue.length, "empty counter", queueEmptyCounter, "processed", processed, "scheduled", scheduled);
- }
-
- while (++ix <= toProcess && (item = queue.shift())) {
- item();
- }
-};
-
-// some test code
-//module.exports.readFile("./folder.js", "utf8", function (err, data) {
-// console.log(err, !!data);
-//});
-//module.exports.readFile("./example.js", "utf8", function (err, data) {
-// console.log(err, !!data);
-//});
Please sign in to comment.
Something went wrong with that request. Please try again.