Skip to content

Commit

Permalink
Merge pull request #11860 from v-fearam/patch-5
Browse files Browse the repository at this point in the history
Update event-processing-content.md
  • Loading branch information
JamesJBarnett committed Apr 19, 2024
2 parents 08aa3d6 + c52c416 commit 863c1fb
Showing 1 changed file with 58 additions and 43 deletions.
101 changes: 58 additions & 43 deletions docs/reference-architectures/serverless/event-processing-content.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,53 +76,70 @@ When using the Event Hubs trigger with Functions, catch exceptions within your p
The following code shows how the ingestion function catches exceptions and puts unprocessed messages onto a dead-letter queue.

```csharp
[FunctionName("RawTelemetryFunction")]
[StorageAccount("DeadLetterStorage")]
public static async Task RunAsync(
[EventHubTrigger("%EventHubName%", Connection = "EventHubConnection", ConsumerGroup ="%EventHubConsumerGroup%")]EventData[] messages,
[Queue("deadletterqueue")] IAsyncCollector<DeadLetterMessage> deadLetterMessages,
ILogger logger)
{
foreach (var message in messages)
{
DeviceState deviceState = null;

try
{
deviceState = telemetryProcessor.Deserialize(message.Body.Array, logger);
}
catch (Exception ex)
{
logger.LogError(ex, "Error deserializing message", message.SystemProperties.PartitionKey, message.SystemProperties.SequenceNumber);
await deadLetterMessages.AddAsync(new DeadLetterMessage { Issue = ex.Message, EventData = message });
}

try
{
await stateChangeProcessor.UpdateState(deviceState, logger);
}
catch (Exception ex)
{
logger.LogError(ex, "Error updating status document", deviceState);
await deadLetterMessages.AddAsync(new DeadLetterMessage { Issue = ex.Message, EventData = message, DeviceState = deviceState });
}
}
}
[Function(nameof(RawTelemetryFunction))]
public async Task RunAsync([EventHubTrigger("%EventHubName%", Connection = "EventHubConnection")] EventData[] messages,
FunctionContext context)
{
_telemetryClient.GetMetric("EventHubMessageBatchSize").TrackValue(messages.Length);
DeviceState? deviceState = null;
// Create a new CosmosClient
var cosmosClient = new CosmosClient(Environment.GetEnvironmentVariable("COSMOSDB_CONNECTION_STRING"));

// Get a reference to the database and the container
var database = cosmosClient.GetDatabase(Environment.GetEnvironmentVariable("COSMOSDB_DATABASE_NAME"));
var container = database.GetContainer(Environment.GetEnvironmentVariable("COSMOSDB_DATABASE_COL"));

// Create a new QueueClient
var queueClient = new QueueClient(Environment.GetEnvironmentVariable("DeadLetterStorage"), "deadletterqueue");
await queueClient.CreateIfNotExistsAsync();

foreach (var message in messages)
{
try
{
deviceState = _telemetryProcessor.Deserialize(message.Body.ToArray(), _logger);
try
{
// Add the device state to Cosmos DB
await container.UpsertItemAsync(deviceState, new PartitionKey(deviceState.DeviceId));
}
catch (Exception ex)
{
_logger.LogError(ex, "Error saving on database", message.PartitionKey, message.SequenceNumber);
var deadLetterMessage = new DeadLetterMessage { Issue = ex.Message, MessageBody = message.Body.ToArray(), DeviceState = deviceState };
// Convert the dead letter message to a string
var deadLetterMessageString = JsonConvert.SerializeObject(deadLetterMessage);

// Send the message to the queue
await queueClient.SendMessageAsync(deadLetterMessageString);
}

}
catch (Exception ex)
{
_logger.LogError(ex, "Error deserializing message", message.PartitionKey, message.SequenceNumber);
var deadLetterMessage = new DeadLetterMessage { Issue = ex.Message, MessageBody = message.Body.ToArray(), DeviceState = deviceState };
// Convert the dead letter message to a string
var deadLetterMessageString = JsonConvert.SerializeObject(deadLetterMessage);

// Send the message to the queue
await queueClient.SendMessageAsync(deadLetterMessageString);
}
}
}
```

Notice that the function uses the [Queue storage output binding][queue-binding] to put items in the queue.

The code shown also logs exceptions to Application Insights. You can use the partition key and sequence number to correlate dead-letter messages with the exceptions in the logs.

Messages in the dead-letter queue should have enough information so that you can understand the context of the error. In this example, the `DeadLetterMessage` class contains the exception message, the original event data, and the deserialized event message (if available).
Messages in the dead-letter queue should have enough information so that you can understand the context of the error. In this example, the `DeadLetterMessage` class contains the exception message, the original event body data, and the deserialized event message (if available).

```csharp
public class DeadLetterMessage
{
public string Issue { get; set; }
public EventData EventData { get; set; }
public DeviceState DeviceState { get; set; }
}
public class DeadLetterMessage
{
public string? Issue { get; set; }
public byte[]? MessageBody { get; set; }
public DeviceState? DeviceState { get; set; }
}
```

Use [Azure Monitor][monitor] to monitor the event hub. If you see there is input but no output, it means that messages aren't being processed. In that case, go into [Log Analytics][log-analytics] and look for exceptions or other errors.
Expand All @@ -133,8 +150,6 @@ Use infrastructure as code (IaC) when possible. IaC manages the infrastructure,

When creating templates, group resources as a way to organize and isolate them per workload. A common way to think about workload is a single serverless application or a virtual network. The goal of workload isolation is to associate the resources to a team, so that the DevOps team can independently manage all aspects of those resources and perform CI/CD.

This architecture includes steps to configure the Drone Status Function App using Azure Pipelines with YAML and Azure Functions Slots.

As you deploy your services you will need to monitor them. Consider using [Application Insights][app-insights] to enable the developers to monitor performance and detect issues.

For more information, see the [DevOps checklist][AWAF-devops-checklist].
Expand Down

0 comments on commit 863c1fb

Please sign in to comment.