-
Notifications
You must be signed in to change notification settings - Fork 45
/
client.py
197 lines (163 loc) · 7.17 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#!/usr/bin/env python3
import time
import struct
from .connector import Connector
from .protocol import CanalProtocol_pb2
from .protocol import EntryProtocol_pb2
class Client(object):
def __init__(self):
self.connector = Connector()
def connect(self, host='127.0.0.1', port=11111):
self.connector.connect(host, port)
data = self.connector.read_next_packet()
packet = CanalProtocol_pb2.Packet()
packet.MergeFromString(data)
if packet.type != CanalProtocol_pb2.PacketType.HANDSHAKE:
raise Exception('connect error')
print('connected to %s:%s' % (host, port))
def disconnect(self):
self.connector.disconnect()
def check_valid(self, username=b'', password=b''):
client_auth = CanalProtocol_pb2.ClientAuth()
client_auth.username = username
client_auth.password = password
packet = CanalProtocol_pb2.Packet()
packet.type = CanalProtocol_pb2.PacketType.CLIENTAUTHENTICATION
packet.body = client_auth.SerializeToString()
self.connector.write_with_header(packet.SerializeToString())
data = self.connector.read_next_packet()
packet = CanalProtocol_pb2.Packet()
packet.MergeFromString(data)
if packet.type != CanalProtocol_pb2.PacketType.ACK:
raise Exception('Auth error')
ack = CanalProtocol_pb2.Ack()
ack.MergeFromString(packet.body)
if ack.error_code > 0:
raise Exception('something goes wrong when doing authentication. error code:%s, error message:%s' % (ack.error_code, ack.error_message))
print('Auth succed')
def subscribe(self, client_id=b'1001', destination=b'example', filter=b'.*\\..*'):
self.client_id = client_id
self.destination = destination
self.rollback(0)
sub = CanalProtocol_pb2.Sub()
sub.destination = destination
sub.client_id = client_id
sub.filter = filter
packet = CanalProtocol_pb2.Packet()
packet.type = CanalProtocol_pb2.PacketType.SUBSCRIPTION
packet.body = sub.SerializeToString()
self.connector.write_with_header(packet.SerializeToString())
data = self.connector.read_next_packet()
packet = CanalProtocol_pb2.Packet()
packet.MergeFromString(data)
if packet.type != CanalProtocol_pb2.PacketType.ACK:
raise Exception('Subscribe error.')
ack = CanalProtocol_pb2.Ack()
ack.MergeFromString(packet.body)
if ack.error_code > 0:
raise Exception('Failed to subscribe. error code:%s, error message:%s' % (ack.error_code, ack.error_message))
print('Subscribe succed')
def unsubscribe(self):
pass
def get(self, size=100):
message = self.get_without_ack(size)
self.ack(message['id'])
return message
def get_without_ack(self, batch_size=10, timeout=-1, unit=-1):
get = CanalProtocol_pb2.Get()
get.client_id = self.client_id
get.destination = self.destination
get.auto_ack = False
get.fetch_size = batch_size
get.timeout = timeout
get.unit = unit
packet = CanalProtocol_pb2.Packet()
packet.type = CanalProtocol_pb2.PacketType.GET
packet.body = get.SerializeToString()
self.connector.write_with_header(packet.SerializeToString())
data = self.connector.read_next_packet()
packet = CanalProtocol_pb2.Packet()
packet.MergeFromString(data)
message = dict(id=0, entries=[])
if packet.type == CanalProtocol_pb2.PacketType.MESSAGES:
messages = CanalProtocol_pb2.Messages()
messages.MergeFromString(packet.body)
if messages.batch_id > 0:
message['id'] = messages.batch_id
for item in messages.messages:
entry = EntryProtocol_pb2.Entry()
entry.MergeFromString(item)
message['entries'].append(entry)
elif packet.type == CanalProtocol_pb2.PacketType.ACK:
ack = CanalProtocol_pb2.PacketType.Ack()
ack.MergeFromString(packet.body)
if ack.error_code > 0:
raise Exception('get data error. error code:%s, error message:%s' % (ack.error_code, ack.error_message))
else:
raise Exception('unexpected packet type:%s' % (packet.type))
return message
def ack(self, message_id):
if message_id:
clientack = CanalProtocol_pb2.ClientAck()
clientack.destination = self.destination
clientack.client_id = self.client_id
clientack.batch_id = message_id
packet = CanalProtocol_pb2.Packet()
packet.type = CanalProtocol_pb2.PacketType.CLIENTACK
packet.body = clientack.SerializeToString()
self.connector.write_with_header(packet.SerializeToString())
def rollback(self, batch_id):
cb = CanalProtocol_pb2.ClientRollback()
cb.batch_id = batch_id
cb.client_id = self.client_id
cb.destination = self.destination
packet = CanalProtocol_pb2.Packet()
packet.type = CanalProtocol_pb2.PacketType.CLIENTROLLBACK
packet.body = cb.SerializeToString()
self.connector.write_with_header(packet.SerializeToString())
if __name__ == "__main__":
client = Client()
client.connect(host='172.12.0.13')
client.check_valid()
client.subscribe()
while True:
message = client.get(100)
entries = message['entries']
for entry in entries:
entry_type = entry.entryType
if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
continue
row_change = EntryProtocol_pb2.RowChange()
row_change.MergeFromString(entry.storeValue)
event_type = row_change.eventType
header = entry.header
database = header.schemaName
table = header.tableName
event_type = header.eventType
for row in row_change.rowDatas:
format_data = dict()
if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
format_data = {
column.name: column.value
}
elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
format_data = {
column.name: column.value
}
else:
format_data['before'] = format_data['after'] = dict()
for column in row.beforeColumns:
format_data['before'][column.name] = column.value
for column in row.afterColumns:
format_data['after'][column.name] = column.value
data = dict(
db=database,
table=table,
event_type=event_type,
data=format_data,
)
print(data)
time.sleep(1)
client.disconnect()