Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to create rules via scripting #68

Merged
merged 7 commits into from Jul 31, 2019
Merged
Changes from 5 commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -36,5 +36,29 @@ public static async Task Create(ManagementClient client, CommandArgument name, C
Console.WriteLine("Subscription already exists, skipping creation");
}
}

public static async Task Subscribe(ManagementClient client, CommandArgument name, CommandOption topicName, CommandOption subscriptionName, CommandArgument eventType, CommandOption ruleName)
{
try
{
await Rule.Create(client, name, topicName, subscriptionName, eventType, ruleName);
}
catch (MessagingEntityAlreadyExistsException)
{
Console.WriteLine($"Rule already exists, skipping creation. Verify SQL filter matches '[NServiceBus.EnclosedMessageTypes] LIKE '%{eventType.Value}%'.");
}
}

public static async Task Unsubscribe(ManagementClient client, CommandArgument name, CommandOption topicName, CommandOption subscriptionName, CommandArgument eventType, CommandOption ruleName)
{
try
{
await Rule.Delete(client, name, topicName, subscriptionName, eventType, ruleName);
}
catch (MessagingEntityAlreadyExistsException)
This conversation was marked as resolved by danielmarbach

This comment has been minimized.

Copy link
@bording

bording Jul 29, 2019

Member

Is this the right exception to to be catching here? I think it needs to be MessagingEntityNotFoundException, but it would be worth verifying (add a test?)

This comment has been minimized.

Copy link
@danielmarbach

danielmarbach Jul 29, 2019

Author Member

I think you're right.

This comment has been minimized.

Copy link
@danielmarbach

danielmarbach Jul 30, 2019

Author Member

Fixed

{
Console.WriteLine("Rule does not exist, skipping deletion");
}
}
}
}
@@ -56,6 +56,44 @@ static int Main(string[] args)
Console.WriteLine($"Endpoint '{name.Value}' is ready.");
});
});

endpointCommand.Command("subscribe", subscribeCommand =>
{
subscribeCommand.Description = "Subscribes an endpoint to an event.";
var name = subscribeCommand.Argument("name", "Name of the endpoint (required)").IsRequired();
var eventType = subscribeCommand.Argument("event-type", "Full name of the event to subscribe to (e.g. MyNamespace.MyMessage) (required)").IsRequired();

subscribeCommand.Options.Add(connectionString);
var topicName = subscribeCommand.Option("-t|--topic", "Topic name (defaults to 'bundle-1')", CommandOptionType.SingleValue);
var subscriptionName = subscribeCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);
var shortenedRuleName = subscribeCommand.Option("-r|--rule-name", "Rule name (defaults to event type) ", CommandOptionType.SingleValue);

subscribeCommand.OnExecute(async () =>
{
await CommandRunner.Run(connectionString, client => Endpoint.Subscribe(client, name, topicName, subscriptionName, eventType, shortenedRuleName));

Console.WriteLine($"Endpoint '{name.Value}' subscribed to '{eventType.Value}'.");
});
});

endpointCommand.Command("unsubscribe", unsubscribeCommand =>
{
unsubscribeCommand.Description = "Unsubscribes an endpoint from an event.";
var name = unsubscribeCommand.Argument("name", "Name of the endpoint (required)").IsRequired();
var eventType = unsubscribeCommand.Argument("event-type", "Full name of the event to unsubscribe from (e.g. MyNamespace.MyMessage) (required)").IsRequired();

unsubscribeCommand.Options.Add(connectionString);
var topicName = unsubscribeCommand.Option("-t|--topic", "Topic name (defaults to 'bundle-1')", CommandOptionType.SingleValue);
var subscriptionName = unsubscribeCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);
var shortenedRuleName = unsubscribeCommand.Option("-r|--rule-name", "Rule name (defaults to event type) ", CommandOptionType.SingleValue);

unsubscribeCommand.OnExecute(async () =>
{
await CommandRunner.Run(connectionString, client => Endpoint.Unsubscribe(client, name, topicName, subscriptionName, eventType, shortenedRuleName));

Console.WriteLine($"Endpoint '{name.Value}' unsubscribed from '{eventType.Value}'.");
});
});
});

app.Command("queue", queueCommand =>
@@ -7,12 +7,25 @@

static class Rule
{
public static Task Delete(ManagementClient client, CommandArgument endpointName, CommandOption topicName, CommandOption subscriptionName, string ruleName = RuleDescription.DefaultRuleName)
public static Task Create(ManagementClient client, CommandArgument endpointName, CommandOption topicName, CommandOption subscriptionName, CommandArgument eventType, CommandOption ruleName)
{
var topicNameToUse = topicName.HasValue() ? topicName.Value() : Topic.DefaultTopicName;
var subscriptionNameToUse = subscriptionName.HasValue() ? subscriptionName.Value() : endpointName.Value;
var eventToSubscribeTo = eventType.Value;
var ruleNameToUse = ruleName.HasValue() ? ruleName.Value() : eventToSubscribeTo;
var description = new RuleDescription(ruleNameToUse, new SqlFilter($"[NServiceBus.EnclosedMessageTypes] LIKE '%{eventToSubscribeTo}%'"));

return client.DeleteRuleAsync(topicNameToUse, subscriptionNameToUse, ruleName);
return client.CreateRuleAsync(topicNameToUse, subscriptionNameToUse, description);
}

public static Task Delete(ManagementClient client, CommandArgument endpointName, CommandOption topicName, CommandOption subscriptionName, CommandArgument eventType, CommandOption ruleName)
{
var topicNameToUse = topicName.HasValue() ? topicName.Value() : Topic.DefaultTopicName;
var subscriptionNameToUse = subscriptionName.HasValue() ? subscriptionName.Value() : endpointName.Value;
var eventToSubscribeTo = eventType.Value;
var ruleNameToUse = ruleName.HasValue() ? ruleName.Value() : eventToSubscribeTo;

return client.DeleteRuleAsync(topicNameToUse, subscriptionNameToUse, ruleNameToUse);
}
}
}
@@ -28,11 +28,46 @@ public async Task Create_endpoint_when_there_are_no_entities()
Assert.IsTrue(error == string.Empty);
Assert.IsFalse(output.Contains("skipping"));

await VerifyQueue(QueueName);
await VerifyTopic(TopicName);
await VerifySubscriptionContainsOnlyDefaultRule(TopicName, SubscriptionName);
}

[Test]
public async Task Subscribe_endpoint()
{
await DeleteQueue(QueueName);
await DeleteTopic(TopicName);

await Execute($"endpoint create {EndpointName} --topic {TopicName}");

await Execute($"endpoint subscribe {EndpointName} MyMessage1 --topic {TopicName}");
await Execute($"endpoint subscribe {EndpointName} MyNamespace1.MyMessage2 --topic {TopicName}");
await Execute($"endpoint subscribe {EndpointName} MyNamespace1.MyMessage3 --topic {TopicName} --rule-name CustomRuleName");

await VerifyQueue(QueueName);
await VerifyTopic(TopicName);
await VerifySubscription(TopicName, SubscriptionName, QueueName);
}

[Test]
public async Task Unsubscribe_endpoint()
{
await DeleteQueue(QueueName);
await DeleteTopic(TopicName);

await Execute($"endpoint create {EndpointName} --topic {TopicName}");
await Execute($"endpoint subscribe {EndpointName} MyMessage1 --topic {TopicName}");
await Execute($"endpoint subscribe {EndpointName} MyNamespace1.MyMessage2 --topic {TopicName}");
await Execute($"endpoint subscribe {EndpointName} MyNamespace1.MyMessage3 --topic {TopicName} --rule-name CustomRuleName");

await Execute($"endpoint unsubscribe {EndpointName} MyMessage1 --topic {TopicName}");
await Execute($"endpoint unsubscribe {EndpointName} MyNamespace1.MyMessage2 --topic {TopicName}");
await Execute($"endpoint unsubscribe {EndpointName} MyNamespace1.MyMessage3 --topic {TopicName} --rule-name CustomRuleName");

await VerifySubscriptionContainsOnlyDefaultRule(TopicName, SubscriptionName);
}

[Test]
public async Task Create_queue_when_it_does_not_exist()
{
@@ -118,13 +153,36 @@ async Task VerifySubscription(string topicName, string subscriptionName, string
Assert.AreEqual(true, actual.EnableBatchedOperations);
Assert.AreEqual(queueName, actual.UserMetadata);

// rules
var rules = await client.GetRulesAsync(topicName, subscriptionName);
Assert.IsTrue(rules.Count == 4);

var defaultRule = rules.ElementAt(0);
Assert.AreEqual("$default", defaultRule.Name);
Assert.AreEqual(new FalseFilter().SqlExpression, ((FalseFilter)defaultRule.Filter).SqlExpression);

var customRuleNameRule = rules.ElementAt(1);
Assert.AreEqual("CustomRuleName", customRuleNameRule.Name);
Assert.AreEqual(new SqlFilter("[NServiceBus.EnclosedMessageTypes] LIKE '%MyNamespace1.MyMessage3%'").SqlExpression, ((SqlFilter)customRuleNameRule.Filter).SqlExpression);

var myMessage1Rule = rules.ElementAt(2);
Assert.AreEqual("MyMessage1", myMessage1Rule.Name);
Assert.AreEqual(new SqlFilter("[NServiceBus.EnclosedMessageTypes] LIKE '%MyMessage1%'").SqlExpression, ((SqlFilter)myMessage1Rule.Filter).SqlExpression);

var myMessage2WithNamespace = rules.ElementAt(3);
Assert.AreEqual("MyNamespace1.MyMessage2", myMessage2WithNamespace.Name);
Assert.AreEqual(new SqlFilter("[NServiceBus.EnclosedMessageTypes] LIKE '%MyNamespace1.MyMessage2%'").SqlExpression, ((SqlFilter)myMessage2WithNamespace.Filter).SqlExpression);
}

async Task VerifySubscriptionContainsOnlyDefaultRule(string topicName, string subscriptionName)
{
// rules
var rules = await client.GetRulesAsync(topicName, subscriptionName);
Assert.IsTrue(rules.Count == 1);

var rule = rules.First();
Assert.AreEqual("$default", rule.Name);
Assert.AreEqual(new FalseFilter().SqlExpression, ((FalseFilter)rule.Filter).SqlExpression);
var defaultRule = rules.ElementAt(0);
Assert.AreEqual("$default", defaultRule.Name);
Assert.AreEqual(new FalseFilter().SqlExpression, ((FalseFilter)defaultRule.Filter).SqlExpression);
}

async Task VerifyQueueExists(bool queueShouldExist)
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.