-
Notifications
You must be signed in to change notification settings - Fork 2k
/
db.py
134 lines (111 loc) · 3.95 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# encoding: utf-8
import os
import logging
import ckan.migration as migration_repo
import click
from itertools import groupby
from ckan.cli import error_shout
import ckan.model as model
log = logging.getLogger(__name__)
@click.group(name=u'db', short_help=u'Database commands')
def db():
pass
@db.command(u'init', short_help=u'Initialize the database')
def initdb():
u'''Initialising the database'''
log.info(u"Initialize the Database")
try:
import ckan.model as model
model.repo.init_db()
except Exception as e:
error_shout(e)
else:
click.secho(u'Initialising DB: SUCCESS', fg=u'green', bold=True)
PROMPT_MSG = u'This will delete all of your data!\nDo you want to continue?'
@db.command(u'clean', short_help=u'Clean the database')
@click.confirmation_option(prompt=PROMPT_MSG)
def cleandb():
u'''Cleaning the database'''
try:
import ckan.model as model
model.repo.clean_db()
except Exception as e:
error_shout(e)
else:
click.secho(u'Cleaning DB: SUCCESS', fg=u'green', bold=True)
@db.command(u'upgrade', short_help=u'Upgrade the database')
@click.option(u'-v', u'--version', help=u'Migration version', default=u'head')
def updatedb(version):
u'''Upgrading the database'''
try:
import ckan.model as model
model.repo.upgrade_db(version)
except Exception as e:
error_shout(e)
else:
click.secho(u'Upgrading DB: SUCCESS', fg=u'green', bold=True)
@db.command(u'downgrade', short_help=u'Downgrade the database')
@click.option(u'-v', u'--version', help=u'Migration version', default=u'base')
def downgradedb(version):
u'''Downgrading the database'''
try:
import ckan.model as model
model.repo.downgrade_db(version)
except Exception as e:
error_shout(e)
else:
click.secho(u'Downgrading DB: SUCCESS', fg=u'green', bold=True)
@db.command(u'version', short_help=u'Returns current version of data schema')
@click.option(u'--hash', is_flag=True)
def version(hash):
u'''Return current version'''
log.info(u"Returning current DB version")
import ckan.model as model
model.repo.setup_migration_version_control()
current = model.repo.current_version()
if not hash:
current = _version_hash_to_ordinal(current)
click.secho(
u'Current DB version: {}'.format(current),
fg=u'green', bold=True
)
@db.command(u'duplicate_emails', short_help=u'Check users email for duplicate')
def duplicate_emails():
u'''Check users email for duplicate'''
log.info(u"Searching for accounts with duplicate emails.")
q = model.Session.query(model.User.email,
model.User.name) \
.filter(model.User.state == u"active") \
.filter(model.User.email != u"") \
.order_by(model.User.email)
if q.all():
try:
for k, grp in groupby(q.all(), lambda x: x[0]):
users = [user[1] for user in grp]
if len(users) > 1:
s = u'{} appears {} time(s). Users: {}'
click.secho(s.format(k, len(users), u', '.join(users)))
except Exception as e:
error_shout(e)
else:
click.secho(u'Duplicate email search: SUCCESS',
fg=u'green', bold=True)
else:
click.secho(u'Duplicate email search: NOT FOUND',
fg=u'green', bold=True)
def _version_hash_to_ordinal(version):
if u'base' == version:
return 0
versions_dir = os.path.join(
os.path.dirname(migration_repo.__file__), u'versions'
)
versions = sorted(os.listdir(versions_dir))
# latest version looks like `123abc (head)`
if version.endswith(u'(head)'):
return int(versions[-1].split(u'_')[0])
for name in versions:
if version in name:
return int(name.split(u'_')[0])
error_shout(u'Version `{}` was not found in {}'.format(
version, versions_dir
))