-
Notifications
You must be signed in to change notification settings - Fork 0
/
Replication.hpp
265 lines (160 loc) · 8.68 KB
/
Replication.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
/*++
Program name:
Апостол CRM
Module Name:
Replication.hpp
Notices:
Replication process
Author:
Copyright (c) Prepodobny Alen
mailto: alienufo@inbox.ru
mailto: ufocomp@gmail.com
--*/
#ifndef APOSTOL_REPLICATION_PROCESS_HPP
#define APOSTOL_REPLICATION_PROCESS_HPP
//----------------------------------------------------------------------------------------------------------------------
#include "Replication/ReplicationClient.hpp"
//----------------------------------------------------------------------------------------------------------------------
extern "C++" {
namespace Apostol {
namespace Replication {
//--------------------------------------------------------------------------------------------------------------
//-- CReplicationActionHandler ---------------------------------------------------------------------------------
//--------------------------------------------------------------------------------------------------------------
typedef std::function<void (CCustomWebSocketClient *Sender, const CWSMessage &Request, CWSMessage &Response)> COnReplicationActionHandlerEvent;
//--------------------------------------------------------------------------------------------------------------
class CReplicationActionHandler: CObject {
private:
bool m_Allow;
COnReplicationActionHandlerEvent m_Handler;
public:
CReplicationActionHandler(bool Allow, COnReplicationActionHandlerEvent && Handler): CObject(), m_Allow(Allow), m_Handler(Handler) {
};
bool Allow() const { return m_Allow; };
void Handler(CCustomWebSocketClient *Sender, const CWSMessage &Request, CWSMessage &Response) {
if (m_Allow && m_Handler)
m_Handler(Sender, Request, Response);
}
};
//--------------------------------------------------------------------------------------------------------------
//-- CReplicationHandler ---------------------------------------------------------------------------------------
//--------------------------------------------------------------------------------------------------------------
class CReplicationProcess;
class CReplicationHandler;
//--------------------------------------------------------------------------------------------------------------
typedef std::function<void (CReplicationHandler *Handler)> COnReplicationHandlerEvent;
//--------------------------------------------------------------------------------------------------------------
class CReplicationHandler: public CPollConnection {
private:
CReplicationProcess *m_pModule;
unsigned long m_ReplicationId;
bool m_Allow;
COnReplicationHandlerEvent m_Handler;
int AddToQueue();
void RemoveFromQueue();
protected:
void SetAllow(bool Value) { m_Allow = Value; }
public:
CReplicationHandler(CReplicationProcess *AModule, unsigned long ReplicationId, COnReplicationHandlerEvent && Handler);
~CReplicationHandler() override;
unsigned long ReplicationId() const { return m_ReplicationId; }
bool Allow() const { return m_Allow; };
void Allow(bool Value) { SetAllow(Value); };
bool Handler();
void Close() override;
};
//--------------------------------------------------------------------------------------------------------------
//-- CReplicationProcess ---------------------------------------------------------------------------------------
//--------------------------------------------------------------------------------------------------------------
typedef CPollManager CQueueManager;
//--------------------------------------------------------------------------------------------------------------
enum CReplicationMode { rmSlave = 0, rmProxy, rmMaster };
//--------------------------------------------------------------------------------------------------------------
class CReplicationProcess: public CApplicationProcess, public CModuleProcess {
typedef CApplicationProcess inherited;
private:
CLocation m_Origin;
CProcessStatus m_Status;
CReplicationMode m_Mode;
size_t m_RelayId;
int m_ApplyCount;
bool m_NeedCheckReplicationLog;
CString m_Session;
CString m_Secret;
CString m_Source;
CString m_Server;
uint32_t m_ErrorCount;
CStringList m_Config;
CDateTime m_CheckDate;
CDateTime m_FixedDate;
CDateTime m_ApplyDate;
CProviders m_Providers;
CStringListPairs m_Tokens;
CReplicationClientManager m_ClientManager;
CQueue m_Queue;
CQueueManager m_QueueManager;
size_t m_Progress;
size_t m_MaxQueue;
void BeforeRun() override;
void AfterRun() override;
void CheckListen();
void InitListen();
void Apply();
void ApplyRelay(CWebSocketClientConnection *AConnection, size_t RelayId);
void CheckRelayLog(CWebSocketClientConnection *AConnection);
void Replication(CWebSocketClientConnection *AConnection) const;
void InitActions(CReplicationClient *AClient);
void InitServer();
CReplicationClient *GetReplicationClient();
void CreateReplicationClient();
void UnloadQueue();
void DeleteHandler(CReplicationHandler *AHandler);
void CheckProviders();
void Heartbeat(CDateTime Now);
void CreateAccessToken(CProvider &Provider, const CString &Application, CStringList &Tokens);
void OnReplication(CObject *Sender, const CWSMessage &Request, CWSMessage &Response);
protected:
void DoTimer(CPollEventHandler *AHandler) override;
void DoReplication(CReplicationHandler *AHandler);
void DoError(const Delphi::Exception::Exception &E);
void DoDataBaseError(const Delphi::Exception::Exception &E);
void DoWebSocketError(CTCPConnection *AConnection);
void DoClientConnected(CObject *Sender) override;
void DoClientDisconnected(CObject *Sender) override;
void DoClientHeartbeat(CObject *Sender);
void DoClientTimeOut(CObject *Sender);
void DoClientMessage(CObject *Sender, const CWSMessage &Message);
void DoClientError(CObject *Sender, int Code, const CString &Message);
void DoClientReplicationLog(CObject *Sender, const CJSON &Payload);
void DoClientReplicationCheckLog(CObject *Sender, unsigned long Id);
void DoClientReplicationCheckRelay(CObject *Sender, unsigned long RelayId);
void DoException(CTCPConnection *AConnection, const Delphi::Exception::Exception &E);
bool DoExecute(CTCPConnection *AConnection) override;
void DoPostgresNotify(CPQConnection *AConnection, PGnotify *ANotify);
void DoPostgresQueryExecuted(CPQPollQuery *APollQuery);
void DoPostgresQueryException(CPQPollQuery *APollQuery, const Delphi::Exception::Exception &E);
public:
explicit CReplicationProcess(CCustomProcess* AParent, CApplication *AApplication);
~CReplicationProcess() override = default;
static class CReplicationProcess *CreateProcess(CCustomProcess *AParent, CApplication *AApplication) {
return new CReplicationProcess(AParent, AApplication);
}
void Run() override;
void Reload() override;
void IncProgress() { m_Progress++; }
void DecProgress() { m_Progress--; }
int AddToQueue(CReplicationHandler *AHandler);
void InsertToQueue(int Index, CReplicationHandler *AHandler);
void RemoveFromQueue(CReplicationHandler *AHandler);
CQueue &Queue() { return m_Queue; }
const CQueue &Queue() const { return m_Queue; }
CPollManager *ptrQueueManager() { return &m_QueueManager; }
CPollManager &QueueManager() { return m_QueueManager; }
const CPollManager &QueueManager() const { return m_QueueManager; }
};
//--------------------------------------------------------------------------------------------------------------
}
}
using namespace Apostol::Replication;
}
#endif //APOSTOL_REPLICATION_PROCESS_HPP