Skip to content

Commit

Permalink
Merge 69af17e into c8650a4
Browse files Browse the repository at this point in the history
  • Loading branch information
OpenDataAlex committed Dec 17, 2019
2 parents c8650a4 + 69af17e commit fc972cd
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 9 deletions.
2 changes: 2 additions & 0 deletions dbscripts/mysql_process_tracker.sql
Expand Up @@ -338,6 +338,8 @@ create table process_tracking
process_run_actor_id int null,
is_latest_run tinyint(1) not null,
process_run_name varchar(250) null,
process_run_insert_count int default 0 not null,
process_run_update_count int default 0 not null,
constraint process_tracking_process_run_name_uindex
unique (process_run_name),
constraint process_tracking_ibfk_1
Expand Down
4 changes: 3 additions & 1 deletion dbscripts/postgresql_process_tracker.sql
Expand Up @@ -365,7 +365,9 @@ create table process_tracking
constraint process_tracking_fk03
references actor_lkup,
is_latest_run boolean default false,
process_run_name varchar(250) null
process_run_name varchar(250) null,
process_run_insert_count int default 0 not null,
process_run_update_count int default 0 not null
);

comment on table process_tracking is 'Tracking table of process runs.';
Expand Down
2 changes: 2 additions & 0 deletions process_tracker/models/process.py
Expand Up @@ -508,6 +508,8 @@ class ProcessTracking(Base):
process_run_start_date_time = Column(DateTime, nullable=False)
process_run_end_date_time = Column(DateTime, nullable=True)
process_run_record_count = Column(Integer, nullable=False, default=0)
process_run_insert_count = Column(Integer, nullable=False, default=0)
process_run_update_count = Column(Integer, nullable=False, default=0)
process_run_actor_id = Column(
Integer, ForeignKey("process_tracker.actor_lkup.actor_id"), nullable=False
)
Expand Down
47 changes: 39 additions & 8 deletions process_tracker/process_tracker.py
Expand Up @@ -1336,22 +1336,53 @@ def set_process_run_low_high_dates(self, low_date=None, high_date=None):

self.session.commit()

def set_process_run_record_count(self, num_records):
def record_count_manager(self, original_count, num_records):
"""
Given two record counts, one the overall count and the other the current count, process and return the adjusted
amount.
:param original_count: The original overall count of records
:type original_count: int
:param num_records: The adjusted count of records
:type num_records: int
"""
if original_count == 0 or original_count is None:
return_count = num_records
else:
return_count = original_count + (num_records - original_count)

return return_count

def set_process_run_record_count(self, num_records, processing_type=None):
"""
For the given process run, set the process_run_record_count for the number of records processed. Will also
update the process' total_record_count - the total number of records ever processed by that process
:param num_records:
:param num_records: Count of number of records processed.
:type num_records: int
:param processing_type: Type of records being processed. Valid values: None, insert, update. None will only
update overall counts
:return:
"""
process_run_records = self.process.total_record_count
process_run_inserts = self.process_tracking_run.process_run_insert_count
process_run_updates = self.process_tracking_run.process_run_update_count

if process_run_records == 0 or process_run_records is None:

self.process.total_record_count = num_records
else:
self.process.total_record_count = self.process.total_record_count + (
num_records - process_run_records
if processing_type == "insert":
self.process_tracking_run.process_run_insert_count = self.record_count_manager(
original_count=process_run_inserts, num_records=num_records
)
elif processing_type == "update":
self.process_tracking_run.process_run_update_count = self.record_count_manager(
original_count=process_run_updates, num_records=num_records
)
elif processing_type is not None:
error_msg = "Processing type not recognized."
self.logger.error(error_msg)
raise Exception(error_msg)

self.process.total_record_count = self.record_count_manager(
original_count=process_run_records, num_records=num_records
)

self.process_tracking_run.process_run_record_count = num_records

self.session.commit()
74 changes: 74 additions & 0 deletions tests/test_process_tracker.py
Expand Up @@ -1877,6 +1877,80 @@ def test_set_process_run_record_count_twice(self):

self.assertEqual(expected_result, given_result)

def test_set_process_run_record_count_insert(self):
"""
Testing that if record counts are provided for a given process_run, and the processing type is insert,
update the insert count for the process run.
"""
initial_record_count = 1000

self.process_tracker.set_process_run_record_count(
num_records=initial_record_count, processing_type="insert"
)

given_count = self.process_tracker.process_tracking_run.process_run_insert_count
expected_count = 1000

self.assertEqual(expected_count, given_count)

def test_set_process_run_record_count_insert_twice(self):
"""
Testing that if record counts are provided for a given process_run multiple times, and the processing type is insert,
update the insert count for the process run.
"""
initial_record_count = 1000
modified_record_count = 1500

self.process_tracker.set_process_run_record_count(
num_records=initial_record_count, processing_type="insert"
)

self.process_tracker.set_process_run_record_count(
num_records=modified_record_count, processing_type="insert"
)

given_count = self.process_tracker.process_tracking_run.process_run_insert_count
expected_count = 1500

self.assertEqual(expected_count, given_count)

def test_set_process_run_record_count_update(self):
"""
Testing that if record counts are provided for a given process_run, and the processing type is update,
update the update count for the process run.
"""
initial_record_count = 1000

self.process_tracker.set_process_run_record_count(
num_records=initial_record_count, processing_type="update"
)

given_count = self.process_tracker.process_tracking_run.process_run_update_count
expected_count = 1000

self.assertEqual(expected_count, given_count)

def test_set_process_run_record_count_update_twice(self):
"""
Testing that if record counts are provided for a given process_run multiple times, and the processing type is update,
update the update count for the process run.
"""
initial_record_count = 1000
modified_record_count = 1500

self.process_tracker.set_process_run_record_count(
num_records=initial_record_count, processing_type="update"
)

self.process_tracker.set_process_run_record_count(
num_records=modified_record_count, processing_type="update"
)

given_count = self.process_tracker.process_tracking_run.process_run_update_count
expected_count = 1500

self.assertEqual(expected_count, given_count)

def test_register_source_dataset_type(self):
"""
When both a source and dataset_type are provided, the source is registered to the dataset_type.
Expand Down

0 comments on commit fc972cd

Please sign in to comment.