-
Notifications
You must be signed in to change notification settings - Fork 2
/
CosmosDb.cs
127 lines (112 loc) · 5.04 KB
/
CosmosDb.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
namespace Canaan
{
public class CosmosDB : Api
{
#region Constructors
public CosmosDB(string endpointUrl, string authKey, string databaseId, CancellationToken ct) : base(ct)
{
EndpointUrl = endpointUrl;
AuthKey = authKey;
Client = new CosmosClient(EndpointUrl, AuthKey,
new CosmosClientOptions()
{
ApplicationName = "Canaan",
ConnectionMode = ConnectionMode.Direct,
MaxRetryAttemptsOnRateLimitedRequests = 30,
MaxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromSeconds(30),
ConsistencyLevel = ConsistencyLevel.Session});
DatabaseId = databaseId;
Info("Created client for CosmosDB database {0} at {1}.", DatabaseId, Client.Endpoint);
Initialized = true;
}
public CosmosDB(string databaseId, CancellationToken ct) : this(Config("CosmosDB:EndpointUrl"), Config("CosmosDB:AuthKey"), databaseId, ct) {}
public CosmosDB(string databaseId) : this(databaseId, Cts.Token) {}
#endregion
#region Properties
public string EndpointUrl { get; }
public string AuthKey { get; }
public string DatabaseId { get; }
public CosmosClient Client { get; protected set; }
#endregion
#region Methods
public async Task<T> GetAsync<T>(string containerId, string partitionKey, string itemId)
{
ThrowIfNotInitialized();
var container = Client.GetContainer(DatabaseId, containerId);
var r = await container.ReadItemAsync<T>(itemId, new PartitionKey(partitionKey), cancellationToken: CancellationToken);
return r.Resource;
}
public async Task<IEnumerable<T>> GetAsync<T>(string containerId, string partitionKey, string query, Dictionary<string, object> parameters)
{
ThrowIfNotInitialized();
var container = Client.GetContainer(DatabaseId, containerId);
QueryDefinition q = new QueryDefinition(query);
if (parameters != null)
{
for (int i = 0; i < parameters.Count; i++)
{
q = q.WithParameter(parameters.Keys.ElementAt(i), parameters.Values.ElementAt(i));
}
}
var r = container.GetItemQueryIterator<T>(q,
requestOptions: new QueryRequestOptions()
{
PartitionKey = new PartitionKey(partitionKey)
});
var items = new List<T>();
while (r.HasMoreResults)
{
var itemr = await r.ReadNextAsync(CancellationToken);
items.AddRange(itemr.Resource);
}
return items;
}
public async Task<IEnumerable<T>> GetAsync<T>(string containerId, string partitionKey, IEnumerable<string> itemIds)
{
ThrowIfNotInitialized();
string ids = itemIds.Select(s => "'" + s + "'").Aggregate((i1, i2) => $"{i1},{i2}");
string query = $"select * from c where c.id in ({ids})";
return await GetAsync<T>(containerId, partitionKey, query, null);
}
public async Task<T> GetScalarAsync<T>(string containerId, string partitionKey, string query, Dictionary<string, object> parameters)
{
ThrowIfNotInitialized();
var container = Client.GetContainer(DatabaseId, containerId);
QueryDefinition q = new QueryDefinition(query);
if (parameters != null)
{
for (int i = 0; i < parameters.Count; i++)
{
q = q.WithParameter(parameters.Keys.ElementAt(i), parameters.Values.ElementAt(i));
}
}
var r = container.GetItemQueryIterator<T>(q,
requestOptions: new QueryRequestOptions()
{
PartitionKey = new PartitionKey(partitionKey)
});
var result = await r.ReadNextAsync();
return result.Resource.Single();
}
public async Task CreateAsync<T>(string containerId, string partitionKey, T item)
{
ThrowIfNotInitialized();
var container = Client.GetContainer(DatabaseId, containerId);
await container.CreateItemAsync(item, partitionKey: new PartitionKey(partitionKey), cancellationToken: CancellationToken);
}
public async Task UpsertAsync<T>(string containerId, string partitionKey, T item)
{
ThrowIfNotInitialized();
var container = Client.GetContainer(DatabaseId, containerId);
await container.UpsertItemAsync(item, partitionKey: new PartitionKey(partitionKey), cancellationToken: CancellationToken);
}
#endregion
}
}