-
Notifications
You must be signed in to change notification settings - Fork 25
/
update_unique_null.py
161 lines (137 loc) · 6.09 KB
/
update_unique_null.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# update_unique_null.py - adapt composite unique constraints and nullables
from __future__ import unicode_literals
# revision identifiers, used by Alembic.
revision = ''
down_revision = ''
# INSTRUCTIONS:
# - cd into the clld app directory
# - $ alembic revision -m "update assoc tables"
# - edit migrations/versions/<...>_update_assoc_tables.py
# - replace the content after 'down_revision = <...>' with the content below
# - $ alembic upgrade head
# - check the output of your "update assoc tables" migration (will end with a RuntimeError)
# - if the changes are as intended, change 'dry=False' to 'dry=True' below
# - run alembic upgrade head once again to apply them
from alembic import op
import sqlalchemy as sa
UNIQUE_NULL = [
('contributioncontributor',
['contribution_pk', 'contributor_pk'], []),
('contributionreference',
['contribution_pk', 'source_pk', 'description'],
['description']),
('domainelement',
['parameter_pk', 'name'],
['name']),
('domainelement',
['parameter_pk', 'number'],
['number']),
('editor',
['dataset_pk', 'contributor_pk'], []),
('languageidentifier',
['language_pk', 'identifier_pk'], []),
('languagesource',
['language_pk', 'source_pk'], []),
('sentencereference',
['sentence_pk', 'source_pk', 'description'],
['description']),
('unit',
['language_pk', 'id'], # FIXME: or rather name?
['id']),
('unitdomainelement',
['unitparameter_pk', 'name'],
['name']),
('unitdomainelement',
['unitparameter_pk', 'ord'],
['ord']),
('unitvalue', # NOTE: <unit, unitparameter, contribution> can have multiple values and also multiple unitdomainelements
['unit_pk', 'unitparameter_pk', 'contribution_pk', 'name', 'unitdomainelement_pk'],
['contribution_pk', 'name', 'unitdomainelement_pk']),
('value', # NOTE: <language, parameter, contribution> can have multiple values and also multiple domainelements
['valueset_pk', 'name', 'domainelement_pk'],
['name', 'domainelement_pk']),
('valuesentence',
['value_pk', 'sentence_pk'], []),
('valueset',
['language_pk', 'parameter_pk', 'contribution_pk'],
['contribution_pk']),
('valuesetreference',
['valueset_pk', 'source_pk', 'description'],
['description']),
]
def upgrade(dry=True, verbose=True):
conn = op.get_bind()
assert conn.dialect.name == 'postgresql'
def delete_null_duplicates(tablename, columns, notnull, returning=sa.text('*')):
assert columns
table = sa.table(tablename, *map(sa.column, ['pk'] + columns))
any_null = sa.or_(table.c[n] == sa.null() for n in notnull)
yield table.delete(bind=conn).where(any_null).returning(returning)
other = table.alias()
yield table.delete(bind=conn).where(~any_null).returning(returning)\
.where(sa.exists()
.where(sa.and_(table.c[c] == other.c[c] for c in columns))
.where(table.c.pk > other.c.pk))
def print_rows(rows, verbose=verbose):
if not verbose:
return
for r in rows:
print(' %r' % dict(r))
class regclass(sa.types.UserDefinedType):
def get_col_spec(self):
return 'regclass'
pga = sa.table('pg_attribute', *map(sa.column, ['attrelid', 'attname', 'attnum', 'attnotnull']))
select_nullable = sa.select([pga.c.attname], bind=conn)\
.where(pga.c.attrelid == sa.cast(sa.bindparam('table'), regclass))\
.where(pga.c.attname == sa.func.any(sa.bindparam('notnull')))\
.where(~pga.c.attnotnull)\
.order_by(pga.c.attnum)
pgco = sa.table('pg_constraint', *map(sa.column,
['oid', 'conname', 'contype', 'conrelid', 'conkey']))
sq = sa.select([
pgco.c.conname.label('name'),
sa.func.pg_get_constraintdef(pgco.c.oid).label('definition'),
sa.func.array(
sa.select([sa.cast(pga.c.attname, sa.Text)])
.where(pga.c.attrelid == pgco.c.conrelid)
.where(pga.c.attnum == sa.func.any(pgco.c.conkey))
.as_scalar()).label('names'),
]).where(pgco.c.contype == 'u')\
.where(pgco.c.conrelid == sa.cast(sa.bindparam('table'), regclass))\
.alias()
select_const = sa.select([sq.c.name, sq.c.definition], bind=conn)\
.where(sq.c.names.op('@>')(sa.bindparam('cols')))\
.where(sq.c.names.op('<@')(sa.bindparam('cols')))
for table, unique, null in UNIQUE_NULL:
print(table)
notnull = [u for u in unique if u not in null]
delete_null, delete_duplicates = delete_null_duplicates(table, unique, notnull)
nulls = delete_null.execute().fetchall()
if nulls:
print('%s delete %d row(s) violating NOT NULL(%s)' % (table, len(nulls), ', '.join(notnull)))
print_rows(nulls)
duplicates = delete_duplicates.execute().fetchall()
if duplicates:
print('%s delete %d row(s) violating UNIQUE(%s)' % (table, len(duplicates), ', '.join(unique)))
print_rows(duplicates)
for col, in select_nullable.execute(table=table, notnull=notnull):
print('%s alter column %s NOT NULL' % (table, col))
op.alter_column(table, col, nullable=False)
constraint = 'UNIQUE (%s)' % ', '.join(unique)
matching = select_const.execute(table=table, cols=unique).fetchall()
if matching:
assert len(matching) == 1
(name, definition), = matching
if definition == constraint:
print('%s keep constraint %s %s\n' % (table, name, definition))
continue
print('%s drop constraint %s %s' % (table, name, definition))
op.drop_constraint(name, table)
name = '%s_%s_key' % (table, '_'.join(unique))
print('%s create constraint %s %s' % (table, name, constraint))
op.create_unique_constraint(name, table, unique)
print('')
if dry:
raise RuntimeError('set dry=False to apply these changes')
def downgrade():
pass