Skip to content

Commit

Permalink
add get and store functions and related tests
Browse files Browse the repository at this point in the history
  • Loading branch information
higs4281 committed Jan 18, 2017
1 parent 03f25f8 commit 96ed34f
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 16 deletions.
36 changes: 24 additions & 12 deletions countylimits/data_collection/county_data_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,22 @@
LAST_CHANGELOG = '{}/last_changelog.html'.format(BASE_DIR)
CHANGELOG_ID = 'tab_2010'

changelog_response = requests.get(CENSUS_CHANGELOG)
soup = bs(changelog_response.text, 'lxml')
current_changelog = soup.find("div", {"id": CHANGELOG_ID}).text

with open(LAST_CHANGELOG, 'r') as f:
base_changelog = f.read()
def get_current_log():
changelog_response = requests.get(CENSUS_CHANGELOG)
soup = bs(changelog_response.text, 'lxml')
return soup.find("div", {"id": CHANGELOG_ID}).text


def get_base_log():
with open(LAST_CHANGELOG, 'r') as f:
base_log = f.read()
return base_log


def store_change_log(newlog):
with open(LAST_CHANGELOG, 'w') as f:
f.write(newlog)


def get_lines(changelog):
Expand All @@ -29,23 +39,23 @@ def check_for_county_changes(email=None):
to see whether updates have been added. If changes are detected,
note the change and update our local 'last_changelog.html' file.
"""
base_lines = get_lines(base_changelog)
current_changelog = get_current_log()
current_lines = get_lines(current_changelog)
base_lines = get_lines(get_base_log())
if base_lines == current_lines:
msg = 'No county changes found, no emails sent.'
return msg
else:
msg = ('County changes need to be checked at {}\n'
'These changes were detected:'.format(CENSUS_CHANGELOG))
with open(LAST_CHANGELOG, 'w') as f:
f.write(current_changelog)
diffsets = []
diffset = ndiff(base_lines, current_lines)
diffsets.append(
d for d in diffset if d.startswith('- ') or d.startswith('+ '))
for diffsett in diffsets:
for diff in diffsett:
msg += '\n{}'.format(diff)
store_change_log(current_changelog)
msg += "\n\nOur 'last_changelog.html' file has been updated."
if email:
send_mail(
Expand All @@ -56,7 +66,9 @@ def check_for_county_changes(email=None):
fail_silently=False
)

return (
"Emails were sent to {} with the following message: \n\n"
"{}".format(", ".join(email), msg)
)
return (
"Emails were sent to {} with the following message: \n\n"
"{}".format(", ".join(email), msg)
)
else:
return msg
127 changes: 123 additions & 4 deletions countylimits/tests.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,138 @@
import os
import unittest

import mock
from mock import mock_open, patch
from rest_framework import status

from django.test import TestCase
from django.utils.six import StringIO
from django.core.management import call_command
from django.core.management.base import CommandError

from countylimits.models import CountyLimit, County, State
from countylimits.management.commands.load_county_limits import Command
from django.core.management.base import CommandError
from countylimits.management.commands import load_county_limits
from countylimits.data_collection.county_data_monitor import (
check_for_county_changes,
store_change_log,
get_current_log,
get_base_log,
get_lines)


try:
BASE_PATH = os.path.dirname(
os.path.dirname(os.path.abspath(__file__))) + '/'
except:
except: # pragma: no cover
BASE_PATH = ''


class CheckCountyChangesCommand(unittest.TestCase):

def setUp(self):
stdout_patch = mock.patch('sys.stdout')
stdout_patch.start()
self.addCleanup(stdout_patch.stop)

@mock.patch(
'countylimits.management.commands.oah_check_county_changes.'
'check_for_county_changes')
def test_check_county_without_email(self, mock_check):
mock_check.return_value = 'OK'
call_command('oah_check_county_changes')
self.assertEqual(mock_check.call_count, 1)

@mock.patch(
'countylimits.management.commands.oah_check_county_changes.'
'check_for_county_changes')
def test_check_county_with_email(self, mock_check):
mock_check.return_value = 'Emails were sent'
call_command('oah_check_county_changes', '--email', 'fake@example.com')
self.assertEqual(mock_check.call_count, 1)


class DataCollectionTest(unittest.TestCase):
"""Test data automation functions"""

def test_get_lines(self):
lines_in = "\n\nline 1\nline 2\n\n\nline 3\n\n"
expected_result = ['line 1', 'line 2', 'line 3']
lines_out = get_lines(lines_in)
self.assertEqual(lines_out, expected_result)

def test_get_base_log(self):
text = get_base_log()
self.assertIn('2010', text)

def test_store_changelog(self):
m = mock_open()
with patch("__builtin__.open", m, create=True):
store_change_log('fake log text')
self.assertTrue(m.call_count == 1)

@mock.patch('countylimits.data_collection.county_data_monitor'
'.requests.get')
@mock.patch('countylimits.data_collection.county_data_monitor.bs')
def test_get_current_log(self, mock_bs, mock_requests):
get_current_log()
self.assertEqual(mock_bs.call_count, 1)
self.assertEqual(mock_requests.call_count, 1)

@mock.patch(
'countylimits.data_collection.county_data_monitor.get_current_log')
@mock.patch(
'countylimits.data_collection.county_data_monitor.get_base_log')
def test_county_data_monitor_no_change(self, mock_base, mock_current):
with open("{}/countylimits/data_collection/"
"changelog_2017.html".format(BASE_PATH)) as f:
mock_base.return_value = mock_current.return_value = f.read()
self.assertIn(
'No county changes found',
check_for_county_changes())

@mock.patch(
'countylimits.data_collection.county_data_monitor.get_current_log')
@mock.patch(
'countylimits.data_collection.county_data_monitor.get_base_log')
@mock.patch(
'countylimits.data_collection.county_data_monitor.store_change_log')
def test_county_data_monitor_with_change_no_email(
self, mock_store_log, mock_base, mock_current):
with open("{}/countylimits/data_collection/"
"changelog_2017.html".format(BASE_PATH)) as f:
mock_base.return_value = f.read()
mock_current.return_value = (
mock_base.return_value + 'When dolphins fly.\n')
self.assertIn(
'When dolphins fly',
check_for_county_changes())
self.assertEqual(mock_current.call_count, 1)
self.assertEqual(mock_base.call_count, 1)
self.assertEqual(mock_store_log.call_count, 1)

@mock.patch(
'countylimits.data_collection.county_data_monitor.get_current_log')
@mock.patch(
'countylimits.data_collection.county_data_monitor.get_base_log')
@mock.patch(
'countylimits.data_collection.county_data_monitor.send_mail')
@mock.patch(
'countylimits.data_collection.county_data_monitor.store_change_log')
def test_county_data_monitor_with_change_and_email(
self, mock_store_log, mock_mail, mock_base, mock_current):
with open("{}/countylimits/data_collection/"
"changelog_2017.html".format(BASE_PATH)) as f:
mock_base.return_value = f.read()
mock_current.return_value = (
mock_base.return_value + 'When dolphins fly.\n')
msg = check_for_county_changes(email='fakemail@example.com')
self.assertIn('When dolphins fly', msg)
self.assertEqual(mock_mail.call_count, 1)
self.assertEqual(mock_current.call_count, 1)
self.assertEqual(mock_base.call_count, 1)
self.assertEqual(mock_store_log.call_count, 1)


class CountyLimitTest(TestCase):

url = '/oah-api/county/'
Expand Down Expand Up @@ -63,7 +182,7 @@ def test_unicode(self):

class LoadCountyLimitsTestCase(TestCase):

c = Command()
c = load_county_limits.Command()
out = StringIO()

test_csv = '{}data/test/test.csv'.format(BASE_PATH)
Expand Down

0 comments on commit 96ed34f

Please sign in to comment.