Permalink
Browse files

let streams and pipe() read and write files. fs.exists() and other cl…

…eanups.
  • Loading branch information...
1 parent 18955cd commit 3b8cc13daac71dbbb0f1c748cf356a0991de0cb4 @alanszlosek committed Dec 16, 2012
Showing with 61 additions and 98 deletions.
  1. +61 −98 ftpd.js
View
159 ftpd.js
@@ -124,13 +124,12 @@ function createServer(host, sandbox) {
} else {
logIf(1, "Opening data connection to " + socket.dataHost + ":" + socket.dataPort, socket);
var dataSocket = new net.Socket();
- dataSocket.buffers = [];
- // Since data may arrive once the connection is made, buffer it
+ // Since data may arrive once the connection is made, pause it right away
dataSocket.on("data", function(data) {
logIf(3, dataSocket.remoteAddress + ' event: data ; ' + (Buffer.isBuffer(data) ? 'buffer' : 'string'));
- dataSocket.buffers.push(data);
});
dataSocket.addListener("connect", function() {
+ dataSocket.pause(); // Pause until the data listeners are in place
socket.dataSocket = dataSocket;
logIf(3, "Data connection succeeded", socket);
callback(dataSocket);
@@ -219,7 +218,7 @@ function createServer(host, sandbox) {
// Change working directory.
if (!authenticated()) break;
var path = PathModule.join(socket.sandbox, PathModule.resolve(socket.fs.cwd(), commandArg));
- PathModule.exists(path, function(exists) {
+ fs.exists(path, function(exists) {
if (!exists) {
socket.write("550 Folder not found.\r\n");
return;
@@ -280,7 +279,7 @@ function createServer(host, sandbox) {
// Returns information of a file or directory if specified, else information of the current working directory is returned.
if (!authenticated()) break;
- whenDataWritable( function(pasvconn) {
+ whenDataWritable( function(dataSocket) {
var leftPad = function(text, width) {
var out = '';
for (var j = text.length; j < width; j++) out += ' ';
@@ -290,18 +289,18 @@ function createServer(host, sandbox) {
// This will be called once data has ACTUALLY written out ... socket.write() is async!
var success = function() {
socket.write("226 Transfer OK\r\n");
- pasvconn.end();
+ dataSocket.end();
};
var failure = function() {
- pasvconn.end();
+ dataSocket.end();
};
- var path = PathModule.join(socket.sandbox, socket.fs.cwd());
- if (pasvconn.readable) pasvconn.resume();
+ var path = PathModule.join(socket.sandbox, socket.fs.cwd());
+ if (dataSocket.readable) dataSocket.resume();
logIf(3, "Sending file list", socket);
fs.readdir(path, function(err, files) {
if (err) {
logIf(0, "While sending file list, reading directory: " + err, socket);
- pasvconn.write("", failure);
+ dataSocket.write("", failure);
} else {
// Wait until acknowledged!
socket.write("150 Here comes the directory listing\r\n", function() {
@@ -310,7 +309,7 @@ function createServer(host, sandbox) {
var file = files[ i ];
var s = fs.statSync( PathModule.join(path, file) );
var line = s.isDirectory() ? 'd' : '-';
- if (i > 0) pasvconn.write("\r\n");
+ if (i > 0) dataSocket.write("\r\n");
line += (0400 & s.mode) ? 'r' : '-';
line += (0200 & s.mode) ? 'w' : '-';
line += (0100 & s.mode) ? 'x' : '-';
@@ -325,10 +324,10 @@ function createServer(host, sandbox) {
var d = new Date(s.mtime);
line += leftPad(d.format('M d H:i'), 12) + ' '; // need to use a date string formatting lib
line += file;
- pasvconn.write(line);
+ dataSocket.write(line);
}
// write the last bit, so we can know when it's finished
- pasvconn.write("\r\n", success);
+ dataSocket.write("\r\n", success);
});
}
});
@@ -391,14 +390,14 @@ function createServer(host, sandbox) {
The server may reject the LIST or NLST request (with code 450 or 550) without first responding with a mark. In this case the server does not touch the data connection.
*/
- whenDataWritable( function(pasvconn) {
+ whenDataWritable( function(dataSocket) {
// This will be called once data has ACTUALLY written out ... socket.write() is async!
var success = function() {
socket.write("226 Transfer OK\r\n");
- pasvconn.end();
+ dataSocket.end();
};
var failure = function() {
- pasvconn.end();
+ dataSocket.end();
};
// Use temporary filesystem path maker since a path might be sent with NLST
var temp = '';
@@ -411,21 +410,21 @@ function createServer(host, sandbox) {
temp = PathModule.join(socket.sandbox, socket.fs.cwd(), commandArg);
}
} else temp = PathModule.join(socket.sandbox, socket.fs.cwd());
- if (pasvconn.readable) pasvconn.resume();
+ if (dataSocket.readable) dataSocket.resume();
logIf(3, "Sending file list", socket);
glob.glob(temp, function(err, files) {
//fs.readdir(socket.sandbox + temp.cwd(), function(err, files) {
if (err) {
logIf(0, "During NLST, error globbing files: " + err, socket);
socket.write("451 Read error\r\n");
- pasvconn.write("", failure);
+ dataSocket.write("", failure);
return;
}
// Wait until acknowledged!
socket.write("150 Here comes the directory listing\r\n", function() {
logIf(3, "Directory has " + files.length + " files", socket);
- pasvconn.write( files.map(PathModule.basename).join("\015\012") + "\015\012", success);
+ dataSocket.write( files.map(PathModule.basename).join("\015\012") + "\015\012", success);
});
});
});
@@ -463,17 +462,13 @@ function createServer(host, sandbox) {
if (socket.dataSocket) socket.dataSocket.end(); // close any existing connections
socket.dataListener = null;
socket.dataSocket = null;
- // Passive listener needs to pause data because sometimes commands come before a data connection,
- // othertime afterwards ... depends on the client and threads
- socket.pause();
+ socket.pause(); // Pause processing of further commands
var pasv = net.createServer(function(psocket) {
logIf(1, "Incoming passive data connection", socket);
- psocket.pause();
- psocket.buffers = [];
+ psocket.pause(); // Pause until data listeners are in place
psocket.on("data", function(data) {
// should watch out for malicious users uploading large amounts of data outside protocol
- logIf(3, 'Data event: received ' + (Buffer.isBuffer(data) ? 'buffer' : 'string'), socket);
- psocket.buffers.push(data);
+ logIf(4, 'Data event: received ' + (Buffer.isBuffer(data) ? 'buffer' : 'string'), socket);
});
psocket.on("connect", function() {
logIf(1, "Passive data event: connect", socket);
@@ -566,47 +561,33 @@ function createServer(host, sandbox) {
break;
case "RETR":
// Retrieve (download) a remote file.
- whenDataWritable( function(pasvconn) {
- pasvconn.setEncoding(socket.mode);
+ whenDataWritable( function(dataSocket) {
+ dataSocket.setEncoding(socket.mode);
var filename = PathModule.resolve(socket.fs.cwd(), commandArg);
if(filename != socket.filename)
{
socket.totsize = 0;
socket.filename = filename;
}
- fs.open( PathModule.join(socket.sandbox, socket.filename), "r", function (err, fd) {
- console.trace("DATA file " + socket.filename + " opened");
- socket.write("150 Opening " + socket.mode.toUpperCase() + " mode data connection\r\n");
- function readChunk() {
- fs.read(fd, 4096, socket.totsize, socket.mode, function(err, chunk, bytes_read) {
- if(err) {
- console.trace("Erro reading chunk");
- throw err;
- return;
- }
- if(chunk) {
- socket.totsize += bytes_read;
- if(pasvconn.readyState == "open") pasvconn.write(chunk, socket.mode);
- readChunk();
- }
- else {
- console.trace("DATA file " + socket.filename + " closed");
- pasvconn.end();
- socket.write("226 Closing data connection, sent " + socket.totsize + " bytes\r\n");
- fs.close(fd);
- socket.totsize = 0;
- }
- });
- }
- if(err) {
- dotrace("Error at read");
- throw err;
- }
- else {
- readChunk();
- }
+ var from = fs.createReadStream( PathModule.join(socket.sandbox, socket.filename), {flags:"r"});
+ from.on("error", function () {
+ logIf(2, "Error reading file");
});
+ from.on("end", function() {
+ logIf(3, "DATA file " + socket.filename + " closed");
+ dataSocket.end();
+ socket.write("226 Closing data connection, sent " + socket.totsize + " bytes\r\n");
+ socket.totsize = 0;
+ });
+
+ logIf(3, "DATA file " + socket.filename + " opened");
+ socket.write("150 Opening " + socket.mode.toUpperCase() + " mode data connection\r\n");
+
+ if (dataSocket.readable) {
+ dataSocket.resume();
+ from.pipe(dataSocket);
+ }
});
break;
case "RMD":
@@ -626,7 +607,7 @@ function createServer(host, sandbox) {
if (!authenticated()) break;
socket.filefrom = PathModule.resolve(socket.fs.cwd(), commandArg);
logIf(3, "Rename from " + socket.filefrom, socket);
- path.exists( PathModule.join(socket.sandbox, socket.filefrom), function(exists) {
+ fs.exists( PathModule.join(socket.sandbox, socket.filefrom), function(exists) {
if (exists) socket.write("350 File exists, ready for destination name\r\n");
else socket.write("350 Command failed, file does not exist\r\n");
});
@@ -687,46 +668,28 @@ function createServer(host, sandbox) {
whenDataWritable( function(dataSocket) {
// dataSocket comes to us paused, so we have a chance to create the file before accepting data
filename = PathModule.resolve(socket.fs.cwd(), commandArg);
- fs.open( PathModule.join(socket.sandbox, filename), 'w', 0644, function(err, fd) {
- if(err) {
- logIf(0, 'Error opening/creating file: ' + filename, socket);
- socket.write("553 Could not create file\r\n");
- dataSocket.end();
- return;
- }
- logIf(3, "File opened/created: " + filename, socket);
+ var destination = fs.createWriteStream( PathModule.join(socket.sandbox, filename), {flags: 'w+', mode:0644});
+ destination.on("error", function(err) {
+ logIf(0, 'Error opening/creating file: ' + filename, socket);
+ socket.write("553 Could not create file\r\n");
+ dataSocket.end();
+ });
+
+ logIf(3, "File opened/created: " + filename, socket);
- dataSocket.addListener("end", function () {
- var writtenToFile = 0;
- var doneCallback = function() {
- fs.close(fd, function() {
- socket.write("226 Closing data connection\r\n"); //, recv " + writtenToFile + " bytes\r\n");
- });
- };
- var writeCallback = function(err, written) {
- var buf;
- if (err) {
- logIf(0, "Error writing " + PathModule.join(socket.sandbox, filename) + ": " + err, socket);
- return;
- }
- writtenToFile += written;
- if (!dataSocket.buffers.length) {
- doneCallback();
- return;
- }
- buf = dataSocket.buffers.shift();
- fs.write(fd, buf, 0, buf.length, null, writeCallback);
- };
- writeCallback();
- });
- dataSocket.addListener("error", function(err) {
- logIf(0, "Error transferring " + filename + ": " + err, socket);
- // close file handle
- });
- logIf(3, "Told client ok to send file data", socket);
- socket.write("150 Ok to send data\r\n"); // don't think resume() needs to wait for this to succeed
- if (dataSocket.readable) dataSocket.resume();
+ dataSocket.addListener("end", function () {
+ socket.write("226 Data connection closed\r\n");
});
+ dataSocket.addListener("error", function(err) {
+ logIf(0, "Error transferring " + filename + ": " + err, socket);
+ });
+ logIf(3, "Told client ok to send file data", socket);
+ socket.write("150 Ok to send data\r\n"); // don't think resume() needs to wait for this to succeed
+ if (dataSocket.readable) {
+ dataSocket.resume();
+ // Let pipe() do the dirty work ... it'll keep both streams in sync
+ dataSocket.pipe(destination);
+ }
});
break;
case "STOU":

0 comments on commit 3b8cc13

Please sign in to comment.