/
RedisTransaction.cs
185 lines (165 loc) · 4.96 KB
/
RedisTransaction.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
//
// https://github.com/ServiceStack/ServiceStack.Redis
// ServiceStack.Redis: ECMA CLI Binding to the Redis key-value storage system
//
// Authors:
// Demis Bellot (demis.bellot@gmail.com)
//
// Copyright 2013 ServiceStack, Inc. All Rights Reserved.
//
// Licensed under the same terms of ServiceStack.
//
using System;
using ServiceStack.Redis.Pipeline;
namespace ServiceStack.Redis;
/// <summary>
/// Adds support for Redis Transactions (i.e. MULTI/EXEC/DISCARD operations).
/// </summary>
public partial class RedisTransaction
: RedisAllPurposePipeline, IRedisTransaction, IRedisQueueCompletableOperation
{
private int numCommands = 0;
public RedisTransaction(RedisClient redisClient)
: this(redisClient, false) {}
internal RedisTransaction(RedisClient redisClient, bool isAsync)
: base(redisClient)
{
// if someone casts between sync/async: the sync-over-async or
// async-over-sync is entirely self-inflicted; I can't fix stupid
_isAsync = isAsync;
}
protected override void Init()
{
//start pipelining
base.Init();
//queue multi command
RedisClient.Multi();
//set transaction
RedisClient.Transaction = this;
}
/// <summary>
/// Put "QUEUED" messages at back of queue
/// </summary>
private void QueueExpectQueued()
{
QueuedCommands.Insert(0, new QueuedRedisOperation
{
VoidReadCommand = RedisClient.ExpectQueued
});
}
/// <summary>
/// Issue exec command (not queued)
/// </summary>
private void Exec()
{
RedisClient.Exec();
RedisClient.FlushAndResetSendBuffer();
}
public bool Commit()
{
bool rc = true;
try
{
numCommands = QueuedCommands.Count / 2;
//insert multi command at beginning
QueuedCommands.Insert(0, new QueuedRedisCommand {
VoidReturnCommand = r => Init(),
VoidReadCommand = RedisClient.ExpectOk,
});
//the first half of the responses will be "QUEUED",
// so insert reading of multiline after these responses
QueuedCommands.Insert(numCommands + 1, new QueuedRedisOperation {
IntReadCommand = RedisClient.ReadMultiDataResultCount,
OnSuccessIntCallback = handleMultiDataResultCount
});
// add Exec command at end (not queued)
QueuedCommands.Add(new RedisCommand {
VoidReturnCommand = r => Exec()
});
//execute transaction
Exec();
//receive expected results
foreach (var queuedCommand in QueuedCommands)
{
queuedCommand.ProcessResult();
}
}
catch (RedisTransactionFailedException)
{
rc = false;
}
finally
{
RedisClient.Transaction = null;
ClosePipeline();
RedisClient.AddTypeIdsRegisteredDuringPipeline();
}
return rc;
}
/// <summary>
/// callback for after result count is read in
/// </summary>
/// <param name="count"></param>
private void handleMultiDataResultCount(int count)
{
// transaction failed due to WATCH condition
if (count == -1)
throw new RedisTransactionFailedException();
if (count != numCommands)
throw new InvalidOperationException(string.Format(
"Invalid results received from 'EXEC', expected '{0}' received '{1}'"
+ "\nWarning: Transaction was committed",
numCommands, count));
}
public void Rollback()
{
if (RedisClient.Transaction == null)
throw new InvalidOperationException("There is no current transaction to Rollback");
RedisClient.Transaction = null;
RedisClient.ClearTypeIdsRegisteredDuringPipeline();
}
public override bool Replay()
{
bool rc = true;
try
{
Execute();
//receive expected results
foreach (var queuedCommand in QueuedCommands)
{
queuedCommand.ProcessResult();
}
}
catch (RedisTransactionFailedException)
{
rc = false;
}
finally
{
RedisClient.Transaction = null;
ClosePipeline();
RedisClient.AddTypeIdsRegisteredDuringPipeline();
}
return rc;
}
public override void Dispose()
{
base.Dispose();
if (RedisClient.Transaction == null) return;
Rollback();
}
private readonly bool _isAsync;
protected override void AddCurrentQueuedOperation()
{
base.AddCurrentQueuedOperation();
if (_isAsync)
{
QueueExpectQueuedAsync();
}
else
{
QueueExpectQueued();
}
}
partial void QueueExpectQueuedAsync();
}