This repository has been archived by the owner on Jan 15, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 128
/
Program.cs
148 lines (120 loc) · 5.33 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
using System;
using System.Configuration;
using System.Diagnostics;
using System.IO;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
namespace MiscOperations
{
class Program
{
private static string _servicesBusConnectionString;
private static NamespaceManager _namespaceManager;
private static CloudQueue _testQueue;
static void Main()
{
CreateTestQueues();
CreateServiceBusQueues();
CreateServiceBusTestMessage();
// This test message kicks off the sample on how to perform graceful
// shutdown. It will shut down the host, so if you want to run other
// samples, comment this out.
//CreateShutdownTestMessage();
JobHostConfiguration config = new JobHostConfiguration()
{
NameResolver = new ConfigNameResolver(),
};
// Demonstrates the global queue processing settings that can
// be configured
config.Queues.MaxPollingInterval = TimeSpan.FromSeconds(3);
config.Queues.MaxDequeueCount = 3;
config.Queues.BatchSize = 16;
config.Queues.NewBatchThreshold = 20;
// Demonstrates how queue processing can be customized further
// by defining a custom QueueProcessor Factory
config.Queues.QueueProcessorFactory = new CustomQueueProcessorFactory();
// Demonstrates how the console trace level can be customized
config.Tracing.ConsoleLevel = TraceLevel.Verbose;
// Demonstrates how a custom TraceWriter can be plugged into the
// host to capture all logging/traces.
config.Tracing.Tracers.Add(new CustomTraceWriter(TraceLevel.Info));
ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration
{
ConnectionString = _servicesBusConnectionString,
// demonstrates global customization of the default OnMessageOptions
// that will be used by MessageReceivers
MessageOptions = new OnMessageOptions
{
MaxConcurrentCalls = 10
}
};
// demonstrates use of a custom MessagingProvider to perform deeper
// customizations of the message processing pipeline
serviceBusConfig.MessagingProvider = new CustomMessagingProvider(serviceBusConfig);
config.UseServiceBus(serviceBusConfig);
try
{
SetEnvironmentVariable(Functions.ShutDownFilePath);
JobHost host = new JobHost(config);
host.RunAndBlock();
}
finally
{
ClearEnvironmentVariable();
}
Console.WriteLine("\nDone");
Console.ReadLine();
}
private static void CreateServiceBusQueues()
{
_servicesBusConnectionString = AmbientConnectionStringProvider.Instance.GetConnectionString(ConnectionStringNames.ServiceBus);
_namespaceManager = NamespaceManager.CreateFromConnectionString(_servicesBusConnectionString);
}
private static void CreateTestQueues()
{
string connectionString = AmbientConnectionStringProvider.Instance.GetConnectionString(ConnectionStringNames.Storage);
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString);
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("singleton-test");
queue.CreateIfNotExists();
_testQueue = queueClient.GetQueueReference("testqueue");
_testQueue.CreateIfNotExists();
CloudQueue testQueue2 = queueClient.GetQueueReference("testqueue2");
testQueue2.CreateIfNotExists();
}
private static void CreateShutdownTestMessage()
{
_testQueue.AddMessage(new CloudQueueMessage("GO!"));
}
private static void CreateServiceBusTestMessage()
{
if (!_namespaceManager.QueueExists(Functions.ServiceBusTestQueueName))
{
_namespaceManager.CreateQueue(Functions.ServiceBusTestQueueName);
}
QueueClient queueClient = QueueClient.CreateFromConnectionString(_servicesBusConnectionString, Functions.ServiceBusTestQueueName);
using (Stream stream = new MemoryStream())
using (TextWriter writer = new StreamWriter(stream))
{
writer.Write("Test");
writer.Flush();
stream.Position = 0;
queueClient.Send(new BrokeredMessage(stream) { ContentType = "text/plain" });
}
queueClient.Close();
}
private static void SetEnvironmentVariable(string path)
{
Environment.SetEnvironmentVariable("WEBJOBS_SHUTDOWN_FILE", path);
}
private static void ClearEnvironmentVariable()
{
Environment.SetEnvironmentVariable("WEBJOBS_SHUTDOWN_FILE", null);
}
}
}