Skip to content

Commit

Permalink
Azure SB persistence and more about principles. Ignoring Rider files.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev authored and phatboyg committed Jan 27, 2017
1 parent bebb320 commit 51b22e2
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 68 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ doc/build/*

_book
/node_modules
.vscode
.vscode
**/.idea/
212 changes: 145 additions & 67 deletions docs/advanced/sagas/persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,55 @@ Sagas are stateful event-based message consumers -- they retain state. Therefore
events is important. Without persistent state, a saga would consider each event a new event, and orchestration
of subsequent events would be meaningless.

## Specifying saga persistence

In order to store the saga state, you need to use one form of saga persistence. There are several
types of storage that MassTransit supports, all of those, which are included to the main distribution,
are listed below. There is also a in-memory unreliable storage, which allows to temporarily store
your saga state. It is useful to try things out since it does not require any infrastructure.

Simple initialization of a state machine saga with persistence looks like this:

```csharp
var sagaStateMachine = new ShoppingCartStateMachine();
var repository = new InMemorySagaRepository<ShoppingCart>();
var busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});

x.ReceiveEndpoint(host, "shopping_cart_state", e =>
{
e.StateMachineSaga(sagaStateMachine, repository);
});
});
```

It is important to notice that the saga repository object is a singleton. It does not hold any state
inside the class instance and only performs operations on the saga state objects that are send to it
to persist and retrieve.

There are two types of saga repository:
* Query repository
* Identity-only repository

Depending on the persistence mechanism, repository implementation can be either identity-only or
identity plus query.

When using identity-only repository, such as Azure Service Bus message session or Redis, you can
only use correlation by identity. This means that all events that the saga receives, must hold
the saga correlation id, and the correlation for each event can only use `CorrelateById` method
to define the correlation.

Query repository by definition support identity correlation too, but in addition support other
properties of events being received and saga state properties. Such correlations are defined using
`CorrelateBy` method and you can use any logical expression that involve the event data and
saga state data to establish such correlation. Repository implementation such as Entity Framework,
NHibernate and Marten support correlation by query. Of course, in-memory repository supports it as well.

## Identity

Saga instances are identified by a unique identifier (`Guid`), represented by the `CorrelationId` on the saga instance.
Expand Down Expand Up @@ -40,8 +89,8 @@ endpoint configuration:
```csharp
c.ReceiveEndpoint("queue", e =>
{
e.UseInMemoryOutbox();
// other endpoint configuration here
e.UseInMemoryOutbox();
// other endpoint configuration here
}
```

Expand All @@ -63,38 +112,38 @@ example below shows the basics of getting started.

```csharp
public class SagaInstance :
SagaStateMachineInstance
SagaStateMachineInstance
{
public SagaInstance(Guid correlationId)
{
CorrelationId = correlationId;
}
public SagaInstance(Guid correlationId)
{
CorrelationId = correlationId;
}

protected SagaInstance()
{
}
protected SagaInstance()
{
}

public string CurrentState { get; set; }
public string ServiceName { get; set; }
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public string ServiceName { get; set; }
public Guid CorrelationId { get; set; }
}

public class SagaInstanceMap :
SagaClassMapping<SagaInstance>
SagaClassMapping<SagaInstance>
{
public SagaInstanceMap()
{
Property(x => x.CurrentState);
Property(x => x.ServiceName, x => x.Length(40));
}
}
public SagaInstanceMap()
{
Property(x => x.CurrentState);
Property(x => x.ServiceName, x => x.Length(40));
}
}
```

The repository is then created on the context factory for the `DbContext` is available.

```csharp
SagaDbContextFactory contextFactory = () =>
new SagaDbContext<SagaInstance, SagaInstanceMap>(_connectionString);
new SagaDbContext<SagaInstance, SagaInstanceMap>(_connectionString);

var repository = new EntityFrameworkSagaRepository<SagaInstance>(contextFactory);
```
Expand All @@ -106,20 +155,20 @@ the saga instances can be persisted easily using a MongoDB collection.

```csharp
public class SagaInstance :
SagaStateMachineInstance
SagaStateMachineInstance
{
public SagaInstance(Guid correlationId)
{
CorrelationId = correlationId;
}
public SagaInstance(Guid correlationId)
{
CorrelationId = correlationId;
}

protected SagaInstance()
{
}
protected SagaInstance()
{
}

public string CurrentState { get; set; }
public string ServiceName { get; set; }
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public string ServiceName { get; set; }
public Guid CorrelationId { get; set; }
}
```

Expand All @@ -140,30 +189,30 @@ to using NHibernate for saga persistence.

```csharp
public class SagaInstance :
SagaStateMachineInstance
SagaStateMachineInstance
{
public SagaInstance(Guid correlationId)
{
CorrelationId = correlationId;
}
public SagaInstance(Guid correlationId)
{
CorrelationId = correlationId;
}

protected SagaInstance()
{
}
protected SagaInstance()
{
}

public string CurrentState { get; set; }
public string ServiceName { get; set; }
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public string ServiceName { get; set; }
public Guid CorrelationId { get; set; }
}

public class SagaInstanceMap :
SagaClassMapping<SagaInstance>
SagaClassMapping<SagaInstance>
{
public SagaInstanceMap()
{
Property(x => x.CurrentState);
Property(x => x.ServiceName, x => x.Length(40));
}
public SagaInstanceMap()
{
Property(x => x.CurrentState);
Property(x => x.ServiceName, x => x.Length(40));
}
}
```

Expand Down Expand Up @@ -192,16 +241,16 @@ value of the `CorrelationId` property:

```csharp
public class SagaInstance :
SagaStateMachineInstance, IHasGuidId
SagaStateMachineInstance, IHasGuidId
{
public Guid CorrelationId { get; set; }
public Guid Id => CorrelationId;
public string CurrentState { get; set; }
public Guid CorrelationId { get; set; }
public Guid Id => CorrelationId;
public string CurrentState { get; set; }

public string CustomData { get; set; }
public string CustomData { get; set; }
}
```

Redis saga persistence does not aquire locking on the database record when writing it so potentially
you can have write conflict in case the saga is updating its state frequently (hundreds of times per second).
To resolve this, the saga instance can implement the `IVersionedSaga` inteface and include the Version property:
Expand All @@ -219,17 +268,17 @@ For containerless initialization the code would look like:
```csharp
var redisConnectionString = "redis://localhost:6379";
var repository = new RedisSagaRepository<SagaInstance>(
new RedisManagerPool(redisConnectionString));
new RedisManagerPool(redisConnectionString));
```

If you use a container, you can use the code like this (example for Autofac):

```csharp
var redisConnectionString = "redis://localhost:6379";
builder.Register<IRedisClientsManager>(c =>
new RedisManagerPool(redisConnectionString)).SingleInstance();
new RedisManagerPool(redisConnectionString)).SingleInstance();
builder.RegisterGeneric(typeof(RedisSagaRepository<>))
.As(typeof(ISagaRepository<>)).SingleInstance();
.As(typeof(ISagaRepository<>)).SingleInstance();
```

### Marten
Expand All @@ -247,10 +296,10 @@ you inform Marten that correlationId will be used as the primary key.
```csharp
public class SampleSaga : ISaga
{
[Identity]
public Guid CorrelationId { get; set; }
public string State { get; set; }
public string SomeProperty { get; set; }
[Identity]
public Guid CorrelationId { get; set; }
public string State { get; set; }
public string SomeProperty { get; set; }
}
```

Expand All @@ -259,7 +308,7 @@ parameter.

```csharp
var connectionString =
"server=localhost;port=5432;database=test;user id=test;password=test;";
"server=localhost;port=5432;database=test;user id=test;password=test;";
var store = DocumentStore.For(connectionString);
var repository = new MartenSagaRepository<SampleSaga>(store);
```
Expand All @@ -268,17 +317,46 @@ If you use a container, you can use the code like this (example for Autofac):

```csharp
var connectionString =
"server=localhost;port=5432;database=test;user id=test;password=test;";
"server=localhost;port=5432;database=test;user id=test;password=test;";
builder.Register<IDocumentStore>(c => DocumentStore.For(connectionString);
builder.RegisterGeneric(typeof(MartenSagaRepository<>))
.As(typeof(ISagaRepository<>)).SingleInstance();
.As(typeof(ISagaRepository<>)).SingleInstance();
```

Marten will create necessary tables for you. This type of saga repository supports correlation by id and custom expressions.
Marten will create necessary tables for you. This type of saga repository
supports correlation by id and custom expressions.

### Azure Service Bus

TODO
Azure Service Bus provides a feature called *message sessions*, to process multiple messages at once and
to store some state on a temporary basis, which can be retrieved by some key.

The latter give us an ability to use this feature as saga state storage. Using message sessions
as saga persistence, you can only use Azure Service Bus for both messaging and saga persistencepurposes,
without needing any additional infrastructure.

There is a limitation for using message sessions - this feature is not supported for AMQP transport.

You have to explicitly enable message sessions when configuring the endpoint, and use parameterless
constructor to instantiate the saga repository.

Here is the basic sample of how to use the Azure Service Bus message session as saga repository:

```csharp
var sagaStateMachine = new MySagaStateMachine();
var repository = new MessageSessionSagaRepository<MySaga>();
sbc.ReceiveEndpoint(host, "test_queue", ep =>
{
ep.RequiresSession = true;
ep.StateMachineSaga(sagaStateMachine, repository);
});
```

As mentioned before, the message session allows storing and retrieving any state by some unique key.
This means that this type of saga persistence only support correlation by id. So, similar to Redis
saga persistence, you cannot use `CorralateBy` to specify how to find the saga instance, but only
`CorrelateById`.



[1]: https://www.postgresql.org/docs/9.5/static/functions-json.html
Expand Down

0 comments on commit 51b22e2

Please sign in to comment.