Permalink
Browse files

add EM::Connection#proxied_bytes for byte transfer stats with #proxy_…

…incoming_to
  • Loading branch information...
1 parent 6ae0e1c commit df92251adfdbf2c0e8adb9c819bbd962924e7c90 @tmm1 tmm1 committed Mar 5, 2012
Showing with 55 additions and 1 deletion.
  1. +14 −0 ext/cmain.cpp
  2. +4 −0 ext/ed.cpp
  3. +2 −0 ext/ed.h
  4. +1 −0 ext/eventmachine.h
  5. +15 −0 ext/rubymain.cpp
  6. +6 −0 lib/em/connection.rb
  7. +13 −1 tests/test_proxy_connection.rb
View
@@ -813,6 +813,20 @@ extern "C" void evma_stop_proxy (const unsigned long from)
ed->StopProxy();
}
+/******************
+evma_proxied_bytes
+*******************/
+
+extern "C" unsigned long evma_proxied_bytes (const unsigned long from)
+{
+ ensure_eventmachine("evma_proxied_bytes");
+ EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (from));
+ if (ed)
+ return ed->GetProxiedBytes();
+ else
+ return 0;
+}
+
/***************************
evma_get_heartbeat_interval
View
@@ -61,6 +61,7 @@ EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em):
UnbindReasonCode (0),
ProxyTarget(NULL),
ProxiedFrom(NULL),
+ ProxiedBytes(0),
MaxOutboundBufSize(0),
MyEventMachine (em),
PendingConnectTimeout(20000000),
@@ -247,6 +248,7 @@ void EventableDescriptor::StartProxy(const unsigned long to, const unsigned long
StopProxy();
ProxyTarget = ed;
BytesToProxy = length;
+ ProxiedBytes = 0;
ed->SetProxiedFrom(this, bufsize);
return;
}
@@ -293,6 +295,7 @@ void EventableDescriptor::_GenericInboundDispatch(const char *buf, int size)
if (BytesToProxy > 0) {
unsigned long proxied = min(BytesToProxy, (unsigned long) size);
ProxyTarget->SendOutboundData(buf, proxied);
+ ProxiedBytes += (unsigned long) proxied;
BytesToProxy -= proxied;
if (BytesToProxy == 0) {
StopProxy();
@@ -303,6 +306,7 @@ void EventableDescriptor::_GenericInboundDispatch(const char *buf, int size)
}
} else {
ProxyTarget->SendOutboundData(buf, size);
+ ProxiedBytes += (unsigned long) size;
}
} else {
(*EventCallback)(GetBinding(), EM_CONNECTION_READ, buf, size);
View
@@ -86,6 +86,7 @@ class EventableDescriptor: public Bindable_t
virtual void StartProxy(const unsigned long, const unsigned long, const unsigned long);
virtual void StopProxy();
+ virtual unsigned long GetProxiedBytes(){ return ProxiedBytes; };
virtual void SetProxiedFrom(EventableDescriptor*, const unsigned long);
virtual int SendOutboundData(const char*,int){ return -1; }
virtual bool IsPaused(){ return bPaused; }
@@ -116,6 +117,7 @@ class EventableDescriptor: public Bindable_t
unsigned long BytesToProxy;
EventableDescriptor *ProxyTarget;
EventableDescriptor *ProxiedFrom;
+ unsigned long ProxiedBytes;
unsigned long MaxOutboundBufSize;
View
@@ -109,6 +109,7 @@ extern "C" {
void evma_start_proxy(const unsigned long, const unsigned long, const unsigned long, const unsigned long);
void evma_stop_proxy(const unsigned long);
+ unsigned long evma_proxied_bytes(const unsigned long);
int evma_set_rlimit_nofile (int n_files);
View
@@ -1094,6 +1094,20 @@ static VALUE t_stop_proxy (VALUE self, VALUE from)
return Qnil;
}
+/***************
+t_proxied_bytes
+****************/
+
+static VALUE t_proxied_bytes (VALUE self, VALUE from)
+{
+ try{
+ return ULONG2NUM(evma_proxied_bytes(NUM2ULONG (from)));
+ } catch (std::runtime_error e) {
+ rb_raise (EM_eConnectionError, e.what());
+ }
+ return Qnil;
+}
+
/************************
t_get_heartbeat_interval
@@ -1193,6 +1207,7 @@ extern "C" void Init_rubyeventmachine()
rb_define_module_function (EmModule, "start_proxy", (VALUE (*)(...))t_start_proxy, 4);
rb_define_module_function (EmModule, "stop_proxy", (VALUE (*)(...))t_stop_proxy, 1);
+ rb_define_module_function (EmModule, "get_proxied_bytes", (VALUE (*)(...))t_proxied_bytes, 1);
rb_define_module_function (EmModule, "watch_filename", (VALUE (*)(...))t_watch_filename, 1);
rb_define_module_function (EmModule, "unwatch_filename", (VALUE (*)(...))t_unwatch_filename, 1);
@@ -238,6 +238,12 @@ def stop_proxying
EventMachine::disable_proxy(self)
end
+ # The number of bytes proxied to another connection. Reset to zero when
+ # EventMachine::Connection#proxy_incoming_to is called, and incremented whenever data is proxied.
+ def proxied_bytes
+ EventMachine::get_proxied_bytes(self.signature)
+ end
+
# EventMachine::Connection#close_connection is called only by user code, and never
# by the event loop. You may call this method against a connection object in any
# callback handler, whether or not the callback was made against the connection
@@ -24,6 +24,7 @@ def proxy_target_unbound
end
def unbind
+ $proxied_bytes = proxied_bytes
@client.close_connection_after_writing
end
end
@@ -94,7 +95,7 @@ def initialize port
end
def receive_data(data)
- EM.connect("127.0.0.1", @port, ProxyConnection, self, data)
+ @proxy = EM.connect("127.0.0.1", @port, ProxyConnection, self, data)
end
end
@@ -134,6 +135,17 @@ def test_proxy_connection
assert_equal("I know!", $client_data)
end
+ def test_proxied_bytes
+ EM.run {
+ EM.start_server("127.0.0.1", @port, Server)
+ EM.start_server("127.0.0.1", @proxy_port, ProxyServer, @port)
+ EM.connect("127.0.0.1", @proxy_port, Client)
+ }
+
+ assert_equal("I know!", $client_data)
+ assert_equal("I know!".bytesize, $proxied_bytes)
+ end
+
def test_partial_proxy_connection
EM.run {
EM.start_server("127.0.0.1", @port, Server)

0 comments on commit df92251

Please sign in to comment.