Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions EnterpriseIntegrationPlatform/tutorials/08-activities-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ public class IntegrationPipelineWorker : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken ct)
{
await _consumer.SubscribeAsync<object>(
topic: _options.InputTopic,
await _consumer.SubscribeAsync<JsonElement>(
topic: _options.InboundSubject,
consumerGroup: _options.ConsumerGroup,
handler: async envelope =>
{
Expand Down Expand Up @@ -251,10 +251,15 @@ The pipeline is configured via `PipelineOptions`:
```csharp
public class PipelineOptions
{
public string InputTopic { get; set; } // Where to listen
public string ConsumerGroup { get; set; } // Consumer group name
public int WorkerConcurrency { get; set; } // Parallel processing
public TimeSpan ProcessingTimeout { get; set; } // Per-message timeout
public string NatsUrl { get; set; } // NATS server URL
public string InboundSubject { get; set; } // Where to listen
public string AckSubject { get; set; } // Ack notification subject
public string NackSubject { get; set; } // Nack notification subject
public string ConsumerGroup { get; set; } // Consumer group name
public string TemporalServerAddress { get; set; } // Temporal gRPC address
public string TemporalNamespace { get; set; } // Temporal namespace
public string TemporalTaskQueue { get; set; } // Temporal task queue
public TimeSpan WorkflowTimeout { get; set; } // Workflow timeout
}
```

Expand Down
4 changes: 2 additions & 2 deletions EnterpriseIntegrationPlatform/tutorials/26-message-replay.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public interface IMessageReplayer
// src/Processing.Replay/IMessageReplayStore.cs
public interface IMessageReplayStore
{
Task StoreForReplayAsync<T>(IntegrationEnvelope<T> envelope, string topic, CancellationToken ct = default);
IAsyncEnumerable<IntegrationEnvelope<object>> GetMessagesForReplayAsync(string topic, ReplayFilter filter, int maxMessages, CancellationToken ct = default);
Task StoreForReplayAsync<T>(IntegrationEnvelope<T> envelope, string topic, CancellationToken ct);
IAsyncEnumerable<IntegrationEnvelope<object>> GetMessagesForReplayAsync(string topic, ReplayFilter filter, int maxMessages, CancellationToken ct);
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
- How `IMessageThrottle` controls message flow rate inside the pipeline
- The `TokenBucketThrottle` algorithm: steady refill rate with burst capacity
- `IThrottleRegistry` for managing multiple throttle policies per partition key
- `ThrottlePolicy` with partition strategies: by Source, Recipient, CorrelationId, or Global
- `ThrottlePolicy` with partition strategies: by TenantId, Queue, Endpoint, or Global
- `ThrottleMetrics` for monitoring throttle pressure and wait times
- The difference between **rate limiting** (HTTP 429) and **throttling** (pipeline delays)
- Per-tenant partitioning for fair resource sharing
Expand Down
12 changes: 6 additions & 6 deletions EnterpriseIntegrationPlatform/tutorials/31-event-sourcing.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,19 @@ public interface IEventStore
string streamId,
IReadOnlyList<EventEnvelope> events,
long expectedVersion,
CancellationToken ct = default);
CancellationToken cancellationToken = default);

Task<IReadOnlyList<EventEnvelope>> ReadStreamAsync(
string streamId,
long fromVersion,
int count,
CancellationToken ct = default);
CancellationToken cancellationToken = default);

Task<IReadOnlyList<EventEnvelope>> ReadStreamBackwardAsync(
string streamId,
long fromVersion,
int count,
CancellationToken ct = default);
CancellationToken cancellationToken = default);
}
```

Expand Down Expand Up @@ -107,7 +107,7 @@ public static class TemporalQuery
DateTimeOffset pointInTime,
TState initialState,
int maxEventsPerRead = 1000,
CancellationToken ct = default) where TState : notnull;
CancellationToken cancellationToken = default) where TState : notnull;
}
```

Expand All @@ -117,8 +117,8 @@ public static class TemporalQuery
// src/EventSourcing/ISnapshotStore.cs
public interface ISnapshotStore<TState>
{
Task SaveAsync(string streamId, TState state, long version, CancellationToken ct = default);
Task<(TState? State, long Version)> LoadAsync(string streamId, CancellationToken ct = default);
Task SaveAsync(string streamId, TState state, long version, CancellationToken cancellationToken = default);
Task<(TState? State, long Version)> LoadAsync(string streamId, CancellationToken cancellationToken = default);
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,18 @@ deploy/helm/eip/
├── Chart.yaml # Chart metadata and version
├── values.yaml # Default configuration values
└── templates/
├── deployment.yaml
├── service.yaml
├── ingress.yaml
├── _helpers.tpl
├── admin-api.yaml
├── configmap.yaml
├── demo-pipeline.yaml
├── grafana-dashboards-configmap.yaml
├── hpa.yaml
└── configmap.yaml
├── ingestion-kafka.yaml
├── namespace.yaml
├── networkpolicy.yaml
├── openclaw-web.yaml
├── serviceaccount.yaml
└── workflow-temporal.yaml
```

**Chart.yaml** declares the chart:
Expand Down
13 changes: 11 additions & 2 deletions EnterpriseIntegrationPlatform/tutorials/49-testing-integrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,26 @@ Test Temporal workflows without infrastructure:
[TestFixture]
public class IntegrationPipelineWorkflowTests
{
private ITestWorkflowEnvironment _env;
private WorkflowEnvironment? _env;

[SetUp]
public async Task SetUp()
{
_env = await TestWorkflowEnvironment.StartLocalAsync();
try
{
_env = await WorkflowEnvironment.StartLocalAsync();
}
catch (Exception)
{
// Temporal local dev server not available
}
}

[Test]
public async Task Pipeline_CompletesAllSteps()
{
if (_env == null) return;

var worker = _env.Client.CreateWorker(/*...*/);
var result = await _env.Client.ExecuteWorkflowAsync(
(IntegrationPipelineWorkflow wf) => wf.RunAsync(testInput));
Expand Down
Loading