Skip to content
Browse files

initial

  • Loading branch information...
0 parents commit b100cce803e049c626b6e401eb8320654ca29ad5 @indutny committed Sep 6, 2012
Showing with 1,221 additions and 0 deletions.
  1. +3 −0 .gitignore
  2. +1 −0 README.md
  3. +20 −0 binding.gyp
  4. +17 −0 example/server.js
  5. +13 −0 keys/server.crt
  6. +15 −0 keys/server.key
  7. +11 −0 lib/tlsnappy.js
  8. +100 −0 lib/tlsnappy/api.js
  9. +14 −0 package.json
  10. +106 −0 src/ngx-queue.h
  11. +117 −0 src/ring.h
  12. +649 −0 src/tlsnappy.cc
  13. +116 −0 src/tlsnappy.h
  14. +39 −0 test/server-test.js
3 .gitignore
@@ -0,0 +1,3 @@
+build/
+node_modules/
+npm-debug.log
1 README.md
@@ -0,0 +1 @@
+# TLSnappy
20 binding.gyp
@@ -0,0 +1,20 @@
+{
+ "variables": {
+ "node_shared_openssl%": "true"
+ },
+ "targets": [
+ {
+ "target_name": "tlsnappy",
+ "sources": [
+ "src/tlsnappy.cc"
+ ],
+ "conditions": [
+ ["node_shared_openssl=='false'", {
+ "include_dirs": [
+ "<(node_root_dir)/deps/openssl/openssl/include"
+ ]
+ }]
+ ]
+ }
+ ]
+}
17 example/server.js
@@ -0,0 +1,17 @@
+var fs = require('fs'),
+ tlsnappy = require('..');
+
+var options = {
+ key: fs.readFileSync(__dirname + '/../keys/server.key'),
+ cert: fs.readFileSync(__dirname + '/../keys/server.crt')
+};
+
+tlsnappy.createServer(options, function(c) {
+ c.on('data', function(data) {
+ console.log(data.toString());
+ });
+ c.write('HTTP/1.1 200 Ok\r\n\r\nhello');
+ c.end();
+}).listen(44300, function() {
+ console.log('listening');
+});
13 keys/server.crt
@@ -0,0 +1,13 @@
+-----BEGIN CERTIFICATE-----
+MIICATCCAWoCCQCR+ZtDTiYbcTANBgkqhkiG9w0BAQUFADBFMQswCQYDVQQGEwJB
+VTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0
+cyBQdHkgTHRkMB4XDTEyMDkwNTEzMDM1MFoXDTEzMDkwNTEzMDM1MFowRTELMAkG
+A1UEBhMCQVUxEzARBgNVBAgTClNvbWUtU3RhdGUxITAfBgNVBAoTGEludGVybmV0
+IFdpZGdpdHMgUHR5IEx0ZDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAzBoj
+JrqRXuN7q1et2nXn/8lWR7xhGIukPCRR0mIHJHFFAot8rsmHKiKoRL2EKtOT2Kb2
+d14wTFn9lUSgsawpMlqRiKICNZysRiJ/2218LtAmV/vyMFsUcee0TJxkJGET6v1P
+ObWlZGACT1UNCFo0MmjYLuQKD/jjXoeDjRyamkUCAwEAATANBgkqhkiG9w0BAQUF
+AAOBgQBoTncNvLOTj/X0m+f9bpzpvmoxbfWI10rrv2nJhgCKAyLMjJ8MjO2qY5ds
+PDBL36KwunuApYtt2tl2y4s1wEMN7Fza1Urg6h4SwDicjSBcLX9qTkvz1AR4D37q
+IivHDL04Ykgn/FvJXIQtXtDCtY1GV6Clu8uM1q0J4EzXDakXPw==
+-----END CERTIFICATE-----
15 keys/server.key
@@ -0,0 +1,15 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIICXgIBAAKBgQDMGiMmupFe43urV63adef/yVZHvGEYi6Q8JFHSYgckcUUCi3yu
+yYcqIqhEvYQq05PYpvZ3XjBMWf2VRKCxrCkyWpGIogI1nKxGIn/bbXwu0CZX+/Iw
+WxRx57RMnGQkYRPq/U85taVkYAJPVQ0IWjQyaNgu5AoP+ONeh4ONHJqaRQIDAQAB
+AoGAWDDbC3sGIXguEcVbC9BypKW8sFaHpfGAz7Pp3vwYlfiTlxvqjiPqUZyUFM/G
+8jJl6fGLAd0jBXrFBeCXhu4bBi9hV9bYAbZ+R1uv2Au/fZs76fMpywqvOz6cuje2
+ke9HWjxYlMAGToLwzALZrWZQThQp2El0sVD/edb49UDsKOkCQQDpuW26usLmDeAV
++lqLteYJrCvWzvKi23q4zLTwOT1q1TQlgqseELHjgSk5xSkqVhfP64fXsKuDXdYi
+Y5F9m7aLAkEA34326/s7+sOE27GRVrj7o0wTKpUJf8qMoxJBMKvoqa5YyqWxPyiE
+eoWbKX6nMTCsZKw/BSTzzZV9fQ5e3YXcbwJBAIXg9X3UpPdqU0XDlkIY+5/mw2XU
+PSAUYIOinrJL12ZuoeQqqSIbpgoPXjH9QctCbXloDPP7+wLQqWwhZKFypGkCQQDW
+s+O4FFwwpGMPFGT/oz2aQdAOj4DGCOQ1Ia0Wb6ROgrGoPqYSgHlrqrGwkoFEjvii
+34LPEARPKAmTd5/IpW1lAkEAwiyIlhcG9lIQsGXidyAVOmI0UO82MMmtrDCxWYRP
+PiJYZg+ahIoGHdoz/1OBLyWFS0N0DfAWYWJRulMwhPB2Bg==
+-----END RSA PRIVATE KEY-----
11 lib/tlsnappy.js
@@ -0,0 +1,11 @@
+var tlsnappy = exports;
+
+// Binding
+
+tlsnappy.binding = require('bindings')('tlsnappy');
+
+// API
+
+tlsnappy.Socket = require('./tlsnappy/api').Socket;
+tlsnappy.Server = require('./tlsnappy/api').Server;
+tlsnappy.createServer = require('./tlsnappy/api').createServer;
100 lib/tlsnappy/api.js
@@ -0,0 +1,100 @@
+var tlsnappy = require('../tlsnappy'),
+ util = require('util'),
+ net = require('net'),
+ Stream = require('stream').Stream;
+
+function Server(options, listener) {
+ net.Server.call(this, this.onconnection.bind(this));
+
+ this.context = new tlsnappy.binding.Context();
+ if (options.key) this.context.setKey(options.key);
+ if (options.cert) this.context.setCert(options.cert);
+
+ if (listener) this.on('secureConnection', listener);
+};
+util.inherits(Server, net.Server);
+exports.Server = Server;
+
+exports.createServer = function createServer(options, listener) {
+ return new Server(options, listener);
+};
+
+Server.prototype.onconnection = function onconnection(c) {
+ var self = this,
+ snappy = new Socket(c, this.context);
+
+ self.emit('secureConnection', snappy);
+};
+
+function Socket(c, context) {
+ var self = this;
+
+ Stream.call(this);
+
+ this.binding = new tlsnappy.binding.Socket(context);
+ this.binding.onedata = this.onedata.bind(this);
+ this.binding.oncdata = this.oncdata.bind(this);
+ this.binding.onhandshake = this.onhandshake.bind(this);
+ this.binding.onclose = this.onclose.bind(this);
+
+ this.c = c;
+ this.c.on('data', function(data) {
+ if (self.binding.encIn(data) === false) c.pause();
+ });
+ this.c.on('end', function() {
+ self.end();
+ });
+ this.c.on('error', function(err) {
+ });
+
+ this.writable = true;
+ this.readable = true;
+ this.closed = false;
+ this.paused = false;
+};
+util.inherits(Socket, Stream);
+exports.Socket = Socket;
+
+Socket.prototype.pause = function pause() {
+ if (this.paused) return;
+ this.paused = true;
+};
+
+Socket.prototype.resume = function resume() {
+ if (!this.paused) return;
+ this.paused = false;
+};
+
+Socket.prototype.write = function write(data, enc) {
+ // Data from user to binding
+ if (!Buffer.isBuffer(data)) {
+ data = new Buffer(data, enc);
+ }
+ return this.binding.clearIn(data);
+};
+
+Socket.prototype.onedata = function onedata(data) {
+ // Data from binding to socket
+ return this.c.write(data);
+};
+
+Socket.prototype.oncdata = function oncdata(data) {
+ // Data from binding to user
+ this.emit('data', data);
+};
+
+Socket.prototype.onhandshake = function onhandshake() {
+ this.emit('handshake');
+};
+
+Socket.prototype.onclose = function onclose() {
+ this.c.destroy();
+ this.emit('end');
+};
+
+Socket.prototype.end = function end() {
+ if (this.closed) return;
+ this.closed = true;
+
+ this.binding.close();
+};
14 package.json
@@ -0,0 +1,14 @@
+{
+ "name": "tlsnappy",
+ "version": "0.0.0",
+ "main": "lib/tlsnappy",
+ "dependencies": {
+ "bindings": "~1.0.0"
+ },
+ "devDependencies": {
+ "mocha": "~1.4.2"
+ },
+ "scripts": {
+ "test": "mocha --timeout 10000 --reporter spec --growl test/*-test.js"
+ }
+}
106 src/ngx-queue.h
@@ -0,0 +1,106 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ */
+
+
+#ifndef NGX_QUEUE_H_INCLUDED_
+#define NGX_QUEUE_H_INCLUDED_
+
+
+typedef struct ngx_queue_s ngx_queue_t;
+
+struct ngx_queue_s {
+ ngx_queue_t *prev;
+ ngx_queue_t *next;
+};
+
+
+#define ngx_queue_init(q) \
+ (q)->prev = q; \
+ (q)->next = q
+
+
+#define ngx_queue_empty(h) \
+ (h == (h)->prev)
+
+
+#define ngx_queue_insert_head(h, x) \
+ (x)->next = (h)->next; \
+ (x)->next->prev = x; \
+ (x)->prev = h; \
+ (h)->next = x
+
+
+#define ngx_queue_insert_after ngx_queue_insert_head
+
+
+#define ngx_queue_insert_tail(h, x) \
+ (x)->prev = (h)->prev; \
+ (x)->prev->next = x; \
+ (x)->next = h; \
+ (h)->prev = x
+
+
+#define ngx_queue_head(h) \
+ (h)->next
+
+
+#define ngx_queue_last(h) \
+ (h)->prev
+
+
+#define ngx_queue_sentinel(h) \
+ (h)
+
+
+#define ngx_queue_next(q) \
+ (q)->next
+
+
+#define ngx_queue_prev(q) \
+ (q)->prev
+
+
+#if (NGX_DEBUG)
+
+#define ngx_queue_remove(x) \
+ (x)->next->prev = (x)->prev; \
+ (x)->prev->next = (x)->next; \
+ (x)->prev = NULL; \
+ (x)->next = NULL
+
+#else
+
+#define ngx_queue_remove(x) \
+ (x)->next->prev = (x)->prev; \
+ (x)->prev->next = (x)->next
+
+#endif
+
+
+#define ngx_queue_split(h, q, n) \
+ (n)->prev = (h)->prev; \
+ (n)->prev->next = n; \
+ (n)->next = q; \
+ (h)->prev = (q)->prev; \
+ (h)->prev->next = h; \
+ (q)->prev = n;
+
+
+#define ngx_queue_add(h, n) \
+ (h)->prev->next = (n)->next; \
+ (n)->next->prev = (h)->prev; \
+ (h)->prev = (n)->prev; \
+ (h)->prev->next = h;
+
+
+#define ngx_queue_data(q, type, link) \
+ (type *) ((unsigned char *) q - offsetof(type, link))
+
+
+#define ngx_queue_foreach(q, h) \
+ for ((q) = ngx_queue_head(h); (q) != (h); (q) = ngx_queue_next(q))
+
+
+#endif /* NGX_QUEUE_H_INCLUDED_ */
117 src/ring.h
@@ -0,0 +1,117 @@
+#ifndef _SRC_RING_H_
+#define _SRC_RING_H_
+
+#include <assert.h>
+#include <string.h>
+
+class Ring{
+ public:
+ struct Buffer {
+ char data[32 * 1024];
+ int offset;
+ Buffer* next;
+ };
+
+ Ring() : total_(0) {
+ head_ = new Buffer();
+ head_->offset = 0;
+ head_->next = head_;
+
+ tail_ = head_;
+ }
+
+ ~Ring() {
+ while (head_ != tail_) {
+ Buffer* next = head_->next;
+ delete head_;
+ head_ = next;
+ }
+ delete head_;
+ }
+
+ inline void Write(char* data, int size) {
+ total_ += size;
+ while (size > 0) {
+ int left = sizeof(tail_->data) - tail_->offset;
+ int bytes = left > size ? size : left;
+
+ memcpy(tail_->data + tail_->offset, data, bytes);
+ tail_->offset += bytes;
+
+ data += bytes;
+ size -= bytes;
+
+ // tail_ is full now - create new buffer or use next one
+ if (left == bytes) {
+ if (tail_->next != head_) {
+ tail_ = tail_->next;
+ } else {
+ Buffer* b = new Buffer();
+ b->offset = 0;
+ b->next = head_;
+
+ // And replace tail_ with it
+ tail_->next = b;
+ tail_ = b;
+ }
+ }
+ }
+ }
+
+ inline int Size() {
+ return total_;
+ }
+
+ inline int Read(char* data, int size) {
+ int left = size;
+
+ while (left > 0 && Size() > 0) {
+ int bytes = head_->offset > left ? left : head_->offset;
+
+ if (data != NULL) memcpy(data, head_->data, bytes);
+ if (bytes < head_->offset) {
+ memmove(head_->data, head_->data + head_->offset - bytes, bytes);
+ }
+
+ head_->offset -= bytes;
+
+ data += bytes;
+ total_ -= bytes;
+ left -= bytes;
+
+ if (head_->offset == 0) {
+ head_ = head_->next;
+ }
+ }
+
+ return size - left;
+ }
+
+ inline int Peek(char* data, int size) {
+ int left = size;
+ Buffer* current = head_;
+
+ while (left > 0) {
+ int bytes = current->offset > left ? left : current->offset;
+
+ memcpy(data, current->data, bytes);
+
+ data += bytes;
+ left -= bytes;
+
+ current = current->next;
+
+ // Do not loop
+ if (current == head_) break;
+ }
+
+ return size - left;
+ }
+
+ private:
+ Buffer* head_;
+ Buffer* tail_;
+ int total_;
+};
+
+#endif // _SRC_RING_H_
649 src/tlsnappy.cc
@@ -0,0 +1,649 @@
+#include "tlsnappy.h"
+
+#include "openssl/ssl.h"
+#include "openssl/err.h"
+#include "node.h"
+#include "node_buffer.h"
+#include "node_object_wrap.h"
+#include "ngx-queue.h"
+#include "ring.h"
+
+#ifndef offset_of
+// g++ in strict mode complains loudly about the system offsetof() macro
+// because it uses NULL as the base address.
+# define offset_of(type, member) \
+ ((intptr_t) ((char *) &(((type *) 8)->member) - 8))
+#endif
+
+#ifndef container_of
+# define container_of(ptr, type, member) \
+ ((type *) ((char *) (ptr) - offset_of(type, member)))
+#endif
+
+namespace tlsnappy {
+
+using namespace v8;
+using namespace node;
+
+static Persistent<String> onedata_sym;
+static Persistent<String> oncdata_sym;
+static Persistent<String> onhandshake_sym;
+static Persistent<String> onclose_sym;
+
+Handle<Value> Context::New(const Arguments& args) {
+ HandleScope scope;
+
+ // XXX Multi-thread pool doesn't work atm
+ Context* ctx = new Context(1);
+ ctx->Wrap(args.Holder());
+
+ return scope.Close(args.This());
+}
+
+
+Context::Context(int worker_count) : status_(kRunning),
+ worker_count_(worker_count) {
+ ctx_ = SSL_CTX_new(SSLv23_method());
+ assert(ctx_ != NULL);
+
+ // Mitigate BEAST attacks
+ SSL_CTX_set_options(ctx_, SSL_OP_CIPHER_SERVER_PREFERENCE);
+
+ if (uv_sem_init(&event_, 0)) abort();
+ if (uv_mutex_init(&queue_mtx_)) abort();
+ ngx_queue_init(&queue_);
+
+ assert(worker_count_ <= kMaxWorkers);
+ for (int i = 0; i < worker_count_; i++) {
+ if (uv_thread_create(&workers_[i], Context::Loop, this)) abort();
+ }
+}
+
+
+Context::~Context() {
+ SSL_CTX_free(ctx_);
+ ctx_ = NULL;
+
+ // Notify each worker about stop event
+ status_ = kStopped;
+ for (int i = 0; i < worker_count_; i++) {
+ uv_sem_post(&event_);
+ }
+
+ // And stop them
+ for (int i = 0; i < worker_count_; i++) {
+ uv_thread_join(&workers_[i]);
+ }
+
+ uv_sem_destroy(&event_);
+ uv_mutex_destroy(&queue_mtx_);
+}
+
+
+void Context::Enqueue(Socket* s) {
+ uv_mutex_lock(&queue_mtx_);
+ // Prevent double insertions
+ if (ngx_queue_empty(&s->member_)) {
+ ngx_queue_insert_tail(&queue_, &s->member_);
+ }
+ uv_mutex_unlock(&queue_mtx_);
+
+ uv_sem_post(&event_);
+}
+
+
+inline SSL* Context::GetSSL() {
+ SSL* ssl = SSL_new(ctx_);
+ assert(ssl != NULL);
+
+ return ssl;
+}
+
+
+void Context::Loop(void* arg) {
+ Context* ctx = reinterpret_cast<Context*>(arg);
+
+ while (ctx->RunLoop()) {
+ }
+}
+
+
+bool Context::RunLoop() {
+ uv_sem_wait(&event_);
+ if (status_ == kStopped) return false;
+
+ Socket* socket = NULL;
+
+ uv_mutex_lock(&queue_mtx_);
+
+ ngx_queue_t* member = ngx_queue_head(&queue_);
+ for (; member != &queue_; member = ngx_queue_next(member)) {
+ socket = container_of(member, Socket, member_);
+
+ // Only one worker can operate on socket at one time
+ if (uv_mutex_trylock(&socket->mtx_) == 0) {
+ ngx_queue_remove(member);
+ ngx_queue_init(member);
+ break;
+ } else {
+ socket = NULL;
+ }
+ }
+
+ uv_mutex_unlock(&queue_mtx_);
+
+ if (socket != NULL) {
+ socket->OnEvent();
+ uv_mutex_unlock(&socket->mtx_);
+ }
+
+ return true;
+}
+
+
+BIO* LoadBIO(Handle<Value> v) {
+ HandleScope scope;
+
+ BIO* bio = BIO_new(BIO_s_mem());
+ assert(bio != NULL);
+
+ int r = -1;
+
+ r = BIO_write(bio,
+ Buffer::Data(v.As<Object>()),
+ Buffer::Length(v.As<Object>()));
+ assert(r > 0);
+
+ return bio;
+}
+
+
+Handle<Value> Context::SetKey(const Arguments& args) {
+ if (args.Length() < 1 ||!Buffer::HasInstance(args[0])) {
+ return ThrowException(String::New("First argument should be Buffer"));
+ }
+
+ BIO *bio = LoadBIO(args[0]);
+
+ String::Utf8Value passphrase(args[1]);
+
+ EVP_PKEY* key = PEM_read_bio_PrivateKey(bio, NULL, NULL,
+ args.Length() == 1 ?
+ NULL
+ :
+ *passphrase);
+
+ if (!key) {
+ BIO_free(bio);
+ unsigned long err = ERR_get_error();
+ if (!err) {
+ return ThrowException(Exception::Error(
+ String::New("PEM_read_bio_PrivateKey")));
+ }
+ char string[120];
+ ERR_error_string_n(err, string, sizeof string);
+ return ThrowException(Exception::Error(String::New(string)));
+ }
+
+ Context* ctx = ObjectWrap::Unwrap<Context>(args.This());
+
+ SSL_CTX_use_PrivateKey(ctx->ctx_, key);
+ EVP_PKEY_free(key);
+ BIO_free(bio);
+
+ return Null();
+}
+
+// Read a file that contains our certificate in "PEM" format,
+// possibly followed by a sequence of CA certificates that should be
+// sent to the peer in the Certificate message.
+//
+// Taken from OpenSSL - editted for style.
+int SSL_CTX_use_certificate_chain(SSL_CTX *ctx, BIO *in) {
+ int ret = 0;
+ X509 *x = NULL;
+
+ x = PEM_read_bio_X509_AUX(in, NULL, NULL, NULL);
+
+ if (x == NULL) {
+ SSLerr(SSL_F_SSL_CTX_USE_CERTIFICATE_CHAIN_FILE, ERR_R_PEM_LIB);
+ goto end;
+ }
+
+ ret = SSL_CTX_use_certificate(ctx, x);
+
+ if (ERR_peek_error() != 0) {
+ // Key/certificate mismatch doesn't imply ret==0 ...
+ ret = 0;
+ }
+
+ if (ret) {
+ // If we could set up our certificate, now proceed to
+ // the CA certificates.
+ X509 *ca;
+ int r;
+ unsigned long err;
+
+ if (ctx->extra_certs != NULL) {
+ sk_X509_pop_free(ctx->extra_certs, X509_free);
+ ctx->extra_certs = NULL;
+ }
+
+ while ((ca = PEM_read_bio_X509(in, NULL, NULL, NULL))) {
+ r = SSL_CTX_add_extra_chain_cert(ctx, ca);
+
+ if (!r) {
+ X509_free(ca);
+ ret = 0;
+ goto end;
+ }
+ // Note that we must not free r if it was successfully
+ // added to the chain (while we must free the main
+ // certificate, since its reference count is increased
+ // by SSL_CTX_use_certificate).
+ }
+
+ // When the while loop ends, it's usually just EOF.
+ err = ERR_peek_last_error();
+ if (ERR_GET_LIB(err) == ERR_LIB_PEM &&
+ ERR_GET_REASON(err) == PEM_R_NO_START_LINE) {
+ ERR_clear_error();
+ } else {
+ // some real error
+ ret = 0;
+ }
+ }
+
+end:
+ if (x != NULL) X509_free(x);
+ return ret;
+}
+
+
+Handle<Value> Context::SetCert(const Arguments& args) {
+ HandleScope scope;
+ Context* ctx = ObjectWrap::Unwrap<Context>(args.This());
+
+ if (args.Length() < 1 ||!Buffer::HasInstance(args[0])) {
+ return ThrowException(String::New("First argument should be Buffer"));
+ }
+
+ BIO *bio = LoadBIO(args[0]);
+
+ int rv = SSL_CTX_use_certificate_chain(ctx->ctx_, bio);
+
+ BIO_free(bio);
+
+ if (!rv) {
+ unsigned long err = ERR_get_error();
+ if (!err) {
+ return ThrowException(Exception::Error(
+ String::New("SSL_CTX_use_certificate_chain")));
+ }
+ char string[120];
+ ERR_error_string_n(err, string, sizeof string);
+ return ThrowException(Exception::Error(String::New(string)));
+ }
+
+ return Null();
+}
+
+
+Handle<Value> Socket::New(const Arguments& args) {
+ HandleScope scope;
+
+ if (args.Length() < 1) {
+ return ThrowException(String::New("First argument should be Context"));
+ }
+
+ Context* ctx = ObjectWrap::Unwrap<Context>(args[0].As<Object>());
+
+ Socket* s = new Socket(ctx);
+ s->Wrap(args.Holder());
+ s->Ref();
+
+ return scope.Close(args.This());
+}
+
+
+Socket::Socket(Context* ctx) : status_(kRunning), ctx_(ctx) {
+ ctx_->Ref();
+ ssl_ = ctx_->GetSSL();
+
+ rbio_ = BIO_new(BIO_s_mem());
+ wbio_ = BIO_new(BIO_s_mem());
+ assert(rbio_ != NULL);
+ assert(wbio_ != NULL);
+ SSL_set_bio(ssl_, rbio_, wbio_);
+ SSL_set_accept_state(ssl_);
+
+ if (uv_mutex_init(&mtx_)) abort();
+ if (uv_mutex_init(&enc_in_mtx_)) abort();
+ if (uv_mutex_init(&enc_out_mtx_)) abort();
+ if (uv_mutex_init(&clear_in_mtx_)) abort();
+ if (uv_mutex_init(&clear_out_mtx_)) abort();
+
+ uv_async_init(uv_default_loop(), &enc_out_cb_, EncOut);
+ uv_async_init(uv_default_loop(), &clear_out_cb_, ClearOut);
+ uv_async_init(uv_default_loop(), &close_cb_, OnClose);
+
+ ngx_queue_init(&member_);
+}
+
+
+Socket::~Socket() {
+ // Ensure that all workers will release socket
+ uv_mutex_lock(&mtx_);
+ uv_mutex_unlock(&mtx_);
+
+ SSL_free(ssl_);
+ ctx_->Unref();
+
+ uv_mutex_destroy(&mtx_);
+ uv_mutex_destroy(&enc_in_mtx_);
+ uv_mutex_destroy(&enc_out_mtx_);
+ uv_mutex_destroy(&clear_in_mtx_);
+ uv_mutex_destroy(&clear_out_mtx_);
+
+ uv_close(reinterpret_cast<uv_handle_t*>(&enc_out_cb_), NULL);
+ uv_close(reinterpret_cast<uv_handle_t*>(&clear_out_cb_), NULL);
+ uv_close(reinterpret_cast<uv_handle_t*>(&close_cb_), NULL);
+}
+
+
+Handle<Value> Socket::ClearIn(const Arguments& args) {
+ if (args.Length() < 1 || !Buffer::HasInstance(args[0])) {
+ return ThrowException(String::New("First argument should be Buffer"));
+ }
+
+ Socket* s = ObjectWrap::Unwrap<Socket>(args.This());
+
+ if (s->status_ >= kHalfClosed) return Null();
+
+ uv_mutex_lock(&s->clear_in_mtx_);
+ s->clear_in_.Write(Buffer::Data(args[0].As<Object>()),
+ Buffer::Length(args[0].As<Object>()));
+ uv_mutex_unlock(&s->clear_in_mtx_);
+
+ s->ctx_->Enqueue(s);
+
+ return Null();
+}
+
+
+Handle<Value> Socket::EncIn(const Arguments& args) {
+ if (args.Length() < 1 || !Buffer::HasInstance(args[0])) {
+ return ThrowException(String::New("First argument should be Buffer"));
+ }
+
+ Socket* s = ObjectWrap::Unwrap<Socket>(args.This());
+
+ if (s->status_ >= kHalfClosed) return Null();
+
+ uv_mutex_lock(&s->enc_in_mtx_);
+ s->enc_in_.Write(Buffer::Data(args[0].As<Object>()),
+ Buffer::Length(args[0].As<Object>()));
+ uv_mutex_unlock(&s->enc_in_mtx_);
+
+ s->ctx_->Enqueue(s);
+
+ return Null();
+}
+
+
+Handle<Value> Socket::Close(const Arguments& args) {
+ Socket* s = ObjectWrap::Unwrap<Socket>(args.This());
+
+ if (s->status_ != kRunning) {
+ return ThrowException(String::New("Socket is already closed"));
+ }
+
+ s->status_ = kClosing;
+ s->ctx_->Enqueue(s);
+
+ return Null();
+}
+
+
+void Socket::ClearOut(uv_async_t* handle, int status) {
+ HandleScope scope;
+ Socket* s = container_of(handle, Socket, clear_out_cb_);
+
+ if (s->status_ == kClosed) return;
+
+ uv_mutex_lock(&s->clear_out_mtx_);
+ int size = s->clear_out_.Size();
+ Buffer* b = Buffer::New(size);
+
+ int read = s->clear_out_.Read(Buffer::Data(b->handle_), size);
+ assert(read == size);
+ uv_mutex_unlock(&s->clear_out_mtx_);
+
+ Handle<Value> argv[1] = { b->handle_ };
+ MakeCallback(s->handle_, oncdata_sym, 1, argv);
+
+ if (s->status_ == kHalfClosed) uv_async_send(&s->close_cb_);
+}
+
+
+void Socket::EncOut(uv_async_t* handle, int status) {
+ HandleScope scope;
+ Socket* s = container_of(handle, Socket, enc_out_cb_);
+
+ if (s->status_ == kClosed) return;
+
+ uv_mutex_lock(&s->enc_out_mtx_);
+ int size = s->enc_out_.Size();
+ Buffer* b = Buffer::New(size);
+
+ int read = s->enc_out_.Read(Buffer::Data(b->handle_), size);
+ assert(read == size);
+ uv_mutex_unlock(&s->enc_out_mtx_);
+
+ Handle<Value> argv[1] = { b->handle_ };
+ MakeCallback(s->handle_, onedata_sym, 1, argv);
+
+ if (s->status_ == kHalfClosed) uv_async_send(&s->close_cb_);
+}
+
+
+void Socket::OnClose(uv_async_t* handle, int status) {
+ HandleScope scope;
+ Socket* s = container_of(handle, Socket, close_cb_);
+
+ if (s->status_ == kClosed) return;
+
+ int bytes = 0;
+ uv_mutex_lock(&s->clear_in_mtx_);
+ bytes += s->clear_in_.Size();
+ uv_mutex_unlock(&s->clear_in_mtx_);
+
+ uv_mutex_lock(&s->enc_in_mtx_);
+ bytes += s->enc_in_.Size();
+ uv_mutex_unlock(&s->enc_in_mtx_);
+
+ uv_mutex_lock(&s->enc_out_mtx_);
+ bytes += s->enc_out_.Size();
+ uv_mutex_unlock(&s->enc_out_mtx_);
+
+ uv_mutex_lock(&s->clear_out_mtx_);
+ bytes += s->clear_out_.Size();
+ uv_mutex_unlock(&s->clear_out_mtx_);
+
+ if (bytes != 0) return;
+
+ s->status_ = kClosed;
+ MakeCallback(s->handle_, onclose_sym, 0, NULL);
+ s->Unref();
+}
+
+
+void Socket::OnEvent() {
+ int r;
+ int err;
+ int enc_bytes;
+ char enc_data[10240];
+ int bytes;
+ char data[10240];
+
+ // Do nothing with closed socket
+ if (status_ == kClosed) return;
+
+ // Ignore all events if socket is half-closed
+ if (status_ == kHalfClosed) goto emit_data;
+
+ do {
+ // Read data from rings
+ uv_mutex_lock(&enc_in_mtx_);
+ enc_bytes = enc_in_.Read(enc_data, sizeof(enc_data));
+ uv_mutex_unlock(&enc_in_mtx_);
+
+ // Write encrypted data
+ if (enc_bytes > 0) {
+ r = BIO_write(rbio_, enc_data, enc_bytes);
+ assert(r == enc_bytes);
+ }
+ } while (enc_bytes == sizeof(enc_data));
+
+ do {
+ // Write clear data
+ uv_mutex_lock(&clear_in_mtx_);
+ bytes = clear_in_.Peek(data, sizeof(data));
+ uv_mutex_unlock(&clear_in_mtx_);
+
+ if (bytes > 0) {
+ r = SSL_write(ssl_, data, bytes);
+ if (r > 0) {
+ // Flush data
+ uv_mutex_lock(&clear_in_mtx_);
+ clear_in_.Read(NULL, r);
+ uv_mutex_unlock(&clear_in_mtx_);
+
+ // Loop until all data will be written
+ if (r < bytes) continue;
+ } else {
+ err = SSL_get_error(ssl_, r);
+ if (err == SSL_ERROR_ZERO_RETURN) {
+ // Ignore
+ } else if (err == SSL_ERROR_WANT_READ) {
+ // Ignore
+ break;
+ } else if (err == SSL_ERROR_WANT_WRITE) {
+ // Ignore
+ break;
+ } else {
+ fprintf(stdout, "SSL error: %d\n", err);
+ abort();
+ }
+ }
+ }
+ } while (bytes == sizeof(data));
+
+emit_data:
+
+ do {
+ // Read clear data
+ bytes = SSL_read(ssl_, data, sizeof(data));
+ if (bytes > 0) {
+ uv_mutex_lock(&clear_out_mtx_);
+ clear_out_.Write(data, bytes);
+ uv_mutex_unlock(&clear_out_mtx_);
+
+ uv_async_send(&clear_out_cb_);
+ } else {
+ err = SSL_get_error(ssl_, bytes);
+ if (err == SSL_ERROR_ZERO_RETURN) {
+ // Ignore
+ } else if (err == SSL_ERROR_WANT_READ) {
+ break;
+ } else if (err == SSL_ERROR_WANT_WRITE) {
+ break;
+ } else {
+ fprintf(stdout, "SSL error: %d\n", err);
+ abort();
+ }
+ }
+ } while (bytes == sizeof(data));
+
+ // Read encrypted data
+ do {
+ bytes = BIO_read(wbio_, data, sizeof(data));
+ if (bytes > 0) {
+ uv_mutex_lock(&enc_out_mtx_);
+ enc_out_.Write(data, bytes);
+ uv_mutex_unlock(&enc_out_mtx_);
+
+ uv_async_send(&enc_out_cb_);
+ }
+ } while (bytes == sizeof(data));
+
+ if (status_ == kClosing) {
+ bytes = 0;
+ uv_mutex_lock(&clear_in_mtx_);
+ bytes += clear_in_.Size();
+ uv_mutex_unlock(&clear_in_mtx_);
+
+ uv_mutex_lock(&enc_in_mtx_);
+ bytes += enc_in_.Size();
+ uv_mutex_unlock(&enc_in_mtx_);
+
+ // All remaining data was written to socket
+ if (bytes == 0) {
+ status_ = kHalfClosed;
+ uv_async_send(&close_cb_);
+ } else {
+ // Try again in next tick
+ ctx_->Enqueue(this);
+ }
+ }
+}
+
+
+void Context::Init(Handle<Object> target) {
+ Local<FunctionTemplate> t = FunctionTemplate::New(Context::New);
+
+ t->InstanceTemplate()->SetInternalFieldCount(1);
+ t->SetClassName(String::NewSymbol("Context"));
+
+ NODE_SET_PROTOTYPE_METHOD(t, "setKey", Context::SetKey);
+ NODE_SET_PROTOTYPE_METHOD(t, "setCert", Context::SetCert);
+
+ target->Set(String::NewSymbol("Context"), t->GetFunction());
+}
+
+
+void Socket::Init(Handle<Object> target) {
+ Local<FunctionTemplate> t = FunctionTemplate::New(Socket::New);
+
+ t->InstanceTemplate()->SetInternalFieldCount(1);
+ t->SetClassName(String::NewSymbol("Socket"));
+
+ NODE_SET_PROTOTYPE_METHOD(t, "clearIn", Socket::ClearIn);
+ NODE_SET_PROTOTYPE_METHOD(t, "encIn", Socket::EncIn);
+ NODE_SET_PROTOTYPE_METHOD(t, "close", Socket::Close);
+
+ target->Set(String::NewSymbol("Socket"), t->GetFunction());
+}
+
+
+void Init(Handle<Object> target) {
+ HandleScope scope;
+
+ onedata_sym = Persistent<String>::New(String::NewSymbol("onedata"));
+ oncdata_sym = Persistent<String>::New(String::NewSymbol("oncdata"));
+ onhandshake_sym = Persistent<String>::New(String::NewSymbol("onhandshake"));
+ onclose_sym = Persistent<String>::New(String::NewSymbol("onclose"));
+
+ SSL_library_init();
+ OpenSSL_add_all_algorithms();
+ OpenSSL_add_all_digests();
+ SSL_load_error_strings();
+ ERR_load_crypto_strings();
+
+ Context::Init(target);
+ Socket::Init(target);
+}
+
+} // namespace tlsnappy
+
+NODE_MODULE(tlsnappy, tlsnappy::Init)
116 src/tlsnappy.h
@@ -0,0 +1,116 @@
+#ifndef _SRC_TLSNAPPY_H_
+#define _SRC_TLSNAPPY_H_
+
+#include "openssl/ssl.h"
+#include "openssl/err.h"
+#include "node.h"
+#include "node_buffer.h"
+#include "node_object_wrap.h"
+#include "ngx-queue.h"
+#include "ring.h"
+
+namespace tlsnappy {
+
+using namespace v8;
+using namespace node;
+
+// Forward declaration
+class Socket;
+
+class Context : public ObjectWrap {
+ public:
+ enum Status {
+ kRunning,
+ kStopped
+ };
+
+ Context(int worker_count);
+ ~Context();
+
+ static void Init(Handle<Object> target);
+
+ void Enqueue(Socket* s);
+ inline SSL* GetSSL();
+
+ protected:
+ static const int kMaxWorkers = 16;
+
+ static Handle<Value> New(const Arguments& args);
+ static Handle<Value> SetKey(const Arguments& args);
+ static Handle<Value> SetCert(const Arguments& args);
+
+ static void Loop(void* arg);
+ bool RunLoop();
+
+ // Worker data
+ volatile Status status_;
+ uv_sem_t event_;
+ uv_mutex_t queue_mtx_;
+ ngx_queue_t queue_;
+ uv_thread_t workers_[kMaxWorkers];
+ int worker_count_;
+
+ SSL_CTX* ctx_;
+
+ friend class Socket;
+};
+
+class Socket : public ObjectWrap {
+ public:
+ enum Status {
+ kRunning = 0,
+ kClosing = 1, // Accept incoming data
+ kHalfClosed = 2, // Stop accepting incoming data, but emit data events
+ kClosed = 3 // No input and output, socket is closed
+ };
+
+ Socket(Context* context);
+ ~Socket();
+
+ static void Init(Handle<Object> target);
+
+ protected:
+ static Handle<Value> New(const Arguments& args);
+
+ static Handle<Value> ClearIn(const Arguments& args);
+ static Handle<Value> EncIn(const Arguments& args);
+ static Handle<Value> Close(const Arguments& args);
+
+ static void ClearOut(uv_async_t* handle, int status);
+ static void EncOut(uv_async_t* handle, int status);
+ static void OnClose(uv_async_t* handle, int status);
+
+ volatile Status status_;
+
+ Ring enc_in_;
+ Ring enc_out_;
+ Ring clear_in_;
+ Ring clear_out_;
+ uv_async_t clear_out_cb_;
+ uv_async_t enc_out_cb_;
+ uv_async_t close_cb_;
+
+ void OnEvent();
+
+ uv_mutex_t mtx_;
+ uv_mutex_t enc_in_mtx_;
+ uv_mutex_t enc_out_mtx_;
+ uv_mutex_t clear_in_mtx_;
+ uv_mutex_t clear_out_mtx_;
+
+ uv_async_t cdata_cb_;
+ uv_async_t edata_cb_;
+
+ Context* ctx_;
+ BIO* rbio_;
+ BIO* wbio_;
+ SSL* ssl_;
+
+ ngx_queue_t member_;
+
+ friend class Context;
+};
+
+} // namespace tlsnappy
+
+#endif // _SRC_TLSNAPPY_H_
39 test/server-test.js
@@ -0,0 +1,39 @@
+var assert = require('assert'),
+ fs = require('fs'),
+ tls = require('tls'),
+ tlsnappy = require('..');
+
+describe('TLSnappy', function() {
+ var options = {
+ key: fs.readFileSync(__dirname + '/../keys/server.key'),
+ cert: fs.readFileSync(__dirname + '/../keys/server.crt')
+ },
+ server;
+
+ beforeEach(function() {
+ server = tlsnappy.createServer(options).listen(44300);
+ });
+
+ afterEach(function() {
+ server.close();
+ });
+
+ it('should accept tls connection', function(callback) {
+ server.on('secureConnection', function(c) {
+ var data = '';
+ c.on('data', function(chunk) {
+ data += chunk;
+ });
+
+ c.on('end', function() {
+ assert.equal(data, 'hello');
+ callback();
+ });
+ });
+
+ var client = tls.connect(44300, options, function() {
+ client.write('hello');
+ client.end();
+ });
+ });
+});

0 comments on commit b100cce

Please sign in to comment.
Something went wrong with that request. Please try again.