Skip to content
This repository
Browse code

Initial test with EM::Connection#backpressure_level=(value). Just tes…

…ting for now.
  • Loading branch information...
commit 197213eccf7dee3abb069954fb722429e2daa5e6 1 parent 218435d
Iñaki Baz Castillo authored
24 ext/cmain.cpp
@@ -643,6 +643,30 @@ extern "C" int evma_set_pending_connect_timeout (const unsigned long binding, fl
643 643 }
644 644
645 645
  646 +/**************************
  647 +evma_get_backpressure_level
  648 +***************************/
  649 +
  650 +extern "C" int evma_get_backpressure_level (const unsigned long binding)
  651 +{
  652 + ensure_eventmachine("evma_get_backpressure_level");
  653 + EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding));
  654 + return ed ? ed->GetBackPressureLevel() : 0;
  655 +}
  656 +
  657 +
  658 +/**************************
  659 +evma_set_backpressure_level
  660 +***************************/
  661 +
  662 +extern "C" int evma_set_backpressure_level (const unsigned long binding, int level)
  663 +{
  664 + ensure_eventmachine("evma_set_backpressure_level");
  665 + EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (binding));
  666 + return ed ? ed->SetBackPressureLevel(level) : 0;
  667 +}
  668 +
  669 +
646 670 /**********************
647 671 evma_set_timer_quantum
648 672 **********************/
31 ext/ed.cpp
@@ -66,6 +66,7 @@ EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em, bool autoc
66 66 MyEventMachine (em),
67 67 PendingConnectTimeout(20000000),
68 68 InactivityTimeout (0),
  69 + BackPressureLevel(32768), // Default value.
69 70 bPaused (false)
70 71 {
71 72 /* There are three ways to close a socket, all of which should
@@ -338,6 +339,28 @@ int EventableDescriptor::SetPendingConnectTimeout (uint64_t value)
338 339 }
339 340
340 341
  342 +/****************************************
  343 +EventableDescriptor::GetBackPressureLevel
  344 +*****************************************/
  345 +
  346 +int EventableDescriptor::GetBackPressureLevel()
  347 +{
  348 + return BackPressureLevel;
  349 +}
  350 +
  351 +
  352 +/****************************************
  353 +EventableDescriptor::SetBackPressureLevel
  354 +*****************************************/
  355 +
  356 +int EventableDescriptor::SetBackPressureLevel(int level)
  357 +{
  358 + // TODO: check value, but better in rubymain.cpp or connection.rb.
  359 + BackPressureLevel = level;
  360 + return level;
  361 +}
  362 +
  363 +
341 364 /*************************************
342 365 EventableDescriptor::GetNextHeartbeat
343 366 *************************************/
@@ -648,6 +671,14 @@ bool ConnectionDescriptor::SelectForRead()
648 671 return false;
649 672 else if (bWatchOnly)
650 673 return bNotifyReadable ? true : false;
  674 + // TODO: When this occurs, the connection remains open but it's not closed after comm_inactivity_timeout value.
  675 + // TODO: When this occurs, the connection does not read data anymore.
  676 + // Explanation by Aman: The other edge case this introduces is that dead connections will not be discovered,
  677 + // as the reactor never attempts a read() and thus never realizes that the other end has terminated the connection.
  678 + else if (GetOutboundDataSize() > BackPressureLevel) {
  679 + //printf("******* GetOutboundDataSize() > BackPressureLevel !!!!!!\n");
  680 + return false;
  681 + }
651 682 else
652 683 return true;
653 684 }
3  ext/ed.h
@@ -80,6 +80,8 @@ class EventableDescriptor: public Bindable_t
80 80 virtual int SetCommInactivityTimeout (uint64_t value) {return 0;}
81 81 uint64_t GetPendingConnectTimeout();
82 82 int SetPendingConnectTimeout (uint64_t value);
  83 + int GetBackPressureLevel();
  84 + int SetBackPressureLevel(int level);
83 85
84 86 #ifdef HAVE_EPOLL
85 87 struct epoll_event *GetEpollEvent() { return &EpollEvent; }
@@ -128,6 +130,7 @@ class EventableDescriptor: public Bindable_t
128 130 EventMachine_t *MyEventMachine;
129 131 uint64_t PendingConnectTimeout;
130 132 uint64_t InactivityTimeout;
  133 + int BackPressureLevel;
131 134 uint64_t LastActivity;
132 135 uint64_t NextHeartbeat;
133 136 bool bPaused;
2  ext/eventmachine.h
@@ -89,6 +89,8 @@ extern "C" {
89 89 float evma_get_pending_connect_timeout (const unsigned long binding);
90 90 int evma_set_pending_connect_timeout (const unsigned long binding, float value);
91 91 int evma_get_outbound_data_size (const unsigned long binding);
  92 + int evma_set_backpressure_level (const unsigned long binding, int level);
  93 + int evma_get_backpressure_level (const unsigned long binding);
92 94 int evma_send_file_data_to_connection (const unsigned long binding, const char *filename);
93 95
94 96 void evma_close_connection (const unsigned long binding, int after_writing);
23 ext/rubymain.cpp
@@ -465,6 +465,27 @@ static VALUE t_set_pending_connect_timeout (VALUE self, VALUE signature, VALUE t
465 465 return Qfalse;
466 466 }
467 467
  468 +
  469 +/*****************************
  470 +t_get_backpressure_level
  471 +*****************************/
  472 +
  473 +static VALUE t_get_backpressure_level (VALUE self, VALUE signature)
  474 +{
  475 + return INT2NUM(evma_get_backpressure_level(NUM2ULONG(signature)));
  476 +}
  477 +
  478 +
  479 +/*****************************
  480 +t_set_backpressure_level
  481 +*****************************/
  482 +
  483 +static VALUE t_set_backpressure_level (VALUE self, VALUE signature, VALUE level)
  484 +{
  485 + return INT2NUM(evma_set_backpressure_level(NUM2ULONG(signature), NUM2INT(level)));
  486 +}
  487 +
  488 +
468 489 /***************
469 490 t_send_datagram
470 491 ***************/
@@ -1239,6 +1260,8 @@ extern "C" void Init_rubyeventmachine()
1239 1260 rb_define_module_function (EmModule, "set_pending_connect_timeout", (VALUE(*)(...))t_set_pending_connect_timeout, 2);
1240 1261 rb_define_module_function (EmModule, "set_rlimit_nofile", (VALUE(*)(...))t_set_rlimit_nofile, 1);
1241 1262 rb_define_module_function (EmModule, "get_connection_count", (VALUE(*)(...))t_get_connection_count, 0);
  1263 + rb_define_module_function (EmModule, "get_backpressure_level", (VALUE(*)(...))t_get_backpressure_level, 1);
  1264 + rb_define_module_function (EmModule, "set_backpressure_level", (VALUE(*)(...))t_set_backpressure_level, 2);
1242 1265
1243 1266 rb_define_module_function (EmModule, "epoll", (VALUE(*)(...))t__epoll, 0);
1244 1267 rb_define_module_function (EmModule, "epoll=", (VALUE(*)(...))t__epoll_set, 1);
12 lib/em/connection.rb
@@ -724,5 +724,17 @@ def resume
724 724 def paused?
725 725 EventMachine::connection_paused? @signature
726 726 end
  727 +
  728 + # ibc
  729 + # TODO: doc
  730 + def backpressure_level= level
  731 + EventMachine::set_backpressure_level @signature, level
  732 + end
  733 +
  734 + # TODO: doc
  735 + def backpressure_level
  736 + EventMachine::get_backpressure_level @signature
  737 + end
  738 +
727 739 end
728 740 end
130 test_backpressure_level/client_backpressure_level.rb
... ... @@ -0,0 +1,130 @@
  1 +#!/usr/bin/ruby
  2 +
  3 +$0 = "client_blackpressure_level.rb"
  4 +
  5 +lib_dir = File.expand_path(File.join(File.dirname(__FILE__), "../", "lib"))
  6 +$LOAD_PATH.insert(0, lib_dir)
  7 +
  8 +require "eventmachine-le"
  9 +begin
  10 + require "iobuffer"
  11 +rescue LoadError
  12 + $stderr.puts "ERROR: iobuffer gem is required:\n gem install iobuffer"
  13 + exit 1
  14 +end
  15 +
  16 +
  17 +DATA = ARGV[0]
  18 +TIMES = (ARGV[1].to_i > 0) ? ARGV[1].to_i : 1
  19 +CONCURRENCY = ARGV[2] ? ARGV[2].to_i : 1
  20 +
  21 +
  22 +
  23 +def show_usage
  24 + puts "
  25 +USAGE: ./em_client_test_leak.rb DATA TIMES CONCURRENCY
  26 +
  27 + DATA must be \"A\" or \"B\" or \"C\"
  28 + TIMES number of messages to send (default 1)
  29 + CONCURRENCY messages to send together upon receipt of a response (default 1)
  30 +
  31 +"
  32 +end
  33 +
  34 +if DATA != "A" and DATA != "B" and DATA != "C"
  35 + $stderr.puts "ERROR: DATA must be \"A\" or \"B\" or \"C\""
  36 + show_usage
  37 + exit 1
  38 +end
  39 +
  40 +
  41 +def calculate_num_requests(sent)
  42 + if (TIMES - sent) < CONCURRENCY
  43 + return TIMES - sent
  44 + else
  45 + return CONCURRENCY
  46 + end
  47 +end
  48 +
  49 +
  50 +
  51 +
  52 +class TestLeakClient < EventMachine::Connection
  53 +
  54 + def initialize
  55 + super
  56 + @buffer = IO::Buffer.new
  57 + @state = :init
  58 +
  59 + @num_request = 0
  60 + @num_response = 0
  61 + end
  62 +
  63 + def send_requests
  64 + calculate_num_requests(@num_request).times do
  65 + @num_request += 1
  66 + send_data DATA
  67 + end
  68 + end
  69 +
  70 + def post_init
  71 + puts "DEBUG: connected"
  72 + send_requests
  73 + end
  74 +
  75 + def unbind
  76 + puts "\n\nWARN: *** disconnected ***"
  77 + end
  78 +
  79 + def receive_data(data)
  80 + @buffer << data
  81 +
  82 + while case @state
  83 + when :init
  84 + @state = :response
  85 + true
  86 + when :response
  87 + parse_response
  88 + when :invalid
  89 + $stderr.puts "FATAL: state invalid"
  90 + raise "invalid state!!!"
  91 + else
  92 + raise RuntimeError, "invalid state: #{@state}"
  93 + end
  94 + end
  95 + end
  96 +
  97 +
  98 +
  99 + def parse_response
  100 + return false if @buffer.size < 1
  101 +
  102 + @response = @buffer.read 1
  103 + @num_response += 1
  104 +
  105 + puts "requests = #{@num_request} - responses = #{@num_response} => diff = #{@num_request-@num_response}"
  106 +
  107 + if @num_response == TIMES
  108 + puts "\n\nINFO: all the #{TIMES} responses received."
  109 + close_connection_after_writing
  110 + sleep 0.2
  111 + exit 0
  112 + end
  113 +
  114 + @state = :init
  115 +
  116 + # Send more requests.
  117 + send_requests
  118 + true
  119 + end
  120 +
  121 +end
  122 +
  123 +
  124 +EM.run do
  125 + puts "INFO: connecting to 127.0.0.1:6666..."
  126 + EventMachine::connect("127.0.0.1", 6666, TestLeakClient) do |c|
  127 + #c.backpressure_level = 2000
  128 + end
  129 +end
  130 +
150 test_backpressure_level/server_backpressure_level.rb
... ... @@ -0,0 +1,150 @@
  1 +#!/usr/bin/ruby
  2 +
  3 +$0 = "server_blackpressure_level.rb"
  4 +
  5 +lib_dir = File.expand_path(File.join(File.dirname(__FILE__), "../", "lib"))
  6 +$LOAD_PATH.insert(0, lib_dir)
  7 +
  8 +require "eventmachine-le"
  9 +begin
  10 + require "iobuffer"
  11 +rescue LoadError
  12 + $stderr.puts "ERROR: iobuffer gem is required:\n gem install iobuffer"
  13 + exit 1
  14 +end
  15 +require "socket"
  16 +
  17 +
  18 +class Message
  19 + attr_accessor :connection, :data, :source_ip, :source_port
  20 +
  21 + def initialize(data)
  22 + @data = data
  23 + end
  24 +end
  25 +
  26 +
  27 +
  28 +class TestLeakServer < EM::Connection
  29 +
  30 + attr_reader :source_ip, :source_port
  31 +
  32 + def initialize
  33 + super
  34 + @buffer = IO::Buffer.new
  35 + @state = :init
  36 + @num_request = 0
  37 + @num_response = 0
  38 + end
  39 +
  40 + def post_init
  41 + @source_port, @source_ip = ::Socket.unpack_sockaddr_in(get_peername)
  42 + puts "post_init: self.object_id = #{self.object_id}"
  43 + end
  44 +
  45 + def unbind cause=nil
  46 + puts "unbind(#{cause.inspect})"
  47 + exit 1
  48 + end
  49 +
  50 + def receive_data(data)
  51 + @buffer << data
  52 +
  53 + while case @state
  54 + when :init
  55 + @state = :message
  56 + when :message
  57 + parse_message
  58 + when :finished
  59 + process_request(@msg)
  60 + @state = :init
  61 + when :invalid
  62 + $stderr.puts "FATAL: state invalid"
  63 + raise "invalid state!!!"
  64 + else
  65 + raise RuntimeError, "invalid state: #{@state}"
  66 + end
  67 + end
  68 + end
  69 +
  70 + def parse_message
  71 + return false if @buffer.size < 1
  72 +
  73 + @msg = Message.new(@buffer.read(1))
  74 + @num_request += 1
  75 + @state = :finished
  76 +
  77 + return true
  78 + end
  79 +
  80 + def process_request(msg)
  81 + puts "requests = #{@num_request} - responses = #{@num_response} => diff = #{@num_request-@num_response}"
  82 +
  83 + msg.source_ip = @source_ip
  84 + msg.source_port = @source_port
  85 +
  86 + if msg.data == "A"
  87 + send_data "a"
  88 + @num_response += 1
  89 +
  90 + elsif msg.data == "B"
  91 + EM.next_tick do
  92 + send_data "b"
  93 + @num_response += 1
  94 + end
  95 +
  96 + elsif msg.data == "C"
  97 + msg.connection = self
  98 + operation = proc { msg }
  99 + callback = proc do |result|
  100 + result.connection.send_data "c"
  101 + @num_response += 1
  102 + end
  103 + EM.defer( operation, callback )
  104 +
  105 + else
  106 + raise "msg.data is not \"A\" or \"B\" or \"C\" !!!"
  107 + end
  108 +
  109 + return true
  110 + end
  111 +
  112 +end
  113 +
  114 +
  115 +EM.threadpool_size = 10
  116 +EM.run do
  117 +
  118 + EM.start_server("127.0.0.1", 6666, TestLeakServer) do |c|
  119 + #c.backpressure_level = 1000
  120 + end
  121 + puts "INFO: TCP server listening on 0.0.0.0:6666"
  122 +
  123 + EM.add_periodic_timer(3) do
  124 + $stderr.puts
  125 + $stderr.puts "[[[ #{Time.now} ]]]"
  126 + GC.start
  127 + $stderr.puts "GC.start executed"
  128 + $stderr.puts "ObjectSpace.each_object:"
  129 + $stderr.puts "TOTAL objets: #{ObjectSpace.each_object() {} }"
  130 + $stderr.puts "TOTAL EM::Connection: #{ObjectSpace.each_object(EM::Connection) {} }"
  131 + $stderr.puts "TOTAL TestLeakServer: #{ObjectSpace.each_object(TestLeakServer) {} }"
  132 + num_conn = 0
  133 + ObjectSpace.each_object(TestLeakServer) do |conn|
  134 + if conn.source_port and (num_conn+=1) <= 20
  135 + $stderr.puts "- connection: source port = #{conn.source_port} | error? = #{conn.error?} | object_id = #{conn.object_id}"
  136 + end
  137 + end
  138 + $stderr.puts "TOTAL Message: #{ObjectSpace.each_object(Message) {} }"
  139 + num_msg = 0
  140 + ObjectSpace.each_object(Message) do |msg|
  141 + $stderr.puts "- message source port: #{msg.source_port}"
  142 + if (num_msg+=1) > 20
  143 + $stderr.puts "- [more...]"
  144 + break
  145 + end
  146 + end
  147 + end
  148 +
  149 +end
  150 +
37 tests/test_blackpressure_level.rb
... ... @@ -0,0 +1,37 @@
  1 +require 'em_test_helper'
  2 +
  3 +class TestBlackPressureLevel < Test::Unit::TestCase
  4 +
  5 + def test_udp_set_and_get_blackpressure_level
  6 + setup_timeout(2)
  7 +
  8 + EM.run do
  9 + EM.open_datagram_socket("127.0.0.1", next_port) do |c|
  10 + # Check default value.
  11 + assert_equal 32768, c.backpressure_level
  12 + # Modify and check value.
  13 + c.backpressure_level = 123456
  14 + assert_equal 123456, c.backpressure_level
  15 + EM.stop
  16 + end
  17 + end
  18 + end
  19 +
  20 + def test_tcp_set_and_get_blackpressure_level
  21 + setup_timeout(2)
  22 +
  23 + EM.run do
  24 + EM.start_server("127.0.0.1", port = next_port) do |c|
  25 + # Check default value.
  26 + assert_equal 32768, c.backpressure_level
  27 + c.backpressure_level = 123456
  28 + # Modify and check value.
  29 + assert_equal 123456, c.backpressure_level
  30 + EM.stop
  31 + end
  32 +
  33 + EM.connect("127.0.0.1", port)
  34 + end
  35 + end
  36 +
  37 +end

0 comments on commit 197213e

Please sign in to comment.
Something went wrong with that request. Please try again.