Skip to content
Permalink
Fetching contributors…
Cannot retrieve contributors at this time
255 lines (230 sloc) 12.4 KB
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Runtime.Remoting.Messaging;
using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.Storage.Queue;
using Microsoft.WindowsAzure.Storage.Table;
using MvcWebRole.Models;
using WorkerRoleA.Telemetry;
namespace WorkerRoleA
{
public class WorkerRoleA : RoleEntryPoint
{
private CloudQueue sendEmailQueue;
private CloudTable mailingListTable;
private CloudTable messageTable;
private CloudTable messagearchiveTable;
private volatile bool onStopCalled = false;
private volatile bool returnedFromRunMethod = false;
private TelemetryClient aiClient = new TelemetryClient();
private static string CORRELATION_SLOT = "CORRELATION-ID";
public override void Run()
{
Trace.TraceInformation("WorkerRoleA entering Run()");
while (true)
{
Stopwatch requestTimer = Stopwatch.StartNew();
var request = RequestTelemetryHelper.StartNewRequest("ProcessMessageWorkflow", DateTimeOffset.UtcNow);
CallContext.LogicalSetData(CORRELATION_SLOT, request.Id);
//Thread.SetData(Thread.GetNamedDataSlot(CORRELATION_SLOT), request.Id);
try
{
var tomorrow = DateTime.Today.AddDays(1.0).ToString("yyyy-MM-dd");
// If OnStop has been called, return to do a graceful shutdown.
if (onStopCalled == true)
{
Trace.TraceInformation("onStopCalled WorkerRoleB");
returnedFromRunMethod = true;
return;
}
// Retrieve all messages that are scheduled for tomorrow or earlier
// and are in Pending or Queuing status.
string typeAndDateFilter = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.GreaterThan, "message"),
TableOperators.And,
TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.LessThan, tomorrow));
var query = (new TableQuery<Message>().Where(typeAndDateFilter));
var messagesToProcess = messageTable.ExecuteQuery(query).ToList();
TableOperation replaceOperation;
request.Metrics.Add(new KeyValuePair<string, double>("NumberOfMessages", messagesToProcess.Count));
// Process each message (queue emails to be sent).
foreach (Message messageToProcess in messagesToProcess)
{
string restartFlag = "0";
// If the message is already in Queuing status,
// set flag to indicate this is a restart.
if (messageToProcess.Status == "Queuing")
{
restartFlag = "1";
}
// If the message is in Pending status, change
// it to Queuing.
if (messageToProcess.Status == "Pending")
{
messageToProcess.Status = "Queuing";
replaceOperation = TableOperation.Replace(messageToProcess);
messageTable.Execute(replaceOperation);
}
// If the message is in Queuing status,
// process it and change it to Processing status;
// otherwise it's already in processing status, and
// in that case check if processing is complete.
if (messageToProcess.Status == "Queuing")
{
ProcessMessage(messageToProcess, restartFlag);
messageToProcess.Status = "Processing";
replaceOperation = TableOperation.Replace(messageToProcess);
messageTable.Execute(replaceOperation);
}
else
{
CheckAndArchiveIfComplete(messageToProcess);
}
}
RequestTelemetryHelper.DispatchRequest(request, requestTimer.Elapsed, true);
// Sleep to minimize query costs.
System.Threading.Thread.Sleep(1000 * 30);
}
catch (Exception ex)
{
string err = ex.Message;
if (ex.InnerException != null)
{
err += " Inner Exception: " + ex.InnerException.Message;
}
Trace.TraceError(err, ex);
RequestTelemetryHelper.DispatchRequest(request, requestTimer.Elapsed, false);
// Don't fill up Trace storage if we have a bug in queue process loop.
System.Threading.Thread.Sleep(1000 * 60);
}
}
}
private void ProcessMessage(Message messageToProcess, string restartFlag)
{
// Get Mailing List info to get the "From" email address.
var retrieveOperation = TableOperation.Retrieve<MailingList>(messageToProcess.ListName, "mailinglist");
var retrievedResult = mailingListTable.Execute(retrieveOperation);
var mailingList = retrievedResult.Result as MailingList;
if (mailingList == null)
{
Trace.TraceError("Mailing list not found: " + messageToProcess.ListName + " for message: " + messageToProcess.MessageRef);
return;
}
// Get email addresses for this Mailing List.
string filter = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, messageToProcess.ListName),
TableOperators.And,
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.NotEqual, "mailinglist"));
var query = new TableQuery<Subscriber>().Where(filter);
var subscribers = mailingListTable.ExecuteQuery(query).ToList();
aiClient.TrackMetric("SubscriberCount", subscribers.Count);
foreach (Subscriber subscriber in subscribers)
{
// Verify that the subscriber email address has been verified.
if (subscriber.Verified == false)
{
Trace.TraceInformation("Subscriber " + subscriber.EmailAddress + " not Verified, so not queuing ");
continue;
}
// Create a SendEmail entity for this email.
var sendEmailRow = new SendEmail
{
PartitionKey = messageToProcess.PartitionKey,
RowKey = messageToProcess.MessageRef.ToString() + subscriber.EmailAddress,
EmailAddress = subscriber.EmailAddress,
EmailSent = false,
MessageRef = messageToProcess.MessageRef,
ScheduledDate = messageToProcess.ScheduledDate,
FromEmailAddress = mailingList.FromEmailAddress,
SubjectLine = messageToProcess.SubjectLine,
SubscriberGUID = subscriber.SubscriberGUID,
ListName = mailingList.ListName
};
// When we try to add the entity to the SendEmail table,
// an exception might happen if this worker role went
// down after processing some of the email addresses and then restarted.
// In that case the row might already be present, so we do an Upsert operation.
try
{
var upsertOperation = TableOperation.InsertOrReplace(sendEmailRow);
messageTable.Execute(upsertOperation);
}
catch (Exception ex)
{
string err = "Error creating SendEmail row: " + ex.Message;
if (ex.InnerException != null)
{
err += " Inner Exception: " + ex.InnerException;
}
Trace.TraceError(err, ex);
}
// Create the queue message.
string queueMessageString =
sendEmailRow.PartitionKey + "," +
sendEmailRow.RowKey + "," +
restartFlag;
var queueMessage = new CloudQueueMessage(queueMessageString);
sendEmailQueue.AddMessage(queueMessage);
}
Trace.TraceInformation("ProcessMessage end PK: "
+ messageToProcess.PartitionKey);
}
private void CheckAndArchiveIfComplete(Message messageToCheck)
{
// Get the list of emails to be sent for this message: all SendEmail rows
// for this message.
string pkrkFilter = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, messageToCheck.PartitionKey),
TableOperators.And,
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.LessThan, "message"));
var query = new TableQuery<SendEmail>().Where(pkrkFilter);
var emailToBeSent = messageTable.ExecuteQuery(query).FirstOrDefault();
if (emailToBeSent != null)
{
return;
}
// All emails have been sent; copy the message row to the archive table.
// Insert the message row in the messagearchive table
var messageToDelete = new Message { PartitionKey = messageToCheck.PartitionKey, RowKey = messageToCheck.RowKey, ETag = "*" };
messageToCheck.Status = "Complete";
var insertOrReplaceOperation = TableOperation.InsertOrReplace(messageToCheck);
messagearchiveTable.Execute(insertOrReplaceOperation);
// Delete the message row from the message table.
var deleteOperation = TableOperation.Delete(messageToDelete);
messageTable.Execute(deleteOperation);
}
public override void OnStop()
{
onStopCalled = true;
while (returnedFromRunMethod == false)
{
System.Threading.Thread.Sleep(1000);
}
}
public override bool OnStart()
{
TelemetryConfiguration.Active.InstrumentationKey = RoleEnvironment.GetConfigurationSettingValue("APPINSIGHTS_INSTRUMENTATIONKEY");
TelemetryConfiguration.Active.TelemetryInitializers.Add(new ItemCorrelationTelemetryInitializer());
ServicePointManager.DefaultConnectionLimit = Environment.ProcessorCount * 12;
Trace.TraceInformation("Initializing storage account in WorkerA");
var storageAccount = Microsoft.WindowsAzure.Storage.CloudStorageAccount.Parse(RoleEnvironment.GetConfigurationSettingValue("StorageConnectionString"));
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
sendEmailQueue = queueClient.GetQueueReference("azuremailqueue");
var tableClient = storageAccount.CreateCloudTableClient();
mailingListTable = tableClient.GetTableReference("mailinglist");
messageTable = tableClient.GetTableReference("message");
messagearchiveTable = tableClient.GetTableReference("messagearchive");
// Create if not exists for queue, blob container, SentEmail table.
sendEmailQueue.CreateIfNotExists();
messageTable.CreateIfNotExists();
mailingListTable.CreateIfNotExists();
messagearchiveTable.CreateIfNotExists();
return base.OnStart();
}
}
}
You can’t perform that action at this time.