Permalink
Browse files

initial

  • Loading branch information...
1 parent 6955508 commit 76067cae3001ea506f39dc28ce9ef61aebc8f65f @dominictarr committed Jun 21, 2012
Showing with 237 additions and 0 deletions.
  1. +12 −0 cli.js
  2. +163 −0 index.js
  3. +46 −0 index2.js
  4. +16 −0 package.json
  5. 0 readme.markdown
View
@@ -0,0 +1,12 @@
+#! /usr/bin/env node
+
+var argv = require('optimist')
+ .alias('h', 'host')
+ .alias('f', 'file')
+ .alias('i', 'id')
+ .argv
+
+var peer = require('./')
+
+peer(argv, console.log)
+
View
@@ -0,0 +1,163 @@
+var crdt = require('crdt')
+var fs = require('fs')
+var parse = require('url').parse
+var net = require('net')
+var es = require('event-stream')
+
+/*
+ approaching things from the other end here.
+
+ read persisted state, and connect to init server,
+ and/or known servers.
+
+*/
+
+function address (row, op) {
+ var address = {}
+ if(op) {
+ var host = (''+op).split(':')
+ return {port: +host.pop(), host: host.pop() || 'localhost'}
+ }
+ return {port: row.get('port'), host: row.get('host')}
+}
+
+function sync(doc, file, synced) {
+
+ if(!file) {
+ doc.sync = true
+ return synced()
+ } else
+ doc.once('sync', synced)
+
+ function write(hist) {
+ console.log('HIST', hist, hist != null)
+ doc.sync = true
+ doc.emit('sync')
+ doc.createReadStream({end: false, history: hist != null })
+// .pipe(es.log('TO FILE>>'))
+ .pipe(es.stringify())
+ .pipe(fs.createWriteStream(file, {flags: 'a'}))
+ .on('end', function () {
+ console.log('ON END')
+ })
+ }
+
+ fs.stat(file, function (err) {
+ console.log(err)
+ if(err) return write(true)
+ else
+ fs.createReadStream(file)
+ .on('end', write)
+ .pipe(es.split())
+ .pipe(es.parse())
+ .pipe(doc.createWriteStream())
+ })
+}
+/*
+ okay, I paused this a week, gotta think it through again.
+ each peer has an address, and may connect to any other peer if it has thier
+ address and that peer is up.
+
+ so to create a peer, need setup a server, have a connect function,
+ and assign/detect an address.
+
+
+ hmm, looks like the most straightforward way to config this is
+ to load from /etc/peers, or ~/.peersrc or ~/.config/peers/config.json
+ or ~/.peers/config.json
+
+ or search fro a relative peers.json, which could be more feasible on
+ some PaaS.
+
+ that could all be bundled into a module.
+
+ or, wherever a --config option points.
+*/
+
+module.exports =
+function (opts, ready) {
+
+ console.log('HELLO1')
+ var doc = new crdt.Doc(), server
+ if(opts.id)
+ doc.id = opts.id
+ sync(doc, opts.file, function () {
+ //create a record for this node if there is not one already.
+ var peer = doc.get(doc.id)
+ var peers = doc.createSet('type', 'peer')
+ if(!peer.get('type'))
+ peer.set({type: 'peer'})
+ function connect(con, doc) {
+ con
+ .pipe(es.split())
+ .pipe(es.parse())
+ .pipe(doc.createStream())
+ .pipe(es.stringify())
+ .pipe(con)
+ }
+
+ var a = address(peer, opts.host)
+
+ if(!opts.client)
+ server = net.createServer(function (con) {
+ console.log('remote connection')
+ connect(con, doc)
+ //copy data for 3 seconds.
+ var timeout = setTimeout(function () {
+ con.end()
+ }, 3000)
+
+ con.on('error', function (err) {
+ console.error(err)
+ clearTimeout(timeout)
+ })
+
+ }).listen(a.port, a.host, function () {
+ console.log('peer:', opts.id, 'listening on', a.host,':', a.port)
+ peer.set({online: Date.now(), host: a.host, port: a.port})
+ })
+
+ ready(null, {peers: peers, peer: peer, server: server})
+ //also, start connecting to other nodes...
+ //every second, connect to a random server & exchange data.
+ setInterval(function () {
+ var l = peers.asArray().length
+ var n = peers.asArray()[~~(Math.random()*l)]
+ //console.log('attempt connect to', n ? n.toJSON() : n)
+ var con
+ if(n && n !== peer) {
+ con = net.connect(n.get('port'), n.get('host'))
+ } else if (opts.init) {
+ var host = (''+opts.init).split(':')
+ var port = +host.pop() //should be a number
+ host = host.pop() || 'localhost'
+ con = net.connect(port, host)
+ }
+ if (con) {
+ connect(con, doc)
+ //when the stream timesout, ends', or errors.
+ //look for another connection.
+ //node will dispose of this connection,
+ //but peers will connect to another server in the next round
+ con.once('error', function (err) {
+ console.error(err.code, 'connecting to:', n.get('id'))
+ })
+ con.once('connect', function () {
+ console.log('replicating',doc.id, '->', n.get('id'))
+ })
+ con.on('end', function () {
+ console.log('disconnect',doc.id, '->', n.get('id'))
+ })
+ }
+ else if (!l)
+ console.log('no peers :(')
+ }, 1000)
+
+ })
+
+
+ //if(/([\w\d\.]+(:?\d+)/.test(opts.host)) //test if host is a correct format.
+
+}
+
+
View
@@ -0,0 +1,46 @@
+/*
+ just thinking out loud
+
+ABSTRACT CLASS
+*/
+
+var inherits = require('util').inherits
+var es = require('es')
+var crdt = require('crdt')
+
+inherits(Peer, crdt.Row)
+
+function Peer () {
+ //where do I set the address?
+ //do I pass the address in?
+ //or do I detect the address?
+ //it will probably be in a config file.
+ //and I can't detect the port,
+ //because it isn't assigned yet
+ //and it may change.
+
+ //so, it must get passed in.
+}
+//@remote {Peer} the remote peer to connect to.
+//lets say that connect should return an object stream.
+Peer.prototype.connect = function (address) {
+
+ return es.connect(es.stringify(), net.connect(address), es.split(), es.parse())
+}
+
+Peer.prototype.serve = function (onConnect, ready) {
+ var address = this.get('address')
+ var port = address.port
+ var host = address.host
+
+ return net.createServer(function (con) {
+ onConnect(
+ es.connect(
+ es.stringify()
+ , con
+ , es.split()
+ , es.parse()
+ )
+ )
+ }).listen(port, host, ready)
+}
View
@@ -0,0 +1,16 @@
+{
+ "name": "peers",
+ "version": "0.0.0",
+ "description": "peer2peer nodejs applications",
+ "homepage": "http://github.com/dominictarr/peers",
+ "repository": {
+ "type": "git",
+ "url": "https://github.com/dominictarr/peers.git"
+ },
+ "dependencies": {
+ "crdt": "~1.6.1",
+ "optimist": "~0.3.4"
+ },
+ "devDependencies": {},
+ "author": "Dominic Tarr <dominic.tarr@gmail.com> (dominictarr.com)"
+}
View
No changes.

0 comments on commit 76067ca

Please sign in to comment.