Permalink
Browse files

little refactoring

  • Loading branch information...
Jae committed Jul 13, 2011
1 parent b95c298 commit ceb2f5c51e7e3fea0cd4c1fe60e312980086534e
Showing with 82 additions and 41 deletions.
  1. +3 −3 examples/test.js
  2. +79 −38 node-hive.js
View
@@ -1,4 +1,4 @@
-hive = require('../node-hive').for({server:"hive.hadoop.forward.co.uk"});
+hive = require('../node-hive').for({server:"hive.hadoop.forward.co.uk", timeout:10000});
hive.fetch("SELECT * FROM weather_data where dated = '2011-07-01' limit 10", function(err, data) {
console.log("SELECT * FROM weather_data where dated = '2011-07-01' limit 10");
@@ -8,8 +8,8 @@ hive.fetch("SELECT * FROM weather_data where dated = '2011-07-01' limit 10", fun
});
var i = 1;
-hive.fetchInBatch(100, "SELECT * FROM weather_data where dated = '2011-07-02' limit 1002", function(err, data) {
- console.log("SELECT * FROM weather_data where dated = '2011-07-02' limit 1002");
+hive.fetchInBatch(5, "SELECT * FROM weather_data where dated = '2011-07-02' limit 12", function(err, data) {
+ console.log("SELECT * FROM weather_data where dated = '2011-07-02' limit 12");
console.log(i++ + "th data:", data.toTSV());
});
View
@@ -1,68 +1,109 @@
var thrift = require('thrift'),
ttransport = require('thrift/transport'),
- ThriftHive = require('gen-nodejs/ThriftHive')
- ResultSet = require('./result_set')
+ ThriftHive = require('gen-nodejs/ThriftHive'),
+ ResultSet = require('./result_set');
-var futureConnection = function(config) {
- return function(cb) {
- var connection = thrift.createConnection(config.server, config.port || 10000, {transport: ttransport.TBufferedTransport, timeout: config.timeout || 1000});
+var hiveClient = function(config) {
+ var connect = function(onError, connected) {
+ var server = config.server;
+ var port = config.port || 10000;
+ var options = {transport: ttransport.TBufferedTransport, timeout: config.timeout || 1000};
+
+ var connection = thrift.createConnection(server, port, options);
var client = thrift.createClient(ThriftHive, connection);
- cb(client, connection);
+
+ connected({
+ execute: function(query, onSuccess) {
+ client.execute(query, function(err) {
+ if (err) {
+ connection.end();
+ onError(true, err);
+ } else {
+ onSuccess();
+ }
+ });
+ },
+ getSchema: function(onSuccess) {
+ client.getSchema(function(err, schema) {
+ if (err) {
+ connection.end();
+ onError(true, err);
+ } else {
+ onSuccess(schema);
+ }
+ });
+ },
+ fetchAll: function(onSuccess) {
+ client.fetchAll(function(err, data) {
+ if (err) {
+ connection.end();
+ onError(true, err);
+ } else {
+ onSuccess(data);
+ }
+ });
+ },
+ fetchN: function(batchSize, onSuccess) {
+ client.fetchN(batchSize, function(err, data) {
+ if (err) {
+ connection.end();
+ onError(true, err);
+ } else {
+ onSuccess(data);
+ }
+ });
+ },
+ closeConnection: function() {
+ connection.end();
+ }
+ });
};
-};
-
-var hiveClient = function(futureConnection) {
+
return {
- fetch: function(query, cb) {
- futureConnection(function(client, connection) {
- client.execute(query, function(err){
- if(err) return cb(true, err);
- client.getSchema(function(err, schema) {
- if (err) return cb(true, err);
- client.fetchAll(function(err, data){
- if (err) return cb(true, err);
- cb(null, ResultSet.create(data, schema));
- connection.end();
+ fetch: function(query, onCompletion) {
+ connect(onCompletion, function(client) {
+ client.execute(query, function() {
+ client.getSchema(function(schema) {
+ client.fetchAll(function(data) {
+ client.closeConnection();
+ onCompletion(null, ResultSet.create(data, schema));
});
});
});
- })
+ });
},
- fetchInBatch: function(batchSize, query, cb) {
- futureConnection(function(client, connection) {
- client.execute(query, function(err){
- if (err) return cb(true, err);
- client.getSchema(function(err, schema) {
- if (err) return cb(true, err);
+ fetchInBatch: function(batchSize, query, onCompletion) {
+ connect(onCompletion, function(client) {
+ client.execute(query, function() {
+ client.getSchema(function(schema) {
var fetchBatch = function() {
- client.fetchN(batchSize, function(err, data){
- if (err) return cb(true, err);
+ client.fetchN(batchSize, function(data) {
if(data.length > 0) {
- cb(null, ResultSet.create(data, schema));
+ onCompletion(null, ResultSet.create(data, schema));
process.nextTick(fetchBatch);
+ } else {
+ client.closeConnection();
}
- else connection.end();
});
};
fetchBatch();
});
});
- })
+ });
},
- execute: function(query, cb){
- futureConnection(function(client, connection) {
- client.execute(query, function(err){
- if (err) return cb(true, err);
- cb(null, null);
- connection.end();
+ execute: function(query, onCompletion){
+ connect(onCompletion, function(client) {
+ client.execute(query, function(){
+ client.closeConnection();
+ onCompletion(null, null);
});
});
},
};
};
exports.for = function(config) {
- return hiveClient(futureConnection(config));
+ return hiveClient(config);
};

0 comments on commit ceb2f5c

Please sign in to comment.