Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added exists, mkdir, rm and append

  • Loading branch information...
commit 3ac64acbcc7d05e904d1b9470f909b09b7345f96 1 parent 3dedbef
@horaci horaci authored
Showing with 361 additions and 17 deletions.
  1. +1 −1  README
  2. +134 −7 demo/demo.js
  3. +114 −6 node-hdfs.js
  4. +5 −1 run
  5. +107 −2 src/hdfs_bindings.cc
View
2  README
@@ -10,7 +10,7 @@ libhdfs must be compiled and installed in a path accessible by ldconfig (i.e. /u
To build libhdfs in Mac OSX a few changes need to be done in the generated 'Makefile' by './configure':
-* Search for a line that starts with 'CFLAGS = -g O2'
+* Edit Makefile and search for a line that starts with 'CFLAGS = -g O2'
** Remove the parameter "-m"
** Add parameter "-framework JavaVM"
View
141 demo/demo.js
@@ -3,6 +3,28 @@ var sys = require('sys')
var hdfs = new HDFS({host:"default", port:0});
+// create file (overwrite if exists)
+var writeremote = function(cb) {
+ var hdfs_path = "/tmp/test.txt"
+ var buffer = new Buffer("The path of the righteous man is beset on all sides by the iniquities of the\n" +
+ "selfish and the tyranny of evil men. Blessed is he who in the name of\n" +
+ "charity and good will shepherds the weak through the valley of darkness, for\n" +
+ "he is truly his brother's keeper and the finder of lost children. And I will\n" +
+ "strike down upon thee with great vengeance and furious anger those who\n" +
+ "attempt to poison and destroy my brothers. And you will know my name is the\n" +
+ "Lord when I lay my vengeance upon thee.\n")
+
+ hdfs.write(hdfs_path, function(writter) {
+ writter.once("open", function(err, handle) {
+ writter.write(buffer);
+ writter.end();
+ })
+ writter.once("close", function(err) {
+ cb();
+ });
+ });
+}
+
// Stat File
var statremote = function(cb) {
var hdfs_file_path = "/tmp/test.txt"
@@ -29,8 +51,8 @@ var readremote = function(cb) {
reader.on("end", function(err) {
if(!err) {
console.log("Finished reading data - Total readed: " + readed);
- cb();
}
+ cb();
});
});
}
@@ -42,26 +64,95 @@ var copylocal = function(cb) {
hdfs.copyToLocalPath(hdfs_file_path, local_out_path, function(err, readed) {
if(!err) {
console.log(readed + " bytes copied from remote hdfs path " + hdfs_file_path + " to local path " + local_out_path);
- cb();
}
+ cb();
});
}
// Write local file to HDFS path
var copyremote = function(cb) {
var local_file_path = "/tmp/test_horaci.txt";
- var hdfs_out_path = "/tmp/test.txt";
+ var hdfs_out_path = "/tmp/test_horaci.txt";
var start = new Date().getTime();
hdfs.copyFromLocalPath(local_file_path, hdfs_out_path, function(err, written) {
if(!err) {
var duration = new Date().getTime() - start;
console.log(written + " bytes copied from local path " + local_file_path + " to remote hdfs path " + hdfs_out_path + " in " + duration + " ms.");
- cb();
}
+ cb();
+ });
+}
+
+var createdirectory = function(cb) {
+ var hdfs_path = "/tmp/node-hdfs-test/a/b/c/d/e";
+ hdfs.mkdir(hdfs_path, function(err, result) {
+ if(!err && result) {
+ console.log("Directory " + hdfs_path + " created.");
+ } else {
+ console.log("Failed creating directory " + hdfs_path + ": " + err);
+ }
+ cb();
});
}
+var deletedirectory = function(cb) {
+ var hdfs_path = "/tmp/node-hdfs-test/a/b/c/d/e";
+ hdfs.rm(hdfs_path, {}, function(err, result) {
+ if(!err && result) {
+ console.log("Directory " + hdfs_path + " deleted.");
+ } else {
+ console.log("Failed deleting " + hdfs_path + ": " + err);
+ }
+
+ var hdfs_path_non_recursive = "/tmp/node-hdfs-test/a/b/c";
+ hdfs.rm(hdfs_path_non_recursive, {}, function(err, result) {
+ if(!err && result) {
+ console.log("Glups!, deleted a folder " + hdfs_path_non_recursive + " with subfolders without recursive or force flags?");
+ } else {
+ console.log("Correctly refused deleting " + hdfs_path_non_recursive + ": " + err);
+ }
+
+ var hdfs_path_recursive = "/tmp/node-hdfs-test/a/b/c";
+ hdfs.rm(hdfs_path_recursive, {recursive:true}, function(err, result) {
+ if(!err && result) {
+ console.log("Deleted folder " + hdfs_path_recursive + " with recursive flag");
+ } else {
+ console.log("Error deleting folder " + hdfs_path_recursive + ": " + err);
+ }
+
+ var hdfs_path_force = "/tmp/node-hdfs-test/a";
+ hdfs.rm(hdfs_path_force, {force:true}, function(err, result) {
+ if(!err && result) {
+ console.log("Deleted folder " + hdfs_path_force + " with force flag");
+ } else {
+ console.log("Error deleting folder " + hdfs_path_force + ": " + err);
+ }
+
+ var hdfs_path_dir = "/tmp/node-hdfs-test/";
+ hdfs.rmdir(hdfs_path_dir, function(err, result) {
+ if(!err && result) {
+ console.log("Removed empty directory " + hdfs_path_dir)
+ } else {
+ console.log("Failed removing empty directory " + hdfs_path_dir)
+ }
+
+ var hdfs_path_file = "/tmp/test.txt";
+ hdfs.rmdir(hdfs_path_file, function(err, result) {
+ if(!err && result) {
+ console.log("Glups! Removed file with rmdir: " + hdfs_path_file)
+ } else {
+ console.log("Correctly refused deleting file with rmdir " + hdfs_path_file + ": " + err)
+ }
+ cb();
+ })
+ })
+ });
+ });
+ });
+ });
+}
+
var listremote = function(cb) {
var hdfs_path = "/user/horaci"
hdfs.list(hdfs_path, function(err, files) {
@@ -72,7 +163,6 @@ var listremote = function(cb) {
});
}
-
var listremoterecursive = function(cb) {
var hdfs_path = "/user/horaci"
hdfs.list(hdfs_path, {recursive:true}, function(err, files) {
@@ -83,14 +173,51 @@ var listremoterecursive = function(cb) {
});
}
+var appendfile = function(cb) {
+ var hdfs_path = "/tmp/test-horaci2.txt"
+
+ var append_file = function(str, callback) {
+ hdfs.append(hdfs_path, function(appender) {
+ appender.on("open", function( err, handle) {
+ appender.write(new Buffer(str));
+ appender.end();
+ })
+ appender.on("close", function(err) {
+ callback();
+ });
+ })
+ }
+
+ var create_file = function(callback) {
+ hdfs.write(hdfs_path, function(writter) {
+ writter.once("open", function(err, handle) {
+ writter.write(new Buffer("Hello create file.\n"));
+ writter.end();
+ })
+ writter.on("close", function(err) {
+ callback();
+ });
+ });
+ }
+
+ create_file(function() {
+ console.log("Create file with some data: " + hdfs_path);
+ append_file("First append\n", function() {
+ append_file("Second append\n", function() {
+ console.log("Appended data to file: " + hdfs_path);
+ cb();
+ })
+ })
+ });
+}
+
//---------------
console.log("Connecting to HDFS server...");
hdfs.connect(); // (optional, first command will connect if disconnected)
console.log("Connected!");
-var ops = [statremote, readremote, copylocal, copyremote, listremote, listremoterecursive];
-//var ops = [listremote,listremoterecursive];
+var ops = [writeremote, statremote, readremote, copylocal, copyremote, listremote, listremoterecursive, createdirectory, deletedirectory, appendfile];
var nextOp = function() { if(next=ops.shift()) next(nextOp)}
process.nextTick(nextOp);
View
120 node-hdfs.js
@@ -35,6 +35,17 @@ module.exports = function(options) {
this.connected = false;
}
+ this.exists = function(path, cb) {
+ self.connect();
+ HDFS.exists(path, function(result) {
+ if(result==0) {
+ cb(null, true);
+ } else {
+ cb("file does not exist", false);
+ }
+ });
+ }
+
this.stat = function(path, cb) {
self.connect();
HDFS.stat(path, cb);
@@ -85,12 +96,104 @@ module.exports = function(options) {
return cb ? cb(reader) : reader;
}
- this.write = function(path, cb) {
+ this.write = function(path, mode, cb) {
+ if (!cb && typeof mode == "function") { cb = mode; mode = undefined; }
+ mode = mode || (modes.O_WRONLY | modes.O_CREAT)
self.connect();
- var writter = new HDFSWritter(path);
+ var writter = new HDFSWritter(path, mode);
return cb ? cb(writter) : writter;
}
+ this.append = function(path, cb) {
+ return self.write(path, modes.O_WRONLY | modes.O_APPEND, cb)
+ }
+
+ this.mkdir = function(path,cb) {
+ self.connect();
+ self.exists(path, function(result) {
+ if(result != 0) {
+ HDFS.mkdir(path,function(result) {
+ if(result == 0) {
+ cb(null, true);
+ } else {
+ cb("Error creating directory", false); // generic error :p
+ }
+ });
+ } else {
+ cb("File or directory already exists", false);
+ }
+ })
+ }
+
+ this.rm = function(path,options, cb) {
+ if (!cb && typeof options == "function") { cb = options; options = undefined; }
+ options = options || {recursive:false, force:false}
+ self.connect();
+
+ if(!options.force) {
+ var slashes = path.split("/");
+ if(slashes[0] != "") {
+ return(cb("cowardly refusing to delete relative path - set force to true in options to override", false));
+ }
+ if( slashes.length < 3 || (slashes.length == 3 && slashes[2] == "")) {
+ return(cb("cowardly refusing to delete root folder or first-level folder - set force to true in options to override", false));
+ }
+ }
+
+ var delete_file = function() {
+ HDFS.rm(path,function(result) {
+ if(result == 0) {
+ cb(null, true);
+ } else {
+ cb("Error deleting file", false); // generic error :p
+ }
+ });
+ }
+
+ self.exists(path, function(err, result) {
+ if(!err && result) {
+ if(!options.recursive && !options.force) {
+ self.stat(path, function(err, data) {
+ if(err) {
+ cb("failed stating the path or file", false);
+ } else {
+ if(data.type == "directory") {
+ self.list(path, function(err, files) {
+ if(files && files.length > 0) {
+ cb("directory is not empty -- use recursive:true in options to force deletion", false);
+ } else { // directory empty
+ delete_file();
+ }
+ });
+ } else { // not a directory
+ delete_file();
+ }
+ }
+ });
+ } else { // recursive or force set
+ delete_file();
+ }
+ } else {
+ cb("File or directory does not exists", false);
+ }
+ })
+ }
+
+ // same as rm, but ensure path is a directory
+ this.rmdir = function(path, options, cb) {
+ if (!cb && typeof options == "function") { cb = options; options = undefined; }
+ options = options || {}
+ self.stat(path, function(err, data) {
+ if(err || !data) {
+ cb(err||"file not found", false);
+ } else if(data.type == "directory") {
+ self.rm(path, options, cb);
+ } else {
+ cb("not a directory", false);
+ }
+ })
+ }
+
this.copyToLocalPath = function(srcPath, dstPath, options, cb) {
if (!cb && typeof options == "function") { cb = options; options = undefined; }
options = options || {encoding: null, mode:0666};
@@ -178,12 +281,13 @@ var HDFSReader = function(path, bufferSize) {
sys.inherits(HDFSReader, EventEmitter);
-var HDFSWritter = function(path) {
+var HDFSWritter = function(path, mode) {
var self = this;
this.handle = null;
this.writting = false;
this.closeCalled = false;
this.writeBuffer = new Buffer(0);
+ mode = mode || (modes.O_WRONLY | modes.O_CREAT)
this.write = function(buffer) {
self.expandBuffer(buffer);
@@ -207,6 +311,7 @@ var HDFSWritter = function(path) {
if(self.handle >= 0) {
if(!self.writting && self.writeBuffer.length == 0) {
HDFS.close(self.handle, function() {
+ self.handle = undefined;
self.emit("close", err);
})
} else {
@@ -220,15 +325,18 @@ var HDFSWritter = function(path) {
this.expandBuffer = function(buffer) {
if(buffer) {
+ if(buffer.constructor.name != "Buffer") {
+ buffer = new Buffer(buffer.toString());
+ }
var newBuffer = new Buffer(self.writeBuffer.length + buffer.length);
self.writeBuffer.copy(newBuffer, 0, 0);
buffer.copy(newBuffer, self.writeBuffer.length, 0);
self.writeBuffer = newBuffer;
}
}
-
- HDFS.open(path, modes.O_WRONLY | modes.O_CREAT, function(err, handle) {
- if(err) {
+
+ HDFS.open(path, mode, function(err, handle) {
+ if(err || !(handle >= 0)) {
self.end(err);
} else {
self.handle = handle;
View
6 run
@@ -1,10 +1,14 @@
export HADOOP_HOME=/usr/local/hadoop
# Path to hadoop libs
-export CLASSPATH=$HADOOP_HOME/hadoop-core-0.20.2-cdh3u0.jar:$HADOOP_HOME/lib/commons-logging-1.0.4.jar
+# export CLASSPATH=$HADOOP_HOME/hadoop-core-0.20.2-cdh3u0.jar:$HADOOP_HOME/lib/commons-logging-1.0.4.jar
+
+# Alternate way without having to specify a version on hadoop-core:
+export CLASSPATH=$(for i in $HADOOP_HOME/hadoop-core*.jar ; do echo -n $i: ; done):$(for i in $HADOOP_HOME/lib/commons-logging*.jar ; do echo -n $i: ; done)
# Add conf path where core-site.xml is
export CLASSPATH=$CLASSPATH:./conf
+
node-waf clean
node-waf configure
node-waf build
View
109 src/hdfs_bindings.cc
@@ -47,6 +47,9 @@ class HdfsClient : public ObjectWrap
NODE_SET_PROTOTYPE_METHOD(s_ct, "open", Open);
NODE_SET_PROTOTYPE_METHOD(s_ct, "close", Close);
NODE_SET_PROTOTYPE_METHOD(s_ct, "list", List);
+ NODE_SET_PROTOTYPE_METHOD(s_ct, "mkdir", CreateDirectory);
+ NODE_SET_PROTOTYPE_METHOD(s_ct, "exists", Exists);
+ NODE_SET_PROTOTYPE_METHOD(s_ct, "rm", Delete);
NODE_SET_PROTOTYPE_METHOD(s_ct, "disconnect", Disconnect);
target->Set(String::NewSymbol("Hdfs"), s_ct->GetFunction());
@@ -74,6 +77,13 @@ class HdfsClient : public ObjectWrap
return args.This();
}
+ struct hdfs_path_baton_t {
+ HdfsClient *client;
+ char *filePath;
+ Persistent<Function> cb;
+ int result;
+ };
+
struct hdfs_open_baton_t {
HdfsClient *client;
char *filePath;
@@ -139,6 +149,59 @@ class HdfsClient : public ObjectWrap
hdfsDisconnect(client->fs_);
return Boolean::New(true);
}
+
+ /**** GENERIC PATH OP ****/
+ static Handle<Value> genericPathOp(int (*op)(eio_req*), const Arguments &args)
+ {
+ HandleScope scope;
+ REQ_FUN_ARG(1, cb);
+
+ HdfsClient* client = ObjectWrap::Unwrap<HdfsClient>(args.This());
+
+ v8::String::Utf8Value pathStr(args[0]);
+ char* filePath = new char[strlen(*pathStr) + 1];
+ strcpy(filePath, *pathStr);
+
+ hdfs_path_baton_t *baton = new hdfs_path_baton_t();
+ baton->client = client;
+ baton->cb = Persistent<Function>::New(cb);
+ baton->filePath = filePath;
+ baton->result = -1;
+
+ client->Ref();
+
+ eio_custom(op, EIO_PRI_DEFAULT, eio_after_hdfs_generic, baton);
+ ev_ref(EV_DEFAULT_UC);
+
+ return Undefined();
+ }
+
+ static int eio_after_hdfs_generic(eio_req *req)
+ {
+ HandleScope scope;
+ hdfs_path_baton_t *baton = static_cast<hdfs_path_baton_t*>(req->data);
+
+ ev_unref(EV_DEFAULT_UC);
+ baton->client->Unref();
+
+ Local<Value> argv[1];
+ argv[0] = Local<Value>::New(Integer::New(baton->result));
+
+ TryCatch try_catch;
+
+ baton->cb->Call(Context::GetCurrent()->Global(), 1, argv);
+
+ if (try_catch.HasCaught()) {
+ FatalException(try_catch);
+ }
+
+ baton->cb.Dispose();
+
+ free(baton->filePath);
+ delete baton;
+ return 0;
+ }
+
/*********** STAT **********/
@@ -531,8 +594,6 @@ class HdfsClient : public ObjectWrap
// write(fileHandleId, buffer, cb)
static Handle<Value> Write(const Arguments& args)
{
-
-
HandleScope scope;
REQ_FUN_ARG(2, cb);
@@ -600,6 +661,50 @@ class HdfsClient : public ObjectWrap
delete baton;
return 0;
}
+
+ /*** Create Directory ***/
+
+ static Handle<Value> CreateDirectory(const Arguments& args)
+ {
+ return genericPathOp(eio_hdfs_mkdir, args);
+ }
+
+ static int eio_hdfs_mkdir(eio_req *req)
+ {
+ hdfs_path_baton_t *baton = static_cast<hdfs_path_baton_t*>(req->data);
+ baton->result = hdfsCreateDirectory(baton->client->fs_, baton->filePath);
+ return 0;
+ }
+
+ /*** exists ***/
+
+ static Handle<Value> Exists(const Arguments& args)
+ {
+ return genericPathOp(eio_hdfs_exists, args);
+ }
+
+ static int eio_hdfs_exists(eio_req *req)
+ {
+ hdfs_path_baton_t *baton = static_cast<hdfs_path_baton_t*>(req->data);
+ baton->result = hdfsExists(baton->client->fs_, baton->filePath);
+ return 0;
+ }
+
+ /*** delete ***/
+
+ static Handle<Value> Delete(const Arguments& args)
+ {
+ return genericPathOp(eio_hdfs_delete, args);
+ }
+
+ static int eio_hdfs_delete(eio_req *req)
+ {
+ hdfs_path_baton_t *baton = static_cast<hdfs_path_baton_t*>(req->data);
+ baton->result = hdfsDelete(baton->client->fs_, baton->filePath);
+ return 0;
+ }
+
+
};
Persistent<FunctionTemplate> HdfsClient::s_ct;
Please sign in to comment.
Something went wrong with that request. Please try again.