Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 123 lines (97 sloc) 5.239 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
import os

from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.db.backends.sqlite3.creation import DatabaseCreation


class SpatiaLiteCreation(DatabaseCreation):

    def create_test_db(self, verbosity=1, autoclobber=False):
        """
Creates a test database, prompting the user for confirmation if the
database already exists. Returns the name of the test database created.

This method is overloaded to load up the SpatiaLite initialization
SQL prior to calling the `migrate` command.
"""
        # Don't import django.core.management if it isn't needed.
        from django.core.management import call_command

        test_database_name = self._get_test_db_name()

        if verbosity >= 1:
            test_db_repr = ''
            if verbosity >= 2:
                test_db_repr = " ('%s')" % test_database_name
            print("Creating test database for alias '%s'%s..." % (self.connection.alias, test_db_repr))

        self._create_test_db(verbosity, autoclobber)

        self.connection.close()
        self.connection.settings_dict["NAME"] = test_database_name

        # Need to load the SpatiaLite initialization SQL before running `migrate`.
        self.load_spatialite_sql()

        # Report migrate messages at one level lower than that requested.
        # This ensures we don't get flooded with messages during testing
        # (unless you really ask to be flooded)
        call_command('migrate',
            verbosity=max(verbosity - 1, 0),
            interactive=False,
            database=self.connection.alias,
            load_initial_data=False)

        # We need to then do a flush to ensure that any data installed by
        # custom SQL has been removed. The only test data should come from
        # test fixtures, or autogenerated from post_migrate triggers.
        # This has the side effect of loading initial data (which was
        # intentionally skipped in the migrate).
        call_command('flush',
            verbosity=max(verbosity - 1, 0),
            interactive=False,
            database=self.connection.alias)

        call_command('createcachetable', database=self.connection.alias)

        # Ensure a connection for the side effect of initializing the test database.
        self.connection.ensure_connection()

        return test_database_name

    def sql_indexes_for_field(self, model, f, style):
        "Return any spatial index creation SQL for the field."
        from django.contrib.gis.db.models.fields import GeometryField

        output = super(SpatiaLiteCreation, self).sql_indexes_for_field(model, f, style)

        if isinstance(f, GeometryField):
            gqn = self.connection.ops.geo_quote_name
            db_table = model._meta.db_table

            output.append(style.SQL_KEYWORD('SELECT ') +
                          style.SQL_TABLE('AddGeometryColumn') + '(' +
                          style.SQL_TABLE(gqn(db_table)) + ', ' +
                          style.SQL_FIELD(gqn(f.column)) + ', ' +
                          style.SQL_FIELD(str(f.srid)) + ', ' +
                          style.SQL_COLTYPE(gqn(f.geom_type)) + ', ' +
                          style.SQL_KEYWORD(str(f.dim)) + ', ' +
                          style.SQL_KEYWORD(str(int(not f.null))) +
                          ');')

            if f.spatial_index:
                output.append(style.SQL_KEYWORD('SELECT ') +
                              style.SQL_TABLE('CreateSpatialIndex') + '(' +
                              style.SQL_TABLE(gqn(db_table)) + ', ' +
                              style.SQL_FIELD(gqn(f.column)) + ');')

        return output

    def load_spatialite_sql(self):
        """
This routine loads up the SpatiaLite SQL file.
"""
        if self.connection.ops.spatial_version[:2] >= (2, 4):
            # Spatialite >= 2.4 -- No need to load any SQL file, calling
            # InitSpatialMetaData() transparently creates the spatial metadata
            # tables
            cur = self.connection._cursor()
            cur.execute("SELECT InitSpatialMetaData()")
        else:
            # Spatialite < 2.4 -- Load the initial SQL

            # Getting the location of the SpatiaLite SQL file, and confirming
            # it exists.
            spatialite_sql = self.spatialite_init_file()
            if not os.path.isfile(spatialite_sql):
                raise ImproperlyConfigured('Could not find the required SpatiaLite initialization '
                                        'SQL file (necessary for testing): %s' % spatialite_sql)

            # Opening up the SpatiaLite SQL initialization file and executing
            # as a script.
            with open(spatialite_sql, 'r') as sql_fh:
                cur = self.connection._cursor()
                cur.executescript(sql_fh.read())

    def spatialite_init_file(self):
        # SPATIALITE_SQL may be placed in settings to tell GeoDjango
        # to use a specific path to the SpatiaLite initialization SQL.
        return getattr(settings, 'SPATIALITE_SQL',
                       'init_spatialite-%s.%s.sql' %
                       self.connection.ops.spatial_version[:2])
Something went wrong with that request. Please try again.