Permalink
Browse files

Proof of concept

  • Loading branch information...
1 parent 1c5396d commit 8990e20df50ce110fe6ddbbdfed7a98987bb5835 @akaariai committed Nov 4, 2011
@@ -280,6 +280,12 @@ def cursor(self):
cursor = util.CursorWrapper(self._cursor(), self)
return cursor
+ def chunked_cursor(self):
+ """
+ Returns a cursor which will try to avoid caching in the backend.
+ """
+ return self.cursor()
+
def make_debug_cursor(self, cursor):
return util.CursorDebugWrapper(cursor, self)
@@ -298,7 +304,7 @@ class BaseDatabaseFeatures(object):
# constraint exists, but one of the unique_together columns is NULL?
ignores_nulls_in_unique_constraints = True
- can_use_chunked_reads = True
+ has_safe_chunked_reads = True
can_return_id_from_insert = False
has_bulk_insert = False
uses_autocommit = False
@@ -353,6 +359,7 @@ class BaseDatabaseFeatures(object):
requires_explicit_null_ordering_when_grouping = False
# Is there a 1000 item limit on query parameters?
+ # TODO: maybe it would be better to have query_parameter_limit = N...
supports_1000_query_parameters = True
# Can an object have a primary key of 0? MySQL says No.
@@ -109,6 +109,7 @@ def __init__(self, *args, **kwargs):
self.introspection = DatabaseIntrospection(self)
self.validation = BaseDatabaseValidation(self)
self._pg_version = None
+ self._named_cursor_idx = 0
def check_constraints(self, table_names=None):
"""
@@ -142,7 +143,7 @@ def _get_pg_version(self):
return self._pg_version
pg_version = property(_get_pg_version)
- def _cursor(self):
+ def _cursor(self, name=None):
new_connection = False
set_tz = False
settings_dict = self.settings_dict
@@ -170,14 +171,23 @@ def _cursor(self):
self.connection.set_client_encoding('UTF8')
self.connection.set_isolation_level(self.isolation_level)
connection_created.send(sender=self.__class__, connection=self)
- cursor = self.connection.cursor()
+ if name:
+ cursor = self.connection.cursor(name)
+ else:
+ cursor = self.connection.cursor()
cursor.tzinfo_factory = None
if new_connection:
if set_tz:
- cursor.execute("SET TIME ZONE %s", [settings_dict['TIME_ZONE']])
+ # Note that we can't use the above cursor if it happens to be
+ # a named cursor - using a named cursor for SET is not valid.
+ self.cursor().execute("SET TIME ZONE %s", [settings_dict['TIME_ZONE']])
self._get_pg_version()
return CursorWrapper(cursor)
+ def chunked_cursor(self):
+ self._named_cursor_idx += 1
+ return self._cursor(name='_django_curs_%d' % self._named_cursor_idx)
+
def _enter_transaction_management(self, managed):
"""
Switch the isolation level when needing transaction support, so that
@@ -53,7 +53,7 @@ class DatabaseFeatures(BaseDatabaseFeatures):
# and then writing the same rows to the database in another cursor. This
# setting ensures we always read result sets fully into memory all in one
# go.
- can_use_chunked_reads = False
+ has_safe_chunked_reads = False
test_db_allows_multiple_connections = False
supports_unspecified_pk = True
supports_1000_query_parameters = False
View
@@ -312,6 +312,18 @@ def iterator(self):
setattr(obj, aggregate, row[i+aggregate_start])
yield obj
+
+ def chunked(self):
+ """
+ Returns an iterator which will try to avoid all possible caching in
+ both the query class and in the db backend. This can mean for example
+ using named cursors when using PostgreSQL.
+ """
+ assert not self._prefetch_related_lookups, \
+ "Prefetch related will no-worky with chunked"
+ clone = self._clone()
+ clone.query.chunked_fetch = True
+ return clone.iterator()
def aggregate(self, *args, **kwargs):
"""
@@ -751,8 +751,10 @@ def execute_sql(self, result_type=MULTI):
return empty_iter()
else:
return
-
- cursor = self.connection.cursor()
+ if self.query.chunked_fetch:
+ cursor = self.connection.chunked_cursor()
+ else:
+ cursor = self.connection.cursor()
cursor.execute(sql, params)
if not result_type:
@@ -769,10 +771,12 @@ def execute_sql(self, result_type=MULTI):
else:
result = iter((lambda: cursor.fetchmany(GET_ITERATOR_CHUNK_SIZE)),
self.connection.features.empty_fetchmany_value)
- if not self.connection.features.can_use_chunked_reads:
+ if (not self.connection.features.has_safe_chunked_reads
+ and not self.query.chunked_fetch):
# If we are using non-chunked reads, we return the same data
# structure as normally, but ensure it is all read into memory
- # before going any further.
+ # before going any further. If the user requests chunked_fetch
+ # we will honor that.
return list(result)
return result
@@ -66,7 +66,7 @@ def __iter__(self):
# Always execute a new query for a new iterator.
# This could be optimized with a cache at the expense of RAM.
self._execute_query()
- if not connections[self.using].features.can_use_chunked_reads:
+ if not connections[self.using].features.has_safe_chunked_reads:
# If the database can't use chunked reads we need to make sure we
# evaluate the entire query up front.
result = list(self.cursor)
@@ -153,6 +153,7 @@ def __init__(self, model, where=WhereNode):
# are the fields to defer, or False if these are the only fields to
# load.
self.deferred_loading = (set(), True)
+ self.chunked_fetch = False
def __str__(self):
"""
@@ -297,6 +298,7 @@ def clone(self, klass=None, memo=None, **kwargs):
else:
obj.used_aliases = set()
obj.filter_is_sticky = False
+ obj.chunked_fetch = self.chunked_fetch
obj.__dict__.update(kwargs)
if hasattr(obj, '_setup_query'):
obj._setup_query()

0 comments on commit 8990e20

Please sign in to comment.