Skip to content

Commit

Permalink
refactor: ♻️ HitCountViewMixin
Browse files Browse the repository at this point in the history
- also, respect HITCOUNT_HITS_PER_IP_LIMIT before counting a hit.
  • Loading branch information
abhiabhi94 committed Nov 5, 2021
1 parent 3f593e9 commit d6c2ceb
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 43 deletions.
2 changes: 2 additions & 0 deletions hitcount/conf/defaults.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
HITCOUNT_KEEP_HIT_ACTIVE = {'days': 7}

HITCOUNT_HITS_PER_IP_LIMIT = 0

HITCOUNT_HITS_PER_SESSION_LIMIT = 0

HITCOUNT_EXCLUDE_USER_GROUP = ()
Expand Down
13 changes: 9 additions & 4 deletions hitcount/managers/blockers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@


class BlockedIPManager(models.Manager):
def filter_ip(self, ip):
return self.filter(ip__exact=ip)
def is_blocked(self, ip=None):
if not ip:
return False
return self.filter(ip__exact=ip).exists()


class BlockedUserAgentManager(models.Manager):
def filter_user_agent(self, user_agent):
return self.filter(user_agent__exact=user_agent)
def is_blocked(self, user_agent=None):
if not user_agent:
return False

return self.filter(user_agent__exact=user_agent).exists()
14 changes: 14 additions & 0 deletions hitcount/managers/hits.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,17 @@ def filter_active(self, *args, **kwargs):
grace = settings.HITCOUNT_KEEP_HIT_ACTIVE
period = timezone.now() - timedelta(**grace)
return self.filter(created__gte=period).filter(*args, **kwargs)

def has_limit_reached_by_ip(self, ip=None):
hits_per_ip_limit = settings.HITCOUNT_HITS_PER_IP_LIMIT
if not ip or not hits_per_ip_limit:
return False

return self.filter_active(ip=ip).count() >= hits_per_ip_limit

def has_limit_reached_by_session(self, session, hitcount):
hits_per_session_limit = settings.HITCOUNT_HITS_PER_SESSION_LIMIT
if not hits_per_session_limit:
return False

return self.filter_active(session=session, hitcount=hitcount).count() >= hits_per_session_limit
40 changes: 13 additions & 27 deletions hitcount/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,11 @@ def hit_count(request, hitcount):
user_agent = request.META.get('HTTP_USER_AGENT', '')[:255]

# first, check our request against the IP blocked
if ip:
if BlockedIP.objects.filter_ip(ip).exists():
return UpdateHitCountResponse(False, 'Not counted: user IP has been blocked')
if BlockedIP.objects.is_blocked(ip):
return UpdateHitCountResponse(False, 'Not counted: user IP has been blocked')

# second, check our request against the user agent blocked
if BlockedUserAgent.objects.filter_user_agent(user_agent).exists():
if BlockedUserAgent.objects.is_blocked(user_agent):
return UpdateHitCountResponse(False, 'Not counted: user agent has been blocked')

# third, see if we are excluding a specific user group or not
Expand All @@ -85,43 +84,30 @@ def hit_count(request, hitcount):
# eliminated first three possible exclusions, now on to checking our database of
# active hits to see if we should count another one

# start with a fresh active query set (HITCOUNT_KEEP_HIT_ACTIVE)
active_hits_qs = Hit.objects.filter_active()

# check limit on hits from a unique ip address (HITCOUNT_HITS_PER_IP_LIMIT)
hits_per_ip_limit = settings.HITCOUNT_HITS_PER_IP_LIMIT
if ip and hits_per_ip_limit:
if active_hits_qs.filter(ip=ip).count() >= hits_per_ip_limit:
return UpdateHitCountResponse(
False, 'Not counted: hits per IP address limit reached')
if Hit.objects.has_limit_reached_by_ip(ip):
return UpdateHitCountResponse(
False, 'Not counted: hits per IP address limit reached')

session_key = request.session.session_key
# create a generic Hit object with request data
if Hit.objects.has_limit_reached_by_session(session_key, hitcount):
return UpdateHitCountResponse(
False, 'Not counted: hits per session limit reached.')

hit = Hit(
session=session_key,
hitcount=hitcount,
ip=ip,
user_agent=request.META.get('HTTP_USER_AGENT', '')[:255],
user_agent=user_agent,
)

# first, use a user's authentication to see if they made an earlier hit
hits_per_session_limit = settings.HITCOUNT_HITS_PER_SESSION_LIMIT
if (
hits_per_session_limit
and
active_hits_qs.filter(session=session_key).filter(hitcount=hitcount).count() >= hits_per_session_limit
):
return UpdateHitCountResponse(
False, 'Not counted: hits per session limit reached.')

if request.user.is_authenticated:
hit.user = user # associate this hit with a user
hit.save()
hit.user = user

response = UpdateHitCountResponse(
True, 'Hit counted: user authentication')
else:
hit.save()
response = UpdateHitCountResponse(True, 'Hit counted: session key')

hit.save()
return response
22 changes: 12 additions & 10 deletions tests/test_managers/test_blockers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,26 @@

class TestBlockIPManager(TestCase):

def test_filter_ip(self):
blocked = BlockedIP.objects.create(ip='10.1.1.1')
def test_is_blocked(self):
# without IP.
self.assertIs(BlockedIP.objects.is_blocked(), False)

BlockedIP.objects.create(ip='10.1.2.1')

self.assertQuerysetEqual(BlockedIP.objects.filter_ip(ip='10.1.1.1'), {blocked}, transform=lambda x: x)
self.assertIs(BlockedIP.objects.is_blocked('10.1.2.1'), True)
self.assertIs(BlockedIP.objects.is_blocked('10.1.1.1'), False)


class TestBlockUserAgentManager(TestCase):

def test_filter_user_agent(self):
def test_is_blocked(self):
# without an agent.
self.assertIs(BlockedUserAgent.objects.is_blocked(), False)

user_agent_windows = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0'
user_agent_mac = 'Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:42.0) Gecko/20100101 Firefox/42.0'

blocked = BlockedUserAgent.objects.create(user_agent=user_agent_mac)
BlockedUserAgent.objects.create(user_agent=user_agent_windows)

self.assertQuerysetEqual(
BlockedUserAgent.objects.filter_user_agent(user_agent=user_agent_mac),
{blocked},
transform=lambda x: x,
)
self.assertIs(BlockedUserAgent.objects.is_blocked(user_agent_windows), True)
self.assertIs(BlockedUserAgent.objects.is_blocked(user_agent_mac), False)
43 changes: 41 additions & 2 deletions tests/test_managers/test_hits.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from datetime import timedelta
from importlib import import_module
from unittest.mock import patch

from django.test import RequestFactory
from django.test import TestCase
from django.utils import timezone

Expand All @@ -10,11 +12,12 @@
from hitcount.models import HitCount


@patch.object(settings, 'HITCOUNT_USE_IP', True)
class TestHitManager(TestCase):
def setUp(self):
post = Post.objects.create(title='my title', content='my text')
hit_count = HitCount.objects.create(content_object=post)
self.hit = Hit.objects.create(hitcount=hit_count)
self.hitcount = HitCount.objects.create(content_object=post)
self.hit = Hit.objects.create(hitcount=self.hitcount)

@patch.object(settings, 'HITCOUNT_KEEP_HIT_ACTIVE', {'days': 7})
def test_filter_active(self):
Expand All @@ -35,6 +38,42 @@ def test_filter_active(self):
self.assertEqual(Hit.objects.all().count(), 10)
self.assertEqual(Hit.objects.filter_active().count(), 7)

def test_has_limit_reached_by_ip(self):
ip = '127.0.0.1'
Hit.objects.create(hitcount=self.hitcount, ip='127.0.0.1')

# all hits are counted.
with patch.object(settings, 'HITCOUNT_HITS_PER_IP_LIMIT', 0):
self.assertIs(Hit.objects.has_limit_reached_by_ip(), False)

Hit.objects.create(hitcount=self.hitcount, ip=ip)

self.assertIs(Hit.objects.has_limit_reached_by_ip(ip), False)

with patch.object(settings, 'HITCOUNT_HITS_PER_IP_LIMIT', 2):
self.assertIs(Hit.objects.has_limit_reached_by_ip(ip), True)

def test_has_limit_reached_by_session(self):
request = RequestFactory()
engine = import_module(settings.SESSION_ENGINE)
session = engine.SessionStore()
session.save()
request.session = session
session_key = request.session.session_key

Hit.objects.create(hitcount=self.hitcount, session=session_key)

# all hits are counted.
with patch.object(settings, 'HITCOUNT_HITS_PER_SESSION_LIMIT', 0):
self.assertIs(Hit.objects.has_limit_reached_by_session(session_key, self.hitcount), False)

Hit.objects.create(hitcount=self.hitcount, session=session_key)

self.assertIs(Hit.objects.has_limit_reached_by_session(session_key, self.hitcount), False)

with patch.object(settings, 'HITCOUNT_HITS_PER_SESSION_LIMIT', 2):
self.assertIs(Hit.objects.has_limit_reached_by_session(session_key, self.hitcount), True)


class TestHitCountManager(TestCase):
def setUp(self):
Expand Down

0 comments on commit d6c2ceb

Please sign in to comment.