/
bigchain.py
401 lines (314 loc) · 14.4 KB
/
bigchain.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
"""Implementation of the `bigchaindb` command,
the command-line interface (CLI) for BigchainDB Server.
"""
import os
import sys
import logging
import argparse
import copy
import json
import builtins
import logstats
from bigchaindb.common import crypto
from bigchaindb.common.exceptions import (StartupError,
DatabaseAlreadyExists,
KeypairNotFoundException)
import bigchaindb
import bigchaindb.config_utils
from bigchaindb.models import Transaction
from bigchaindb.utils import ProcessGroup
from bigchaindb import backend
from bigchaindb.backend import schema
from bigchaindb.backend.admin import (set_replicas, set_shards, add_replicas,
remove_replicas)
from bigchaindb.backend.exceptions import OperationError
from bigchaindb.commands import utils
from bigchaindb import processes
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# We need this because `input` always prints on stdout, while it should print
# to stderr. It's a very old bug, check it out here:
# - https://bugs.python.org/issue1927
def input_on_stderr(prompt=''):
print(prompt, end='', file=sys.stderr)
return builtins.input()
def run_show_config(args):
"""Show the current configuration"""
# TODO Proposal: remove the "hidden" configuration. Only show config. If
# the system needs to be configured, then display information on how to
# configure the system.
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
config = copy.deepcopy(bigchaindb.config)
del config['CONFIGURED']
private_key = config['keypair']['private']
config['keypair']['private'] = 'x' * 45 if private_key else None
print(json.dumps(config, indent=4, sort_keys=True))
def run_configure(args, skip_if_exists=False):
"""Run a script to configure the current node.
Args:
skip_if_exists (bool): skip the function if a config file already exists
"""
config_path = args.config or bigchaindb.config_utils.CONFIG_DEFAULT_PATH
config_file_exists = False
# if the config path is `-` then it's stdout
if config_path != '-':
config_file_exists = os.path.exists(config_path)
if config_file_exists and skip_if_exists:
return
if config_file_exists and not args.yes:
want = input_on_stderr('Config file `{}` exists, do you want to '
'override it? (cannot be undone) [y/N]: '.format(config_path))
if want != 'y':
return
conf = copy.deepcopy(bigchaindb.config)
# Patch the default configuration with the new values
conf = bigchaindb.config_utils.update(
conf,
bigchaindb.config_utils.env_config(bigchaindb.config))
print('Generating keypair', file=sys.stderr)
conf['keypair']['private'], conf['keypair']['public'] = \
crypto.generate_key_pair()
# select the correct config defaults based on the backend
print('Generating default configuration for backend {}'
.format(args.backend))
conf['database'] = bigchaindb._database_map[args.backend]
if not args.yes:
for key in ('bind', ):
val = conf['server'][key]
conf['server'][key] = \
input_on_stderr('API Server {}? (default `{}`): '.format(key, val)) \
or val
for key in ('host', 'port', 'name'):
val = conf['database'][key]
conf['database'][key] = \
input_on_stderr('Database {}? (default `{}`): '.format(key, val)) \
or val
val = conf['backlog_reassign_delay']
conf['backlog_reassign_delay'] = \
input_on_stderr(('Stale transaction reassignment delay (in '
'seconds)? (default `{}`): '.format(val))) \
or val
if config_path != '-':
bigchaindb.config_utils.write_config(conf, config_path)
else:
print(json.dumps(conf, indent=4, sort_keys=True))
print('Configuration written to {}'.format(config_path), file=sys.stderr)
print('Ready to go!', file=sys.stderr)
def run_export_my_pubkey(args):
"""Export this node's public key to standard output
"""
logger.debug('bigchaindb args = {}'.format(args))
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
pubkey = bigchaindb.config['keypair']['public']
if pubkey is not None:
print(pubkey)
else:
sys.exit("This node's public key wasn't set anywhere "
"so it can't be exported")
# raises SystemExit exception
# message is sent to stderr
# exits with exit code 1 (signals tha an error happened)
def _run_init():
# Try to access the keypair, throws an exception if it does not exist
b = bigchaindb.Bigchain()
schema.init_database(connection=b.connection)
logger.info('Create genesis block.')
b.create_genesis_block()
logger.info('Done, have fun!')
def run_init(args):
"""Initialize the database"""
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
# TODO Provide mechanism to:
# 1. prompt the user to inquire whether they wish to drop the db
# 2. force the init, (e.g., via -f flag)
try:
_run_init()
except DatabaseAlreadyExists:
print('The database already exists.', file=sys.stderr)
print('If you wish to re-initialize it, first drop it.', file=sys.stderr)
def run_drop(args):
"""Drop the database"""
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
dbname = bigchaindb.config['database']['name']
if not args.yes:
response = input_on_stderr('Do you want to drop `{}` database? [y/n]: '.format(dbname))
if response != 'y':
return
conn = backend.connect()
dbname = bigchaindb.config['database']['name']
schema.drop_database(conn, dbname)
def run_start(args):
"""Start the processes to run the node"""
logger.info('BigchainDB Version {}'.format(bigchaindb.__version__))
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
if args.allow_temp_keypair:
if not (bigchaindb.config['keypair']['private'] or
bigchaindb.config['keypair']['public']):
private_key, public_key = crypto.generate_key_pair()
bigchaindb.config['keypair']['private'] = private_key
bigchaindb.config['keypair']['public'] = public_key
else:
logger.warning('Keypair found, no need to create one on the fly.')
if args.start_rethinkdb:
try:
proc = utils.start_rethinkdb()
except StartupError as e:
sys.exit('Error starting RethinkDB, reason is: {}'.format(e))
logger.info('RethinkDB started with PID %s' % proc.pid)
try:
_run_init()
except DatabaseAlreadyExists:
pass
except KeypairNotFoundException:
sys.exit("Can't start BigchainDB, no keypair found. "
'Did you run `bigchaindb configure`?')
logger.info('Starting BigchainDB main process with public key %s',
bigchaindb.config['keypair']['public'])
processes.start()
def _run_load(tx_left, stats):
logstats.thread.start(stats)
b = bigchaindb.Bigchain()
while True:
tx = Transaction.create([b.me], [([b.me], 1)])
tx = tx.sign([b.me_private])
b.write_transaction(tx)
stats['transactions'] += 1
if tx_left is not None:
tx_left -= 1
if tx_left == 0:
break
def run_load(args):
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
logger.info('Starting %s processes', args.multiprocess)
stats = logstats.Logstats()
logstats.thread.start(stats)
tx_left = None
if args.count > 0:
tx_left = int(args.count / args.multiprocess)
workers = ProcessGroup(concurrency=args.multiprocess,
target=_run_load,
args=(tx_left, stats.get_child()))
workers.start()
def run_set_shards(args):
conn = backend.connect()
try:
set_shards(conn, shards=args.num_shards)
except OperationError as e:
logger.warn(e)
def run_set_replicas(args):
conn = backend.connect()
try:
set_replicas(conn, replicas=args.num_replicas)
except OperationError as e:
logger.warn(e)
def run_add_replicas(args):
# Note: This command is specific to MongoDB
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
conn = backend.connect()
try:
add_replicas(conn, args.replicas)
except (OperationError, NotImplementedError) as e:
logger.warn(e)
else:
logger.info('Added {} to the replicaset.'.format(args.replicas))
def run_remove_replicas(args):
# Note: This command is specific to MongoDB
bigchaindb.config_utils.autoconfigure(filename=args.config, force=True)
conn = backend.connect()
try:
remove_replicas(conn, args.replicas)
except (OperationError, NotImplementedError) as e:
logger.warn(e)
else:
logger.info('Removed {} from the replicaset.'.format(args.replicas))
def create_parser():
parser = argparse.ArgumentParser(
description='Control your BigchainDB node.',
parents=[utils.base_parser])
parser.add_argument('--dev-start-rethinkdb',
dest='start_rethinkdb',
action='store_true',
help='Run RethinkDB on start')
parser.add_argument('--dev-allow-temp-keypair',
dest='allow_temp_keypair',
action='store_true',
help='Generate a random keypair on start')
# all the commands are contained in the subparsers object,
# the command selected by the user will be stored in `args.command`
# that is used by the `main` function to select which other
# function to call.
subparsers = parser.add_subparsers(title='Commands',
dest='command')
# parser for writing a config file
config_parser = subparsers.add_parser('configure',
help='Prepare the config file '
'and create the node keypair')
config_parser.add_argument('backend',
choices=['rethinkdb', 'mongodb'],
help='The backend to use. It can be either '
'rethinkdb or mongodb.')
# parsers for showing/exporting config values
subparsers.add_parser('show-config',
help='Show the current configuration')
subparsers.add_parser('export-my-pubkey',
help="Export this node's public key")
# parser for database-level commands
subparsers.add_parser('init',
help='Init the database')
subparsers.add_parser('drop',
help='Drop the database')
# parser for starting BigchainDB
subparsers.add_parser('start',
help='Start BigchainDB')
# parser for configuring the number of shards
sharding_parser = subparsers.add_parser('set-shards',
help='Configure number of shards')
sharding_parser.add_argument('num_shards', metavar='num_shards',
type=int, default=1,
help='Number of shards')
# parser for configuring the number of replicas
replicas_parser = subparsers.add_parser('set-replicas',
help='Configure number of replicas')
replicas_parser.add_argument('num_replicas', metavar='num_replicas',
type=int, default=1,
help='Number of replicas (i.e. the replication factor)')
# parser for adding nodes to the replica set
add_replicas_parser = subparsers.add_parser('add-replicas',
help='Add a set of nodes to the '
'replica set. This command '
'is specific to the MongoDB'
' backend.')
add_replicas_parser.add_argument('replicas', nargs='+',
type=utils.mongodb_host,
help='A list of space separated hosts to '
'add to the replicaset. Each host '
'should be in the form `host:port`.')
# parser for removing nodes from the replica set
rm_replicas_parser = subparsers.add_parser('remove-replicas',
help='Remove a set of nodes from the '
'replica set. This command '
'is specific to the MongoDB'
' backend.')
rm_replicas_parser.add_argument('replicas', nargs='+',
type=utils.mongodb_host,
help='A list of space separated hosts to '
'remove from the replicaset. Each host '
'should be in the form `host:port`.')
load_parser = subparsers.add_parser('load',
help='Write transactions to the backlog')
load_parser.add_argument('-m', '--multiprocess',
nargs='?',
type=int,
default=False,
help='Spawn multiple processes to run the command, '
'if no value is provided, the number of processes '
'is equal to the number of cores of the host machine')
load_parser.add_argument('-c', '--count',
default=0,
type=int,
help='Number of transactions to push. If the parameter -m '
'is set, the count is distributed equally to all the '
'processes')
return parser
def main():
utils.start(create_parser(), sys.argv[1:], globals())