Skip to content

Commit

Permalink
Workers sample, WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ajlopez committed Jan 26, 2013
1 parent f1ac64c commit aee0260
Show file tree
Hide file tree
Showing 4 changed files with 310 additions and 0 deletions.
31 changes: 31 additions & 0 deletions samples/Workers/README.md
@@ -0,0 +1,31 @@
# Distributed Web Crawler Sample using connected Workers

A distributed web crawler sample. A central server has queues with links to process. The topology can be
manually launched in many nodes. Each node is a worker that can send and receive messages from the other ones.

## Install
Execute
```
npm install
```
to install the dependencies.

## Usage
Launch the server
```
node server.js <url> [<url>... ]
```
Example
```
node server.js http://ajlopez.wordpress.com
```
Launch one or more clients
```
node stormnode.js port [host:port ...]
```
The node starts to listen other nodes at `port`. The optional additional addresses refers to already running
nodes.




9 changes: 9 additions & 0 deletions samples/Workers/package.json
@@ -0,0 +1,9 @@
{
"name": "distributed-webcrawler-example",
"version": "0.0.1",
"private": true,
"dependencies": {
"simplequeue": "0.0.2",
"simplemessages": "0.0.3"
}
}
85 changes: 85 additions & 0 deletions samples/Workers/server.js
@@ -0,0 +1,85 @@

var ss = require('../../'),
sq = require('simplequeue'),
http = require('http'),
url = require('url');

var hostnames = {};

function registerHostName(hostname)
{
if (!hostnames[hostname])
{
console.log('Host: ' + hostname);
hostnames[hostname] = true;
}
}

function isHostName(hostname)
{
return hostnames[hostname];
}

function Resolver(inputqueue, outputqueue) {
this.visited = {};
var self = this;

this.process = function(link) {
console.log('Resolving ' + link);
var urldata = url.parse(link);

if (!isHostName(urldata.hostname))
return;

if (this.visited[link])
return;

this.visited[link] = true;
outputqueue.putMessage(link);
}

this.start = function() {
process.nextTick(function() {
function processMessage(err, msg) {
if (err)
console.log(err);
else
self.process(msg);
inputqueue.getMessage(processMessage);
}

inputqueue.getMessage(processMessage);
});
}

this.registerHost = function(link)
{
var urldata = url.parse(link);
registerHostName(urldata.hostname);
}
}

// Queues

var queueserver = sq.createQueueServer();
var resolverqueue = queueserver.createQueue('qresolver');
var linksqueue = queueserver.createQueue('qlinks');

var server = sq.createRemoteServer(queueserver);

server.listen(3000);

// Objects

var resolver = new Resolver(resolverqueue, linksqueue);
resolver.start();

// Process arguments

process.argv.forEach(function(arg) {
if (arg.indexOf("http:")==0) {
resolver.registerHost(arg);
linksqueue.putMessage(arg);
}
});

185 changes: 185 additions & 0 deletions samples/Workers/stormapp.js
@@ -0,0 +1,185 @@

var ss = require('../../'),
sq = require('simplequeue'),
http = require('http'),
url = require('url'),
sm = require('simplemessages');

var hostnames = {};

function registerHostName(hostname)
{
if (!hostnames[hostname])
{
console.log('Host: ' + hostname);
hostnames[hostname] = true;
}
}

function isHostName(hostname)
{
return hostnames[hostname];
}

function Spout(queue) {
var self = this;

this.start = function(context) {
console.log('spout started');

function processMessage(err, msg) {
if (err)
console.log(err);
else {
console.log('Received ' + msg);
context.emit(msg);
}

queue.getMessage(processMessage);
}

queue.getMessage(processMessage);
}
}

function Resolver(queue) {
this.visited = {};

this.process = function(link, context) {
queue.putMessage(link);
}
}

function Downloader() {
this.process = function(link, context) {
var urldata = url.parse(link);

registerHostName(urldata.hostname);

options = {
host: urldata.hostname,
port: urldata.port,
path: urldata.path,
method: 'GET'
};

http.get(options, function(res) {
console.log('Url: ' + link);
res.setEncoding('utf8');
res.on('data', function(data) {
context.emit(data);
});
}).on('error', function(e) {
console.log('Url: ' + link);
console.log('Error: ' + e.message);
});
}
}

var match1 = /href=\s*"([^&"]*)"/i;
var match2= /href=\s*'([^&']*)'/i;

function Harvester() {
this.process = function(data, context) {
var links = match1.exec(data);

if (links)
links.forEach(function(link) {
if (link.indexOf('http:') == 0)
context.emit(link);
});

links = match2.exec(data);

if (links)
links.forEach(function(link) {
if (link.indexOf('http:') == 0)
context.emit(link);
});
}
}

// Queues

var qclient = sq.createRemoteClient();

var port = parseInt(process.argv[2]);

var nodes = [];

for (var k = 3; k < process.argv.length; k++) {
var node = makeNode(process.argv[k]);
console.dir(node);
nodes.push(node);
}

function makeNode(arg) {
var position = arg.indexOf(':');

if (position < 0)
return { host: 'localhost', port: parseInt(arg) };

var host = arg.substring(0, position);
var port = parseInt(arg.substring(position + 1));

return { host: host, port: port };
}

qclient.on('remote', function(remote) {
console.log('connected');

remote.getQueue('qlinks', function(err, linkqueue) {
if (err) {
console.log(err);
process.exit(1);
}

console.log('links queue');

remote.getQueue('qresolver', function(err, resolverqueue) {
if (err) {
console.log(err);
process.exit(1);
}

console.log('resolver queue');

// Objects

var spout = new Spout(linkqueue);
var downloader = new Downloader();
var harvester = new Harvester();
var resolver = new Resolver(resolverqueue);

// Setting Builder

var builder = ss.createTopologyBuilder();

builder.setSpout("spout", spout);
builder.setBolt("downloader", downloader).shuffleGrouping("spout");
builder.setBolt("harvester", harvester).shuffleGrouping("downloader");
builder.setBolt("resolver", resolver).shuffleGrouping("harvester");

var topology = builder.createTopology();

topology.start();

var server = sm.createServer(function (channel) {
topology.registerWorker(channel, channel);
});

server.listen(port);

nodes.forEach(function (node) {
var client = sm.createClient(node.port, node.host, function() {
topology.registerWorker(client, client);
});
});
});
});
});

qclient.connect(3000);



0 comments on commit aee0260

Please sign in to comment.