Skip to content

Commit

Permalink
process_tracker_python-16 Add Audit Information
Browse files Browse the repository at this point in the history
✨ Added basic record counts for process and process_run.
✨ Added full_filename variable to ExtractTracker object, for usability
  • Loading branch information
OpenDataAlex committed May 17, 2019
1 parent 56f07b8 commit c0e7d2a
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 3 deletions.
5 changes: 3 additions & 2 deletions process_tracker/extract_tracker.py
@@ -1,15 +1,15 @@
# Extract Tracking
# Used in the creation and editing of extract records. Used in conjunction with process tracking.
from datetime import datetime
from os.path import basename, normpath
from os.path import basename, join, normpath

from process_tracker.data_store import DataStore

from process_tracker.models.extract import Extract, ExtractProcess, ExtractStatus, Location


class ExtractTracker:
# TODO: Add filename/path variable

def __init__(self, process_run, filename, location_path, location_name=None):
"""
ExtractTracker is the primary engine for tracking data extracts
Expand All @@ -33,6 +33,7 @@ def __init__(self, process_run, filename, location_path, location_name=None):

self.source = self.process_run.source
self.filename = filename
self.full_filename = join(location_path, filename)

self.location = self.data_store.get_or_create(model=Location
, location_name=location_name
Expand Down
39 changes: 39 additions & 0 deletions process_tracker/process_tracker.py
Expand Up @@ -253,3 +253,42 @@ def register_new_process_run(self):
else:
raise Exception('The process %s is currently running.' % self.process_name)
exit()

def set_process_run_low_high_dates(self, low_date=None, high_date=None):
"""
For the given process run, set the process_run_low_date_time and/or process_run_high_date_time.
:param low_date: For the set of data being processed, the lowest datetime tracked. If set multiple times within
a run, will compare the new to old and adjust accordingly.
:type low_date: datetime
:param high_date: For the set of data being processed, the highest datetime tracked.
:type high_date: datetime
:return:
"""
previous_low_date_time = self.process_tracking_run.process_run_low_date_time
previous_high_date_time = self.process_tracking_run.process_run_low_date_time

if low_date is not None and (previous_low_date_time is None or low_date < previous_low_date_time):
self.process_tracking_run.process_run_low_date_time = low_date

if high_date is not None and (previous_high_date_time is None or high_date > previous_high_date_time):
self.process_tracking_run.process_run_high_date_time = high_date

self.session.commit()

def set_process_run_record_count(self, num_records):
"""
For the given process run, set the process_run_record_count for the number of records processed. Will also
update the process' total_record_count - the total number of records ever processed by that process
:param num_records:
:return:
"""
process_run_records = self.process.total_record_count

if process_run_records == 0:

self.process.total_record_count += num_records
else:
self.process.total_record_count = self.process.total_record_count + (num_records - process_run_records)

self.process_tracking_run.process_run_record_count = num_records
self.session.commit()
100 changes: 99 additions & 1 deletion tests/test_process_tracker.py
@@ -1,6 +1,6 @@
# Tests for validating process_tracking works as expected.

from datetime import datetime
from datetime import datetime, timedelta
import unittest

from sqlalchemy.orm import Session
Expand Down Expand Up @@ -441,3 +441,101 @@ def test_raise_run_error_with_fail(self):
with self.subTest():
self.assertTrue('Process halting. An error triggered the process to fail.' in str(context.exception))

def test_set_run_low_high_dates(self):
"""
Testing that if low and high date are not set, the process_tracking_record low/high dates are set.
:return:
"""
low_date = datetime.now() - timedelta(hours=1)
high_date = datetime.now()

self.process_tracker.set_process_run_low_high_dates(low_date=low_date, high_date=high_date)

given_dates = self.session.query(ProcessTracking.process_run_low_date_time, ProcessTracking.process_run_high_date_time)\
.filter(ProcessTracking.process_tracking_id == self.process_tracker.process_tracking_run.process_tracking_id)

expected_result = [low_date, high_date]
given_result = [given_dates[0].process_run_low_date_time, given_dates[0].process_run_high_date_time]

self.assertEqual(expected_result, given_result)

def test_set_run_low_high_dates_lower_low_date(self):
"""
Testing that if a new low date comes in for a given process_run, set the process_run_low_date_time to the new
low date.
:return:
"""
low_date = datetime.now() - timedelta(hours=1)
lower_low_date = low_date - timedelta(hours=1)

self.process_tracker.set_process_run_low_high_dates(low_date=low_date)

self.process_tracker.set_process_run_low_high_dates(low_date=lower_low_date)

given_dates = self.session.query(ProcessTracking.process_run_low_date_time) \
.filter(ProcessTracking.process_tracking_id == self.process_tracker.process_tracking_run.process_tracking_id)

expected_result = lower_low_date
given_result = given_dates[0].process_run_low_date_time

self.assertEqual(expected_result, given_result)

def test_set_run_low_high_dates_higher_high_date(self):
"""
Testing that if a new low date comes in for a given process_run, set the process_run_low_date_time to the new
low date.
:return:
"""
high_date = datetime.now()
higher_high_date = high_date + timedelta(hours=1)

self.process_tracker.set_process_run_low_high_dates(high_date=high_date)

self.process_tracker.set_process_run_low_high_dates(high_date=higher_high_date)

given_dates = self.session.query(ProcessTracking.process_run_high_date_time) \
.filter(ProcessTracking.process_tracking_id == self.process_tracker.process_tracking_run.process_tracking_id)

expected_result = higher_high_date
given_result = given_dates[0].process_run_high_date_time

self.assertEqual(expected_result, given_result)

def test_set_process_run_record_count(self):
"""
Testing that if record counts are provided for a given process_run, set the process_run_record_count and process'
total_record_counts correctly.
:return:
"""
initial_record_count = 1000

self.process_tracker.set_process_run_record_count(num_records=initial_record_count)

given_counts = self.session.query(ProcessTracking.process_run_record_count, Process.total_record_count) \
.join(Process)\
.filter(ProcessTracking.process_tracking_id == self.process_tracker.process_tracking_run.process_tracking_id)

expected_result = [initial_record_count, initial_record_count]
given_result = [given_counts[0].process_run_record_count, given_counts[0].total_record_count]

self.assertEqual(expected_result, given_result)

def test_set_process_run_record_count_twice(self):
"""
Testing that if record counts get set multiple times, then the process total record count will be set correctly.
:return:
"""
initial_record_count = 1000
modified_record_count = 1500

self.process_tracker.set_process_run_record_count(num_records=initial_record_count)
self.process_tracker.set_process_run_record_count(num_records=modified_record_count)

given_counts = self.session.query(ProcessTracking.process_run_record_count, Process.total_record_count) \
.join(Process)\
.filter(ProcessTracking.process_tracking_id == self.process_tracker.process_tracking_run.process_tracking_id)

expected_result = [modified_record_count, modified_record_count]
given_result = [given_counts[0].process_run_record_count, given_counts[0].total_record_count]

self.assertEqual(expected_result, given_result)

0 comments on commit c0e7d2a

Please sign in to comment.