This repository has been archived by the owner on Oct 16, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
/
DocumentDBStorageProvider.cs
141 lines (115 loc) · 4.34 KB
/
DocumentDBStorageProvider.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Orleans.Runtime;
using Orleans.Storage;
using System;
using System.Threading.Tasks;
namespace Orleans.StorageProvider.DocumentDB
{
// TODO: Ensure collection exists
// TODO: CI
// TODO: Testing
// TODO: Nuget
// TODO: readme
// TODO: extension method for registering the provider
class DocumentDBStorageProvider : IStorageProvider
{
private string databaseName;
private string collectionName;
public string Name { get; set; }
public Logger Log { get; set; }
private DocumentClient Client { get; set; }
public async Task Init(string name, Providers.IProviderRuntime providerRuntime, Providers.IProviderConfiguration config)
{
try
{
this.Name = name;
var url = config.Properties["Url"];
var key = config.Properties["Key"];
this.databaseName = config.Properties["Database"];
this.collectionName = config.Properties["Collection"];
this.Client = new DocumentClient(new Uri(url), key);
await this.Client.CreateDatabaseAsync(new Database { Id = this.databaseName });
var myCollection = new DocumentCollection
{
Id = this.collectionName
};
await this.Client.CreateDocumentCollectionAsync(
UriFactory.CreateDatabaseUri(this.databaseName),
myCollection,
new RequestOptions { /*OfferThroughput = 20000 */});
}
catch (Exception ex)
{
Log.Error(0, "Error in Init.", ex);
throw;
}
}
public Task Close()
{
if (null != this.Client) this.Client.Dispose();
return TaskDone.Done;
}
Uri GenerateUri(string grainType, GrainReference reference)
{
return UriFactory.CreateDocumentUri(databaseName, collectionName, reference.ToKeyString());
}
public async Task ReadStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
{
try
{
var uri = GenerateUri(grainType, grainReference);
Document readDoc = await this.Client.ReadDocumentAsync(uri);
if (null != readDoc)
{
grainState.ETag = readDoc.ETag;
grainState.State = readDoc.GetPropertyValue<object>("state");
}
}
catch (Exception ex)
{
Log.Error(0, "Error in ReadStateAsync", ex);
throw;
}
}
public async Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
{
try
{
var uri = GenerateUri(grainType, grainReference);
var document = new Document();
document.SetPropertyValue("state", grainState.State);
if (null != grainState.ETag)
{
var ac = new AccessCondition { Condition = grainState.ETag, Type = AccessConditionType.IfMatch };
await this.Client.ReplaceDocumentAsync(uri, document, new RequestOptions { AccessCondition = ac });
}
else
{
Document newDoc = await this.Client.CreateDocumentAsync(uri, document);
grainState.ETag = newDoc.ETag;
}
}
catch (Exception ex)
{
Log.Error(0, "Error in WriteStateAsync", ex);
throw;
}
}
public async Task ClearStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
{
try
{
var uri = GenerateUri(grainType, grainReference);
await this.Client.DeleteDocumentAsync(uri);
grainState.State = null;
grainState.ETag = null;
}
catch (Exception ex)
{
Log.Error(0, "Error in ClearStateAsync", ex);
throw;
}
}
}
}