-
Notifications
You must be signed in to change notification settings - Fork 0
/
cassandra-test-ha.py
214 lines (176 loc) · 8.1 KB
/
cassandra-test-ha.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#!/usr/bin/python
import random
import re
import sys
import time
from cassandra.cluster import Cluster
import paramiko
"""
To run this script, execute:
$ python test_cassandra_ha.py <data_amount>
Parameter <data_amount> is the number of data that you want to write to Cassandra.
Set to greater than 100 will trigger node-down job, to test HA.
"""
#################################
# Here are your configurations. #
#################################
CLUSTER_ADDRESSES = ["192.168.122.122",
"192.168.122.123",
"192.168.122.124",
# "192.168.122.125",
"192.168.122.126"]
SEED_NODES = ["192.168.122.122", "192.168.122.123"]
SCHEMA_SCRIPT = "drop table if exists bigboy.info;" \
"drop schema if exists bigboy;" \
"create schema bigboy" \
" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };" \
"use bigboy;" \
"create table bigboy.info(" \
" boy_id text," \
" time_stamp timestamp," \
" primary key (boy_id, time_stamp)" \
");"
KEYSPACE = "bigboy"
TABLE = "bigboy.info"
REPLICATION_FACTOR = 3
INSERT_INTERVAL = 0
INSERT_AMOUNT = int(sys.argv[1])
#################################
#################################
def get_ring_distribute(host, username="root", password="password"):
"""
cmd ` nodetool describering <keyspace> ` will output:
TokenRange(start_token:5940577727476728623, end_token:5943584674441571405, endpoints:[192.168.122.111], rpc_endpoints:[192.168.122.111], endpoint_details:[EndpointDetails(host:192.168.122.111, datacenter:datacenter1, rack:rack1)])\n'
we want to convert it to a list of (start_token, end_token, endpoints) pairs, such as:
[(-78030903380813158, -74533601447532126, "192.168.122.113"),
(4218276711791353687, 4219841154467858666, "192.168.122.111"),
...
]
"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host, username=username, password=password)
stdin, stdout, stderr = ssh.exec_command("nodetool describering %s" % KEYSPACE)
ring = "".join(stdout.readlines())
ssh.close()
# Assemble start tokens:
start_token_head_list = [m.end() for m in re.finditer("start_token:", ring)]
start_token_tail_list = [m.start() for m in re.finditer(', end_token', ring)]
start_token_list = []
for start_token_head, start_token_tail in zip(start_token_head_list, start_token_tail_list):
start_token_list.append(int(ring[start_token_head : start_token_tail]))
# Assemble end tokens:
end_token_head_list = [m.end() for m in re.finditer('end_token:', ring)]
end_token_tail_list = [m.start() for m in re.finditer(', endpoints', ring)]
end_token_list = []
for end_token_head, end_token_tail in zip(end_token_head_list, end_token_tail_list):
end_token_list.append(int(ring[end_token_head : end_token_tail]))
# Assemble endpoints:
endpoints_head_list = [m.end() for m in re.finditer(' endpoints:\[', ring)]
endpoints_tail_list = [m.start() for m in re.finditer('\], rpc_endpoints', ring)]
endpoints_list = []
for endpoints_head, endpoints_tail in zip(endpoints_head_list, endpoints_tail_list):
endpoints_list.append(ring[endpoints_head : endpoints_tail])
return zip(start_token_list, end_token_list, endpoints_list)
def start_cassandra_service(host, username="root", password="password"):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host, username=username, password=password)
ssh.exec_command("service cassandra start")
ssh.close()
def stop_cassandra_service(host, username="root", password="password"):
isCassandraStopped = False
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host, username=username, password=password)
ssh.exec_command("service cassandra stop")
time.sleep(10)
_, stdout, _ = ssh.exec_command("service cassandra status")
if 'Active: inactive' in ''.join(stdout.readlines()):
isCassandraStopped = True
ssh.close()
return isCassandraStopped
def main():
# Step1: Connect to Cassandra server.
try:
cluster = Cluster(CLUSTER_ADDRESSES)
session = cluster.connect()
except:
print "Error to connect to Cassandra server."
exit(1)
# Step2: Create schema for test.
try:
for script in SCHEMA_SCRIPT.split(';'):
if script:
session.execute(script)
time.sleep(2)
except Exception as e:
print e
print "Error occurs during creating schema for test."
exit(1)
# Step3: Get ring partition details for schema above..
for host in CLUSTER_ADDRESSES:
try:
ring_distribute = get_ring_distribute(host)
break
except:
print "Error connecting to %s to get ring info." % host
ring_collect = {}
down_nodes_number = REPLICATION_FACTOR - 1
seed_node_list = SEED_NODES
stor_node_list = [node for node in CLUSTER_ADDRESSES if node not in SEED_NODES]
down_nodes = []
# Step4: Start to insert data.
# TODO: Enhance to asynchrounous style such as using threadpool.
for idx in range(INSERT_AMOUNT):
# 4-1: Write data into database.
session.execute("INSERT INTO %s (boy_id, time_stamp) VALUES ('%s', %s)" % (TABLE, idx, int(time.time())))
time.sleep(0.05)
token_obj = session.execute("SELECT token(boy_id) from %s where boy_id='%s'" % (TABLE, idx))
token = token_obj._current_rows[0].system_token_boy_id
# 4-2: To know which node data is written to.
for head, tail, endpoints in ring_distribute:
if tail <= token <= head:
print " Write data %s to %s ..." % (idx, endpoints)
# endpoints might be "123.123.123.123" or "123.123.123.123, 123.123.123.124"
# or "123.123.123.123, 123.123.123.124, 123.123.123.125"
for ep in endpoints.split(', '):
if ep not in ring_collect:
ring_collect[ep] = 1
else:
ring_collect[ep] += 1
break
# 4-3: Stop Cassandra on some nodes but lower priority for stopping onseed nodes.
if down_nodes_number > 0 and INSERT_AMOUNT > 100 and idx == INSERT_AMOUNT/(down_nodes_number + 1):
if stor_node_list:
down_node = random.choice(stor_node_list)
else:
down_node = random.choice(seed_node_list)
print "#####################################################################"
print " Plan to stop Cassandra service on node: %s ..." % down_node
isCassandraStopped = stop_cassandra_service(down_node)
if isCassandraStopped:
down_nodes.append(down_node)
if down_node in stor_node_list:
stor_node_list.remove(down_node)
else:
seed_node_list.remove(down_node)
down_nodes_number -= 1
print "#####################################################################"
print " Cassandra service is stopped on node: %s" % down_node
print " Cassandra service DOWN: %s" % down_nodes
print " Cassandra service UP: %s" % (stor_node_list + seed_node_list)
print "#####################################################################\n"
time.sleep(2)
if "INSERT_INTERVAL" in globals():
time.sleep(INSERT_INTERVAL)
# Step5: Restart Cassandra service..
for node in down_nodes:
start_cassandra_service(node)
# Print result.
print "******************************************************************"
print " Amount of queries: %s. Count of data on each node: \n" % INSERT_AMOUNT
for endpoint in ring_collect:
print " %s: %s \n" % (endpoint, ring_collect[endpoint])
if __name__ == "__main__":
main()