forked from django/django
-
Notifications
You must be signed in to change notification settings - Fork 2
/
schema.py
131 lines (117 loc) · 6.06 KB
/
schema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import psycopg2
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s USING %(column)s::%(type)s"
sql_create_sequence = "CREATE SEQUENCE %(sequence)s"
sql_delete_sequence = "DROP SEQUENCE IF EXISTS %(sequence)s CASCADE"
sql_set_sequence_max = "SELECT setval('%(sequence)s', MAX(%(column)s)) FROM %(table)s"
sql_create_index = "CREATE INDEX %(name)s ON %(table)s%(using)s (%(columns)s)%(extra)s"
sql_create_varchar_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s varchar_pattern_ops)%(extra)s"
sql_create_text_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s text_pattern_ops)%(extra)s"
sql_delete_index = "DROP INDEX IF EXISTS %(name)s"
# Setting the constraint to IMMEDIATE runs any deferred checks to allow
# dropping it in the same transaction.
sql_delete_fk = "SET CONSTRAINTS %(name)s IMMEDIATE; ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
def quote_value(self, value):
return psycopg2.extensions.adapt(value)
def _field_indexes_sql(self, model, field):
output = super()._field_indexes_sql(model, field)
like_index_statement = self._create_like_index_sql(model, field)
if like_index_statement is not None:
output.append(like_index_statement)
return output
def _create_like_index_sql(self, model, field):
"""
Return the statement to create an index with varchar operator pattern
when the column type is 'varchar' or 'text', otherwise return None.
"""
db_type = field.db_type(connection=self.connection)
if db_type is not None and (field.db_index or field.unique):
# Fields with database column types of `varchar` and `text` need
# a second index that specifies their operator class, which is
# needed when performing correct LIKE queries outside the
# C locale. See #12234.
#
# The same doesn't apply to array fields such as varchar[size]
# and text[size], so skip them.
if '[' in db_type:
return None
if db_type.startswith('varchar'):
return self._create_index_sql(model, [field], suffix='_like', sql=self.sql_create_varchar_index)
elif db_type.startswith('text'):
return self._create_index_sql(model, [field], suffix='_like', sql=self.sql_create_text_index)
return None
def _alter_column_type_sql(self, model, old_field, new_field, new_type):
"""Make ALTER TYPE with SERIAL make sense."""
table = model._meta.db_table
if new_type.lower() in ("serial", "bigserial"):
column = new_field.column
sequence_name = "%s_%s_seq" % (table, column)
col_type = "integer" if new_type.lower() == "serial" else "bigint"
return (
(
self.sql_alter_column_type % {
"column": self.quote_name(column),
"type": col_type,
},
[],
),
[
(
self.sql_delete_sequence % {
"sequence": self.quote_name(sequence_name),
},
[],
),
(
self.sql_create_sequence % {
"sequence": self.quote_name(sequence_name),
},
[],
),
(
self.sql_alter_column % {
"table": self.quote_name(table),
"changes": self.sql_alter_column_default % {
"column": self.quote_name(column),
"default": "nextval('%s')" % self.quote_name(sequence_name),
}
},
[],
),
(
self.sql_set_sequence_max % {
"table": self.quote_name(table),
"column": self.quote_name(column),
"sequence": self.quote_name(sequence_name),
},
[],
),
],
)
else:
return super()._alter_column_type_sql(model, old_field, new_field, new_type)
def _alter_field(self, model, old_field, new_field, old_type, new_type,
old_db_params, new_db_params, strict=False):
# Drop indexes on varchar/text columns that are changing to a different
# type.
if (old_field.db_index or old_field.unique) and (
(old_type.startswith('varchar') and not new_type.startswith('varchar')) or
(old_type.startswith('text') and not new_type.startswith('text'))
):
index_name = self._create_index_name(model, [old_field.column], suffix='_like')
self.execute(self._delete_constraint_sql(self.sql_delete_index, model, index_name))
super()._alter_field(
model, old_field, new_field, old_type, new_type, old_db_params,
new_db_params, strict,
)
# Added an index? Create any PostgreSQL-specific indexes.
if ((not (old_field.db_index or old_field.unique) and new_field.db_index) or
(not old_field.unique and new_field.unique)):
like_index_statement = self._create_like_index_sql(model, new_field)
if like_index_statement is not None:
self.execute(like_index_statement)
# Removed an index? Drop any PostgreSQL-specific indexes.
if old_field.unique and not (new_field.db_index or new_field.unique):
index_to_remove = self._create_index_name(model, [old_field.column], suffix='_like')
self.execute(self._delete_constraint_sql(self.sql_delete_index, model, index_to_remove))