Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Fixed up the introspection code and tests added for #9779 to run unde…

…r Python 2.3, which has neither set nor rsplit.

git-svn-id: http://code.djangoproject.com/svn/django/trunk@10392 bcc190cf-cafb-0310-a4f2-bffc1f526a37
  • Loading branch information...
commit 48e01d2b3db28be153feae7f3cecb09941a4e41b 1 parent bb31cf3
@kmtracey kmtracey authored
View
72 django/db/backends/sqlite3/introspection.py
@@ -56,54 +56,54 @@ def get_table_description(self, cursor, table_name):
info['null_ok']) for info in self._table_info(cursor, table_name)]
def get_relations(self, cursor, table_name):
- """
- Returns a dictionary of {field_index: (field_index_other_table, other_table)}
- representing all relationships to the given table. Indexes are 0-based.
- """
-
- # Dictionary of relations to return
- relations = {}
-
- # Schema for this table
- cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s", [table_name])
- results = cursor.fetchone()[0].strip()
- results = results.split('(', 1)[1]
- results = results.rsplit(')', 1)[0]
-
+ """
+ Returns a dictionary of {field_index: (field_index_other_table, other_table)}
+ representing all relationships to the given table. Indexes are 0-based.
+ """
+
+ # Dictionary of relations to return
+ relations = {}
+
+ # Schema for this table
+ cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s", [table_name])
+ results = cursor.fetchone()[0].strip()
+ results = results[results.index('(')+1:results.rindex(')')]
+
# Walk through and look for references to other tables. SQLite doesn't
# really have enforced references, but since it echoes out the SQL used
# to create the table we can look for REFERENCES statements used there.
- for field_index, field_desc in enumerate(results.split(',')):
- field_desc = field_desc.strip()
- if field_desc.startswith("UNIQUE"):
- continue
-
+ for field_index, field_desc in enumerate(results.split(',')):
+ field_desc = field_desc.strip()
+ if field_desc.startswith("UNIQUE"):
+ continue
+
m = re.search('references (.*) \(["|](.*)["|]\)', field_desc, re.I)
- if not m:
- continue
-
+ if not m:
+ continue
+
table, column = [s.strip('"') for s in m.groups()]
-
+
cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s", [table])
result = cursor.fetchone()
if not result:
continue
- other_table_results = result[0].strip()
- other_table_results = other_table_results.split('(', 1)[1]
- other_table_results = other_table_results.rsplit(')', 1)[0]
-
- for other_index, other_desc in enumerate(other_table_results.split(',')):
- other_desc = other_desc.strip()
- if other_desc.startswith('UNIQUE'):
- continue
-
- name = other_desc.split(' ', 1)[0].strip('"')
- if name == column:
+ other_table_results = result[0].strip()
+ li, ri = other_table_results.index('('), other_table_results.rindex(')')
+ other_table_results = other_table_results[li+1:ri]
+
+
+ for other_index, other_desc in enumerate(other_table_results.split(',')):
+ other_desc = other_desc.strip()
+ if other_desc.startswith('UNIQUE'):
+ continue
+
+ name = other_desc.split(' ', 1)[0].strip('"')
+ if name == column:
relations[field_index] = (other_index, table)
break
-
+
return relations
-
+
def get_indexes(self, cursor, table_name):
"""
Returns a dictionary of fieldname -> infodict for the given table,
View
35 tests/regressiontests/introspection/tests.py
@@ -5,13 +5,18 @@
from models import Reporter, Article
+try:
+ set
+except NameError:
+ from sets import Set as set # Python 2.3 fallback
+
#
# The introspection module is optional, so methods tested here might raise
# NotImplementedError. This is perfectly acceptable behavior for the backend
# in question, but the tests need to handle this without failing. Ideally we'd
# skip these tests, but until #4788 is done we'll just ignore them.
#
-# The easiest way to accomplish this is to decorate every test case with a
+# The easiest way to accomplish this is to decorate every test case with a
# wrapper that ignores the exception.
#
# The metaclass is just for fun.
@@ -35,14 +40,14 @@ def __new__(cls, name, bases, attrs):
class IntrospectionTests(TestCase):
__metaclass__ = IgnoreNotimplementedError
-
+
def test_table_names(self):
tl = connection.introspection.table_names()
- self.assert_(Reporter._meta.db_table in tl,
+ self.assert_(Reporter._meta.db_table in tl,
"'%s' isn't in table_list()." % Reporter._meta.db_table)
- self.assert_(Article._meta.db_table in tl,
+ self.assert_(Article._meta.db_table in tl,
"'%s' isn't in table_list()." % Article._meta.db_table)
-
+
def test_django_table_names(self):
cursor = connection.cursor()
cursor.execute('CREATE TABLE django_introspection_test_table (id INTEGER);');
@@ -50,26 +55,26 @@ def test_django_table_names(self):
cursor.execute("DROP TABLE django_introspection_test_table;")
self.assert_('django_introspection_testcase_table' not in tl,
"django_table_names() returned a non-Django table")
-
+
def test_installed_models(self):
tables = [Article._meta.db_table, Reporter._meta.db_table]
models = connection.introspection.installed_models(tables)
self.assertEqual(models, set([Article, Reporter]))
-
+
def test_sequence_list(self):
sequences = connection.introspection.sequence_list()
expected = {'table': Reporter._meta.db_table, 'column': 'id'}
- self.assert_(expected in sequences,
+ self.assert_(expected in sequences,
'Reporter sequence not found in sequence_list()')
-
+
def test_get_table_description_names(self):
cursor = connection.cursor()
desc = connection.introspection.get_table_description(cursor, Reporter._meta.db_table)
self.assertEqual([r[0] for r in desc],
[f.column for f in Reporter._meta.fields])
-
+
def test_get_table_description_types(self):
- cursor = connection.cursor()
+ cursor = connection.cursor()
desc = connection.introspection.get_table_description(cursor, Reporter._meta.db_table)
self.assertEqual([datatype(r[1]) for r in desc],
['IntegerField', 'CharField', 'CharField', 'CharField'])
@@ -78,7 +83,7 @@ def test_get_table_description_types(self):
if settings.DATABASE_ENGINE.startswith('postgresql'):
def test_postgresql_real_type(self):
cursor = connection.cursor()
- cursor.execute("CREATE TABLE django_introspection_real_test_table (number REAL);")
+ cursor.execute("CREATE TABLE django_introspection_real_test_table (number REAL);")
desc = connection.introspection.get_table_description(cursor, 'django_introspection_real_test_table')
cursor.execute('DROP TABLE django_introspection_real_test_table;')
self.assertEqual(datatype(desc[0][1]), 'FloatField')
@@ -86,19 +91,19 @@ def test_postgresql_real_type(self):
def test_get_relations(self):
cursor = connection.cursor()
relations = connection.introspection.get_relations(cursor, Article._meta.db_table)
-
+
# Older versions of MySQL don't have the chops to report on this stuff,
# so just skip it if no relations come back. If they do, though, we
# should test that the response is correct.
if relations:
# That's {field_index: (field_index_other_table, other_table)}
self.assertEqual(relations, {3: (0, Reporter._meta.db_table)})
-
+
def test_get_indexes(self):
cursor = connection.cursor()
indexes = connection.introspection.get_indexes(cursor, Article._meta.db_table)
self.assertEqual(indexes['reporter_id'], {'unique': False, 'primary_key': False})
-
+
def datatype(dbtype):
"""Helper to convert a data type into a string."""
dt = connection.introspection.data_types_reverse[dbtype]
Please sign in to comment.
Something went wrong with that request. Please try again.