Skip to content

Commit

Permalink
Merge pull request #11600 from MicrosoftDocs/pipes-and-filters-example
Browse files Browse the repository at this point in the history
[MAINT] Update Pipes and Filters cloud design pattern example
  • Loading branch information
American-Dipper committed Feb 9, 2024
2 parents 176ed48 + b60642c commit 3384d4c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 179 deletions.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
211 changes: 39 additions & 172 deletions docs/patterns/pipes-and-filters-content.md
Expand Up @@ -42,7 +42,7 @@ Consider the following points when you decide how to implement this pattern:

- **Repeated messages**. If a filter in a pipeline fails after it posts a message to the next stage of the pipeline, another instance of the filter might be run, and it would post a copy of the same message to the pipeline. This scenario could cause two instances of the same message to be passed to the next filter. To avoid this problem, the pipeline should detect and eliminate duplicate messages.

> [!NOTE]
> [!NOTE]
> If you implement the pipeline by using message queues (like Azure Service Bus queues), the message queuing infrastructure might provide automatic duplicate message detection and removal.
- **Context and state**. In a pipeline, each filter essentially runs in isolation and shouldn't make any assumptions about how it was invoked. Therefore, each filter should be provided with sufficient context to perform its work. This context could include a significant amount of state information.
Expand All @@ -55,7 +55,7 @@ Use this pattern when:

- The processing steps performed by an application have different scalability requirements.

> [!NOTE]
> [!NOTE]
> You can group filters that should scale together in the same process. For more information, see the [Compute Resource Consolidation pattern](./compute-resource-consolidation.yml).
- You require the flexibility to allow reordering of the processing steps that are performed by an application, or to allow the capability to add and remove steps.
Expand All @@ -72,203 +72,70 @@ This pattern might not be useful when:

## Example

You can use a sequence of message queues to provide the infrastructure that's required to implement a pipeline. An initial message queue receives unprocessed messages. A component that's implemented as a filter task listens for a message on this queue, performs its work, and then posts the transformed message to the next queue in the sequence. Another filter task can listen for messages on this queue, process them, post the results to another queue, and so on, until the fully transformed data appears in the final message in the queue. This diagram illustrates a pipeline that uses message queues:
You can use a sequence of message queues to provide the infrastructure that's required to implement a pipeline. An initial message queue receives unprocessed messages that become the start of the pipes and filters pattern implementation. A component that's implemented as a filter task listens for a message on this queue, performs its work, and then posts a new or transformed message to the next queue in the sequence. Another filter task can listen for messages on this queue, process them, post the results to another queue, and so on, until the final step that ends the pipes and filters process. This diagram illustrates a pipeline that uses message queues:

![Diagram showing a pipeline that uses message queues.](./_images/pipes-and-filters-message-queues.png)

If you're building a solution on Azure, you can use Service Bus queues to provide a reliable and scalable queuing mechanism. The `ServiceBusPipeFilter` class shown in the following C# code demonstrates how you can implement a filter that receives input messages from a queue, processes the messages, and posts the results to another queue.
An image processing pipeline could be implemented using this pattern. If your workload takes an image, the image could pass through a series of largely independent and reorderable filters to perform actions such as:

> [!NOTE]
> The `ServiceBusPipeFilter` class is defined in the PipesAndFilters.Shared project, which is available on [GitHub](https://github.com/mspnp/cloud-design-patterns/tree/master/pipes-and-filters).
- content moderation
- resizing
- watermarking
- reorientation
- Exif metadata removal
- Content delivery network (CDN) publication

```csharp
public class ServiceBusPipeFilter
{
...
private readonly string inQueuePath;
private readonly string outQueuePath;
...
private QueueClient inQueue;
private QueueClient outQueue;
...

public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null)
{
...
this.inQueuePath = inQueuePath;
this.outQueuePath = outQueuePath;
}

public void Start()
{
...
// Create the outbound filter queue if it doesn't exist.
...
this.outQueue = QueueClient.CreateFromConnectionString(...);

...
// Create the inbound and outbound queue clients.
this.inQueue = QueueClient.CreateFromConnectionString(...);
}

public void OnPipeFilterMessageAsync(
Func<BrokeredMessage, Task<BrokeredMessage>> asyncFilterTask, ...)
{
...

this.inQueue.OnMessageAsync(
async (msg) =>
{
...
// Process the filter and send the output to the
// next queue in the pipeline.
var outMessage = await asyncFilterTask(msg);

// Send the message from the filter processor
// to the next queue in the pipeline.
if (outQueue != null)
{
await outQueue.SendAsync(outMessage);
}

// Note: There's a chance that the same message could be sent twice
// or that a message could be processed by an upstream or downstream
// filter at the same time.
// This would happen in a situation where processing of a message was
// completed, it was sent to the next pipe/queue, and it then failed
// to complete when using the PeekLock method.
// In a real-world implementation, you should consider idempotent message
// processing and concurrency.
},
options);
}

public async Task Close(TimeSpan timespan)
{
// Pause the processing threads.
this.pauseProcessingEvent.Reset();

// There's no clean approach for waiting for the threads to complete
// the processing. This example simply stops any new processing, waits
// for the existing thread to complete, closes the message pump,
// and finally returns.
Thread.Sleep(timespan);

this.inQueue.Close();
...
}

...
}
```

The `Start` method in the `ServiceBusPipeFilter` class connects to a pair of input and output queues, and the `Close` method disconnects from the input queue. The `OnPipeFilterMessageAsync` method performs the actual processing of messages, and the `asyncFilterTask` parameter of this method specifies the processing to be performed. The `OnPipeFilterMessageAsync` method waits for incoming messages on the input queue, runs the code specified by the `asyncFilterTask` parameter over each message as it arrives, and posts the results to the output queue. The queues are specified by the constructor.
In this example, the filters could be implemented as individually deployed Azure Functions or even a single Azure Function app that contains each filter as an isolated deployment. The use of Azure Function triggers, input bindings, and output bindings can simplify the filter code and work automatically with a queue-based pipe using a [claim check](./claim-check.yml) to the image to process.

The sample solution implements filters in a set of worker roles. Each worker role can be scaled independently, depending on the complexity of the business processing that it performs or the resources that are required for processing. Additionally, multiple instances of each worker role can be run in parallel to improve throughput.
:::image type="complex" source="./_images/pipes-and-filters-image-processing-example.svg" alt-text="Diagram showing an image processing pipeline that uses Azure Queue Storage between a series of Azure Functions." lightbox="./_images/pipes-and-filters-image-processing-example.svg":::
This diagram shows three unprocessed images on the left of various file types. To the right of those is an Azure Queue Storage pipe with claim check messages for each image; followed by an Azure Function that performs content moderation on the image as a filter. All the images are stored in an Azure Blob Storage account. There is another queue (pipe) and function (filter) that follows the first to handle image resizing. Then there is an ellipses (…) which represents unshown pipes and filters. The last pipe and filter are responsible for publishing the final, fully processed image to its destination.
:::image-end:::

The following code shows an Azure worker role named `PipeFilterARoleEntry`, which is defined in the PipeFilterA project in the sample solution.
Here's an example of what one filter, implemented as an Azure Function, triggered from a Queue Storage pipe with a claim Check to the image, and writing a new claim check to another Queue Storage pipe might look like. The implementation has been replaced with pseudocode in comments for brevity. More code like this can be found in the [demonstration of the Pipes and Filters pattern](https://github.com/mspnp/cloud-design-patterns/tree/main/pipes-and-filters#readme) available on GitHub.

```csharp
public class PipeFilterARoleEntry : RoleEntryPoint
// This is the "Resize" filter. It handles claim checks from input pipe, performs the
// resize work, and places a claim check in the next pipe for anther filter to handle.
[Function(nameof(ResizeFilter))]
[QueueOutput("pipe-fjur", Connection = "pipe")] // Destination pipe claim check
public async Task<string> RunAsync(
[QueueTrigger("pipe-xfty", Connection = "pipe")] string imageFilePath, // Source pipe claim check
[BlobInput("{QueueTrigger}", Connection = "pipe")] BlockBlobClient imageBlob) // Image to process
{
...
private ServiceBusPipeFilter pipeFilterA;

public override bool OnStart()
{
...
this.pipeFilterA = new ServiceBusPipeFilter(
...,
Constants.QueueAPath,
Constants.QueueBPath);

this.pipeFilterA.Start();
...
}

public override void Run()
{
this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>
{
// Clone the message and update it.
// Properties set by the broker (Deliver count, enqueue time, ...)
// aren't cloned and must be copied over if required.
var newMsg = msg.Clone();

await Task.Delay(500); // DOING WORK
Trace.TraceInformation("Filter A processed message:{0} at {1}",
msg.MessageId, DateTime.UtcNow);
_logger.LogInformation("Processing image {uri} for resizing.", imageBlob.Uri);

newMsg.Properties.Add(Constants.FilterAMessageKey, "Complete");
// Idempotency checks
// ...
return newMsg;
});
// Download image based on claim check in queue message body
// ...
// Resize the image
// ...
...
}
// Write resized image back to storage
// ...
...
}
```

This role contains a `ServiceBusPipeFilter` object. The `OnStart` method in the role connects to the queues that receive input messages and post output messages. (The names of the queues are defined in the `Constants` class.) The `Run` method invokes the `OnPipeFilterMessageAsync` method to perform processing on each message that's received. (In this example, the processing is simulated by waiting for a short time.) When processing is complete, a new message is constructed that contains the results (in this case, a custom property is added to the input message), and this message is posted to the output queue.

The sample code contains another worker role named `PipeFilterBRoleEntry`. It's in the PipeFilterB project. This role is similar to `PipeFilterARoleEntry`, but it performs different processing in the `Run` method. In the example solution, these two roles are combined to construct a pipeline. The output queue for the `PipeFilterARoleEntry` role is the input queue for the `PipeFilterBRoleEntry` role.

The sample solution also provides two other roles named `InitialSenderRoleEntry` (in the InitialSender project) and `FinalReceiverRoleEntry` (in the FinalReceiver project). The `InitialSenderRoleEntry` role provides the initial message in the pipeline. The `OnStart` method connects to a single queue, and the `Run` method posts a method to that queue. The queue is the input queue that's used by the `PipeFilterARoleEntry` role, so posting a message to it causes the message to be received and processed by the `PipeFilterARoleEntry` role. The processed message then passes through the `PipeFilterBRoleEntry` role.

The input queue for the `FinalReceiveRoleEntry` role is the output queue for the `PipeFilterBRoleEntry` role. The `Run` method in the `FinalReceiveRoleEntry` role, shown in the following code, receives the message and performs some final processing. It then writes the values of the custom properties added by the filters in the pipeline to the trace output.

```csharp
public class FinalReceiverRoleEntry : RoleEntryPoint
{
...
// Final queue/pipe in the pipeline to process data from.
private ServiceBusPipeFilter queueFinal;

public override bool OnStart()
{
...
// Set up the queue.
this.queueFinal = new ServiceBusPipeFilter(...,Constants.QueueFinalPath);
this.queueFinal.Start();
...
}

public override void Run()
{
this.queueFinal.OnPipeFilterMessageAsync(
async (msg) =>
{
await Task.Delay(500); // DOING WORK
// The pipeline message was received.
Trace.TraceInformation(
"Pipeline Message Complete - FilterA:{0} FilterB:{1}",
msg.Properties[Constants.FilterAMessageKey],
msg.Properties[Constants.FilterBMessageKey]);

return null;
});
...
}

...
// Create claim check for image and place in the next pipe
// ...
_logger.LogInformation("Image resizing done or not needed. Adding image {filePath} into the next pipe.", imageFilePath);
return imageFilePath;
}
```

## Next steps

You might find the following resources helpful when you implement this pattern:

- [A sample that demonstrates this pattern, on GitHub](https://github.com/mspnp/cloud-design-patterns/tree/master/pipes-and-filters)
- A [demonstration of the Pipes and Filters Pattern](https://github.com/mspnp/cloud-design-patterns/tree/main/pipes-and-filters#readme) using the image processing scenario is available on GitHub.
- [Idempotency patterns](https://blog.jonathanoliver.com/idempotency-patterns), on Jonathan Oliver's blog

## Related resources

The following patterns might also be relevant when you implement this pattern:

- [Claim-Check pattern](./claim-check.yml). A pipeline implemented using a queue may not hold the actual item being sent through the filters, but instead a pointer to the data that needs to be processed. The example uses a claim check in Azure Queue Storage for images stored in Azure Blob Storage.
- [Competing Consumers pattern](./competing-consumers.yml). A pipeline can contain multiple instances of one or more filters. This approach is useful for running parallel instances of slow filters. It enables the system to spread the load and improve throughput. Each instance of a filter competes for input with the other instances, but two instances of a filter shouldn't be able to process the same data. This article explains the approach.
- [Compute Resource Consolidation pattern](./compute-resource-consolidation.yml). It might be possible to group filters that should scale together into a single process. This article provides more information about the benefits and tradeoffs of this strategy.
- [Compensating Transaction pattern](./compensating-transaction.yml). You can implement a filter as an operation that can be reversed, or that has a compensating operation that restores the state to a previous version if there's a failure. This article explains how you can implement this pattern to maintain or achieve eventual consistency.
16 changes: 9 additions & 7 deletions docs/patterns/pipes-and-filters.yml
@@ -1,19 +1,21 @@
### YamlMime:Architecture
metadata:
title: Pipes and Filters pattern
description: Break down a task that performs complex processing into a series of separate elements that can be reused.
description: Break down a task that performs complex processing into a series of separate elements that can be reused or reordered.
author: martinekuan
ms.author: architectures
ms.date: 07/28/2022
ms.topic: conceptual
ms.topic: design-pattern
ms.service: architecture-center
ms.subservice: azure-guide
ms.subservice: design-pattern
azureCategories:
- devops
- compute
products:
- azure-devops
- azure-blob-storage
- azure-functions
- azure-queue-storage
name: Pipes and Filters pattern
summary: Break down a task that performs complex processing into a series of separate elements that can be reused.
summary: Break down a task that performs complex processing into a series of separate elements that can be reused or reordered.
thumbnailUrl: /azure/architecture/browse/thumbs/pipes-filters-solution.png
content: |
[!include[](pipes-and-filters-content.md)]
[!INCLUDE[](pipes-and-filters-content.md)]

0 comments on commit 3384d4c

Please sign in to comment.