/
shell.py
153 lines (125 loc) · 4.29 KB
/
shell.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
'''
A Python REPL for Starbelly.
This "shell" imports useful modules and sets up the application
configuration, database pool, and other useful features. This shell
is intended for use with Python's interactive flag, i.e.:
$ python3 -im tools.shell
>>> config['database']['user']
'starbelly-app'
You can also load this in Jupyter Notebook by running this in the first cell:
from tools.shell import *
The shell is handy for development and debugging in order to execute
sections of Starbelly without running the entire server.
'''
import functools
import logging
import os
import sys
from IPython.terminal.embed import InteractiveShellEmbed
from rethinkdb import RethinkDB
import trio
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
import starbelly.config
from starbelly.version import __version__
# Globals exposed in the shell:
r = RethinkDB()
r.set_loop_type('trio')
logger = None
config = None
def run_query(query, super_user=False):
''' Run ``query`` on RethinkDB and return result. '''
async def async_query():
db_config = config['database']
kwargs = {
'host': db_config['host'],
'port': db_config['port'],
'db': db_config['db'],
'user': db_config['user'],
'password': db_config['password'],
}
if super_user:
kwargs['user'] = db_config['super_user']
kwargs['password'] = db_config['super_password']
async with trio.open_nursery() as nursery:
kwargs['nursery'] = nursery
connect_db = functools.partial(r.connect, **kwargs)
conn = await connect_db()
try:
result = await query.run(conn)
finally:
await conn.close()
return result
return trio.run(async_query)
def list_results(results):
'''
Convert query results to list, even if the query resulted in a cursor. '''
async def async_list_results():
if isinstance(results, list):
return results.copy()
elif isinstance(results, r.Cursor):
l = list()
async with results:
async for item in results:
l.append(item)
return l
else:
type_ = type(results)
logger.error(f'RethinkDB UNKNOWN TYPE: {type_}')
return None
return trio.run(async_list_results)
def print_results(results):
'''
Pretty print RethinkDB query results.
This method correctly handles different types of results, such as a
cursor, list, etc.
'''
async def async_print_results():
MAX_ITEMS = 100
INDENT = ' '
if isinstance(results, list):
len_ = len(results)
print(f'RethinkDB List (len={len_}): [')
for item in results[:MAX_ITEMS]:
print(f'{INDENT}{item},')
if len_ > MAX_ITEMS:
print(f'{INDENT}...')
print(']')
elif isinstance(results, r.Cursor):
print('RethinkDB Cursor: [')
item_count = 0
try:
async for item in results:
if item_count > MAX_ITEMS:
print(f'{INDENT}...')
print(f'{INDENT}{item},')
item_count += 1
finally:
await results.close()
print(']')
else:
type_ = type(results)
logger.error(f'RethinkDB UNKNOWN TYPE: {type_}')
trio.run(async_print_results)
def setup():
''' Set up configuration and logging. '''
global config, logger
log_format = '%(asctime)s [%(name)s] %(levelname)s: %(message)s'
log_date_format = '%H:%M:%S'
log_formatter = logging.Formatter(log_format, log_date_format)
log_handler = logging.StreamHandler(sys.stdout)
log_handler.setFormatter(log_formatter)
logger = logging.getLogger('tools.shell')
logger.addHandler(log_handler)
logger.setLevel(logging.INFO)
config = starbelly.config.get_config()
def main():
''' Run IPython shell. '''
ipy_shell = InteractiveShellEmbed(
banner1=f'IPython Shell: Starbelly v{__version__}')
ipy_shell.magic('autoawait trio')
ipy_shell()
setup()
if __name__ == '__main__':
main()
else:
print(f'Starbelly v{__version__} Shell')