Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made get_indexes() consistent across backends. #25

Merged
merged 1 commit into from Apr 30, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions django/db/backends/__init__.py
Expand Up @@ -997,6 +997,17 @@ def get_primary_key_column(self, cursor, table_name):
"""
raise NotImplementedError

def get_indexes(self, cursor, table_name):
"""
Returns a dictionary of indexed fieldname -> infodict for the given
table, where each infodict is in the format:
{'primary_key': boolean representing whether it's the primary key,
'unique': boolean representing whether it's a unique index}

Only single-column indexes are introspected.
"""
raise NotImplementedError

class BaseDatabaseClient(object):
"""
This class encapsulates all backend-specific methods for opening a
Expand Down
18 changes: 11 additions & 7 deletions django/db/backends/mysql/introspection.py
Expand Up @@ -85,15 +85,19 @@ def get_primary_key_column(self, cursor, table_name):
return None

def get_indexes(self, cursor, table_name):
"""
Returns a dictionary of fieldname -> infodict for the given table,
where each infodict is in the format:
{'primary_key': boolean representing whether it's the primary key,
'unique': boolean representing whether it's a unique index}
"""
cursor.execute("SHOW INDEX FROM %s" % self.connection.ops.quote_name(table_name))
# Do a two-pass search for indexes: on first pass check which indexes
# are multicolumn, on second pass check which single-column indexes
# are present.
rows = list(cursor.fetchall())
multicol_indexes = set()
for row in rows:
if row[3] > 1:
multicol_indexes.add(row[2])
indexes = {}
for row in cursor.fetchall():
for row in rows:
if row[2] in multicol_indexes:
continue
indexes[row[4]] = {'primary_key': (row[2] == 'PRIMARY'), 'unique': not bool(row[1])}
return indexes

57 changes: 26 additions & 31 deletions django/db/backends/oracle/introspection.py
Expand Up @@ -72,51 +72,46 @@ def get_relations(self, cursor, table_name):
FROM user_constraints, USER_CONS_COLUMNS ca, USER_CONS_COLUMNS cb,
user_tab_cols ta, user_tab_cols tb
WHERE user_constraints.table_name = %s AND
ta.table_name = %s AND
ta.table_name = user_constraints.table_name AND
ta.column_name = ca.column_name AND
ca.table_name = %s AND
ca.table_name = ta.table_name AND
user_constraints.constraint_name = ca.constraint_name AND
user_constraints.r_constraint_name = cb.constraint_name AND
cb.table_name = tb.table_name AND
cb.column_name = tb.column_name AND
ca.position = cb.position""", [table_name, table_name, table_name])
ca.position = cb.position""", [table_name])

relations = {}
for row in cursor.fetchall():
relations[row[0]] = (row[2], row[1].lower())
return relations

def get_indexes(self, cursor, table_name):
sql = """
SELECT LOWER(uic1.column_name) AS column_name,
CASE user_constraints.constraint_type
WHEN 'P' THEN 1 ELSE 0
END AS is_primary_key,
CASE user_indexes.uniqueness
WHEN 'UNIQUE' THEN 1 ELSE 0
END AS is_unique
FROM user_constraints, user_indexes, user_ind_columns uic1
WHERE user_constraints.constraint_type (+) = 'P'
AND user_constraints.index_name (+) = uic1.index_name
AND user_indexes.uniqueness (+) = 'UNIQUE'
AND user_indexes.index_name (+) = uic1.index_name
AND uic1.table_name = UPPER(%s)
AND uic1.column_position = 1
AND NOT EXISTS (
SELECT 1
FROM user_ind_columns uic2
WHERE uic2.index_name = uic1.index_name
AND uic2.column_position = 2
)
"""
Returns a dictionary of fieldname -> infodict for the given table,
where each infodict is in the format:
{'primary_key': boolean representing whether it's the primary key,
'unique': boolean representing whether it's a unique index}
"""
# This query retrieves each index on the given table, including the
# first associated field name
# "We were in the nick of time; you were in great peril!"
sql = """\
SELECT LOWER(all_tab_cols.column_name) AS column_name,
CASE user_constraints.constraint_type
WHEN 'P' THEN 1 ELSE 0
END AS is_primary_key,
CASE user_indexes.uniqueness
WHEN 'UNIQUE' THEN 1 ELSE 0
END AS is_unique
FROM all_tab_cols, user_cons_columns, user_constraints, user_ind_columns, user_indexes
WHERE all_tab_cols.column_name = user_cons_columns.column_name (+)
AND all_tab_cols.table_name = user_cons_columns.table_name (+)
AND user_cons_columns.constraint_name = user_constraints.constraint_name (+)
AND user_constraints.constraint_type (+) = 'P'
AND user_ind_columns.column_name (+) = all_tab_cols.column_name
AND user_ind_columns.table_name (+) = all_tab_cols.table_name
AND user_indexes.uniqueness (+) = 'UNIQUE'
AND user_indexes.index_name (+) = user_ind_columns.index_name
AND all_tab_cols.table_name = UPPER(%s)
"""
cursor.execute(sql, [table_name])
indexes = {}
for row in cursor.fetchall():
indexes[row[0]] = {'primary_key': row[1], 'unique': row[2]}
indexes[row[0]] = {'primary_key': bool(row[1]),
'unique': bool(row[2])}
return indexes
6 changes: 0 additions & 6 deletions django/db/backends/postgresql_psycopg2/introspection.py
Expand Up @@ -65,12 +65,6 @@ def get_relations(self, cursor, table_name):
return relations

def get_indexes(self, cursor, table_name):
"""
Returns a dictionary of fieldname -> infodict for the given table,
where each infodict is in the format:
{'primary_key': boolean representing whether it's the primary key,
'unique': boolean representing whether it's a unique index}
"""
# This query retrieves each index on the given table, including the
# first associated field name
cursor.execute("""
Expand Down
16 changes: 5 additions & 11 deletions django/db/backends/sqlite3/introspection.py
Expand Up @@ -133,28 +133,22 @@ def get_key_columns(self, cursor, table_name):
return key_columns

def get_indexes(self, cursor, table_name):
"""
Returns a dictionary of fieldname -> infodict for the given table,
where each infodict is in the format:
{'primary_key': boolean representing whether it's the primary key,
'unique': boolean representing whether it's a unique index}
"""
indexes = {}
for info in self._table_info(cursor, table_name):
indexes[info['name']] = {'primary_key': info['pk'] != 0,
'unique': False}
if info['pk'] != 0:
indexes[info['name']] = {'primary_key': True,
'unique': False}
cursor.execute('PRAGMA index_list(%s)' % self.connection.ops.quote_name(table_name))
# seq, name, unique
for index, unique in [(field[1], field[2]) for field in cursor.fetchall()]:
if not unique:
continue
cursor.execute('PRAGMA index_info(%s)' % self.connection.ops.quote_name(index))
info = cursor.fetchall()
# Skip indexes across multiple fields
if len(info) != 1:
continue
name = info[0][2] # seqno, cid, name
indexes[name]['unique'] = True
indexes[name] = {'primary_key': False,
'unique': unique}
return indexes

def get_primary_key_column(self, cursor, table_name):
Expand Down
5 changes: 4 additions & 1 deletion tests/regressiontests/introspection/models.py
Expand Up @@ -7,6 +7,9 @@ class Reporter(models.Model):
email = models.EmailField()
facebook_user_id = models.BigIntegerField(null=True)

class Meta:
unique_together = ('first_name', 'last_name')

def __unicode__(self):
return u"%s %s" % (self.first_name, self.last_name)

Expand All @@ -19,4 +22,4 @@ def __unicode__(self):
return self.headline

class Meta:
ordering = ('headline',)
ordering = ('headline',)
9 changes: 9 additions & 0 deletions tests/regressiontests/introspection/tests.py
Expand Up @@ -137,6 +137,15 @@ def test_get_indexes(self):
indexes = connection.introspection.get_indexes(cursor, Article._meta.db_table)
self.assertEqual(indexes['reporter_id'], {'unique': False, 'primary_key': False})

def test_get_indexes_multicol(self):
"""
Test that multicolumn indexes are not included in the introspection
results.
"""
cursor = connection.cursor()
indexes = connection.introspection.get_indexes(cursor, Reporter._meta.db_table)
self.assertNotIn('first_name', indexes)
self.assertIn('id', indexes)

def datatype(dbtype, description):
"""Helper to convert a data type into a string."""
Expand Down