-
Notifications
You must be signed in to change notification settings - Fork 2k
/
user.py
146 lines (121 loc) · 4.31 KB
/
user.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# encoding: utf-8
import logging
import sys
from pprint import pprint
import six
import click
from six import text_type
import ckan.logic as logic
import ckan.plugins as plugin
from ckan.cli import error_shout
log = logging.getLogger(__name__)
@click.group(name=u'user', short_help=u'Manage user commands')
@click.help_option(u'-h', u'--help')
def user():
pass
@user.command(u'add', short_help=u'Add new user')
@click.argument(u'username')
@click.argument(u'args', nargs=-1)
@click.pass_context
def add_user(ctx, username, args):
u'''Add new user if we use ckan sysadmin add
or ckan user add
'''
# parse args into data_dict
data_dict = {u'name': username}
for arg in args:
try:
field, value = arg.split(u'=', 1)
data_dict[field] = value
except ValueError:
raise ValueError(
u'Could not parse arg: %r (expected "<option>=<value>)"' % arg
)
# Required
if u'email' not in data_dict:
data_dict['email'] = click.prompt(u'Email address ').strip()
if u'password' not in data_dict:
data_dict['password'] = click.prompt(u'Password ', hide_input=True,
confirmation_prompt=True)
# Optional
if u'fullname' in data_dict:
data_dict['fullname'] = data_dict['fullname'].decode(
sys.getfilesystemencoding()
)
# pprint(u'Creating user: %r' % username)
try:
import ckan.logic as logic
import ckan.model as model
site_user = logic.get_action(u'get_site_user')({
u'model': model,
u'ignore_auth': True},
{}
)
context = {
u'model': model,
u'session': model.Session,
u'ignore_auth': True,
u'user': site_user['name'],
}
flask_app = ctx.meta['flask_app']
# Current user is tested agains sysadmin role during model
# dictization, thus we need request context
with flask_app.test_request_context():
user_dict = logic.get_action(u'user_create')(context, data_dict)
click.secho(u"Successfully created user: %s" % user_dict['name'],
fg=u'green', bold=True)
except logic.ValidationError as e:
error_shout(e)
raise click.Abort()
def get_user_str(user):
user_str = u'name=%s' % user.name
if user.name != user.display_name:
user_str += u' display=%s' % user.display_name
return user_str
@user.command(u'list', short_help=u'List all users')
def list_users():
import ckan.model as model
click.secho(u'Users:')
users = model.Session.query(model.User).filter_by(state=u'active')
click.secho(u'count = %i' % users.count())
for user in users:
click.secho(get_user_str(user))
@user.command(u'remove', short_help=u'Remove user')
@click.argument(u'username')
@click.pass_context
def remove_user(ctx, username):
import ckan.model as model
if not username:
error_shout(u'Please specify the username to be removed')
return
site_user = logic.get_action(u'get_site_user')({u'ignore_auth': True}, {})
context = {u'user': site_user[u'name']}
with ctx.meta['flask_app'].test_request_context():
plugin.toolkit.get_action(u'user_delete')(context, {u'id': username})
click.secho(u'Deleted user: %s' % username, fg=u'green', bold=True)
@user.command(u'show', short_help=u'Show user')
@click.argument(u'username')
def show_user(username):
import ckan.model as model
if not username:
error_shout(u'Please specify the username for the user')
return
user = model.User.get(text_type(username))
click.secho(u'User: %s' % user)
@user.command(u'setpass', short_help=u'Set password for the user')
@click.argument(u'username')
def set_password(username):
import ckan.model as model
if not username:
error_shout(u'Need name of the user.')
return
user = model.User.get(username)
if not user:
error_shout(u"User not found!")
return
click.secho(u'Editing user: %r' % user.name, fg=u'yellow')
password = click.prompt(u'Password', hide_input=True,
confirmation_prompt=True)
user.password = password
model.repo.commit_and_remove()
click.secho(u'Password updated!', fg=u'green', bold=True)