Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BatchingSqlJournal now preserves Sender in PersistCallback #3779

Merged
merged 1 commit into from
May 7, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public sealed class CircuitBreakerSettings
/// Maximum number of failures that can happen before the circuit opens.
/// </summary>
public int MaxFailures { get; }

/// <summary>
/// Maximum time available for operation to execute before
/// <see cref="CircuitBreaker"/> considers it a failure.
Expand Down Expand Up @@ -229,7 +229,7 @@ public abstract class BatchingSqlJournalSetup
/// </summary>
public string DefaultSerializer { get; }

/// <summary>
/// <summary>
/// Initializes a new instance of the <see cref="BatchingSqlJournalSetup" /> class.
/// </summary>
/// <param name="config">The configuration used to configure the journal.</param>
Expand Down Expand Up @@ -346,12 +346,12 @@ protected BatchingSqlJournalSetup(string connectionString, int maxConcurrentOper
/// </summary>
/// <typeparam name="TConnection">A concrete implementation of <see cref="DbConnection"/> for targeted database provider.</typeparam>
/// <typeparam name="TCommand">A concrete implementation of <see cref="DbCommand"/> for targeted database provider.</typeparam>
public abstract class BatchingSqlJournal<TConnection, TCommand> : WriteJournalBase
public abstract class BatchingSqlJournal<TConnection, TCommand> : WriteJournalBase
where TConnection : DbConnection
where TCommand : DbCommand
{
#region internal classes

private sealed class ChunkExecutionFailure : IDeadLetterSuppression
{
public Exception Cause { get; }
Expand Down Expand Up @@ -458,7 +458,7 @@ public RequestChunk(int chunkId, IJournalRequest[] requests)
/// SQL statement executed as result of <see cref="WriteMessages"/> request to journal.
/// </summary>
protected virtual string InsertEventSql { get; }

/// <summary>
/// SQL query executed as result of <see cref="GetCurrentPersistenceIds"/> request to journal.
/// It's a part of persistence query protocol.
Expand Down Expand Up @@ -509,7 +509,7 @@ public RequestChunk(int chunkId, IJournalRequest[] requests)
/// <see cref="PersistenceIdAdded"/> messages.
/// </summary>
protected bool HasAllIdsSubscribers => _allIdsSubscribers.Count != 0;

/// <summary>
/// Flag determining if incoming journal requests should be published in current actor system event stream.
/// Useful mostly for tests.
Expand Down Expand Up @@ -862,7 +862,7 @@ protected virtual void OnBufferOverflow(IJournalMessage request)
}
else if (request is ReplayTaggedMessages)
{
var r = (ReplayTaggedMessages) request;
var r = (ReplayTaggedMessages)request;
r.ReplyTo.Tell(new ReplayMessagesFailure(JournalBufferOverflowException.Instance), ActorRefs.NoSender);
}
}
Expand All @@ -887,11 +887,11 @@ private async Task<BatchComplete> ExecuteChunk(RequestChunk chunk, IActorContext
using (var connection = CreateConnection(Setup.ConnectionString))
{
await connection.OpenAsync();

using (var tx = connection.BeginTransaction(Setup.IsolationLevel))
using (var command = (TCommand)connection.CreateCommand())
{
command.CommandTimeout = (int) Setup.ConnectionTimeout.TotalMilliseconds;
command.CommandTimeout = (int)Setup.ConnectionTimeout.TotalMilliseconds;
command.Transaction = tx;
try
{
Expand Down Expand Up @@ -1034,10 +1034,10 @@ protected virtual async Task HandleReplayMessages(ReplayMessages req, TCommand c
var replaySettings = Setup.ReplayFilterSettings;
var replyTo = replaySettings.IsEnabled
? context.ActorOf(ReplayFilter.Props(
persistentActor: req.PersistentActor,
mode: replaySettings.Mode,
persistentActor: req.PersistentActor,
mode: replaySettings.Mode,
windowSize: replaySettings.WindowSize,
maxOldWriters: replaySettings.MaxOldWriters,
maxOldWriters: replaySettings.MaxOldWriters,
debugEnabled: replaySettings.IsDebug))
: req.PersistentActor;
var persistenceId = req.PersistenceId;
Expand Down Expand Up @@ -1086,15 +1086,15 @@ protected virtual async Task HandleReplayMessages(ReplayMessages req, TCommand c
private async Task HandleWriteMessages(WriteMessages req, TCommand command)
{
IJournalResponse summary = null;
var responses = new List<IJournalResponse>();
var responses = new List<Tuple<IJournalResponse, IActorRef>>();
var tags = new HashSet<string>();
var persistenceIds = new HashSet<string>();
var actorInstanceId = req.ActorInstanceId;

try
{
command.CommandText = InsertEventSql;

var tagBuilder = new StringBuilder(16); // magic number

foreach (var envelope in req.Messages)
Expand All @@ -1113,7 +1113,7 @@ private async Task HandleWriteMessages(WriteMessages req, TCommand command)
var persistent = AdaptToJournal(unadapted);
if (persistent.Payload is Tagged)
{
var tagged = (Tagged) persistent.Payload;
var tagged = (Tagged)persistent.Payload;
if (tagged.Tags.Count != 0)
{
tagBuilder.Append(';');
Expand All @@ -1130,7 +1130,7 @@ private async Task HandleWriteMessages(WriteMessages req, TCommand command)

await command.ExecuteNonQueryAsync();

var response = new WriteMessageSuccess(unadapted, actorInstanceId);
var response = Tuple.Create<IJournalResponse, IActorRef>(new WriteMessageSuccess(unadapted, actorInstanceId), unadapted.Sender);
responses.Add(response);
persistenceIds.Add(persistent.PersistenceId);

Expand All @@ -1140,23 +1140,23 @@ private async Task HandleWriteMessages(WriteMessages req, TCommand command)
{
// database-related exceptions should result in failure
summary = new WriteMessagesFailed(cause);
var response = new WriteMessageFailure(unadapted, cause, actorInstanceId);
var response = Tuple.Create<IJournalResponse, IActorRef>(new WriteMessageFailure(unadapted, cause, actorInstanceId), unadapted.Sender);
responses.Add(response);
}
catch (Exception cause)
{
//TODO: this scope wraps atomic write. Atomic writes have all-or-nothing commits.
// so we should revert transaction here. But we need to check how this affect performance.

var response = new WriteMessageRejected(unadapted, cause, actorInstanceId);
var response = Tuple.Create<IJournalResponse, IActorRef>(new WriteMessageRejected(unadapted, cause, actorInstanceId), unadapted.Sender);
responses.Add(response);
}
}
}
else
{
//TODO: other cases?
var response = new LoopMessageSuccess(envelope.Payload, actorInstanceId);
var response = Tuple.Create<IJournalResponse, IActorRef>(new LoopMessageSuccess(envelope.Payload, actorInstanceId), envelope.Sender);
responses.Add(response);
}
}
Expand Down Expand Up @@ -1187,12 +1187,12 @@ private async Task HandleWriteMessages(WriteMessages req, TCommand command)
var aref = req.PersistentActor;

aref.Tell(summary);
foreach (var response in responses)
foreach (var r in responses)
{
aref.Tell(response);
aref.Tell(r.Item1, r.Item2);
}
}

/// <summary>
/// Perform write of persistent event with specified <paramref name="tags"/>
/// into database using given <paramref name="command"/>.
Expand Down