Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'release/1.1.1'

  • Loading branch information...
commit af2a18f1b94efec7684b06b89461353026d429f8 2 parents cc4c43c + 4916d32
Darcy Laycock authored
1  Guardfile
View
@@ -0,0 +1 @@
+guard 'coffeescript', :input => 'src', :output => 'lib'
41 README.md
View
@@ -58,7 +58,33 @@ You define a setup method that starts doing processing, configure a name (and op
for events and `configNamespace` for the configuration json namespace). This simple yet flexible architecture
makes it possible to build a wide variety of publishers.
-To use a publisher in your.
+To use a publisher in your app, just require the file and use:
+
+```coffeescript
+runner_instance.add YourPublisher
+```
+
+Like is seen above.
+
+### Different Modes & Broadcasters
+
+Pebble can be run in two modes (`client` (the default) and `server`, set via `pebble.mode`) as well as with two different broadcaster options
+(`pubsub` or `direct`, the default).
+
+In `direct` broadcaster mode, everything will be run in process where as in the `pubsub` mode, you typically run
+socket.io and a front end in the `client` and any publishers (e.g. twitter) in the server process, allowing 1 publisher
+setup broadcasting to many clients. One key thing of this approach is that it allows some flexibility in regards
+to dealing with apps, ultimately making it simpler to build out (e.g. not running everything in a single process).
+
+By default, any `add`-ed publishers are only run when the app is using the `direct` broadcaster or is running in `server` mode.
+This can be changed on a per-publisher based by overriding the `shouldBeRun` function on your publisher, allowing conditional logic.
+
+E.g. to run something on whatever app runs the web server, one would replace it with:
+
+```coffeescript
+class MyPublisher extends Base
+ shouldBeRun: -> @runner.shouldRunWeb
+```
### Configuration
@@ -71,6 +97,17 @@ Pebble uses a standard `config.json` file which uses nested keys according to:
An example of this can be seen in the `config.example.json` file which contains **all** options.
Please note that most options are optional.
+Likewise, if you pass a second parameter before the callback to `Pebble.run` with a value of true,
+you can pass command line arguments to your program - e.g.:
+
+```coffeescript
+Pebble.run config, true, (runner) ->
+ runner.addBuiltin 'twitter'
+```
+
+And then invoke your program with --help. This lets you set mode, broadcaster, host and port for the
+app in a content-sensitive manner.
+
Lastly, in the case of the following, they can also be overridden by an environment variable:
* `pebble.listen.host` (by `HOST`)
@@ -79,6 +116,8 @@ Lastly, in the case of the following, they can also be overridden by an environm
* `pebble.redis.port` (by `REDIS_HOST`)
* `pebble.redis.password` (by `REDIS_PASSWORD`)
* `pebble.redis.maxHistory` (by `REDIS_MAXHISTORY`)
+* `pebble.broadcaster` (by `PEBBLE_BROADCASTER`)
+* `pebble.mode` (by `PEBBLE_MODE`)
### The Public JavaScript Portion
102 lib/pebble.js
View
@@ -1,16 +1,17 @@
(function() {
- var Pebble, fs, io, path, redis, sys, web;
+ var Pebble, broadcaster, fs, path, redis, sys, web;
path = require('path');
sys = require('sys');
fs = require('fs');
- io = require('socket.io');
redis = require('./pebble/redis');
web = require('./pebble/web');
+ broadcaster = require('./pebble/broadcaster');
Pebble = (function() {
function Pebble(config) {
this.config = config;
this.publishers = [];
this.version = this.packageMetadata().version;
+ this.configure();
}
Pebble.prototype.packageMetadata = function() {
var contents, file;
@@ -34,24 +35,36 @@
}
};
Pebble.prototype.run = function() {
- var publisher, _i, _len, _ref;
+ var publisher, runningPublishers, _i, _len, _ref;
sys.puts("Starting pebble...");
this.redis = new redis(this);
this.web = new web(this);
- this.broadcast = io.listen(this.web.app);
- this.broadcast.set('log level', 0);
+ this.broadcast = new broadcaster(this);
+ this.broadcast.run();
+ runningPublishers = ["broadcaster (" + this.mode + ", " + this.broadcaster + ")"];
_ref = this.publishers;
for (_i = 0, _len = _ref.length; _i < _len; _i++) {
publisher = _ref[_i];
- publisher.run();
+ if (publisher.shouldBeRun()) {
+ publisher.run();
+ runningPublishers.push(publisher.name || publisher.namespace);
+ }
+ }
+ if (this.shouldRunWeb) {
+ this.web.run();
+ runningPublishers.push('web');
+ }
+ if (runningPublishers.length) {
+ return sys.puts("Running the following publishers: " + (runningPublishers.join(', ')));
+ } else {
+ return sys.puts('Not running any publishers.');
}
- return this.web.run();
};
Pebble.prototype.visitableURL = function() {
var port;
if (this._visitableURL == null) {
- this._visitableURL = "http://" + (this.host());
- port = this.port();
+ this._visitableURL = "http://" + this.host;
+ port = this.port;
if ((port != null) && port !== 80) {
this._visitableURL += ":" + port;
}
@@ -59,11 +72,12 @@
}
return this._visitableURL;
};
- Pebble.prototype.host = function() {
- return process.env.HOST || this.get('pebble.listen.host', 'localhost');
- };
- Pebble.prototype.port = function() {
- return process.env.PORT || this.get('pebble.listen.port', 3003);
+ Pebble.prototype.configure = function() {
+ this.host = process.env.HOST || this.get('pebble.listen.host', 'localhost');
+ this.port = process.env.PORT || this.get('pebble.listen.port', 3003);
+ this.mode = process.env.PEBBLE_MODE || this.get('pebble.mode', 'client');
+ this.broadcaster = process.env.PEBBLE_BROADCASTER || this.get('pebble.broadcaster', 'direct');
+ return this.shouldRunWeb = this.mode === 'client' || this.broadcaster === 'direct';
};
Pebble.prototype.get = function(key, defaultValue) {
var config, key_parts, part;
@@ -78,9 +92,18 @@
}
return config;
};
- Pebble.run = function(config_path, callback) {
- var config, runner;
- config = JSON.parse(fs.readFileSync(config_path));
+ Pebble.run = function(configPath, includeCLI, callback) {
+ var config;
+ config = JSON.parse(fs.readFileSync(configPath));
+ return this.runWithConfig(config, includeCLI, callback);
+ };
+ Pebble.runWithConfig = function(config, includeCLI, callback) {
+ var runner;
+ if (typeof includeCLI === 'function') {
+ callback = includeCLI;
+ } else if (includeCLI) {
+ this.parseCommandLine(config);
+ }
runner = new this(config);
if (callback instanceof Function) {
callback(runner);
@@ -88,6 +111,51 @@
runner.run();
return runner;
};
+ Pebble.parseCommandLine = function(config) {
+ var argv, pebble, _ref, _ref2;
+ if (config == null) {
+ config = {};
+ }
+ argv = require('optimist').argv;
+ pebble = (_ref = config.pebble) != null ? _ref : config.pebble = {};
+ if ((_ref2 = pebble.listen) != null) {
+ _ref2;
+ } else {
+ pebble.listen = {};
+ };
+ if (argv.host) {
+ pebble.listen.host = argv.host;
+ }
+ if (argv.port) {
+ pebble.listen.port = argv.port;
+ }
+ if (argv.direct) {
+ pebble.broadcaster = 'direct';
+ } else if (argv.pubsub) {
+ pebble.broadcaster = 'pubsub';
+ } else if (argv.broadcaster) {
+ pebble.broadcaster = argv.broadcaster;
+ }
+ if (argv.client) {
+ pebble.mode = 'client';
+ } else if (argv.server) {
+ pebble.mode = 'server';
+ } else if (argv.mode) {
+ pebble.mode = argv.mode;
+ }
+ if (argv.help) {
+ this.printOptions();
+ process.exit(1);
+ }
+ return config;
+ };
+ Pebble.printOptions = function() {
+ sys.puts("Known options:");
+ sys.puts("--server, --client, --mode [server,client] - sets the mode of the current pebble-based application.");
+ sys.puts("--direct, --pubsub, --broadcaster [direct,pubsub] - sets the broadcaster for the current pebble-based application.");
+ sys.puts("--port [port] - sets the http port for the current application.");
+ return sys.puts("--host [host] - sets the http host for the current application.");
+ };
return Pebble;
})();
Pebble.Base = require('./pebble/base');
5 lib/pebble/base.js
View
@@ -39,9 +39,12 @@
message = key;
key = this.namespace;
}
- this.runner.broadcast.sockets.emit(key, message);
+ this.runner.broadcast.broadcast(key, message);
return this.runner.redis.addHistory(key, JSON.stringify(message));
};
+ Base.prototype.shouldBeRun = function() {
+ return this.runner.mode === 'server' || this.runner.broadcaster === 'direct';
+ };
return Base;
})();
module.exports = Base;
38 lib/pebble/broadcaster.js
View
@@ -0,0 +1,38 @@
+(function() {
+ var Broadcaster, io;
+ io = require('socket.io');
+ Broadcaster = (function() {
+ function Broadcaster(runner) {
+ this.runner = runner;
+ this.broadcaster = this.runner.broadcaster;
+ this.mode = this.runner.mode;
+ }
+ Broadcaster.prototype.run = function() {
+ if (this.broadcaster === 'direct') {
+ this.broadcast = this.broadcastDirect;
+ return this.setupClient();
+ } else if (this.broadcaster === 'pubsub') {
+ this.broadcast = this.broadcastViaPub;
+ return this.setupPubSub();
+ }
+ };
+ Broadcaster.prototype.setupClient = function() {
+ this.io = io.listen(this.runner.web.app);
+ return this.io.set('log level', 0);
+ };
+ Broadcaster.prototype.setupPubSub = function() {
+ if (this.mode === 'client') {
+ this.setupClient();
+ return this.runner.redis.subscribe();
+ }
+ };
+ Broadcaster.prototype.broadcastDirect = function(channel, data) {
+ return this.io.sockets.emit(channel, data);
+ };
+ Broadcaster.prototype.broadcastViaPub = function(channel, data) {
+ return this.runner.redis.publish(channel, data);
+ };
+ return Broadcaster;
+ })();
+ module.exports = Broadcaster;
+}).call(this);
11 lib/pebble/broadcasters.js
View
@@ -0,0 +1,11 @@
+(function() {
+ var Broadcaster;
+ Broadcaster = (function() {
+ function Broadcaster(runner, mode) {
+ this.runner = runner;
+ this.mode = mode;
+ }
+ return Broadcaster;
+ })();
+ module.exports = Broadcaster;
+}).call(this);
37 lib/pebble/redis.js
View
@@ -11,13 +11,17 @@
this.password = process.env.REDIS_PASSWORD || this.runner.get('redis.password');
this.namespace = this.runner.get('redis.namespace', 'juggernaut');
this.maxHistory = this.runner.get('redis.maxHistory', 100);
- sys.puts("Connecting to Redis at " + this.host + ":" + this.port);
- this.redis = redis.createClient(this.port, this.host);
+ this.pubsubChannel = "" + this.namespace + ":" + (this.runner.get('redis.pubsub_channel', 'pubsub'));
+ this.redis = this.createConnection();
+ }
+ RedisWrapper.prototype.createConnection = function() {
+ var r;
+ r = redis.createClient(this.port, this.host);
if (this.password != null) {
- sys.puts('Authing with redis password.');
- this.redis.auth(this.password);
+ r.auth(this.password);
}
- }
+ return r;
+ };
RedisWrapper.prototype.publish = function(key, message) {
if (message != null) {
key = "" + this.namespace + ":" + key;
@@ -58,9 +62,8 @@
var key;
key = this.historyKeyFor(channel);
return this.redis.lrange(key, 0, this.maxHistory - 1, __bind(function(err, result) {
- sys.puts("Getting history for " + key + " - " + (this.maxHistory - 1));
if (err) {
- sys.puts("Hadd error: " + err);
+ sys.puts("Had error getting history for " + key + ": " + err);
return callback(err);
} else if (!result) {
return callback(true);
@@ -73,6 +76,26 @@
sys.puts("Error: " + (sys.inspect(err)));
return sys.puts("Result: " + (sys.inspect(result)));
};
+ RedisWrapper.prototype.publish = function(channel, message) {
+ var encoded;
+ encoded = JSON.stringify({
+ channel: channel,
+ message: message
+ });
+ return this.redis.publish(this.pubsubChannel, encoded);
+ };
+ RedisWrapper.prototype.subscribe = function() {
+ var broadcaster;
+ broadcaster = this.runner.broadcast;
+ this.subredis = this.createConnection();
+ sys.puts("Subscribing to " + this.pubsubChannel);
+ this.subredis.on("message", function(channel, message) {
+ var parsed;
+ parsed = JSON.parse(message);
+ return broadcaster.broadcastDirect(parsed.channel, parsed.message);
+ });
+ return this.subredis.subscribe(this.pubsubChannel);
+ };
return RedisWrapper;
})();
module.exports = RedisWrapper;
3  lib/pebble/web.js
View
@@ -72,8 +72,9 @@
}
};
Web.prototype.run = function() {
+ sys.puts("Starting and running on port " + this.runner.port);
this.configure();
- return this.app.listen(this.runner.port());
+ return this.app.listen(this.runner.port);
};
return Web;
})();
25 package.json
View
@@ -1,24 +1,29 @@
{
"name": "pebble",
- "version": "1.0.1",
+ "version": "1.1.1",
"description": "pebble is a series of tools for building real time event notifiers / streams.",
"keys": ["realtime", "events", "stream"],
"author": "Darcy Laycock <sutto@sutto.net> (http://blog.ninjahideout.com/)",
"main": "./lib/pebble",
+ "repository": {
+ "type": "git",
+ "url": "https://github.com/Sutto/Pebble"
+ },
"directories": {
"lib": "./lib"
},
"dependencies": {
- "express": "~> 2.4.3",
- "irc": "~> 0.2.0",
- "ntwitter": "~> 0.2.1",
- "socket.io": "~> 0.7.7",
- "hiredis": "~> 0.1.12",
- "redis": "~> 0.6.0",
- "node-static": "~> 0.5.7",
- "ejs": "~> 0.4.3"
+ "express": "~2.4.3",
+ "irc": "~0.2.0",
+ "ntwitter": "~0.2.1",
+ "socket.io": "~0.7.7",
+ "hiredis": "~0.1.12",
+ "redis": "~0.6.0",
+ "node-static": "~0.5.7",
+ "ejs": "~0.4.3",
+ "optimist": "~0.2.6"
},
"devDependencies": {
- "coffee-script": "~> 1.1.1"
+ "coffee-script": "~1.1.1"
}
}
100 src/pebble.coffee
View
@@ -1,15 +1,16 @@
-path = require 'path'
-sys = require 'sys'
-fs = require 'fs'
-io = require 'socket.io'
-redis = require './pebble/redis'
-web = require './pebble/web'
+path = require 'path'
+sys = require 'sys'
+fs = require 'fs'
+redis = require './pebble/redis'
+web = require './pebble/web'
+broadcaster = require './pebble/broadcaster'
class Pebble
constructor: (@config) ->
@publishers = []
@version = @packageMetadata().version
+ @configure()
packageMetadata: ->
unless @_packageMetadata
@@ -21,7 +22,7 @@ class Pebble
# Adds a publisher to the current runner.
add: (publisher) ->
@publishers.push new publisher(@)
-
+
addBuiltin: (name) ->
try
builtin = require "./pebble/#{name}"
@@ -31,25 +32,40 @@ class Pebble
run: ->
sys.puts "Starting pebble..."
- @redis = new redis @
- @web = new web @
- @broadcast = io.listen @web.app
- @broadcast.set 'log level', 0
+ @redis = new redis @
+ @web = new web @
+ @broadcast = new broadcaster @
+ @broadcast.run()
+ runningPublishers = ["broadcaster (#{@mode}, #{@broadcaster})"]
for publisher in @publishers
- publisher.run()
- @web.run()
+ if publisher.shouldBeRun()
+ publisher.run()
+ runningPublishers.push(publisher.name or publisher.namespace)
+ if @shouldRunWeb
+ @web.run()
+ runningPublishers.push 'web'
+
+ if runningPublishers.length
+ sys.puts "Running the following publishers: #{runningPublishers.join ', '}"
+ else
+ sys.puts 'Not running any publishers.'
visitableURL: ->
unless @_visitableURL?
- @_visitableURL = "http://#{@host()}"
- port = @port()
+ @_visitableURL = "http://#{@host}"
+ port = @port
if port? and port isnt 80
@_visitableURL += ":#{port}"
@_visitableURL += "/"
@_visitableURL
-
- host: -> process.env.HOST or @get 'pebble.listen.host', 'localhost'
- port: -> process.env.PORT or @get 'pebble.listen.port', 3003
+
+
+ configure: ->
+ @host = process.env.HOST or @get 'pebble.listen.host', 'localhost'
+ @port = process.env.PORT or @get 'pebble.listen.port', 3003
+ @mode = process.env.PEBBLE_MODE or @get 'pebble.mode', 'client'
+ @broadcaster = process.env.PEBBLE_BROADCASTER or @get 'pebble.broadcaster', 'direct'
+ @shouldRunWeb = @mode is 'client' or @broadcaster is 'direct'
get: (key, defaultValue) ->
key_parts = key.split "."
@@ -61,12 +77,54 @@ class Pebble
config
- @run: (config_path, callback) ->
- config = JSON.parse fs.readFileSync(config_path)
- runner = new @(config)
+
+ @run: (configPath, includeCLI, callback) ->
+ config = JSON.parse fs.readFileSync configPath
+ @runWithConfig config, includeCLI, callback
+
+
+ @runWithConfig: (config, includeCLI, callback) ->
+ if typeof includeCLI is 'function'
+ callback = includeCLI
+ else if includeCLI
+ @parseCommandLine config
+ runner = new @ config
callback runner if callback instanceof Function
runner.run()
runner
+
+ @parseCommandLine: (config = {}) ->
+ argv = require('optimist').argv
+ pebble = config.pebble ?= {}
+ pebble.listen ?= {}
+ # First, accept --host and --port
+ pebble.listen.host = argv.host if argv.host
+ pebble.listen.port = argv.port if argv.port
+ # Next, accept the broadcaster options
+ if argv.direct
+ pebble.broadcaster = 'direct'
+ else if argv.pubsub
+ pebble.broadcaster = 'pubsub'
+ else if argv.broadcaster
+ pebble.broadcaster = argv.broadcaster
+ # And finally also accept mode options
+ if argv.client
+ pebble.mode = 'client'
+ else if argv.server
+ pebble.mode = 'server'
+ else if argv.mode
+ pebble.mode = argv.mode
+ if argv.help
+ @printOptions()
+ process.exit 1
+ config
+
+ @printOptions: ->
+ sys.puts "Known options:"
+ sys.puts "--server, --client, --mode [server,client] - sets the mode of the current pebble-based application."
+ sys.puts "--direct, --pubsub, --broadcaster [direct,pubsub] - sets the broadcaster for the current pebble-based application."
+ sys.puts "--port [port] - sets the http port for the current application."
+ sys.puts "--host [host] - sets the http host for the current application."
Pebble.Base = require './pebble/base'
module.exports = Pebble
9 src/pebble/base.coffee
View
@@ -1,4 +1,5 @@
sys = require 'sys'
+uuid = require 'node-uuid'
class Base
@@ -26,7 +27,11 @@ class Base
else
message = key
key = @namespace
- @runner.broadcast.sockets.emit key, message
- @runner.redis.addHistory key, JSON.stringify(message)
+ # Generate a unique uuid for each outgoing message.
+ message._id = uuid()
+ @runner.broadcast.broadcast key, message
+ @runner.redis.addHistory key, JSON.stringify(message)
+
+ shouldBeRun: -> @runner.mode is 'server' or @runner.broadcaster is 'direct'
module.exports = Base
43 src/pebble/broadcaster.coffee
View
@@ -0,0 +1,43 @@
+io = require 'socket.io'
+
+# Broadcasters are two different methods of broadcasting messages.
+# Namely:
+# A) Redis pub/sub based (similar to Juggernaut 2)
+# B) In process - Using a single parent process.
+
+class Broadcaster
+
+ # Config options include direct and pubsub.
+ # Note that mode only matters when running as pub sub.
+ constructor: (@runner) ->
+ @broadcaster = @runner.broadcaster
+ @mode = @runner.mode
+
+ run: ->
+ if @broadcaster is 'direct'
+ @broadcast = @broadcastDirect
+ @setupClient()
+ else if @broadcaster is 'pubsub'
+ @broadcast = @broadcastViaPub
+ @setupPubSub()
+
+ setupClient: ->
+ @io = io.listen @runner.web.app
+ @io.set 'log level', 0
+ @io.configure 'production', =>
+ @io.enable 'browser client minification'
+ @io.enable 'browser client etag'
+
+
+ setupPubSub: ->
+ # When it's indirect, we need to do some type conditional setup.
+ if @mode is 'client'
+ @setupClient()
+ @runner.redis.subscribe()
+
+ broadcastDirect: (channel, data) -> @io.sockets.emit channel, data
+
+ broadcastViaPub: (channel, data) -> @runner.redis.publish channel, data
+
+module.exports = Broadcaster
+
45 src/pebble/redis.coffee
View
@@ -4,16 +4,19 @@ sys = require 'sys'
class RedisWrapper
constructor: (@runner) ->
- @host = process.env.REDIS_HOST || @runner.get 'redis.host', 'localhost'
- @port = process.env.REDIS_PORT || @runner.get 'redis.port', 6379
- @password = process.env.REDIS_PASSWORD || @runner.get 'redis.password'
- @namespace = @runner.get 'redis.namespace', 'juggernaut'
- @maxHistory = @runner.get 'redis.maxHistory', 100
- sys.puts "Connecting to Redis at #{@host}:#{@port}"
- @redis = redis.createClient @port, @host
+ @host = process.env.REDIS_HOST || @runner.get 'redis.host', 'localhost'
+ @port = process.env.REDIS_PORT || @runner.get 'redis.port', 6379
+ @password = process.env.REDIS_PASSWORD || @runner.get 'redis.password'
+ @namespace = @runner.get 'redis.namespace', 'juggernaut'
+ @maxHistory = @runner.get 'redis.maxHistory', 100
+ @pubsubChannel = "#{@namespace}:#{@runner.get 'redis.pubsub_channel', 'pubsub'}"
+ @redis = @createConnection()
+
+ createConnection: ->
+ r = redis.createClient @port, @host
if @password?
- sys.puts 'Authing with redis password.'
- @redis.auth @password
+ r.auth @password
+ r
publish: (key, message) ->
if message?
@@ -48,9 +51,8 @@ class RedisWrapper
getHistory: (channel, callback) ->
key = @historyKeyFor channel
@redis.lrange key, 0, @maxHistory - 1, (err, result) =>
- sys.puts "Getting history for #{key} - #{@maxHistory - 1}"
if err
- sys.puts "Hadd error: #{err}"
+ sys.puts "Had error getting history for #{key}: #{err}"
callback err
else if not result
callback true
@@ -61,5 +63,26 @@ class RedisWrapper
sys.puts "Error: #{sys.inspect err}"
sys.puts "Result: #{sys.inspect result}"
+ publish: (channel, message) ->
+ encoded = JSON.stringify
+ channel: channel
+ message: message
+ # Publish the given method to the pubsub channel.
+ @redis.publish @pubsubChannel, encoded
+
+ subscribe: ->
+ # Get a reference to the broadcaster so we don't have to look it up on every message.
+ broadcaster = @runner.broadcast
+ # create a new redis instance just for subscribing
+ @subredis = @createConnection()
+ sys.puts "Subscribing to #{@pubsubChannel}"
+ @subredis.on "message", (channel, message) ->
+ # We use JSON because there are issues with using the channel name on it's own
+ # initially. It still needs some extra work to clean it up.
+ parsed = JSON.parse message
+ broadcaster.broadcastDirect parsed.channel, parsed.message
+ @subredis.subscribe @pubsubChannel
+
+
module.exports = RedisWrapper
3  src/pebble/web.coffee
View
@@ -52,8 +52,9 @@ class Web extends EventEmitter
run: ->
+ sys.puts "Starting and running on port #{@runner.port}"
@configure()
- @app.listen @runner.port()
+ @app.listen @runner.port
module.exports = Web
Please sign in to comment.
Something went wrong with that request. Please try again.