Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create and delete vhosts #14

Merged
merged 5 commits into from
Apr 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 12 additions & 10 deletions spec/policies_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ require "./spec_helper"

describe AvalancheMQ::VHost do
log = Logger.new(STDOUT)
# log.level = Logger::DEBUG
log.level = Logger::ERROR
FileUtils.rm_rf("/tmp/spec")
vhost = AvalancheMQ::VHost.new("add_policy", "/tmp/spec", log)
definitions = {
"max-length" => 10_i64,
Expand All @@ -11,36 +12,37 @@ describe AvalancheMQ::VHost do
it "should be able to add policy" do
vhost.add_policy("test", "^.*$", "all", definitions, -10_i8)
vhost.policies.size.should eq 1
vhost.remove_policy("test")
vhost.delete_policy("test")
end

it "should be able to list policies" do
vhost = AvalancheMQ::VHost.new("add_remove_policy", "/tmp/spec", log)
vhost.add_policy("test", "^.*$", "all", definitions, -10_i8)
vhost.remove_policy("test")
vhost.policies.size.should eq 0
vhost2 = AvalancheMQ::VHost.new("add_remove_policy", "/tmp/spec_lp", log)
vhost2.add_policy("test", "^.*$", "all", definitions, -10_i8)
vhost2.delete_policy("test")
vhost2.policies.size.should eq 0
vhost2.delete
end

it "should overwrite policy with same name" do
vhost.add_policy("test", "^.*$", "all", definitions, -10_i8)
vhost.add_policy("test", "^.*$", "exchanges", definitions, 10_i8)
vhost.policies.size.should eq 1
vhost.policies["test"].apply_to.should eq "exchanges"
vhost.remove_policy("test")
vhost.delete_policy("test")
end

it "should validate pattern" do
expect_raises(ArgumentError) do
vhost.add_policy("test", "(bad", "all", definitions, -10_i8)
end
vhost.remove_policy("test")
vhost.delete_policy("test")
end

it "should validate appy_to" do
expect_raises(ArgumentError) do
vhost.add_policy("test", "^.*$", "bad", definitions, -10_i8)
end
vhost.remove_policy("test")
vhost.delete_policy("test")
end

it "should apply policy" do
Expand All @@ -49,7 +51,7 @@ describe AvalancheMQ::VHost do
vhost.add_policy("ml", "^.*$", "queues", definitions, 11_i8)
Fiber.yield
vhost.queues["test"].policy.not_nil!.name.should eq "ml"
vhost.remove_policy("ml")
vhost.delete_policy("ml")
end

it "should respect priroty" do
Expand Down
23 changes: 23 additions & 0 deletions spec/vhost_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
require "./spec_helper"

describe AvalancheMQ::Server do
s = AvalancheMQ::Server.new("/tmp/spec", Logger::ERROR)

it "should be able to create vhosts" do
s.vhosts.create("test")
s.vhosts["test"]?.should_not be_nil
end

it "should be able to delete vhosts" do
s.vhosts.create("test")
s.vhosts.delete("test")
s.vhosts["test"]?.should be_nil
end

it "should be able to persist vhosts" do
s.vhosts.create("test")
s.close
s = AvalancheMQ::Server.new("/tmp/spec", Logger::ERROR)
s.vhosts["test"]?.should_not be_nil
end
end
34 changes: 31 additions & 3 deletions src/avalanchemq/http_server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ module AvalancheMQ
get(context)
when "POST"
post(context)
when "DELETE"
delete(context)
else
context.response.content_type = "text/plain"
context.response.status_code = 405
Expand All @@ -33,6 +35,8 @@ module AvalancheMQ
@amqp_server.vhosts.flat_map { |_, v| v.queues.values }.to_json(context.response)
when "/api/policies"
@amqp_server.vhosts.flat_map { |_, v| v.policies.values }.to_json(context.response)
when "/api/vhosts"
@amqp_server.vhosts.to_json(context.response)
when "/"
context.response.content_type = "text/plain"
context.response.print "AvalancheMQ"
Expand All @@ -43,11 +47,14 @@ module AvalancheMQ

def post(context)
case context.request.path
when "/api/policies"
body = JSON.parse(context.request.body.not_nil!)
when "/api/policies/"
body = parse_body(context)
vhost = @amqp_server.vhosts[body["vhost"].as_s]?
raise NotFoundError.new("No vhost named #{body["vhost"]}") unless vhost
raise NotFoundError.new("No vhost named #{body["vhost"].as_s}") unless vhost
vhost.add_policy(Policy.from_json(vhost, body))
when "/api/vhosts"
body = parse_body(context)
@amqp_server.create_vhost(body["name"].as_s)
else
not_found(context)
end
Expand All @@ -56,13 +63,33 @@ module AvalancheMQ
{ error: "#{e.class}: #{e.message}" }.to_json(context.response)
end

def delete(context)
case context.request.path
when "/api/policies"
body = parse_body(context)
vhost = @amqp_server.vhosts[body["vhost"].as_s]?
raise NotFoundError.new("No vhost named #{body["vhost"].as_s}") unless vhost
vhost.delete_policy(body["name"].as_s)
when "/api/vhosts"
body = parse_body(context)
@amqp_server.delete_vhost(body["name"].as_s)
else
not_found(context)
end
end

def not_found(context, message = nil)
context.response.content_type = "text/plain"
context.response.status_code = 404
context.response.print "Not found\n"
context.response.print message
end

def parse_body(context)
raise ExpectedBodyError.new if context.request.body.nil?
JSON.parse(context.request.body)
end

def listen
server = @http.bind
print "HTTP API listening on ", server.local_address, "\n"
Expand All @@ -74,5 +101,6 @@ module AvalancheMQ
end

class NotFoundError < Exception; end
class ExpectedBodyError < ArgumentError; end
end
end
2 changes: 1 addition & 1 deletion src/avalanchemq/policy.cr
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ module AvalancheMQ

protected def delete
@log.info "Deleting"
@vhost.remove_policy(name)
@vhost.delete_policy(name)
end

class InvalidDefinitionError < ArgumentError; end
Expand Down
12 changes: 4 additions & 8 deletions src/avalanchemq/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ require "logger"
require "openssl"
require "./amqp"
require "./client"
require "./vhost"
require "./vhost_store"
require "./exchange"
require "./queue"
require "./durable_queue"
Expand All @@ -19,15 +19,11 @@ module AvalancheMQ
@log.formatter = Logger::Formatter.new do |severity, datetime, progname, message, io|
io << progname << ": " << message
end
Dir.mkdir_p @data_dir
@listeners = Array(TCPServer).new(1)
@connections = Array(Client).new
@connection_events = Channel(Tuple(Client, Symbol)).new(16)
Dir.mkdir_p @data_dir
@vhosts = {
"/" => VHost.new("/", @data_dir, @log),
"default" => VHost.new("default", @data_dir, @log),
"bunny_testbed" => VHost.new("bunny_testbed", @data_dir, @log)
}
@vhosts = VHostStore.new(@data_dir, @log)
spawn handle_connection_events, name: "Server#handle_connection_events"
end

Expand Down Expand Up @@ -81,7 +77,7 @@ module AvalancheMQ
@log.debug "Closing connections"
@connections.each &.close
@log.debug "Closing vhosts"
@vhosts.each_value &.close
@vhosts.close
end

private def handle_connection(socket : TCPSocket, ssl_client : OpenSSL::SSL::Socket? = nil)
Expand Down
43 changes: 31 additions & 12 deletions src/avalanchemq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ module AvalancheMQ
@queues = Hash(String, Queue).new
@policies = Hash(String, Policy).new
@save = Channel(AMQP::Frame).new(32)
@data_dir = File.join(@server_data_dir, Digest::SHA1.hexdigest(@name))
@dir = Digest::SHA1.hexdigest(@name)
@data_dir = File.join(@server_data_dir, @dir)
Dir.mkdir_p @data_dir
@segment = last_segment
@wfile = open_wfile
Expand Down Expand Up @@ -81,6 +82,13 @@ module AvalancheMQ
wfile
end

def to_json(json : JSON::Builder)
{
name: @name,
dir: @dir
}.to_json(json)
end

def apply(f, loading = false)
@save.send f unless loading
case f
Expand Down Expand Up @@ -141,7 +149,7 @@ module AvalancheMQ
spawn apply_policies
end

def remove_policy(name)
def delete_policy(name)
@policies.delete(name)
save_policies!
spawn apply_policies
Expand All @@ -152,6 +160,12 @@ module AvalancheMQ
@save.close
end

def delete
close
Fiber.yield
FileUtils.rm_rf(@data_dir)
end

private def apply_policies(resources : Array(Queue | Exchange) | Nil = nil)
itr = if resources
resources.each
Expand Down Expand Up @@ -187,16 +201,20 @@ module AvalancheMQ

private def load_policies!
file = File.join(@data_dir, "policies.json")
return unless File.exists?(file)
policies = File.read(File.join(@data_dir, "policies.json"))
data = JSON.parse(policies)
return unless data.is_a?(Array)
data.each do |p|
next unless p.is_a?(Hash)
policy = Policy.from_json(self, p)
@policies[policy.name] = policy
if File.exists?(file)
@log.debug("File exists")
File.open(File.join(@data_dir, "policies.json"), "r") do |f|
data = JSON.parse(f)
data.each do |p|
policy = Policy.from_json(self, p)
@policies[policy.name] = policy
end
end
end
@log.debug { "#{@policies.size} policies loaded" }
spawn apply_policies
rescue e : Exception
@log.error("Can't load policies: #{e.inspect}")
end

private def load_default_definitions
Expand Down Expand Up @@ -285,10 +303,11 @@ module AvalancheMQ

private def save_policies!
@log.debug "Saving #{@policies.size} policies"
File.open(File.join(@data_dir, "policies.json.tmp"), "w") do |f|
tmpfile = File.join(@data_dir, "policies.json.tmp")
File.open(tmpfile, "w") do |f|
@policies.values.to_json(f)
end
File.rename File.join(@data_dir, "policies.json.tmp"), File.join(@data_dir, "policies.json")
File.rename tmpfile, File.join(@data_dir, "policies.json")
end

private def last_segment : UInt32
Expand Down
74 changes: 74 additions & 0 deletions src/avalanchemq/vhost_store.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
require "json"
require "./vhost"

module AvalancheMQ
class VHostStore
def initialize(@data_dir : String, @log : Logger)
@vhosts = Hash(String, VHost).new
load!
end

def [](name)
@vhosts[name]
end

def []?(name)
@vhosts[name]?
end

def size
@vhosts.size
end

def create(name, save = true)
return if @vhosts.has_key?(name)
vhost = VHost.new(name, @data_dir, @log)
@vhosts[name] = vhost
save! if save
vhost
end

def delete(name) : VHost?
if vhost = @vhosts.delete name
vhost.delete
save!
vhost
end
end

def close
@vhosts.each_value &.close
save!
end

def to_json(json : JSON::Builder)
@vhosts.keys.to_json(json)
end

private def load!
path = File.join(@data_dir, "vhosts.json")
if File.exists? path
@log.debug "Loading vhosts from file"
File.open(path) do |f|
Array(String).from_json(f) do |name|
@vhosts[name] = VHost.new(name, @data_dir, @log)
end
end
else
@log.debug "Loading default vhosts"
create("/", save: false)
create("default", save: false)
create("bunny_testbed", save: false)
save!
end
@log.debug("#{@vhosts.size} vhosts loaded")
end

private def save!
@log.debug "Saving vhosts to file"
tmpfile = File.join(@data_dir, "vhosts.json.tmp")
File.open(tmpfile, "w") { |f| self.to_json(f) }
File.rename tmpfile, File.join(@data_dir, "vhosts.json")
end
end
end
5 changes: 3 additions & 2 deletions src/avalanchemqctl.cr
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ parser.on("-h", "--help", "Show this help") { puts parser; exit 1 }
parser.invalid_option { |arg| abort "Invalid argument: #{arg}" }
parser.parse!

abort banner unless (entity = ARGV.shift?) && ["queues", "policies", "exchanges"].includes?(entity)
ENTITIES = ["queues", "policies", "exchanges", "vhosts"]
abort banner unless (entity = ARGV.shift?) && ENTITIES.includes?(entity)
headers = HTTP::Headers{"Content-Type" => "application/json"}

begin
if name = options["remove"]?
resp = HTTP::Client.delete "#{options["host"]}/api/#{entity}", headers, name.to_s
resp = HTTP::Client.delete "#{options["host"]}/api/#{entity}", headers, { name: name.to_s, vhost: options["vhost"]? }.to_json
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't vhost and name be in the path? DELETE /api/vhost/exchange/my-exchange ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I think we should use kemal for route parsing etc. then. I'll save that improvement for another branch

if resp.status_code == 200
exit 0
else
Expand Down