diff --git a/process_tracker/extract_tracker.py b/process_tracker/extract_tracker.py index 7942b67..5e9f63e 100755 --- a/process_tracker/extract_tracker.py +++ b/process_tracker/extract_tracker.py @@ -1,7 +1,7 @@ # Extract Tracking # Used in the creation and editing of extract records. Used in conjunction with process tracking. from datetime import datetime -from os.path import basename, normpath +from os.path import basename, join, normpath from process_tracker.data_store import DataStore @@ -9,7 +9,7 @@ class ExtractTracker: -# TODO: Add filename/path variable + def __init__(self, process_run, filename, location_path, location_name=None): """ ExtractTracker is the primary engine for tracking data extracts @@ -33,6 +33,7 @@ def __init__(self, process_run, filename, location_path, location_name=None): self.source = self.process_run.source self.filename = filename + self.full_filename = join(location_path, filename) self.location = self.data_store.get_or_create(model=Location , location_name=location_name diff --git a/process_tracker/process_tracker.py b/process_tracker/process_tracker.py index adf60b5..3bd89d9 100755 --- a/process_tracker/process_tracker.py +++ b/process_tracker/process_tracker.py @@ -253,3 +253,42 @@ def register_new_process_run(self): else: raise Exception('The process %s is currently running.' % self.process_name) exit() + + def set_process_run_low_high_dates(self, low_date=None, high_date=None): + """ + For the given process run, set the process_run_low_date_time and/or process_run_high_date_time. + :param low_date: For the set of data being processed, the lowest datetime tracked. If set multiple times within + a run, will compare the new to old and adjust accordingly. + :type low_date: datetime + :param high_date: For the set of data being processed, the highest datetime tracked. + :type high_date: datetime + :return: + """ + previous_low_date_time = self.process_tracking_run.process_run_low_date_time + previous_high_date_time = self.process_tracking_run.process_run_low_date_time + + if low_date is not None and (previous_low_date_time is None or low_date < previous_low_date_time): + self.process_tracking_run.process_run_low_date_time = low_date + + if high_date is not None and (previous_high_date_time is None or high_date > previous_high_date_time): + self.process_tracking_run.process_run_high_date_time = high_date + + self.session.commit() + + def set_process_run_record_count(self, num_records): + """ + For the given process run, set the process_run_record_count for the number of records processed. Will also + update the process' total_record_count - the total number of records ever processed by that process + :param num_records: + :return: + """ + process_run_records = self.process.total_record_count + + if process_run_records == 0: + + self.process.total_record_count += num_records + else: + self.process.total_record_count = self.process.total_record_count + (num_records - process_run_records) + + self.process_tracking_run.process_run_record_count = num_records + self.session.commit() diff --git a/tests/test_process_tracker.py b/tests/test_process_tracker.py index fac9431..9fd132a 100755 --- a/tests/test_process_tracker.py +++ b/tests/test_process_tracker.py @@ -1,6 +1,6 @@ # Tests for validating process_tracking works as expected. -from datetime import datetime +from datetime import datetime, timedelta import unittest from sqlalchemy.orm import Session @@ -441,3 +441,101 @@ def test_raise_run_error_with_fail(self): with self.subTest(): self.assertTrue('Process halting. An error triggered the process to fail.' in str(context.exception)) + def test_set_run_low_high_dates(self): + """ + Testing that if low and high date are not set, the process_tracking_record low/high dates are set. + :return: + """ + low_date = datetime.now() - timedelta(hours=1) + high_date = datetime.now() + + self.process_tracker.set_process_run_low_high_dates(low_date=low_date, high_date=high_date) + + given_dates = self.session.query(ProcessTracking.process_run_low_date_time, ProcessTracking.process_run_high_date_time)\ + .filter(ProcessTracking.process_tracking_id == self.process_tracker.process_tracking_run.process_tracking_id) + + expected_result = [low_date, high_date] + given_result = [given_dates[0].process_run_low_date_time, given_dates[0].process_run_high_date_time] + + self.assertEqual(expected_result, given_result) + + def test_set_run_low_high_dates_lower_low_date(self): + """ + Testing that if a new low date comes in for a given process_run, set the process_run_low_date_time to the new + low date. + :return: + """ + low_date = datetime.now() - timedelta(hours=1) + lower_low_date = low_date - timedelta(hours=1) + + self.process_tracker.set_process_run_low_high_dates(low_date=low_date) + + self.process_tracker.set_process_run_low_high_dates(low_date=lower_low_date) + + given_dates = self.session.query(ProcessTracking.process_run_low_date_time) \ + .filter(ProcessTracking.process_tracking_id == self.process_tracker.process_tracking_run.process_tracking_id) + + expected_result = lower_low_date + given_result = given_dates[0].process_run_low_date_time + + self.assertEqual(expected_result, given_result) + + def test_set_run_low_high_dates_higher_high_date(self): + """ + Testing that if a new low date comes in for a given process_run, set the process_run_low_date_time to the new + low date. + :return: + """ + high_date = datetime.now() + higher_high_date = high_date + timedelta(hours=1) + + self.process_tracker.set_process_run_low_high_dates(high_date=high_date) + + self.process_tracker.set_process_run_low_high_dates(high_date=higher_high_date) + + given_dates = self.session.query(ProcessTracking.process_run_high_date_time) \ + .filter(ProcessTracking.process_tracking_id == self.process_tracker.process_tracking_run.process_tracking_id) + + expected_result = higher_high_date + given_result = given_dates[0].process_run_high_date_time + + self.assertEqual(expected_result, given_result) + + def test_set_process_run_record_count(self): + """ + Testing that if record counts are provided for a given process_run, set the process_run_record_count and process' + total_record_counts correctly. + :return: + """ + initial_record_count = 1000 + + self.process_tracker.set_process_run_record_count(num_records=initial_record_count) + + given_counts = self.session.query(ProcessTracking.process_run_record_count, Process.total_record_count) \ + .join(Process)\ + .filter(ProcessTracking.process_tracking_id == self.process_tracker.process_tracking_run.process_tracking_id) + + expected_result = [initial_record_count, initial_record_count] + given_result = [given_counts[0].process_run_record_count, given_counts[0].total_record_count] + + self.assertEqual(expected_result, given_result) + + def test_set_process_run_record_count_twice(self): + """ + Testing that if record counts get set multiple times, then the process total record count will be set correctly. + :return: + """ + initial_record_count = 1000 + modified_record_count = 1500 + + self.process_tracker.set_process_run_record_count(num_records=initial_record_count) + self.process_tracker.set_process_run_record_count(num_records=modified_record_count) + + given_counts = self.session.query(ProcessTracking.process_run_record_count, Process.total_record_count) \ + .join(Process)\ + .filter(ProcessTracking.process_tracking_id == self.process_tracker.process_tracking_run.process_tracking_id) + + expected_result = [modified_record_count, modified_record_count] + given_result = [given_counts[0].process_run_record_count, given_counts[0].total_record_count] + + self.assertEqual(expected_result, given_result) \ No newline at end of file