diff --git a/spec/policies_spec.cr b/spec/policies_spec.cr index 90a807e5f..fa6be5d93 100644 --- a/spec/policies_spec.cr +++ b/spec/policies_spec.cr @@ -2,7 +2,8 @@ require "./spec_helper" describe AvalancheMQ::VHost do log = Logger.new(STDOUT) - # log.level = Logger::DEBUG + log.level = Logger::ERROR + FileUtils.rm_rf("/tmp/spec") vhost = AvalancheMQ::VHost.new("add_policy", "/tmp/spec", log) definitions = { "max-length" => 10_i64, @@ -11,14 +12,15 @@ describe AvalancheMQ::VHost do it "should be able to add policy" do vhost.add_policy("test", "^.*$", "all", definitions, -10_i8) vhost.policies.size.should eq 1 - vhost.remove_policy("test") + vhost.delete_policy("test") end it "should be able to list policies" do - vhost = AvalancheMQ::VHost.new("add_remove_policy", "/tmp/spec", log) - vhost.add_policy("test", "^.*$", "all", definitions, -10_i8) - vhost.remove_policy("test") - vhost.policies.size.should eq 0 + vhost2 = AvalancheMQ::VHost.new("add_remove_policy", "/tmp/spec_lp", log) + vhost2.add_policy("test", "^.*$", "all", definitions, -10_i8) + vhost2.delete_policy("test") + vhost2.policies.size.should eq 0 + vhost2.delete end it "should overwrite policy with same name" do @@ -26,21 +28,21 @@ describe AvalancheMQ::VHost do vhost.add_policy("test", "^.*$", "exchanges", definitions, 10_i8) vhost.policies.size.should eq 1 vhost.policies["test"].apply_to.should eq "exchanges" - vhost.remove_policy("test") + vhost.delete_policy("test") end it "should validate pattern" do expect_raises(ArgumentError) do vhost.add_policy("test", "(bad", "all", definitions, -10_i8) end - vhost.remove_policy("test") + vhost.delete_policy("test") end it "should validate appy_to" do expect_raises(ArgumentError) do vhost.add_policy("test", "^.*$", "bad", definitions, -10_i8) end - vhost.remove_policy("test") + vhost.delete_policy("test") end it "should apply policy" do @@ -49,7 +51,7 @@ describe AvalancheMQ::VHost do vhost.add_policy("ml", "^.*$", "queues", definitions, 11_i8) Fiber.yield vhost.queues["test"].policy.not_nil!.name.should eq "ml" - vhost.remove_policy("ml") + vhost.delete_policy("ml") end it "should respect priroty" do diff --git a/spec/vhost_spec.cr b/spec/vhost_spec.cr new file mode 100644 index 000000000..b2d0935da --- /dev/null +++ b/spec/vhost_spec.cr @@ -0,0 +1,23 @@ +require "./spec_helper" + +describe AvalancheMQ::Server do + s = AvalancheMQ::Server.new("/tmp/spec", Logger::ERROR) + + it "should be able to create vhosts" do + s.vhosts.create("test") + s.vhosts["test"]?.should_not be_nil + end + + it "should be able to delete vhosts" do + s.vhosts.create("test") + s.vhosts.delete("test") + s.vhosts["test"]?.should be_nil + end + + it "should be able to persist vhosts" do + s.vhosts.create("test") + s.close + s = AvalancheMQ::Server.new("/tmp/spec", Logger::ERROR) + s.vhosts["test"]?.should_not be_nil + end +end diff --git a/src/avalanchemq/http_server.cr b/src/avalanchemq/http_server.cr index 43d125f0a..df227ff4e 100644 --- a/src/avalanchemq/http_server.cr +++ b/src/avalanchemq/http_server.cr @@ -12,6 +12,8 @@ module AvalancheMQ get(context) when "POST" post(context) + when "DELETE" + delete(context) else context.response.content_type = "text/plain" context.response.status_code = 405 @@ -33,6 +35,8 @@ module AvalancheMQ @amqp_server.vhosts.flat_map { |_, v| v.queues.values }.to_json(context.response) when "/api/policies" @amqp_server.vhosts.flat_map { |_, v| v.policies.values }.to_json(context.response) + when "/api/vhosts" + @amqp_server.vhosts.to_json(context.response) when "/" context.response.content_type = "text/plain" context.response.print "AvalancheMQ" @@ -43,11 +47,14 @@ module AvalancheMQ def post(context) case context.request.path - when "/api/policies" - body = JSON.parse(context.request.body.not_nil!) + when "/api/policies/" + body = parse_body(context) vhost = @amqp_server.vhosts[body["vhost"].as_s]? - raise NotFoundError.new("No vhost named #{body["vhost"]}") unless vhost + raise NotFoundError.new("No vhost named #{body["vhost"].as_s}") unless vhost vhost.add_policy(Policy.from_json(vhost, body)) + when "/api/vhosts" + body = parse_body(context) + @amqp_server.create_vhost(body["name"].as_s) else not_found(context) end @@ -56,6 +63,21 @@ module AvalancheMQ { error: "#{e.class}: #{e.message}" }.to_json(context.response) end + def delete(context) + case context.request.path + when "/api/policies" + body = parse_body(context) + vhost = @amqp_server.vhosts[body["vhost"].as_s]? + raise NotFoundError.new("No vhost named #{body["vhost"].as_s}") unless vhost + vhost.delete_policy(body["name"].as_s) + when "/api/vhosts" + body = parse_body(context) + @amqp_server.delete_vhost(body["name"].as_s) + else + not_found(context) + end + end + def not_found(context, message = nil) context.response.content_type = "text/plain" context.response.status_code = 404 @@ -63,6 +85,11 @@ module AvalancheMQ context.response.print message end + def parse_body(context) + raise ExpectedBodyError.new if context.request.body.nil? + JSON.parse(context.request.body) + end + def listen server = @http.bind print "HTTP API listening on ", server.local_address, "\n" @@ -74,5 +101,6 @@ module AvalancheMQ end class NotFoundError < Exception; end + class ExpectedBodyError < ArgumentError; end end end diff --git a/src/avalanchemq/policy.cr b/src/avalanchemq/policy.cr index 6fc10f18e..c163312dc 100644 --- a/src/avalanchemq/policy.cr +++ b/src/avalanchemq/policy.cr @@ -69,7 +69,7 @@ module AvalancheMQ protected def delete @log.info "Deleting" - @vhost.remove_policy(name) + @vhost.delete_policy(name) end class InvalidDefinitionError < ArgumentError; end diff --git a/src/avalanchemq/server.cr b/src/avalanchemq/server.cr index da8e6a53b..e54c61dc6 100644 --- a/src/avalanchemq/server.cr +++ b/src/avalanchemq/server.cr @@ -3,7 +3,7 @@ require "logger" require "openssl" require "./amqp" require "./client" -require "./vhost" +require "./vhost_store" require "./exchange" require "./queue" require "./durable_queue" @@ -19,15 +19,11 @@ module AvalancheMQ @log.formatter = Logger::Formatter.new do |severity, datetime, progname, message, io| io << progname << ": " << message end + Dir.mkdir_p @data_dir @listeners = Array(TCPServer).new(1) @connections = Array(Client).new @connection_events = Channel(Tuple(Client, Symbol)).new(16) - Dir.mkdir_p @data_dir - @vhosts = { - "/" => VHost.new("/", @data_dir, @log), - "default" => VHost.new("default", @data_dir, @log), - "bunny_testbed" => VHost.new("bunny_testbed", @data_dir, @log) - } + @vhosts = VHostStore.new(@data_dir, @log) spawn handle_connection_events, name: "Server#handle_connection_events" end @@ -81,7 +77,7 @@ module AvalancheMQ @log.debug "Closing connections" @connections.each &.close @log.debug "Closing vhosts" - @vhosts.each_value &.close + @vhosts.close end private def handle_connection(socket : TCPSocket, ssl_client : OpenSSL::SSL::Socket? = nil) diff --git a/src/avalanchemq/vhost.cr b/src/avalanchemq/vhost.cr index b393bc88b..8d2cc32f0 100644 --- a/src/avalanchemq/vhost.cr +++ b/src/avalanchemq/vhost.cr @@ -24,7 +24,8 @@ module AvalancheMQ @queues = Hash(String, Queue).new @policies = Hash(String, Policy).new @save = Channel(AMQP::Frame).new(32) - @data_dir = File.join(@server_data_dir, Digest::SHA1.hexdigest(@name)) + @dir = Digest::SHA1.hexdigest(@name) + @data_dir = File.join(@server_data_dir, @dir) Dir.mkdir_p @data_dir @segment = last_segment @wfile = open_wfile @@ -81,6 +82,13 @@ module AvalancheMQ wfile end + def to_json(json : JSON::Builder) + { + name: @name, + dir: @dir + }.to_json(json) + end + def apply(f, loading = false) @save.send f unless loading case f @@ -141,7 +149,7 @@ module AvalancheMQ spawn apply_policies end - def remove_policy(name) + def delete_policy(name) @policies.delete(name) save_policies! spawn apply_policies @@ -152,6 +160,12 @@ module AvalancheMQ @save.close end + def delete + close + Fiber.yield + FileUtils.rm_rf(@data_dir) + end + private def apply_policies(resources : Array(Queue | Exchange) | Nil = nil) itr = if resources resources.each @@ -187,16 +201,20 @@ module AvalancheMQ private def load_policies! file = File.join(@data_dir, "policies.json") - return unless File.exists?(file) - policies = File.read(File.join(@data_dir, "policies.json")) - data = JSON.parse(policies) - return unless data.is_a?(Array) - data.each do |p| - next unless p.is_a?(Hash) - policy = Policy.from_json(self, p) - @policies[policy.name] = policy + if File.exists?(file) + @log.debug("File exists") + File.open(File.join(@data_dir, "policies.json"), "r") do |f| + data = JSON.parse(f) + data.each do |p| + policy = Policy.from_json(self, p) + @policies[policy.name] = policy + end + end end + @log.debug { "#{@policies.size} policies loaded" } spawn apply_policies + rescue e : Exception + @log.error("Can't load policies: #{e.inspect}") end private def load_default_definitions @@ -285,10 +303,11 @@ module AvalancheMQ private def save_policies! @log.debug "Saving #{@policies.size} policies" - File.open(File.join(@data_dir, "policies.json.tmp"), "w") do |f| + tmpfile = File.join(@data_dir, "policies.json.tmp") + File.open(tmpfile, "w") do |f| @policies.values.to_json(f) end - File.rename File.join(@data_dir, "policies.json.tmp"), File.join(@data_dir, "policies.json") + File.rename tmpfile, File.join(@data_dir, "policies.json") end private def last_segment : UInt32 diff --git a/src/avalanchemq/vhost_store.cr b/src/avalanchemq/vhost_store.cr new file mode 100644 index 000000000..5db34162e --- /dev/null +++ b/src/avalanchemq/vhost_store.cr @@ -0,0 +1,74 @@ +require "json" +require "./vhost" + +module AvalancheMQ + class VHostStore + def initialize(@data_dir : String, @log : Logger) + @vhosts = Hash(String, VHost).new + load! + end + + def [](name) + @vhosts[name] + end + + def []?(name) + @vhosts[name]? + end + + def size + @vhosts.size + end + + def create(name, save = true) + return if @vhosts.has_key?(name) + vhost = VHost.new(name, @data_dir, @log) + @vhosts[name] = vhost + save! if save + vhost + end + + def delete(name) : VHost? + if vhost = @vhosts.delete name + vhost.delete + save! + vhost + end + end + + def close + @vhosts.each_value &.close + save! + end + + def to_json(json : JSON::Builder) + @vhosts.keys.to_json(json) + end + + private def load! + path = File.join(@data_dir, "vhosts.json") + if File.exists? path + @log.debug "Loading vhosts from file" + File.open(path) do |f| + Array(String).from_json(f) do |name| + @vhosts[name] = VHost.new(name, @data_dir, @log) + end + end + else + @log.debug "Loading default vhosts" + create("/", save: false) + create("default", save: false) + create("bunny_testbed", save: false) + save! + end + @log.debug("#{@vhosts.size} vhosts loaded") + end + + private def save! + @log.debug "Saving vhosts to file" + tmpfile = File.join(@data_dir, "vhosts.json.tmp") + File.open(tmpfile, "w") { |f| self.to_json(f) } + File.rename tmpfile, File.join(@data_dir, "vhosts.json") + end + end +end diff --git a/src/avalanchemqctl.cr b/src/avalanchemqctl.cr index 0ad3bdb94..f71e07fc2 100644 --- a/src/avalanchemqctl.cr +++ b/src/avalanchemqctl.cr @@ -24,12 +24,13 @@ parser.on("-h", "--help", "Show this help") { puts parser; exit 1 } parser.invalid_option { |arg| abort "Invalid argument: #{arg}" } parser.parse! -abort banner unless (entity = ARGV.shift?) && ["queues", "policies", "exchanges"].includes?(entity) +ENTITIES = ["queues", "policies", "exchanges", "vhosts"] +abort banner unless (entity = ARGV.shift?) && ENTITIES.includes?(entity) headers = HTTP::Headers{"Content-Type" => "application/json"} begin if name = options["remove"]? - resp = HTTP::Client.delete "#{options["host"]}/api/#{entity}", headers, name.to_s + resp = HTTP::Client.delete "#{options["host"]}/api/#{entity}", headers, { name: name.to_s, vhost: options["vhost"]? }.to_json if resp.status_code == 200 exit 0 else