/
BasicRedisClientManager.cs
182 lines (147 loc) · 5.64 KB
/
BasicRedisClientManager.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
//
// https://github.com/ServiceStack/ServiceStack.Redis
// ServiceStack.Redis: ECMA CLI Binding to the Redis key-value storage system
//
// Authors:
// Demis Bellot (demis.bellot@gmail.com)
//
// Copyright 2013 ServiceStack, Inc. All Rights Reserved.
//
// Licensed under the same terms of ServiceStack.
//
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using ServiceStack.Logging;
using ServiceStack.Text;
namespace ServiceStack.Redis;
/// <summary>
/// Provides thread-safe retrieval of redis clients since each client is a new one.
/// Allows the configuration of different ReadWrite and ReadOnly hosts
/// </summary>
public partial class BasicRedisClientManager
: IRedisClientsManager, IRedisFailover, IHasRedisResolver, IHasStats
{
public static ILog Log = LogManager.GetLogger(typeof(BasicRedisClientManager));
public int? ConnectTimeout { get; set; }
public int? SocketSendTimeout { get; set; }
public int? SocketReceiveTimeout { get; set; }
public int? IdleTimeOutSecs { get; set; }
/// <summary>
/// Gets or sets object key prefix.
/// </summary>
public string NamespacePrefix { get; set; }
private int readWriteHostsIndex;
private int readOnlyHostsIndex;
protected int RedisClientCounter = 0;
public Func<RedisEndpoint, RedisClient> ClientFactory { get; set; }
public long? Db { get; private set; }
public Action<IRedisNativeClient> ConnectionFilter { get; set; }
public List<Action<IRedisClientsManager>> OnFailover { get; private set; }
public IRedisResolver RedisResolver { get; set; }
public BasicRedisClientManager() : this(RedisConfig.DefaultHost) { }
public BasicRedisClientManager(params string[] readWriteHosts)
: this(readWriteHosts, readWriteHosts) { }
public BasicRedisClientManager(int initialDb, params string[] readWriteHosts)
: this(readWriteHosts, readWriteHosts, initialDb) { }
/// <summary>
/// Hosts can be an IP Address or Hostname in the format: host[:port]
/// e.g. 127.0.0.1:6379
/// default is: localhost:6379
/// </summary>
/// <param name="readWriteHosts">The write hosts.</param>
/// <param name="readOnlyHosts">The read hosts.</param>
/// <param name="initialDb"></param>
public BasicRedisClientManager(
IEnumerable<string> readWriteHosts,
IEnumerable<string> readOnlyHosts,
long? initialDb = null)
: this(readWriteHosts.ToRedisEndPoints(), readOnlyHosts.ToRedisEndPoints(), initialDb) {}
public BasicRedisClientManager(
IEnumerable<RedisEndpoint> readWriteHosts,
IEnumerable<RedisEndpoint> readOnlyHosts,
long? initialDb = null)
{
this.Db = initialDb;
RedisResolver = new RedisResolver(readWriteHosts, readOnlyHosts);
this.OnFailover = new List<Action<IRedisClientsManager>>();
JsConfig.InitStatics();
this.OnStart();
}
protected virtual void OnStart()
{
this.Start();
}
/// <summary>
/// Returns a Read/Write client (The default) using the hosts defined in ReadWriteHosts
/// </summary>
/// <returns></returns>
public IRedisClient GetClient() => GetClientImpl();
private RedisClient GetClientImpl()
{
var client = InitNewClient(RedisResolver.CreateMasterClient(readWriteHostsIndex++));
return client;
}
/// <summary>
/// Returns a ReadOnly client using the hosts defined in ReadOnlyHosts.
/// </summary>
/// <returns></returns>
public virtual IRedisClient GetReadOnlyClient() => GetReadOnlyClientImpl();
private RedisClient GetReadOnlyClientImpl()
{
var client = InitNewClient(RedisResolver.CreateSlaveClient(readOnlyHostsIndex++));
return client;
}
private RedisClient InitNewClient(RedisClient client)
{
client.Id = Interlocked.Increment(ref RedisClientCounter);
client.ConnectionFilter = ConnectionFilter;
if (this.ConnectTimeout != null)
client.ConnectTimeout = this.ConnectTimeout.Value;
if (this.SocketSendTimeout.HasValue)
client.SendTimeout = this.SocketSendTimeout.Value;
if (this.SocketReceiveTimeout.HasValue)
client.ReceiveTimeout = this.SocketReceiveTimeout.Value;
if (this.IdleTimeOutSecs.HasValue)
client.IdleTimeOutSecs = this.IdleTimeOutSecs.Value;
if (this.NamespacePrefix != null)
client.NamespacePrefix = NamespacePrefix;
if (Db != null && client.Db != Db) //Reset database to default if changed
client.ChangeDb(Db.Value);
return client;
}
public void SetAll<T>(IDictionary<string, T> values)
{
foreach (var entry in values)
{
Set(entry.Key, entry.Value);
}
}
public void Start()
{
readWriteHostsIndex = 0;
readOnlyHostsIndex = 0;
}
public void FailoverTo(params string[] readWriteHosts)
{
FailoverTo(readWriteHosts, readWriteHosts);
}
public void FailoverTo(IEnumerable<string> readWriteHosts, IEnumerable<string> readOnlyHosts)
{
Interlocked.Increment(ref RedisState.TotalFailovers);
var masters = readWriteHosts.ToList();
var replicas = readOnlyHosts.ToList();
Log.Info($"FailoverTo: {string.Join(",", masters)} : {string.Join(",", replicas)} Total: {RedisState.TotalFailovers}");
lock (this)
{
RedisResolver.ResetMasters(masters);
RedisResolver.ResetSlaves(replicas);
}
Start();
}
public void Dispose()
{
}
public Dictionary<string, long> Stats => RedisStats.ToDictionary();
}