Skip to content

Commit

Permalink
process_tracker_python-28 Process Extract Association
Browse files Browse the repository at this point in the history
✨ Bulk Extract status change for ProcessTracker

ProcessTracker can now bulk change status of extracts retrieved from
all extract finders.

Closes:#28
  • Loading branch information
OpenDataAlex committed Jun 7, 2019
1 parent de7841c commit 65f984b
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 1 deletion.
2 changes: 1 addition & 1 deletion process_tracker/data_store.py
Expand Up @@ -28,7 +28,7 @@
]
preload_process_status_types = ["running", "completed", "failed"]
preload_process_types = ["extract", "load"]
preload_system_keys = [{"key": "version", "value": "0.1.0"}]
preload_system_keys = [{"key": "version", "value": "0.2.0"}]

relational_stores = ["postgresql", "mysql", "oracle", "mssql", "snowflake"]
nonrelational_stores = []
Expand Down
17 changes: 17 additions & 0 deletions process_tracker/process_tracker.py
Expand Up @@ -98,6 +98,20 @@ def __init__(

self.process_tracking_run = self.register_new_process_run()

@staticmethod
def bulk_change_extract_status(extracts, extract_status):
"""
Given a set of extract objects, update the extract process record to reflect the association and updated status
as well as the extract record's' status.
:param extracts: List of Extract SQLAlchemy objects to be bulk updated.
:param extract_status: The status to change the extract files to.
:type extract_status: str
:return:
"""

for extract in extracts:
extract.change_extract_status(new_status=extract_status)

def change_run_status(self, new_status, end_date=None):
"""
Change a process tracking run record from 'running' to another status.
Expand Down Expand Up @@ -155,6 +169,8 @@ def find_ready_extracts_by_filename(self, filename):
.all()
)

self.logger.info("Returning extract files by filename.")

return process_files

def find_ready_extracts_by_location(self, location_name=None, location_path=None):
Expand Down Expand Up @@ -195,6 +211,7 @@ def find_ready_extracts_by_location(self, location_name=None, location_path=None
"A location name or path must be provided. Please try again."
)

self.logger.info("Returning extract files by location.")
return process_files

def find_ready_extracts_by_process(self, extract_process_name):
Expand Down
40 changes: 40 additions & 0 deletions tests/test_process_tracker.py
Expand Up @@ -112,6 +112,46 @@ def tearDown(self):
self.session.query(ErrorType).delete()
self.session.commit()

def test_bulk_change_extract_status(self):
"""
Testing that bulk change occurs when extracts provided.
:return:
"""
extract = ExtractTracker(
process_run=self.process_tracker,
filename="test_extract_filename2.csv",
location_name="Test Location",
location_path="/home/test/extract_dir",
)

extract2 = ExtractTracker(
process_run=self.process_tracker,
filename="test_extract_filename3.csv",
location_name="Test Location",
location_path="/home/test/extract_dir",
)

extracts = [extract, extract2]

self.process_tracker.bulk_change_extract_status(
extracts=extracts, extract_status="loading"
)

given_result = (
self.session.query(ExtractProcess)
.join(ExtractStatus)
.filter(
ExtractProcess.process_tracking_id
== self.process_tracker.process_tracking_run.process_tracking_id
)
.filter(ExtractStatus.extract_status_name == "loading")
.count()
)

expected_result = 2

self.assertEqual(expected_result, given_result)

def test_change_status_invalid_type(self):
"""
Testing that if an invalid process status type is passed, it will trigger an exception.
Expand Down

0 comments on commit 65f984b

Please sign in to comment.