diff --git a/oim_cms/api.py b/oim_cms/api.py index c302c5b..4873cc0 100644 --- a/oim_cms/api.py +++ b/oim_cms/api.py @@ -11,7 +11,7 @@ from mudmap.models import MudMap from organisation.api import DepartmentUserResource, LocationResource, profile from organisation.models import DepartmentUser, Location, OrgUnit, CostCentre -from registers.api import ITSystemResource, ITSystemHardwareResource +from registers.api import ITSystemResource, ITSystemHardwareResource, ITSystemEventResource from registers.models import ITSystem from tracking.api import EC2InstanceResource, FreshdeskTicketResource from .utils import CSVDjangoResource @@ -178,4 +178,5 @@ def create(self): url(r'^profile/', profile, name='api_profile'), url(r'^options/', include(OptionResource.urls())), url(r'^whoami', WhoAmIResource.as_detail(), name='api_whoami'), + url(r'^events/', include(ITSystemEventResource.urls())), ] diff --git a/oim_cms/test_api.py b/oim_cms/test_api.py index 5a9b2ad..f111fba 100644 --- a/oim_cms/test_api.py +++ b/oim_cms/test_api.py @@ -1,15 +1,13 @@ -from datetime import datetime +from __future__ import unicode_literals, absolute_import from django.contrib.auth.models import User from django.test import TestCase, Client from mixer.backend.django import mixer import random import string -import json from uuid import uuid1 from organisation.models import DepartmentUser, Location, OrgUnit, CostCentre from registers.models import ITSystem -from tracking.models import FreshdeskTicket, FreshdeskContact def random_dpaw_email(): @@ -24,7 +22,7 @@ class ApiTestCase(TestCase): def setUp(self): # Generate some other DepartmentUser objects. - mixer.cycle(6).blend( + mixer.cycle(8).blend( DepartmentUser, photo=None, active=True, email=random_dpaw_email, org_unit=None, cost_centre=None, ad_guid=uuid1, o365_licence=False, in_sync=False) @@ -33,16 +31,20 @@ def setUp(self): self.loc2 = mixer.blend(Location, manager=None) # Generate a basic org structure. # NOTE: don't use mixer to create OrgUnit objects (it breaks MPTT). - self.dept = OrgUnit.objects.create(name='Department 1', unit_type=0, acronym='DEPT') + self.dept = OrgUnit.objects.create(name='Department 1', unit_type=0, acronym='DEPT', active=True) self.div1 = OrgUnit.objects.create( - name='Divison 1', unit_type=1, parent=self.dept, location=self.loc1, acronym='D1') + name='Divison 1', unit_type=1, parent=self.dept, location=self.loc1, acronym='DIV1', active=True) + self.branch1 = OrgUnit.objects.create( + name='Branch 1', unit_type=2, parent=self.div1, location=self.loc1, acronym='BRANCH1', active=True) self.cc1 = CostCentre.objects.create( name='Cost centre 1', code='001', division=self.div1, org_position=self.div1) self.div2 = OrgUnit.objects.create( - name='Divison 2', unit_type=1, parent=self.dept, location=self.loc2, acronym='D2') + name='Divison 2', unit_type=1, parent=self.dept, location=self.loc2, acronym='DIV2', active=True) + self.branch2 = OrgUnit.objects.create( + name='Branch 2', unit_type=2, parent=self.div2, location=self.loc2, acronym='BRANCH2', active=True) self.cc2 = CostCentre.objects.create( name='Cost centre 2', code='002', division=self.div2, org_position=self.div2) - # Give each of the divisions some members. + # Give each of the org units some members. users = DepartmentUser.objects.all() self.user1 = users[0] self.user1.org_unit = self.div1 @@ -56,6 +58,14 @@ def setUp(self): self.user2.save() self.div2.manager = self.user2 self.div2.save() + self.user3 = users[2] + self.user3.org_unit = self.branch1 + self.user3.cost_centre = self.cc1 + self.user3.save() + self.user4 = users[3] + self.user4.org_unit = self.branch2 + self.user4.cost_centre = self.cc2 + self.user4.save() # Mark a user as inactive and deleted in AD. self.del_user = users[2] self.del_user.active = False @@ -80,11 +90,6 @@ def setUp(self): self.user3.org_unit = self.div1 self.user3.cost_centre = self.cc1 self.user3.save() - # Generate some IT Systems. - self.it1 = mixer.blend(ITSystem, status=0, owner=self.user1) - self.it2 = mixer.blend(ITSystem, status=1, owner=self.user2) - self.it_leg = mixer.blend(ITSystem, status=2, owner=self.user2) - self.it_dec = mixer.blend(ITSystem, status=3, owner=self.user2) # Generate a test user for endpoint responses. self.testuser = User.objects.create_user( username='testuser', email='user@dpaw.wa.gov.au.com', password='pass') @@ -94,413 +99,8 @@ def setUp(self): org_unit=None, cost_centre=None, ad_guid=uuid1) # Log in testuser by default. self.client.login(username='testuser', password='pass') - - -class ProfileTestCase(ApiTestCase): - url = '/api/profile/' - - def test_profile_api_get(self): - """Test the profile API endpoint GET response - """ - response = self.client.get(self.url) - self.assertEqual(response.status_code, 200) - - def test_profile_api_post(self): - """Test the profile API endpoint GET response - """ - response = self.client.get(self.url) - j = response.json() - obj = j['objects'][0] - self.assertFalse(obj['telephone']) - tel = '9111 1111' - response = self.client.post(self.url, {'telephone': tel}) - self.assertEqual(response.status_code, 200) - j = response.json() - obj = j['objects'][0] - self.assertEqual(obj['telephone'], tel) - - def test_profile_api_anon(self): - """Test that anonymous users can't use the profile endpoint - """ - self.client.logout() - response = self.client.get(self.url) - self.assertEqual(response.status_code, 403) - - -class OptionResourceTestCase(ApiTestCase): - - def test_data_org_structure(self): - """Test the data_org_structure API endpoint - """ - url = '/api/options/?list=org_structure' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # Division 1 will be present in the response. - self.assertContains(response, self.div1.name) - # Response can be deserialised into a dict. - r = response.json() - self.assertTrue(isinstance(r, dict)) - # Deserialised response contains a list. - self.assertTrue(isinstance(r['objects'], list)) - # Make OrgUnit inactive to test exclusion. - self.div1.active = False - self.div1.save() - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # Division 1 won't be present in the response. - self.assertNotContains(response, self.div1.name) - - def test_data_cost_centre(self): - """Test the data_cost_centre API endpoint - """ - url = '/api/options/?list=cost_centre' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # 001 will be present in the response. - self.assertContains(response, self.cc1.code) - # Add 'inactive' to Division 1 name to inactivate the CC. - self.div1.name = 'Division 1 (inactive)' - self.div1.save() - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # 001 won't be present in the response. - self.assertNotContains(response, self.cc1.code) - - def test_data_org_unit(self): - """Test the data_org_unit API endpoint - """ - url = '/api/options/?list=org_unit' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # Org unit names will be present in the response. - self.assertContains(response, self.dept.name) - self.assertContains(response, self.div1.name) - self.assertContains(response, self.div2.name) - - def test_data_dept_user(self): - """Test the data_dept_user API endpoint - """ - url = '/api/options/?list=dept_user' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # User 1 will be present in the response. - self.assertContains(response, self.user1.email) - # Make a user inactive to test excludion - self.user1.active = False - self.user1.save() - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # User 1 won't be present in the response. - self.assertNotContains(response, self.user1.email) - - -class DepartmentUserResourceTestCase(ApiTestCase): - - def test_list(self): - """Test the DepartmentUserResource list responses - """ - url = '/api/users/' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - r = response.json() - self.assertTrue(isinstance(r['objects'], list)) - # Response should not contain inactive, contractors or shared accounts. - self.assertContains(response, self.user1.email) - self.assertNotContains(response, self.del_user.email) - self.assertNotContains(response, self.contract_user.email) - self.assertNotContains(response, self.shared.email) - # Test the compact response. - url = '/api/users/?compact=true' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # Test the minimal response. - url = '/api/users/?minimal=true' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - - def test_list_filtering(self): - """Test the DepartmentUserResource filtered list responses - """ - # Test the "all" response. - url = '/api/users/?all=true' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - self.assertContains(response, self.contract_user.email) - self.assertContains(response, self.del_user.email) - self.assertContains(response, self.shared.email) - # Test filtering by ad_deleted. - url = '/api/users/?ad_deleted=true' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - self.assertContains(response, self.del_user.email) - self.assertNotContains(response, self.user1.email) - url = '/api/users/?ad_deleted=false' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - self.assertNotContains(response, self.del_user.email) - self.assertContains(response, self.user1.email) - # Test filtering by email (should return only one object). - url = '/api/users/?email={}'.format(self.user1.email) - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - j = response.json() - self.assertEqual(len(j['objects']), 1) - self.assertContains(response, self.user1.email) - self.assertNotContains(response, self.user2.email) - # Test filtering by GUID (should return only one object). - url = '/api/users/?ad_guid={}'.format(self.user1.ad_guid) - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - j = response.json() - self.assertEqual(len(j['objects']), 1) - self.assertContains(response, self.user1.email) - self.assertNotContains(response, self.user2.email) - # Test filtering by cost centre (should return all, inc. inactive and contractors). - url = '/api/users/?cost_centre={}'.format(self.cc2.code) - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - self.assertContains(response, self.user2.email) - self.assertContains(response, self.contract_user.email) - self.assertContains(response, self.del_user.email) - self.assertNotContains(response, self.user1.email) - self.assertNotContains(response, self.shared.email) # Belongs to CC1. - # Test filtering by O365 licence status. - self.user1.o365_licence = True - self.user1.save() - url = '/api/users/?o365_licence=true' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - self.assertContains(response, self.user1.email) - self.assertNotContains(response, self.user2.email) - - def test_detail(self): - """Test the DepartmentUserResource detail response - """ - # Test detail URL using ad_guid. - url = '/api/users/{}/'.format(self.user1.ad_guid) - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # Test URL using email also. - url = '/api/users/{}/'.format(self.user1.email.lower()) - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - - def test_org_structure(self): - """Test the DepartmentUserResource org_structure response - """ - url = '/api/users/?org_structure=true' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # User 1 will be present in the response. - self.assertContains(response, self.user1.email) - # Division 1 will be present in the response. - self.assertContains(response, self.div1.name) - - def test_org_structure_sync_0365(self): - """Test the sync_o365=true request parameter - """ - self.div1.sync_o365 = False - self.div1.save() - url = '/api/users/?org_structure=true&sync_o365=true' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # Division 1 won't be present in the response. - self.assertNotContains(response, self.div1.name) - - def test_org_structure_populate_groups_members(self): - """Test populate_groups=true request parameter - """ - url = '/api/users/?org_structure=true&populate_groups=true' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # User 3 will be present in the response. - self.assertContains(response, self.user3.email) - self.user3.populate_primary_group = False - self.user3.save() - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # User 3 won't be present in the response. - self.assertNotContains(response, self.user3.email) - - def test_create(self): - """Test the DepartmentUserResource create response - """ - url = '/api/users/' - # Response should be status 400 where ObjectGUID is missing. - data = {} - response = self.client.post(url, json.dumps(data), content_type='application/json') - self.assertEqual(response.status_code, 400) - # Try again with valid data. - username = str(uuid1())[:8] - data = { - 'ObjectGUID': str(uuid1()), - 'EmailAddress': '{}@dpaw.wa.gov.au'.format(username), - 'DistinguishedName': 'CN={},OU=Users,DC=domain'.format(username), - 'SamAccountName': username, - 'AccountExpirationDate': datetime.now().isoformat(), - 'Enabled': True, - 'DisplayName': 'Doe, John', - 'GivenName': 'John', - 'Surname': 'Doe', - 'Title': 'Social Media Creative', - 'Modified': datetime.now().isoformat(), - } - response = self.client.post(url, json.dumps(data), content_type='application/json') - self.assertEqual(response.status_code, 201) # Created - # A DepartmentUser with that email should now exist. - self.assertTrue(DepartmentUser.objects.filter(email=data['EmailAddress']).exists()) - - def test_update(self): - """Test the DepartmentUserResource update response - """ - self.assertFalse(self.user1.o365_licence) - surname = str(uuid1())[:8] - url = '/api/users/{}/'.format(self.user1.ad_guid) - data = { - 'Surname': surname, - 'o365_licence': True, - } - response = self.client.put(url, json.dumps(data), content_type='application/json') - self.assertEqual(response.status_code, 202) - user = DepartmentUser.objects.get(pk=self.user1.pk) # Refresh from db - self.assertEqual(user.surname, surname) - self.assertTrue(user.o365_licence) - self.assertTrue(user.in_sync) - - def test_disable(self): - """Test the DepartmentUserResource update response (set user as inactive) - """ - self.assertTrue(self.user1.active) - self.assertFalse(self.user1.ad_deleted) - url = '/api/users/{}/'.format(self.user1.ad_guid) - data = { - 'Enabled': False, - } - response = self.client.put(url, json.dumps(data), content_type='application/json') - self.assertEqual(response.status_code, 202) - user = DepartmentUser.objects.get(pk=self.user1.pk) # Refresh from db - self.assertFalse(user.ad_deleted) - self.assertFalse(user.active) - self.assertTrue(user.in_sync) - - def test_delete(self): - """Test the DepartmentUserResource update response (set user as 'AD deleted') - """ - self.assertFalse(self.user1.ad_deleted) - self.assertTrue(self.user1.active) - url = '/api/users/{}/'.format(self.user1.ad_guid) - data = { - 'Deleted': True, - } - response = self.client.put(url, json.dumps(data), content_type='application/json') - self.assertEqual(response.status_code, 202) - user = DepartmentUser.objects.get(pk=self.user1.pk) # Refresh from db - self.assertTrue(user.ad_deleted) - self.assertFalse(user.active) - self.assertTrue(user.in_sync) - - -class LocationResourceTestCase(ApiTestCase): - - def test_list(self): - """Test the LocationResource list response - """ - loc_inactive = mixer.blend(Location, manager=None, active=False) - url = '/api/locations/' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # Response should not contain the inactive Location. - self.assertNotContains(response, loc_inactive.name) - - def test_filter(self): - """Test the LocationResource filtered response - """ - url = '/api/locations/?location_id={}'.format(self.loc1.pk) - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - self.assertContains(response, self.loc1.name) - # We can still return inactive locations by ID - loc_inactive = mixer.blend(Location, manager=None, active=False) - url = '/api/locations/?location_id={}'.format(loc_inactive.pk) - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - self.assertContains(response, loc_inactive.name) - - -class ITSystemResourceTestCase(ApiTestCase): - - def test_list(self): - """Test the ITSystemResource list response - """ - url = '/api/itsystems/' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # The 'development' & decommissioned IT systems won't be in the response. - self.assertNotContains(response, self.it2.name) - self.assertNotContains(response, self.it_dec.name) - # Test all request parameter. - url = '/api/itsystems/?all' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # The 'development' IT system will be in the response. - self.assertContains(response, self.it2.name) - # Test filtering by system_id - url = '/api/itsystems/?system_id={}'.format(self.it1.system_id) - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - self.assertContains(response, self.it1.name) - self.assertNotContains(response, self.it2.name) - - -class ITSystemHardwareResourceTestCase(ApiTestCase): - - def test_list(self): - url = '/api/itsystem-hardware/' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - # The 'decommissioned' IT system won't be in the response. - self.assertNotContains(response, self.it_dec.name) - - -class FreshdeskTicketResourceTestCase(ApiTestCase): - - def setUp(self): - """Generate from FreshdeskTicket objects. - """ - super(FreshdeskTicketResourceTestCase, self).setUp() - mixer.cycle(5).blend( - FreshdeskContact, email=random_dpaw_email) - mixer.cycle(5).blend( - FreshdeskTicket, - subject=mixer.RANDOM, description_text=mixer.RANDOM, type='Test', - freshdesk_requester=mixer.SELECT, - it_system=mixer.SELECT, - custom_fields={ - 'support_category': None, 'support_subcategory': None}, - ) - self.ticket = FreshdeskTicket.objects.first() - - def test_list(self): - """Test the FreshdeskTicketResource list response - """ - url = '/api/freshdesk_tickets/' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - self.assertContains(response, self.ticket.subject) - - def test_list_filtering(self): - """Test the FreshdeskTicketResource filtered list response - """ - self.ticket.type = 'Incident' - self.ticket.save() - url = '/api/freshdesk_tickets/?type=Test' - response = self.client.get(url) - self.assertEqual(response.status_code, 200) - self.assertNotContains(response, self.ticket.subject) - - def test_detail(self): - """Test the FreshdeskTicketResource detail response - """ - url = '/api/freshdesk_tickets/{}/'.format(self.ticket.ticket_id) - response = self.client.get(url) - self.assertEqual(response.status_code, 200) + # Generate some IT Systems. + self.it1 = mixer.blend(ITSystem, status=0, owner=self.user1) + self.it2 = mixer.blend(ITSystem, status=1, owner=self.user2) + self.it_leg = mixer.blend(ITSystem, status=2, owner=self.user2) + self.it_dec = mixer.blend(ITSystem, status=3, owner=self.user2) diff --git a/organisation/admin.py b/organisation/admin.py index 4cfcbbd..09ed181 100644 --- a/organisation/admin.py +++ b/organisation/admin.py @@ -58,7 +58,8 @@ class DepartmentUserAdmin(VersionAdmin): readonly_fields = [ 'username', 'email', 'org_data_pretty', 'ad_data_pretty', 'active', 'in_sync', 'ad_deleted', 'date_ad_updated', - 'alesco_data_pretty', 'o365_licence', 'shared_account'] + 'alesco_data_pretty', 'o365_licence', 'shared_account', + 'azure_guid'] fieldsets = ( ('Email/username', { 'fields': ('email', 'username'), @@ -82,9 +83,10 @@ class DepartmentUserAdmin(VersionAdmin): 'secondary_locations', 'working_hours', 'extra_data', ) }), - ('AD sync and HR data (read-only, except GUID)', { + ('AD sync and HR data (read-only, except AD GUID)', { 'fields': ( 'ad_guid', + 'azure_guid', 'active', 'in_sync', 'ad_deleted', 'date_ad_updated', 'o365_licence', 'shared_account', 'org_data_pretty', 'ad_data_pretty', 'alesco_data_pretty', diff --git a/organisation/api.py b/organisation/api.py index 7ecd2c2..a176b58 100644 --- a/organisation/api.py +++ b/organisation/api.py @@ -19,7 +19,7 @@ ACCOUNT_TYPE_DICT = dict(DepartmentUser.ACCOUNT_TYPE_CHOICES) -logger = logging.getLogger('ad_sync') +LOGGER = logging.getLogger('ad_sync') def format_fileField(request, value): @@ -191,37 +191,78 @@ def detail(self, guid): @skip_prepare def create(self): - """Create view for a new DepartmentUserObject. - BUSINESS RULE: we call this endpoint from AD, and require a - complete request body that includes a GUID. + """Call this endpoint from on-prem AD or from Azure AD. + Match either AD-object key values or Departmentuser field names. """ - if 'ObjectGUID' not in self.data: - raise BadRequest('Missing ObjectGUID parameter') + user = DepartmentUser() + # Check for essential request params. + if 'EmailAddress' not in self.data and 'email' not in self.data: + raise BadRequest('Missing email parameter value') + if 'DisplayName' not in self.data and 'name' not in self.data: + raise BadRequest('Missing name parameter value') + if 'SamAccountName' not in self.data and 'username' not in self.data: + raise BadRequest('Missing account name parameter value') + # Required: email, name and sAMAccountName. + if 'EmailAddress' in self.data: + user.email = self.data['EmailAddress'].lower() + elif 'email' in self.data: + user.email = self.data['email'].lower() + if 'DisplayName' in self.data: + user.name = self.data['DisplayName'] + elif 'name' in self.data: + user.name = self.data['name'] + if 'SamAccountName' in self.data: + user.username = self.data['SamAccountName'] + elif 'username' in self.data: + user.username = self.data['username'] + # Optional fields. + if 'Enabled' in self.data: + user.active = self.data['Enabled'] + elif 'active' in self.data: + user.active = self.data['active'] + if 'ObjectGUID' in self.data: + user.ad_guid = self.data['ObjectGUID'] + elif 'ad_guid' in self.data: + user.ad_guid = self.data['ad_guid'] + if 'azure_guid' in self.data: # Exception to the if/elif rule. + user.azure_guid = self.data['azure_guid'] + if 'Distinguishedname' in self.data: + user.ad_dn = self.data['DistinguishedName'] + elif 'ad_dn' in self.data: + user.ad_dn = self.data['ad_dn'] + if 'AccountExpirationDate' in self.data: + user.expiry_date = self.data['AccountExpirationDate'] + elif 'expiry_date' in self.data: + user.expiry_date = self.data['expiry_date'] + if 'Title' in self.data: + user.title = self.data['Title'] + elif 'title' in self.data: + user.title = self.data['title'] + if 'GivenName' in self.data: + user.given_name = self.data['GivenName'] + elif 'given_name' in self.data: + user.given_name = self.data['given_name'] + if 'Surname' in self.data: + user.surname = self.data['Surname'] + elif 'given_name' in self.data: + user.surname = self.data['surname'] + if 'Modified' in self.data: + user.date_ad_updated = self.data['Modified'] + elif 'date_ad_updated' in self.data: + user.date_ad_updated = self.data['date_ad_updated'] + try: - user = DepartmentUser.objects.get_or_create( - ad_guid=self.data['ObjectGUID'], - email=self.data['EmailAddress'].lower(), - ad_dn=self.data['DistinguishedName'], - username=self.data['SamAccountName'], - expiry_date=self.data['AccountExpirationDate'], - active=self.data['Enabled'], - name=self.data['DisplayName'], - title=self.data['Title'], - given_name=self.data['GivenName'], - surname=self.data['Surname'], - date_ad_updated=self.data['Modified'], - )[0] + user.save() except Exception as e: data = self.data data['Error'] = repr(e) - logger.error(repr(e)) + LOGGER.error(repr(e)) return self.formatters.format(self.request, {'Error': repr(e)}) # Serialise the newly-created DepartmentUser. data = list(DepartmentUser.objects.filter(pk=user.pk).values(*self.VALUES_ARGS))[0] - logger.info('Created user {}'.format(user.email)) - logger.info('{} '.format(self.formatters.format(self.request, data))) - + LOGGER.info('Created user {}'.format(user.email)) + LOGGER.info('{} '.format(self.formatters.format(self.request, data))) return self.formatters.format(self.request, data) def update(self, guid): @@ -238,36 +279,60 @@ def update(self, guid): raise BadRequest('Object not found') try: - if 'ObjectGUID' in self.data and self.data['ObjectGUID']: - user.ad_guid = self.data['ObjectGUID'] if 'EmailAddress' in self.data and self.data['EmailAddress']: user.email = self.data['EmailAddress'].lower() - if 'DistinguishedName' in self.data and self.data['DistinguishedName']: - user.ad_dn = self.data['DistinguishedName'] + if 'email' in self.data and self.data['email']: + user.email = self.data['email'].lower() + if 'DisplayName' in self.data and self.data['DisplayName']: + user.name = self.data['DisplayName'] + if 'name' in self.data and self.data['name']: + user.name = self.data['name'] if 'SamAccountName' in self.data and self.data['SamAccountName']: user.username = self.data['SamAccountName'] + if 'username' in self.data and self.data['username']: + user.username = self.data['username'] + if 'ObjectGUID' in self.data and self.data['ObjectGUID']: + user.ad_guid = self.data['ObjectGUID'] + if 'ad_guid' in self.data and self.data['ad_guid']: + user.ad_guid = self.data['ad_guid'] + if 'DistinguishedName' in self.data and self.data['DistinguishedName']: + user.ad_dn = self.data['DistinguishedName'] + if 'ad_dn' in self.data and self.data['ad_dn']: + user.ad_dn = self.data['ad_dn'] if 'AccountExpirationDate' in self.data and self.data['AccountExpirationDate']: user.expiry_date = self.data['AccountExpirationDate'] + if 'expiry_date' in self.data and self.data['expiry_date']: + user.expiry_date = self.data['expiry_date'] if 'Enabled' in self.data: # Boolean; don't only work on True! user.active = self.data['Enabled'] - if 'DisplayName' in self.data and self.data['DisplayName']: - user.name = self.data['DisplayName'] + if 'active' in self.data: # Boolean; don't only work on True! + user.active = self.data['active'] if 'Title' in self.data and self.data['Title']: user.title = self.data['Title'] + if 'title' in self.data and self.data['title']: + user.title = self.data['title'] if 'GivenName' in self.data and self.data['GivenName']: user.given_name = self.data['GivenName'] + if 'given_name' in self.data and self.data['given_name']: + user.given_name = self.data['given_name'] if 'Surname' in self.data and self.data['Surname']: user.surname = self.data['Surname'] + if 'surname' in self.data and self.data['surname']: + user.surname = self.data['surname'] if 'Modified' in self.data and self.data['Modified']: user.date_ad_updated = self.data['Modified'] + if 'date_ad_updated' in self.data and self.data['date_ad_updated']: + user.date_ad_updated = self.data['date_ad_updated'] if 'o365_licence' in self.data: # Boolean; don't only work on True! user.o365_licence = self.data['o365_licence'] + if 'azure_guid' in self.data and self.data['azure_guid']: + user.azure_guid = self.data['azure_guid'] if 'Deleted' in self.data and self.data['Deleted']: user.active = False user.ad_deleted = True - user.ad_guid = '' + user.ad_guid, user.azure_guid = None, None data = list(DepartmentUser.objects.filter(pk=user.pk).values(*self.VALUES_ARGS))[0] - logger.info('Set user {} as deleted in AD'.format(user.name)) + LOGGER.info('Set user {} as deleted in AD'.format(user.name)) else: user.ad_deleted = False user.ad_data = self.data # Store the raw request data. @@ -276,12 +341,12 @@ def update(self, guid): except Exception as e: data = self.data data['Error'] = repr(e) - logger.error(repr(e)) + LOGGER.error(repr(e)) return self.formatters.format(self.request, {'Error': repr(e)}) data = list(DepartmentUser.objects.filter(pk=user.pk).values(*self.VALUES_ARGS))[0] - logger.info('Updated user {}'.format(user.email)) - logger.info('{}'.format(self.formatters.format(self.request, data))) + LOGGER.info('Updated user {}'.format(user.email)) + LOGGER.info('{}'.format(self.formatters.format(self.request, data))) return self.formatters.format(self.request, data) @@ -297,7 +362,7 @@ def org_structure(self, sync_o365=False, exclude_populate_groups=False): qs = qs.exclude(populate_primary_group=False) structure = [] if sync_o365: # Exclude certain things from populating O365/AD - orgunits = OrgUnit.objects.filter(unit_type__in=[0, 1], sync_o365=True) + orgunits = OrgUnit.objects.filter(active=True, unit_type__in=[0, 1], sync_o365=True) costcentres = [] locations = Location.objects.filter(active=True) slocations = [] diff --git a/organisation/migrations/0030_departmentuser_azure_guid.py b/organisation/migrations/0030_departmentuser_azure_guid.py new file mode 100644 index 0000000..1ca1f4c --- /dev/null +++ b/organisation/migrations/0030_departmentuser_azure_guid.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.10.7 on 2017-07-12 08:54 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('organisation', '0029_auto_20170707_1224'), + ] + + operations = [ + migrations.AddField( + model_name='departmentuser', + name='azure_guid', + field=models.CharField(blank=True, help_text='Azure AD GUID.', max_length=48, null=True, unique=True), + ), + ] diff --git a/organisation/migrations/0031_auto_20170713_1406.py b/organisation/migrations/0031_auto_20170713_1406.py new file mode 100644 index 0000000..dce51c8 --- /dev/null +++ b/organisation/migrations/0031_auto_20170713_1406.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.10.7 on 2017-07-13 06:06 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('organisation', '0030_departmentuser_azure_guid'), + ] + + operations = [ + migrations.AlterField( + model_name='departmentuser', + name='name', + field=models.CharField(help_text='Format: [Given name] [Surname]', max_length=128), + ), + ] diff --git a/organisation/models.py b/organisation/models.py index ed16084..901165f 100644 --- a/organisation/models.py +++ b/organisation/models.py @@ -59,7 +59,7 @@ class DepartmentUser(MPTTModel): (2, 'Casual'), (3, 'Other'), ) - # These fields are populated from Active Directory. + date_created = models.DateTimeField(auto_now_add=True) date_updated = models.DateTimeField(auto_now=True) cost_centre = models.ForeignKey( @@ -79,6 +79,8 @@ class DepartmentUser(MPTTModel): ad_guid = models.CharField( max_length=48, unique=True, null=True, blank=True, help_text='Locally stored GUID. This field must match GUID in the AD object for sync to be successful') + azure_guid = models.CharField( + max_length=48, unique=True, null=True, blank=True, help_text='Azure AD GUID.') ad_dn = models.CharField(max_length=512, unique=True, null=True, blank=True, editable=False) ad_data = JSONField(null=True, blank=True, editable=False) org_data = JSONField(null=True, blank=True, editable=False) @@ -89,7 +91,7 @@ class DepartmentUser(MPTTModel): username = models.CharField( max_length=128, editable=False, unique=True, help_text='Pre-Windows 2000 login username.') - name = models.CharField(max_length=128, help_text='Format: Surname, Given name') + name = models.CharField(max_length=128, help_text='Format: [Given name] [Surname]') given_name = models.CharField( max_length=128, null=True, help_text='Legal first name (matches birth certificate/password/etc.)') diff --git a/organisation/test_api.py b/organisation/test_api.py new file mode 100644 index 0000000..12dcec5 --- /dev/null +++ b/organisation/test_api.py @@ -0,0 +1,378 @@ +from __future__ import unicode_literals, absolute_import +from datetime import datetime +import json +from mixer.backend.django import mixer +from oim_cms.test_api import ApiTestCase +from uuid import uuid1 + +from organisation.models import DepartmentUser, Location + + +class ProfileTestCase(ApiTestCase): + url = '/api/profile/' + + def test_profile_api_get(self): + """Test the profile API endpoint GET response + """ + response = self.client.get(self.url) + self.assertEqual(response.status_code, 200) + + def test_profile_api_post(self): + """Test the profile API endpoint GET response + """ + response = self.client.get(self.url) + j = response.json() + obj = j['objects'][0] + self.assertFalse(obj['telephone']) + tel = '9111 1111' + response = self.client.post(self.url, {'telephone': tel}) + self.assertEqual(response.status_code, 200) + j = response.json() + obj = j['objects'][0] + self.assertEqual(obj['telephone'], tel) + + def test_profile_api_anon(self): + """Test that anonymous users can't use the profile endpoint + """ + self.client.logout() + response = self.client.get(self.url) + self.assertEqual(response.status_code, 403) + + +class OptionResourceTestCase(ApiTestCase): + + def test_data_org_structure(self): + """Test the data_org_structure API endpoint + """ + url = '/api/options/?list=org_structure' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # Division 1 will be present in the response. + self.assertContains(response, self.div1.name) + # Response can be deserialised into a dict. + r = response.json() + self.assertTrue(isinstance(r, dict)) + # Deserialised response contains a list. + self.assertTrue(isinstance(r['objects'], list)) + # Make OrgUnit inactive to test exclusion. + self.branch1.active = False + self.branch1.save() + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # Division 1 won't be present in the response. + self.assertNotContains(response, self.branch1.name) + + def test_data_cost_centre(self): + """Test the data_cost_centre API endpoint + """ + url = '/api/options/?list=cost_centre' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # 001 will be present in the response. + self.assertContains(response, self.cc1.code) + # Add 'inactive' to Division 1 name to inactivate the CC. + self.div1.name = 'Division 1 (inactive)' + self.div1.save() + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # 001 won't be present in the response. + self.assertNotContains(response, self.cc1.code) + + def test_data_org_unit(self): + """Test the data_org_unit API endpoint + """ + url = '/api/options/?list=org_unit' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # Org unit names will be present in the response. + self.assertContains(response, self.dept.name) + self.assertContains(response, self.div1.name) + self.assertContains(response, self.div2.name) + + def test_data_dept_user(self): + """Test the data_dept_user API endpoint + """ + url = '/api/options/?list=dept_user' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # User 1 will be present in the response. + self.assertContains(response, self.user1.email) + # Make a user inactive to test excludion + self.user1.active = False + self.user1.save() + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # User 1 won't be present in the response. + self.assertNotContains(response, self.user1.email) + + +class DepartmentUserResourceTestCase(ApiTestCase): + + def test_list(self): + """Test the DepartmentUserResource list responses + """ + url = '/api/users/' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + r = response.json() + self.assertTrue(isinstance(r['objects'], list)) + # Response should not contain inactive, contractors or shared accounts. + self.assertContains(response, self.user1.email) + self.assertNotContains(response, self.del_user.email) + self.assertNotContains(response, self.contract_user.email) + self.assertNotContains(response, self.shared.email) + # Test the compact response. + url = '/api/users/?compact=true' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # Test the minimal response. + url = '/api/users/?minimal=true' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + + def test_list_filtering(self): + """Test the DepartmentUserResource filtered list responses + """ + # Test the "all" response. + url = '/api/users/?all=true' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertContains(response, self.contract_user.email) + self.assertContains(response, self.del_user.email) + self.assertContains(response, self.shared.email) + # Test filtering by ad_deleted. + url = '/api/users/?ad_deleted=true' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertContains(response, self.del_user.email) + self.assertNotContains(response, self.user1.email) + url = '/api/users/?ad_deleted=false' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertNotContains(response, self.del_user.email) + self.assertContains(response, self.user1.email) + # Test filtering by email (should return only one object). + url = '/api/users/?email={}'.format(self.user1.email) + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + j = response.json() + self.assertEqual(len(j['objects']), 1) + self.assertContains(response, self.user1.email) + self.assertNotContains(response, self.user2.email) + # Test filtering by GUID (should return only one object). + url = '/api/users/?ad_guid={}'.format(self.user1.ad_guid) + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + j = response.json() + self.assertEqual(len(j['objects']), 1) + self.assertContains(response, self.user1.email) + self.assertNotContains(response, self.user2.email) + # Test filtering by cost centre (should return all, inc. inactive and contractors). + url = '/api/users/?cost_centre={}'.format(self.cc2.code) + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertContains(response, self.user2.email) + self.assertContains(response, self.contract_user.email) + self.assertContains(response, self.del_user.email) + self.assertNotContains(response, self.user1.email) + self.assertNotContains(response, self.shared.email) # Belongs to CC1. + # Test filtering by O365 licence status. + self.user1.o365_licence = True + self.user1.save() + url = '/api/users/?o365_licence=true' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertContains(response, self.user1.email) + self.assertNotContains(response, self.user2.email) + + def test_detail(self): + """Test the DepartmentUserResource detail response + """ + # Test detail URL using ad_guid. + url = '/api/users/{}/'.format(self.user1.ad_guid) + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # Test URL using email also. + url = '/api/users/{}/'.format(self.user1.email.lower()) + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + + def test_org_structure(self): + """Test the DepartmentUserResource org_structure response + """ + url = '/api/users/?org_structure=true' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # User 1 will be present in the response. + self.assertContains(response, self.user1.email) + # Division 1 will be present in the response. + self.assertContains(response, self.div1.name) + + def test_org_structure_sync_0365(self): + """Test the sync_o365=true request parameter + """ + self.div1.sync_o365 = False + self.div1.save() + url = '/api/users/?org_structure=true&sync_o365=true' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # Division 1 won't be present in the response. + self.assertNotContains(response, self.div1.name) + + def test_org_structure_populate_groups_members(self): + """Test populate_groups=true request parameter + """ + url = '/api/users/?org_structure=true&populate_groups=true' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # User 3 will be present in the response. + self.assertContains(response, self.user3.email) + self.user3.populate_primary_group = False + self.user3.save() + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # User 3 won't be present in the response. + self.assertNotContains(response, self.user3.email) + + def test_create_invalid(self): + """Test the DepartmentUserResource create response with missing data + """ + url = '/api/users/' + data = {} + username = str(uuid1())[:8] + # Response should be status 400 where essential parameters are missing. + response = self.client.post(url, json.dumps(data), content_type='application/json') + self.assertEqual(response.status_code, 400) + data['EmailAddress'] = '{}@dpaw.wa.gov.au'.format(username) + response = self.client.post(url, json.dumps(data), content_type='application/json') + self.assertEqual(response.status_code, 400) + data['DisplayName'] = 'Doe, John' + response = self.client.post(url, json.dumps(data), content_type='application/json') + self.assertEqual(response.status_code, 400) + data['SamAccountName'] = username + response = self.client.post(url, json.dumps(data), content_type='application/json') + self.assertEqual(response.status_code, 201) # Now valid. + + def test_create_valid(self): + """Test the DepartmentUserResource create response with valid data + """ + url = '/api/users/' + username = str(uuid1())[:8] + data = { + 'EmailAddress': '{}@dpaw.wa.gov.au'.format(username), + 'DisplayName': 'Doe, John', + 'SamAccountName': username, + 'DistinguishedName': 'CN={},OU=Users,DC=domain'.format(username), + 'AccountExpirationDate': datetime.now().isoformat(), + 'Enabled': True, + 'ObjectGUID': str(uuid1()), + 'GivenName': 'John', + 'Surname': 'Doe', + 'Title': 'Content Creator', + 'Modified': datetime.now().isoformat(), + } + response = self.client.post(url, json.dumps(data), content_type='application/json') + self.assertEqual(response.status_code, 201) + # A DepartmentUser with that email should now exist. + self.assertTrue(DepartmentUser.objects.filter(email=data['EmailAddress']).exists()) + + def test_create_valid_alt(self): + """Test the DepartmentUserResource create response with alternate parameter names + """ + url = '/api/users/' + username = str(uuid1())[:8] + data = { + 'email': '{}@dpaw.wa.gov.au'.format(username), + 'name': 'Doe, John', + 'username': username, + 'ad_dn': 'CN={},OU=Users,DC=domain'.format(username), + 'expiry_date': datetime.now().isoformat(), + 'active': True, + 'ad_guid': str(uuid1()), + 'given_name': 'John', + 'surname': 'Doe', + 'title': 'Content Creator', + 'date_ad_updated': datetime.now().isoformat(), + } + response = self.client.post(url, json.dumps(data), content_type='application/json') + self.assertEqual(response.status_code, 201) + self.assertTrue(DepartmentUser.objects.filter(email=data['email']).exists()) + + def test_update(self): + """Test the DepartmentUserResource update response + """ + self.assertFalse(self.user1.o365_licence) + url = '/api/users/{}/'.format(self.user1.ad_guid) + data = { + 'Surname': 'Lebowski', + 'title': 'Bean Counter', + 'o365_licence': True, + } + response = self.client.put(url, json.dumps(data), content_type='application/json') + self.assertEqual(response.status_code, 202) + user = DepartmentUser.objects.get(pk=self.user1.pk) # Refresh from db + self.assertEqual(user.surname, data['Surname']) + self.assertEqual(user.title, data['title']) + self.assertTrue(user.o365_licence) + self.assertTrue(user.in_sync) + + def test_disable(self): + """Test the DepartmentUserResource update response (set user as inactive) + """ + self.assertTrue(self.user1.active) + self.assertFalse(self.user1.ad_deleted) + url = '/api/users/{}/'.format(self.user1.ad_guid) + data = { + 'Enabled': False, + } + response = self.client.put(url, json.dumps(data), content_type='application/json') + self.assertEqual(response.status_code, 202) + user = DepartmentUser.objects.get(pk=self.user1.pk) # Refresh from db + self.assertFalse(user.ad_deleted) + self.assertFalse(user.active) + self.assertTrue(user.in_sync) + + def test_delete(self): + """Test the DepartmentUserResource update response (set user as 'AD deleted') + """ + self.assertFalse(self.user1.ad_deleted) + self.assertTrue(self.user1.active) + url = '/api/users/{}/'.format(self.user1.ad_guid) + data = {'Deleted': True} + response = self.client.put(url, json.dumps(data), content_type='application/json') + self.assertEqual(response.status_code, 202) + user = DepartmentUser.objects.get(pk=self.user1.pk) # Refresh from db + self.assertTrue(user.ad_deleted) + self.assertFalse(user.active) + self.assertTrue(user.in_sync) + # Also delete a second object, to check for silly 'empty string' collisions. + url = '/api/users/{}/'.format(self.user2.ad_guid) + response = self.client.put(url, json.dumps(data), content_type='application/json') + self.assertEqual(response.status_code, 202) + + +class LocationResourceTestCase(ApiTestCase): + + def test_list(self): + """Test the LocationResource list response + """ + loc_inactive = mixer.blend(Location, manager=None, active=False) + url = '/api/locations/' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # Response should not contain the inactive Location. + self.assertNotContains(response, loc_inactive.name) + + def test_filter(self): + """Test the LocationResource filtered response + """ + url = '/api/locations/?location_id={}'.format(self.loc1.pk) + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertContains(response, self.loc1.name) + # We can still return inactive locations by ID + loc_inactive = mixer.blend(Location, manager=None, active=False) + url = '/api/locations/?location_id={}'.format(loc_inactive.pk) + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertContains(response, loc_inactive.name) diff --git a/registers/admin.py b/registers/admin.py index 1d46d48..0035862 100644 --- a/registers/admin.py +++ b/registers/admin.py @@ -1,7 +1,7 @@ from __future__ import unicode_literals, absolute_import from django import forms from django.conf.urls import url -from django.contrib.admin import register +from django.contrib.admin import register, ModelAdmin from django.http import HttpResponse from django.template.response import TemplateResponse from reversion.admin import VersionAdmin @@ -13,7 +13,8 @@ from .models import ( UserGroup, ITSystemHardware, Platform, ITSystem, ITSystemDependency, Backup, BusinessService, BusinessFunction, BusinessProcess, - ProcessITSystemRelationship) + ProcessITSystemRelationship, ITSystemEvent) +from .utils import smart_truncate @register(UserGroup) @@ -71,10 +72,15 @@ def export(self, request): @register(Platform) class PlatformAdmin(VersionAdmin): - list_display = ('name', 'category') + list_display = ('name', 'category', 'it_systems') list_filter = ('category',) search_fields = ('name',) + def it_systems(self, obj): + # Exclude decommissioned systems from the count. + return obj.itsystem_set.all().exclude(status=3).count() + it_systems.short_description = 'IT Systems' + class ITSystemForm(forms.ModelForm): @@ -305,3 +311,26 @@ class ProcessITSystemRelationshipAdmin(VersionAdmin): list_display = ('process', 'itsystem', 'importance') list_filter = ('importance', 'process', 'itsystem') search_fields = ('process__name', 'itsystem__name') + + +@register(ITSystemEvent) +class ITSystemEventAdmin(ModelAdmin): + filter_horizontal = ('it_systems', 'locations') + list_display = ( + 'id', 'event_type', 'description_trunc', 'start', 'duration', 'end', + 'it_systems_affected', 'locations_affected') + list_filter = ('event_type', 'planned', 'current') + search_fields = ('description', 'it_systems__name', 'locations__name') + date_hierarchy = 'start' + + def description_trunc(self, obj): + return smart_truncate(obj.description) + description_trunc.short_description = 'description' + + def it_systems_affected(self, obj): + return ', '.join([i.name for i in obj.it_systems.all()]) + it_systems_affected.short_description = 'IT Systems' + + def locations_affected(self, obj): + return ', '.join([i.name for i in obj.locations.all()]) + locations_affected.short_description = 'locations' diff --git a/registers/api.py b/registers/api.py index edbfb80..f570c2e 100644 --- a/registers/api.py +++ b/registers/api.py @@ -1,10 +1,14 @@ from __future__ import unicode_literals, absolute_import from babel.dates import format_timedelta from django.conf import settings +from django.conf.urls import url import itertools from oim_cms.utils import CSVDjangoResource +from restless.dj import DjangoResource +from restless.preparers import FieldsPreparer +from restless.resources import skip_prepare -from .models import ITSystem, ITSystemHardware +from .models import ITSystem, ITSystemHardware, ITSystemEvent class ITSystemResource(CSVDjangoResource): @@ -204,3 +208,59 @@ def prepare(self, data): def list(self): return ITSystemHardware.objects.all() + + +class ITSystemEventResource(DjangoResource): + def __init__(self, *args, **kwargs): + super(ITSystemEventResource, self).__init__(*args, **kwargs) + self.http_methods.update({ + 'current': {'GET': 'current'} + }) + + preparer = FieldsPreparer(fields={ + 'id': 'id', + 'description': 'description', + 'planned': 'planned', + 'start': 'start', + 'end': 'end', + 'current': 'current', + }) + + def prepare(self, data): + prepped = super(ITSystemEventResource, self).prepare(data) + prepped['event_type'] = data.get_event_type_display() + if data.duration: + prepped['duration_sec'] = data.duration.seconds + else: + prepped['duration_sec'] = None + if data.it_systems: + prepped['it_systems'] = [i.name for i in data.it_systems.all()] + else: + prepped['it_systems'] = None + if data.locations: + prepped['locations'] = [i.name for i in data.locations.all()] + else: + prepped['locations'] = None + return prepped + + @skip_prepare + def current(self): + # Slightly-expensive query: iterate over each 'current' event and call save(). + # This should automatically expire any events that need to be non-current. + for i in ITSystemEvent.objects.filter(current=True): + i.save() + # Return prepared data. + return {'objects': [self.prepare(data) for data in ITSystemEvent.objects.filter(current=True)]} + + def list(self): + return ITSystemEvent.objects.all() + + def detail(self, pk): + return ITSystemEvent.objects.get(pk=pk) + + @classmethod + def urls(self, name_prefix=None): + urlpatterns = super(ITSystemEventResource, self).urls(name_prefix=name_prefix) + return [ + url(r'^current/$', self.as_view('current'), name=self.build_url_name('current', name_prefix)), + ] + urlpatterns diff --git a/registers/migrations/0019_itsystemevent.py b/registers/migrations/0019_itsystemevent.py new file mode 100644 index 0000000..afe26fe --- /dev/null +++ b/registers/migrations/0019_itsystemevent.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.10.7 on 2017-07-10 07:29 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('registers', '0018_auto_20170530_1308'), + ] + + operations = [ + migrations.CreateModel( + name='ITSystemEvent', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('event_type', models.PositiveSmallIntegerField(choices=[(1, 'Incident'), (2, 'Maintenance'), (3, 'Information')])), + ('description', models.TextField()), + ('planned', models.BooleanField(default=False, help_text='Was this event planned?')), + ('start', models.DateTimeField(help_text='Event start (date & time)')), + ('duration', models.DurationField(blank=True, help_text='Optional: duration of the event (hh:mm:ss).', null=True)), + ('end', models.DateTimeField(blank=True, help_text='Optional: event end (date & time)', null=True)), + ('current', models.BooleanField(default=True, editable=False)), + ], + options={ + 'verbose_name': 'IT System event', + }, + ), + ] diff --git a/registers/migrations/0020_auto_20170713_1406.py b/registers/migrations/0020_auto_20170713_1406.py new file mode 100644 index 0000000..ad5e0a0 --- /dev/null +++ b/registers/migrations/0020_auto_20170713_1406.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.10.7 on 2017-07-13 06:06 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('organisation', '0031_auto_20170713_1406'), + ('registers', '0019_itsystemevent'), + ] + + operations = [ + migrations.AddField( + model_name='itsystemevent', + name='it_systems', + field=models.ManyToManyField(blank=True, help_text='IT System(s) affect by this event', to='registers.ITSystem'), + ), + migrations.AddField( + model_name='itsystemevent', + name='locations', + field=models.ManyToManyField(blank=True, help_text='Location(s) affect by this event', to='organisation.Location'), + ), + ] diff --git a/registers/models.py b/registers/models.py index 91164cd..eff4a67 100644 --- a/registers/models.py +++ b/registers/models.py @@ -8,8 +8,9 @@ from django.utils.encoding import python_2_unicode_compatible from django.utils.safestring import mark_safe -from organisation.models import DepartmentUser +from organisation.models import DepartmentUser, Location from tracking.models import CommonFields, Computer +from .utils import smart_truncate CRITICALITY_CHOICES = ( @@ -576,3 +577,41 @@ class Meta: def __str__(self): return '{} - {} ({})'.format( self.itsystem.name, self.process.name, self.get_importance_display()) + + +@python_2_unicode_compatible +class ITSystemEvent(models.Model): + """Represents information about an event that affects one or more IT Systems + or networked locations. + """ + EVENT_TYPE_CHOICES = ( + (1, 'Incident'), + (2, 'Maintenance'), + (3, 'Information'), + ) + event_type = models.PositiveSmallIntegerField(choices=EVENT_TYPE_CHOICES) + description = models.TextField() + planned = models.BooleanField(default=False, help_text='Was this event planned?') + start = models.DateTimeField(help_text='Event start (date & time)') + duration = models.DurationField(null=True, blank=True, help_text='Optional: duration of the event (hh:mm:ss).') + end = models.DateTimeField(null=True, blank=True, help_text='Optional: event end (date & time)') + current = models.BooleanField(default=True, editable=False) + it_systems = models.ManyToManyField(ITSystem, blank=True, help_text='IT System(s) affect by this event') + locations = models.ManyToManyField(Location, blank=True, help_text='Location(s) affect by this event') + # TODO: incident type (optional: P1, P2, P3, P4) + # TODO: FD ticket (optional) + + class Meta: + verbose_name = 'IT System event' + + def __str__(self): + return '{}: {}'.format(self.get_event_type_display(), smart_truncate(self.description)) + + def save(self, *args, **kwargs): + # On save, set the `current` boolean field value correctly for the this instant. + # An event needs either an end datestamp and/or a duration to set `current`. + if self.end and self.end < timezone.now(): + self.current = False + elif self.duration and (self.start + self.duration) < timezone.now(): + self.current = False + super(ITSystemEvent, self).save(*args, **kwargs) diff --git a/registers/test_api.py b/registers/test_api.py new file mode 100644 index 0000000..9b9d093 --- /dev/null +++ b/registers/test_api.py @@ -0,0 +1,70 @@ +from __future__ import unicode_literals, absolute_import +from datetime import timedelta +from django.utils import timezone +from mixer.backend.django import mixer +from oim_cms.test_api import ApiTestCase + +from .models import ITSystemEvent + + +class ITSystemResourceTestCase(ApiTestCase): + + def test_list(self): + """Test the ITSystemResource list response + """ + url = '/api/itsystems/' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # The 'development' & decommissioned IT systems won't be in the response. + self.assertNotContains(response, self.it2.name) + self.assertNotContains(response, self.it_dec.name) + # Test all request parameter. + url = '/api/itsystems/?all' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # The 'development' IT system will be in the response. + self.assertContains(response, self.it2.name) + # Test filtering by system_id + url = '/api/itsystems/?system_id={}'.format(self.it1.system_id) + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertContains(response, self.it1.name) + self.assertNotContains(response, self.it2.name) + + +class ITSystemHardwareResourceTestCase(ApiTestCase): + + def test_list(self): + url = '/api/itsystem-hardware/' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # The 'decommissioned' IT system won't be in the response. + self.assertNotContains(response, self.it_dec.name) + + +class ITSystemEventResourceTestCase(ApiTestCase): + + def setUp(self): + super(ITSystemEventResourceTestCase, self).setUp() + # Create some events + self.event_current = mixer.blend(ITSystemEvent, planned=False, start=timezone.now()) + self.event_past = mixer.blend( + ITSystemEvent, planned=True, start=timezone.now() - timedelta(hours=1), + end=timezone.now(), current=False) + + def test_list(self): + url = '/api/events/' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + + def test_detail(self): + url = '/api/events/{}/'.format(self.event_current.pk) + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + + def test_current(self): + url = '/api/events/current/' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + # The 'non-current' event won't be in the response. + self.assertNotContains(response, self.event_past.description) diff --git a/registers/utils.py b/registers/utils.py index 0d33a62..e8ec111 100644 --- a/registers/utils.py +++ b/registers/utils.py @@ -1,4 +1,5 @@ from django.contrib.admin import ModelAdmin +from django.utils.encoding import smart_text class OimModelAdmin(ModelAdmin): @@ -14,3 +15,14 @@ def has_module_permission(self, request): return True return False + + +def smart_truncate(content, length=100, suffix='....(more)'): + """Small function to truncate a string in a sensible way, sourced from: + http://stackoverflow.com/questions/250357/smart-truncate-in-python + """ + content = smart_text(content) + if len(content) <= length: + return content + else: + return ' '.join(content[:length + 1].split(' ')[0:-1]) + suffix diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..fd83856 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,7 @@ +[pep8] +max-line-length = 160 + + +[flake8] +ignore = E501 + diff --git a/tracking/test_api.py b/tracking/test_api.py new file mode 100644 index 0000000..2f6854e --- /dev/null +++ b/tracking/test_api.py @@ -0,0 +1,49 @@ +from __future__ import unicode_literals, absolute_import +from oim_cms.test_api import ApiTestCase, random_dpaw_email +from mixer.backend.django import mixer + +from .models import FreshdeskTicket, FreshdeskContact + + +class FreshdeskTicketResourceTestCase(ApiTestCase): + + def setUp(self): + """Generate from FreshdeskTicket objects. + """ + super(FreshdeskTicketResourceTestCase, self).setUp() + mixer.cycle(5).blend( + FreshdeskContact, email=random_dpaw_email) + mixer.cycle(5).blend( + FreshdeskTicket, + subject=mixer.RANDOM, description_text=mixer.RANDOM, type='Test', + freshdesk_requester=mixer.SELECT, + it_system=mixer.SELECT, + custom_fields={ + 'support_category': None, 'support_subcategory': None}, + ) + self.ticket = FreshdeskTicket.objects.first() + + def test_list(self): + """Test the FreshdeskTicketResource list response + """ + url = '/api/freshdesk_tickets/' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertContains(response, self.ticket.subject) + + def test_list_filtering(self): + """Test the FreshdeskTicketResource filtered list response + """ + self.ticket.type = 'Incident' + self.ticket.save() + url = '/api/freshdesk_tickets/?type=Test' + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertNotContains(response, self.ticket.subject) + + def test_detail(self): + """Test the FreshdeskTicketResource detail response + """ + url = '/api/freshdesk_tickets/{}/'.format(self.ticket.ticket_id) + response = self.client.get(url) + self.assertEqual(response.status_code, 200)