/
index_replication_thread.hpp
111 lines (103 loc) · 2.98 KB
/
index_replication_thread.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#pragma once
#ifndef __FL_NOMOS_INDEX_REPLICATION_THREAD_HPP
#define __FL_NOMOS_INDEX_REPLICATION_THREAD_HPP
///////////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2014 Final Level
// Author: Denys Misko <gdraal@gmail.com>
// Distributed under BSD (3-Clause) License (See
// accompanying file LICENSE)
//
// Description: Index's replication threads implementation
///////////////////////////////////////////////////////////////////////////////
#include "thread.hpp"
#include "socket.hpp"
#include "mutex.hpp"
#include "types.hpp"
#include "buffer.hpp"
#include "bstring.hpp"
#include "file.hpp"
#include <vector>
namespace fl {
namespace nomos {
using fl::network::Socket;
using fl::network::TDescriptor;
using fl::network::TIPv4;
using fl::threads::Mutex;
using fl::utils::Buffer;
using fl::strings::BString;
struct ReadBinLogRequest
{
TReplicationLogNumber number;
uint32_t seek;
};
struct ReadBinLogAnswer
{
TReplicationLogNumber number;
uint32_t seek;
uint32_t size;
};
class ReplicationActiveThread : public fl::threads::Thread
{
public:
ReplicationActiveThread(TIPv4 hostIp, const uint32_t hostPort, class Index *index);
virtual ~ReplicationActiveThread() {};
private:
bool _handshake();
bool _openReplicationInfo();
bool _saveReplicationInfo();
bool _getReplicationPacket();
virtual void run();
TIPv4 _hostIp;
uint32_t _hostPort;
TServerID _fromServer;
TReplicationLogNumber _startNumber;
uint32_t _startSeek;
ReadBinLogRequest _readBinLogPacket;
Socket _socket;
class Index *_index;
Buffer _buffer;
Buffer _data;
BString _infoData;
fl::fs::File _fd;
};
class ReplicationReceiverThread : public fl::threads::Thread
{
public:
ReplicationReceiverThread(const TDescriptor descr, const TIPv4 ip, class Index *index,
class ReplicationAcceptThread *acceptThread);
virtual ~ReplicationReceiverThread();
void setSocket(const TDescriptor descr, const TIPv4 ip);
private:
bool _doHandShake();
bool _recvPacket();
bool _getReplicationPacket();
virtual void run();
Socket _socket;
TIPv4 _ip;
class Index *_index;
class ReplicationAcceptThread *_acceptThread;
TServerID _toServer;
ReadBinLogRequest _readBinLogPacket;
Buffer _buffer;
Buffer _data;
};
class ReplicationAcceptThread : public fl::threads::Thread
{
public:
ReplicationAcceptThread(Socket *listenTo, class Index *index);
virtual ~ReplicationAcceptThread();
void stop();
void addFreeReceiver(ReplicationReceiverThread *thread);
private:
virtual void run();
Socket *_listenTo;
class Index *_index;
typedef std::vector<ReplicationReceiverThread*> TReceiverThreadVector;
Mutex _sync;
TReceiverThreadVector _freeReceiver;
TReceiverThreadVector _receivers;
};
};
};
#endif // __FL_NOMOS_INDEX_REPLICATION_THREAD_HPP