Permalink
Switch branches/tags
Nothing to show
Find file
Fetching contributors…
Cannot retrieve contributors at this time
680 lines (582 sloc) 25.7 KB
"""
Models and managers for generic tagging.
"""
# Python 2.3 compatibility
from django.template.defaultfilters import slugify
from django.contrib.auth.models import User
from django.core.urlresolvers import reverse
from django.utils.safestring import mark_safe
try:
set
except NameError:
from sets import Set as set
from django.contrib.contenttypes import generic
from django.contrib.contenttypes.models import ContentType
from django.db import connection, models
from django.db.models.query import QuerySet
from django.utils.translation import ugettext_lazy as _
from tagging import settings
from tagging.utils import calculate_cloud, get_tag_list, get_queryset_and_model, parse_tag_input
from tagging.utils import LOGARITHMIC
qn = connection.ops.quote_name
############
# Managers #
############
class TagManager(models.Manager):
def update_tags(self, obj, tag_names, user=None):
"""
Update tags associated with an object.
user - the user adding the new tags
"""
ctype = ContentType.objects.get_for_model(obj)
current_tags = list(self.filter(items__content_type__pk=ctype.pk,
items__object_id=obj.pk))
updated_tag_names = parse_tag_input(tag_names)
if settings.FORCE_LOWERCASE_TAGS:
updated_tag_names = [t.lower() for t in updated_tag_names]
updated_tag_name_slugs = [slugify(t) for t in updated_tag_names]
update_tag_name_dict = dict(zip(updated_tag_name_slugs, updated_tag_names))
# Remove tags which no longer apply
tags_for_removal = [tag for tag in current_tags \
if tag.slug not in updated_tag_name_slugs]
if len(tags_for_removal):
TaggedItem._default_manager.filter(content_type__pk=ctype.pk,
object_id=obj.pk,
tag__in=tags_for_removal).delete()
# Add new tags
current_tag_names = [tag.name for tag in current_tags]
current_tag_name_slugs = [tag.slug for tag in current_tags]
for tag_slug in updated_tag_name_slugs:
if tag_slug not in current_tag_name_slugs:
tag_name = update_tag_name_dict[tag_slug]
tag, created = self.get_or_create(slug=tag_slug, defaults=dict(name=tag_name))
TaggedItem._default_manager.create(tag=tag, object=obj, user=user)
def get_or_create_by_tag_names(self, tag_names):
tag_names = parse_tag_input(tag_names)
tag_slugs = map(slugify, tag_names)
tags = dict((t.slug, t) for t in self.filter(slug__in=tag_slugs))
for tag_slug, tag_name in zip(tag_slugs, tag_names):
if tag_slug not in tags:
defaults = dict(name=tag_name)
tag = self.get_or_create(slug=tag_slug, defaults=defaults)[0]
tags[tag_slug] = tag
return tags.values()
def add_tag(self, obj, tag_name, user=None):
"""
Associates the given object with a tag.
"""
tag_names = parse_tag_input(tag_name)
if not len(tag_names):
raise AttributeError(_('No tags were given: "%s".') % tag_name)
if len(tag_names) > 1:
raise AttributeError(_('Multiple tags were given: "%s".') % tag_name)
tag_name = tag_names[0]
tag_slug = slugify(tag_name)
if settings.FORCE_LOWERCASE_TAGS:
tag_name = tag_name.lower()
tag, created = self.get_or_create(slug=tag_slug, defaults=dict(name=tag_name))
ctype = ContentType.objects.get_for_model(obj)
tagged_item, created = TaggedItem._default_manager.get_or_create(
tag=tag, content_type=ctype, object_id=obj.pk, defaults=dict(user=user))
tagged_item.created = created
return tag, tagged_item
def get_for_object(self, obj):
"""
Create a queryset matching all tags associated with the given
object.
"""
ctype = ContentType.objects.get_for_model(obj)
return self.filter(items__content_type__pk=ctype.pk,
items__object_id=obj.pk)
def _get_usage(self, model, counts=False, min_count=None, extra_joins=None, extra_criteria=None, params=None, limit=20):
"""
Perform the custom SQL query for ``usage_for_model`` and
``usage_for_queryset``.
"""
counts = True
model_table = qn(model._meta.db_table)
model_pk = '%s.%s' % (model_table, qn(model._meta.pk.column))
query = """
SELECT DISTINCT %(tag)s.id, %(tag)s.name, MAX(%(tag)s.slug) as slug, MAX(%(tag)s.type_id) as type_id%(count_sql)s
FROM
%(tag)s
INNER JOIN %(tagged_item)s
ON %(tag)s.id = %(tagged_item)s.tag_id
INNER JOIN %(model)s
ON %(tagged_item)s.object_id = %(model_pk)s
%%s
WHERE %(tagged_item)s.content_type_id = %(content_type_id)s
%%s
GROUP BY %(tag)s.id, %(tag)s.name
%%s
ORDER BY %(count_ordering)s, %(tag)s.name ASC LIMIT %(limit)s""" % {
'tag': qn(self.model._meta.db_table),
'count_sql': counts and (', COUNT(%s)' % model_pk) or '',
'count_ordering': ('COUNT(%s) DESC ' % model_pk),
'tagged_item': qn(TaggedItem._meta.db_table),
'model': model_table,
'model_pk': model_pk,
'content_type_id': ContentType.objects.get_for_model(model).pk,
'limit': limit,
}
min_count_sql = ''
if min_count is not None:
min_count_sql = 'HAVING COUNT(%s) >= %%s' % model_pk
params.append(min_count)
if not extra_criteria and not extra_joins:
tag = qn(self.model._meta.db_table)
if min_count is not None:
min_count_sql = 'WHERE %(tag)s.usage_count >= %%s' % {'tag': tag}
else:
min_count_sql = ''
query = """
SELECT
%(tag)s.id,
%(tag)s.name,
%(tag)s.slug,
%(tag)s.type_id,
%(tag)s.usage_count
FROM
%(tag)s
/* these parameters are all unused, but we have to keep them
* for compatibility with the rest of the code */
%%s %%s %%s
ORDER BY
%(tag)s.usage_count DESC,
%(tag)s.name ASC
LIMIT %(limit)s
""" % {
'tag': tag,
'limit': limit,
}
cursor = connection.cursor()
cursor.execute(query % (extra_joins, extra_criteria, min_count_sql), params)
tags = []
for row in cursor.fetchall():
t = self.model(*row[:4])
if counts:
t.count = row[-1]
tags.append(t)
return tags
def usage_for_model(self, model, counts=False, min_count=None, filters=None, **kwargs):
"""
Obtain a list of tags associated with instances of the given
Model class.
If ``counts`` is True, a ``count`` attribute will be added to
each tag, indicating how many times it has been used against
the Model class in question.
If ``min_count`` is given, only tags which have a ``count``
greater than or equal to ``min_count`` will be returned.
Passing a value for ``min_count`` implies ``counts=True``.
To limit the tags (and counts, if specified) returned to those
used by a subset of the Model's instances, pass a dictionary
of field lookups to be applied to the given Model as the
``filters`` argument.
"""
if filters is None: filters = {}
queryset = model._default_manager.filter()
for f in filters.items():
queryset.query.add_filter(f)
usage = self.usage_for_queryset(queryset, counts, min_count, **kwargs)
return usage
def usage_for_queryset(self, queryset, counts=False, min_count=None, **kwargs):
"""
Obtain a list of tags associated with instances of a model
contained in the given queryset.
If ``counts`` is True, a ``count`` attribute will be added to
each tag, indicating how many times it has been used against
the Model class in question.
If ``min_count`` is given, only tags which have a ``count``
greater than or equal to ``min_count`` will be returned.
Passing a value for ``min_count`` implies ``counts=True``.
"""
if getattr(queryset.query, 'get_compiler', None):
# Django 1.2+
compiler = queryset.query.get_compiler(using='default')
extra_joins = ' '.join(compiler.get_from_clause()[0][1:])
where, params = queryset.query.where.as_sql(
compiler.quote_name_unless_alias, compiler.connection
)
else:
# Django pre-1.2
extra_joins = ' '.join(queryset.query.get_from_clause()[0][1:])
where, params = queryset.query.where.as_sql()
if where:
extra_criteria = 'AND %s' % where
else:
extra_criteria = ''
return self._get_usage(queryset.model, counts, min_count, extra_joins, extra_criteria, params, **kwargs)
def related_for_model(self, tags, model, counts=False, min_count=None, order_by_count=True, limit=40):
"""
Obtain a list of tags related to a given list of tags - that
is, other tags used by items which have all the given tags.
If ``counts`` is True, a ``count`` attribute will be added to
each tag, indicating the number of items which have it in
addition to the given list of tags.
If ``min_count`` is given, only tags which have a ``count``
greater than or equal to ``min_count`` will be returned.
Passing a value for ``min_count`` implies ``counts=True``.
"""
if min_count is not None: counts = True
tags = get_tag_list(tags)
tag_count = len(tags)
tagged_item_table = qn(TaggedItem._meta.db_table)
query = """
SELECT %(tag)s.id, %(tag)s.name, MAX(%(tag)s.slug) as slug, MAX(%(tag)s.type_id) as type_id%(count_sql)s
FROM %(tagged_item)s INNER JOIN %(tag)s ON %(tagged_item)s.tag_id = %(tag)s.id
WHERE %(tagged_item)s.content_type_id = %(content_type_id)s
AND %(tagged_item)s.object_id IN
(
SELECT %(tagged_item)s.object_id
FROM %(tagged_item)s, %(tag)s
WHERE %(tagged_item)s.content_type_id = %(content_type_id)s
AND %(tag)s.id = %(tagged_item)s.tag_id
AND %(tag)s.id IN (%(tag_id_placeholders)s)
GROUP BY %(tagged_item)s.object_id
HAVING COUNT(%(tagged_item)s.object_id) = %(tag_count)s
)
AND %(tag)s.id NOT IN (%(tag_id_placeholders)s)
GROUP BY %(tag)s.id, %(tag)s.name
%(min_count_sql)s
ORDER BY %(tag)s.name ASC LIMIT %(limit)s""" % {
'tag': qn(self.model._meta.db_table),
'count_sql': counts and ', COUNT(%s.object_id)' % tagged_item_table or '',
'tagged_item': tagged_item_table,
'content_type_id': ContentType.objects.get_for_model(model).pk,
'tag_id_placeholders': ','.join(['%s'] * tag_count),
'tag_count': tag_count,
'min_count_sql': min_count is not None and ('HAVING COUNT(%s.object_id) >= %%s' % tagged_item_table) or '',
'limit': str(limit),
}
if order_by_count:
current = '%(tag)s.name ASC' % dict(tag=qn(self.model._meta.db_table))
new_value = 'COUNT(%s.object_id) DESC, %s' % (tagged_item_table, current)
query = query.replace(current, new_value)
params = [tag.pk for tag in tags] * 2
if min_count is not None:
params.append(min_count)
cursor = connection.cursor()
cursor.execute(query, params)
related = []
for row in cursor.fetchall():
tag = self.model(*row[:4])
if counts is True:
tag.count = row[-1]
related.append(tag)
return related
def cloud_for_model(self, model, steps=4, distribution=LOGARITHMIC,
filters=None, min_count=None, limit=None):
"""
Obtain a list of tags associated with instances of the given
Model, giving each tag a ``count`` attribute indicating how
many times it has been used and a ``font_size`` attribute for
use in displaying a tag cloud.
``steps`` defines the range of font sizes - ``font_size`` will
be an integer between 1 and ``steps`` (inclusive).
``distribution`` defines the type of font size distribution
algorithm which will be used - logarithmic or linear. It must
be either ``tagging.utils.LOGARITHMIC`` or
``tagging.utils.LINEAR``.
To limit the tags displayed in the cloud to those associated
with a subset of the Model's instances, pass a dictionary of
field lookups to be applied to the given Model as the
``filters`` argument.
To limit the tags displayed in the cloud to those with a
``count`` greater than or equal to ``min_count``, pass a value
for the ``min_count`` argument.
"""
tags = list(self.usage_for_model(model, counts=True, filters=filters,
min_count=min_count, limit=limit))
return calculate_cloud(tags, steps, distribution)
class TaggedItemManager(models.Manager):
"""
FIXME There's currently no way to get the ``GROUP BY`` and ``HAVING``
SQL clauses required by many of this manager's methods into
Django's ORM.
For now, we manually execute a query to retrieve the PKs of
objects we're interested in, then use the ORM's ``__in``
lookup to return a ``QuerySet``.
Now that the queryset-refactor branch is in the trunk, this can be
tidied up significantly.
"""
def get_by_model(self, queryset_or_model, tags):
"""
Create a ``QuerySet`` containing instances of the specified
model associated with a given tag or list of tags.
"""
tags = get_tag_list(tags)
tag_count = len(tags)
if tag_count == 0:
# No existing tags were given
queryset, model = get_queryset_and_model(queryset_or_model)
return model._default_manager.none()
elif tag_count == 1:
# Optimisation for single tag - fall through to the simpler
# query below.
tag = tags[0]
else:
return self.get_intersection_by_model(queryset_or_model, tags)
queryset, model = get_queryset_and_model(queryset_or_model)
content_type = ContentType.objects.get_for_model(model)
opts = self.model._meta
tagged_item_table = qn(opts.db_table)
query = queryset.extra(
tables=[opts.db_table],
where=[
'%s.content_type_id = %%s' % tagged_item_table,
'%s.tag_id = %%s' % tagged_item_table,
'%s.%s = %s.object_id' % (qn(model._meta.db_table),
qn(model._meta.pk.column),
tagged_item_table)
],
params=[content_type.pk, tag.pk],
)
return query
@classmethod
def get_tagged_item_joins(cls, tags, tagged_item_table, model):
content_type_id = ContentType.objects.get_for_model(model).pk
model_table = qn(model._meta.db_table)
# Sort the tags by usage, this makes the query quite a bit faster
tags = sorted(tags, key=lambda t: t.usage_count)
joins = ''
params = []
for tag in tags:
alias = 'taggeditem_%d' % tag.pk
params += [
content_type_id,
tag.id,
]
joins += '''
JOIN %(tagged_item)s %(alias)s
ON %(alias)s.content_type_id = %%s
AND %(alias)s.tag_id = %%s
AND %(alias)s.object_id = %(model_pk)s
''' % {
'tagged_item': tagged_item_table,
'alias': alias,
'model_pk': '%s.%s' % (model_table, qn(model._meta.pk.column)),
}
return joins, params
def get_intersection_by_model(self, queryset_or_model, tags):
"""
Create a ``QuerySet`` containing instances of the specified
model associated with *all* of the given list of tags.
"""
tags = get_tag_list(tags)
tag_count = len(tags)
queryset, model = get_queryset_and_model(queryset_or_model)
if not tag_count:
return model._default_manager.none()
model_table = qn(model._meta.db_table)
tagged_item_table = qn(self.model._meta.db_table)
# This query selects the ids of all objects which have all the
# given tags.
joins, params = self.get_tagged_item_joins(tags, tagged_item_table, model)
query = '''
SELECT %(model_pk)s
FROM %(model)s
%(joins)s
GROUP BY %(model_pk)s
''' % {
'model': model_table,
'model_pk': '%s.%s' % (model_table, qn(model._meta.pk.column)),
'joins': joins,
}
cursor = connection.cursor()
cursor.execute(query, params)
object_ids = [row[0] for row in cursor.fetchall()]
if len(object_ids) > 0:
return queryset.filter(pk__in=object_ids)
else:
return model._default_manager.none()
def get_union_by_model(self, queryset_or_model, tags):
"""
Create a ``QuerySet`` containing instances of the specified
model associated with *any* of the given list of tags.
"""
tags = get_tag_list(tags)
tag_count = len(tags)
queryset, model = get_queryset_and_model(queryset_or_model)
if not tag_count:
return model._default_manager.none()
model_table = qn(model._meta.db_table)
# This query selects the ids of all objects which have any of
# the given tags.
query = """
SELECT %(model_pk)s
FROM %(model)s, %(tagged_item)s
WHERE %(tagged_item)s.content_type_id = %(content_type_id)s
AND %(tagged_item)s.tag_id IN (%(tag_id_placeholders)s)
AND %(model_pk)s = %(tagged_item)s.object_id
GROUP BY %(model_pk)s""" % {
'model_pk': '%s.%s' % (model_table, qn(model._meta.pk.column)),
'model': model_table,
'tagged_item': qn(self.model._meta.db_table),
'content_type_id': ContentType.objects.get_for_model(model).pk,
'tag_id_placeholders': ','.join(['%s'] * tag_count),
}
cursor = connection.cursor()
cursor.execute(query, [tag.pk for tag in tags])
object_ids = [row[0] for row in cursor.fetchall()]
if len(object_ids) > 0:
return queryset.filter(pk__in=object_ids)
else:
return model._default_manager.none()
def get_related(self, obj, queryset_or_model, num=None):
"""
Retrieve a list of instances of the specified model which share
tags with the model instance ``obj``, ordered by the number of
shared tags in descending order.
If ``num`` is given, a maximum of ``num`` instances will be
returned.
"""
queryset, model = get_queryset_and_model(queryset_or_model)
model_table = qn(model._meta.db_table)
content_type = ContentType.objects.get_for_model(obj)
related_content_type = ContentType.objects.get_for_model(model)
query = """
SELECT %(model_pk)s, COUNT(related_tagged_item.object_id) AS %(count)s
FROM %(model)s, %(tagged_item)s, %(tag)s, %(tagged_item)s related_tagged_item
WHERE %(tagged_item)s.object_id = %%s
AND %(tagged_item)s.content_type_id = %(content_type_id)s
AND %(tag)s.id = %(tagged_item)s.tag_id
AND related_tagged_item.content_type_id = %(related_content_type_id)s
AND related_tagged_item.tag_id = %(tagged_item)s.tag_id
AND %(model_pk)s = related_tagged_item.object_id"""
if content_type.pk == related_content_type.pk:
# Exclude the given instance itself if determining related
# instances for the same model.
query += """
AND related_tagged_item.object_id != %(tagged_item)s.object_id"""
query += """
GROUP BY %(model_pk)s
ORDER BY %(count)s DESC
%(limit_offset)s"""
query = query % {
'model_pk': '%s.%s' % (model_table, qn(model._meta.pk.column)),
'count': qn('count'),
'model': model_table,
'tagged_item': qn(self.model._meta.db_table),
'tag': qn(self.model._meta.get_field('tag').rel.to._meta.db_table),
'content_type_id': content_type.pk,
'related_content_type_id': related_content_type.pk,
# Hardcoding this for now just to get tests working again - this
# should now be handled by the query object.
'limit_offset': num is not None and 'LIMIT %s' or '',
}
cursor = connection.cursor()
params = [obj.pk]
if num is not None:
params.append(num)
cursor.execute(query, params)
object_ids = [row[0] for row in cursor.fetchall()]
if len(object_ids) > 0:
# Use in_bulk here instead of an id__in lookup, because id__in would
# clobber the ordering.
object_dict = queryset.in_bulk(object_ids)
return [object_dict[object_id] for object_id in object_ids \
if object_id in object_dict]
else:
return []
##########
# Models #
##########
from framework.utils.orm import CreatedAtAbstractBase
class TagType(CreatedAtAbstractBase):
name = models.CharField(_('name'), max_length=50, unique=True, db_index=True)
slug = models.SlugField(unique=True, db_index=True)
no_index = models.BooleanField(default=True)
_tag_type_dict = {}
class Tag(models.Model):
"""
A tag.
"""
name = models.CharField(_('name'), max_length=50, unique=True, db_index=True)
slug = models.SlugField(unique=True, db_index=True)
type = models.ForeignKey('tagging.TagType', blank=True, null=True, default=1)
usage_count = models.IntegerField(default=0)
@property
def type_cached(self):
if not _tag_type_dict:
_tag_type_dict.update((t.id, t) for t in TagType.objects.all())
if self.type_id:
return _tag_type_dict[self.type_id]
else:
return _tag_type_dict[1]
@property
def no_index(self):
return self.type_cached.no_index
def save(self, *args, **kwargs):
self.slug = slugify(self.name)
if not self.type_id:
self.type_id = 1
models.Model.save(self, *args, **kwargs)
objects = TagManager()
def _change_search(self, url, func='remove'):
current = url.split('/search/')[1].strip('/')
tag_slugs = [t for t in current.split('/') if t]
getattr(tag_slugs, func)(self.slug)
tag_slugs.sort()
if tag_slugs:
arg_slug = '/%s' % ('/'.join(tag_slugs),)
else:
arg_slug = ''
new_url = reverse('search', args=[arg_slug])
return new_url
def remove_from_search(self, url):
return self._change_search(url)
def add_to_search(self, url):
return self._change_search(url, 'append')
def url_link(self):
return mark_safe('<a href="%s">%s</a>' % (unicode(self.url), self.name))
url_link.allow_tags = True
@property
def url(self, **kwargs):
from framework.url.utils import UrlObject
tag_slug = '/%s' % self.slug
config = {
'default': ['search', {'tag_slugs': tag_slug}],
'search': ['search', {'tag_slugs': tag_slug}],
}
return UrlObject(self, config)
def get_absolute_url(self):
return str(self.url)
class Meta:
ordering = ('slug',)
verbose_name = _('tag')
verbose_name_plural = _('tags')
def __unicode__(self):
return self.name
class TaggedItem(models.Model):
"""
Holds the relationship between a tag and the item being tagged.
"""
tag = models.ForeignKey('tagging.Tag', verbose_name=_('tag'), related_name='items')
content_type = models.ForeignKey(ContentType, verbose_name=_('content type'))
object_id = models.PositiveIntegerField(_('object id'), db_index=True)
object = generic.GenericForeignKey('content_type', 'object_id')
user = models.ForeignKey('auth.User', blank=True, null=True)
created_at = models.DateTimeField(auto_now_add=True, blank=True, null=True)
def find_user(self):
if self.object and hasattr(self.object, 'created_by'):
return self.object.created_by
def object_link(self):
if self.object and hasattr(self.object, 'url'):
return mark_safe('<a href="%s">%s</a>' % (self.object.url, self.object))
object_link.allow_tags = True
@property
def url(self, **kwargs):
from framework.url.utils import UrlObject
config = {
'default': ['remove_tag', {'item_id': self.object.id, 'tag_item_id': self.id}],
'remove': ['remove_tag', {'item_id': self.object.id, 'tag_item_id': self.id}],
}
return UrlObject(self, config)
objects = TaggedItemManager()
class Meta:
# Enforce unique tag association per object
unique_together = (('tag', 'content_type', 'object_id'),)
verbose_name = _('tagged item')
verbose_name_plural = _('tagged items')
def __unicode__(self):
return u'%s [%s]' % (self.object, self.tag)