-
Notifications
You must be signed in to change notification settings - Fork 2k
/
StateStorageBridge.cs
162 lines (141 loc) · 5.74 KB
/
StateStorageBridge.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
using System;
using System.Diagnostics;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Orleans.Runtime;
using Orleans.Storage;
namespace Orleans.Core
{
public class StateStorageBridge<TState> : IStorage<TState>
{
private readonly string name;
private readonly GrainReference grainRef;
private readonly IGrainStorage store;
private readonly GrainState<TState> grainState;
private readonly ILogger logger;
public TState State
{
get
{
GrainRuntime.CheckRuntimeContext();
return grainState.State;
}
set
{
GrainRuntime.CheckRuntimeContext();
grainState.State = value;
}
}
public string Etag => grainState.ETag;
public bool RecordExists => grainState.RecordExists;
public StateStorageBridge(string name, GrainReference grainRef, IGrainStorage store, ILoggerFactory loggerFactory)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (grainRef == null) throw new ArgumentNullException(nameof(grainRef));
if (store == null) throw new ArgumentNullException(nameof(store));
if (loggerFactory == null) throw new ArgumentNullException(nameof(loggerFactory));
this.logger = loggerFactory.CreateLogger(store.GetType());
this.name = name;
this.grainRef = grainRef;
this.store = store;
this.grainState = new GrainState<TState>(Activator.CreateInstance<TState>());
}
/// <summary>
/// Async method to cause refresh of the current grain state data from backing store.
/// Any previous contents of the grain state data will be overwritten.
/// </summary>
public async Task ReadStateAsync()
{
const string what = "ReadState";
Stopwatch sw = Stopwatch.StartNew();
try
{
GrainRuntime.CheckRuntimeContext();
await store.ReadStateAsync(name, grainRef, grainState);
StorageStatisticsGroup.OnStorageRead(name, grainRef, sw.Elapsed);
}
catch (Exception exc)
{
StorageStatisticsGroup.OnStorageReadError(name, grainRef);
string errMsg = MakeErrorMsg(what, exc);
this.logger.Error((int)ErrorCode.StorageProvider_ReadFailed, errMsg, exc);
if (!(exc is OrleansException))
{
throw new OrleansException(errMsg, exc);
}
throw;
}
finally
{
sw.Stop();
}
}
/// <summary>
/// Async method to cause write of the current grain state data into backing store.
/// </summary>
public async Task WriteStateAsync()
{
const string what = "WriteState";
try
{
GrainRuntime.CheckRuntimeContext();
Stopwatch sw = Stopwatch.StartNew();
await store.WriteStateAsync(name, grainRef, grainState);
sw.Stop();
StorageStatisticsGroup.OnStorageWrite(name, grainRef, sw.Elapsed);
}
catch (Exception exc)
{
StorageStatisticsGroup.OnStorageWriteError(name, grainRef);
string errMsgToLog = MakeErrorMsg(what, exc);
this.logger.Error((int)ErrorCode.StorageProvider_WriteFailed, errMsgToLog, exc);
// If error is not specialization of OrleansException, wrap it
if (!(exc is OrleansException))
{
throw new OrleansException(errMsgToLog, exc);
}
throw;
}
}
/// <summary>
/// Async method to cause write of the current grain state data into backing store.
/// </summary>
public async Task ClearStateAsync()
{
const string what = "ClearState";
try
{
GrainRuntime.CheckRuntimeContext();
Stopwatch sw = Stopwatch.StartNew();
// Clear (most likely Delete) state from external storage
await store.ClearStateAsync(name, grainRef, grainState);
sw.Stop();
// Reset the in-memory copy of the state
grainState.State = Activator.CreateInstance<TState>();
// Update counters
StorageStatisticsGroup.OnStorageDelete(name, grainRef, sw.Elapsed);
}
catch (Exception exc)
{
StorageStatisticsGroup.OnStorageDeleteError(name, grainRef);
string errMsg = MakeErrorMsg(what, exc);
this.logger.Error((int)ErrorCode.StorageProvider_DeleteFailed, errMsg, exc);
if (!(exc is OrleansException))
{
throw new OrleansException(errMsg, exc);
}
throw;
}
}
private string MakeErrorMsg(string what, Exception exc)
{
HttpStatusCode httpStatusCode;
string errorCode = string.Empty;
var decoder = store as IRestExceptionDecoder;
decoder?.DecodeException(exc, out httpStatusCode, out errorCode, true);
return string.Format("Error from storage provider {0} during {1} for grain Type={2} Pk={3} Id={4} Error={5}" + Environment.NewLine + " {6}",
$"{this.store.GetType().Name}.{this.name}", what, name, grainRef.GrainId.ToDetailedString(), grainRef, errorCode, LogFormatter.PrintException(exc));
}
}
}