Skip to content

Commit

Permalink
Merge pull request #2254 from cloudfoundry/170044379-switch-to-nats-pure
Browse files Browse the repository at this point in the history
Switch nats.rb to nats-pure.rb

The purpose of switching is to avoid problems caused by the EventMachine-based gem in large-scale foundations.
  • Loading branch information
h4xnoodle committed Apr 2, 2020
2 parents 2f34853 + 715c720 commit 29421a6
Show file tree
Hide file tree
Showing 24 changed files with 217 additions and 209 deletions.
3 changes: 2 additions & 1 deletion src/Gemfile
Expand Up @@ -63,7 +63,8 @@ group :development, :test do
gem 'factory_bot'

# for root level specs
gem 'nats', '~>0.9.2'
gem 'nats-pure', '~>0.6.2'
gem 'openssl'
gem 'rest-client'

gem 'blue-shell'
Expand Down
15 changes: 10 additions & 5 deletions src/Gemfile.lock
Expand Up @@ -36,8 +36,9 @@ PATH
httpclient (~> 2.8.3)
logging (~> 2.2.2)
membrane (~> 1.1.0)
nats (~> 0.9.2)
nats-pure (~> 0.6.2)
netaddr (~> 1.5.0)
openssl
prometheus-client (~> 1.0.0)
puma (~> 3.12)
rack-test (~> 0.6.2)
Expand All @@ -62,7 +63,8 @@ PATH
eventmachine (~> 1.2.0)
httpclient (~> 2.8.3)
logging (~> 2.2.2)
nats (~> 0.9.2)
nats-pure (~> 0.6.2)
openssl
riemann-client (~> 0.2.6)
sinatra (~> 2.0.2)
thin (~> 1.7.0)
Expand Down Expand Up @@ -1068,6 +1070,7 @@ GEM
httpclient (2.8.3)
i18n (1.8.2)
concurrent-ruby (~> 1.0)
ipaddr (1.2.2)
ipaddress (0.8.3)
jaro_winkler (1.5.4)
jmespath (1.4.0)
Expand All @@ -1090,11 +1093,12 @@ GEM
mustermann (1.1.1)
ruby2_keywords (~> 0.0.1)
mysql2 (0.5.3)
nats (0.9.2)
eventmachine (~> 1.2, >= 1.2)
nats-pure (0.6.2)
net-ssh (5.2.0)
netaddr (1.5.1)
netrc (0.11.0)
openssl (2.1.2)
ipaddr
parallel (1.19.1)
parallel_tests (2.31.0)
parallel
Expand Down Expand Up @@ -1232,8 +1236,9 @@ DEPENDENCIES
minitar
mono_logger
mysql2
nats (~> 0.9.2)
nats-pure (~> 0.6.2)
net-ssh
openssl
parallel_tests (~> 2.0)
pg
pry-byebug
Expand Down
30 changes: 18 additions & 12 deletions src/bosh-dev/lib/bosh/dev/sandbox/main.rb
Expand Up @@ -383,15 +383,23 @@ def nats_certificate_paths
end

def director_nats_config
tls_context = OpenSSL::SSL::SSLContext.new
tls_context.ssl_version = :TLSv1_2
tls_context.verify_mode = OpenSSL::SSL::VERIFY_PEER

tls_context.key = OpenSSL::PKey::RSA.new(File.open(nats_certificate_paths['clients']['test_client']['private_key_path']))
tls_context.cert = OpenSSL::X509::Certificate.new(File.open(nats_certificate_paths['clients']['test_client']['certificate_path']))
tls_context.ca_file = nats_certificate_paths['ca_path']

{
uri: "nats://localhost:#{nats_port}",
ssl: true,
tls: {
:private_key_file => nats_certificate_paths['clients']['test_client']['private_key_path'],
:cert_chain_file => nats_certificate_paths['clients']['test_client']['certificate_path'],
:verify_peer => true,
:ca_file => nats_certificate_paths['ca_path'],
}
servers: Array.new(1, "nats://localhost:#{nats_port}"),
dont_randomize_servers: true,
max_reconnect_attempts: 4,
reconnect_time_wait: 2,
reconnect: true,
tls: {
context: tls_context,
},
}
end

Expand Down Expand Up @@ -456,10 +464,8 @@ def do_reset

load_db_and_populate_blobstore(@test_initial_state) unless @test_initial_state.nil?

unless @nats_process.running?
@nats_process.stop
start_nats
end
stop_nats if @nats_process.running?
start_nats

@config_server_service.restart(@with_config_server_trusted_certs) if @config_server_enabled

Expand Down
3 changes: 2 additions & 1 deletion src/bosh-director/bosh-director.gemspec
Expand Up @@ -48,7 +48,8 @@ Gem::Specification.new do |spec|
spec.add_dependency 'httpclient', '~>2.8.3'
spec.add_dependency 'logging', '~>2.2.2'
spec.add_dependency 'membrane', '~>1.1.0'
spec.add_dependency 'nats', '~>0.9.2'
spec.add_dependency 'nats-pure', '~>0.6.2'
spec.add_dependency 'openssl'
spec.add_dependency 'netaddr', '~>1.5.0'
spec.add_dependency 'prometheus-client','~>1.0.0'
spec.add_dependency 'puma', '~>3.12'
Expand Down
3 changes: 2 additions & 1 deletion src/bosh-director/lib/bosh/director.rb
Expand Up @@ -31,7 +31,8 @@ module Director
require 'sequel'
require 'sinatra/base'
require 'securerandom'
require 'nats/client'
require 'nats/io/client'
require 'openssl'
require 'securerandom'
require 'delayed_job_sequel'

Expand Down
92 changes: 46 additions & 46 deletions src/bosh-director/lib/bosh/director/nats_rpc.rb
Expand Up @@ -17,22 +17,31 @@ def initialize(nats_uri, nats_server_ca_path, nats_client_private_key_path, nats
@handled_response = false
end

# Returns a lazily connected NATS client
def nats
init_nats if @nats.nil?
begin
@nats ||= connect
unless @nats.connected?
@lock.synchronize do
@nats.connect(@nats_options) unless @nats.connected?
end
end
rescue Errno::ECONNREFUSED
@logger.error("NATS connection refused")
rescue Exception => e
raise "An error has occurred while connecting to NATS: #{e}"
end
@nats
end

# Publishes a payload (encoded as JSON) without expecting a response
def send_message(client, payload)
message = JSON.generate(payload)
@logger.debug("SENT: #{client} #{message}")

EM.schedule do
nats.publish(client, message)
if nats.connected?
@nats.publish(client, message)
else
@logger.error("SENT failed because nats isn't connected: #{client} #{message}")
end
end

Expand All @@ -49,15 +58,11 @@ def send_request(subject_name, client_id, request, options, &callback)

@logger.debug("SENT: #{subject_name} #{sanitized_log_message}") unless options['logging'] == false

EM.schedule do
if nats.connected?
subscribe_inbox
if @handled_response
nats.publish(subject_name, request_body)
else
nats.flush do
nats.publish(subject_name, request_body)
end
end
@nats.publish(subject_name, request_body)
else
@logger.error("SENT failed because nats isn't connected: #{subject_name} #{sanitized_log_message}")
end
request_id
end
Expand All @@ -73,41 +78,37 @@ def generate_request_id

private

def connect
# double-check locking to reduce synchronization
if @nats.nil?
@lock.synchronize do
if @nats.nil?
NATS.on_error do |e|
password = @nats_uri[/nats:\/\/.*:(.*)@/, 1]
redacted_message = password.nil? ? "NATS client error: #{e}" : "NATS client error: #{e}".gsub(password, '*******')
@logger.error(redacted_message)
end
options = {
# The NATS client library has a built-in reconnection logic.
# This logic only works when a cluster of servers is provided, by passing
# a list of them (it will not retry a server if it receives an error from it, for
# example a timeout). We are getting around the issue by passing the same URI
# multiple times so the library will retry the connection. This way we are
# adding retry logic to the director NATS connections by relying on the built-in
# library logic.
:uris => Array.new(MAX_RECONNECT_ATTEMPTS, @nats_uri),
:max_reconnect_attempts => MAX_RECONNECT_ATTEMPTS,
:reconnect_time_wait => 2,
:reconnect => true,
:ssl => true,
:tls => {
:private_key_file => @nats_client_private_key_path,
:cert_chain_file => @nats_client_certificate_path,
:verify_peer => true,
:ca_file => @nats_server_ca_path
}
}
@nats = NATS.connect(options)
def init_nats
@lock.synchronize do
if @nats.nil?
@nats = NATS::IO::Client.new

tls_context = OpenSSL::SSL::SSLContext.new
tls_context.ssl_version = :TLSv1_2
tls_context.verify_mode = OpenSSL::SSL::VERIFY_PEER

tls_context.key = OpenSSL::PKey::RSA.new(File.open(@nats_client_private_key_path))
tls_context.cert = OpenSSL::X509::Certificate.new(File.open(@nats_client_certificate_path))
tls_context.ca_file = @nats_server_ca_path

@nats_options = {
servers: Array.new(MAX_RECONNECT_ATTEMPTS, @nats_uri),
dont_randomize_servers: true,
max_reconnect_attempts: MAX_RECONNECT_ATTEMPTS,
reconnect_time_wait: 2,
reconnect: true,
tls: {
context: tls_context,
},
}

@nats.on_error do |e|
password = @nats_uri[%r{nats://.*:(.*)@}, 1]
redacted_message = password.nil? ? "NATS client error: #{e}" : "NATS client error: #{e}".gsub(password, '*******')
@logger.error(redacted_message)
end
end
end
@nats
end

# subscribe to an inbox, if not already subscribed
Expand All @@ -123,7 +124,7 @@ def subscribe_inbox
handle_response(message, subject)
end
end
end
end if client.connected?
end
end

Expand Down Expand Up @@ -151,6 +152,5 @@ def sanitize_log_message(request)
JSON.generate(request)
end
end

end
end

0 comments on commit 29421a6

Please sign in to comment.