Skip to content

Commit

Permalink
process_tracker_python-168 Init too complicated
Browse files Browse the repository at this point in the history
Cleaned up init code and moved the initialization logic to their own functions respectfully.

Closes #168
  • Loading branch information
Alex Meadows committed Jan 16, 2020
1 parent 2fbc2cd commit 1846378
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 74 deletions.
97 changes: 59 additions & 38 deletions process_tracker/extract_tracker.py
Expand Up @@ -93,21 +93,45 @@ def __init__(
self.extract_status_deleted = self.extract_status_types["deleted"]
self.extract_status_error = self.extract_status_types["error"]

if extract_id is not None:
self.compression_type = compression_type
self.compression_type_id = None
self.dataset_types = None
self.extract = None
self.extract_id = extract_id
self.extract_process = None
self.filename = filename
self.file_size = file_size
self.filetype = filetype
self.full_filename = None
self.location = location
self.location_name = location_name
self.location_path = location_path
self.sources = None
self.source_objects = None
self.status = status

self.initialize_extract_tracker()

def initialize_extract_tracker(self):
"""
Initialize extract_tracker object based on values passed on call.
:return:
"""

if self.extract_id is not None:
self.logger.info("Extract id provided. Attempting to reconstruct.")

extract = self.data_store.get_or_create_item(
model=Extract, extract_id=extract_id, create=False
self.extract = self.data_store.get_or_create_item(
model=Extract, extract_id=self.extract_id, create=False
)
self.filename = extract.extract_filename
self.location = extract.locations
self.compression_type = extract.compression_type
self.filename = self.extract.extract_filename
self.location = self.extract.locations
self.compression_type = self.extract.compression_type
if self.compression_type is None:
self.compression_type_id = None
else:
self.compression_type_id = self.compression_type.compression_type_id
self.filetype = extract.extract_filetype
self.extract = extract
self.filetype = self.extract.extract_filetype
self.full_filename = self.get_full_filename()
self.dataset_types = self.get_dataset_types()
self.extract_process = self.retrieve_extract_process()
Expand All @@ -126,60 +150,60 @@ def __init__(
self.file_size = None

else:
if filename is None:
if self.filename is None:
error_msg = "Filename must be provided."
self.logger.error(error_msg)
raise Exception(error_msg)

self.filename = filename

if location is not None:
if self.location is not None:
self.logger.info("Location object provided.")
self.location = location
elif location_path is not None:
self.location = self.location
elif self.location_path is not None:
self.logger.info("Location path provided. Creating Location object.")
self.location = LocationTracker(
location_name=location_name,
location_path=location_path,
location_name=self.location_name,
location_path=self.location_path,
data_store=self.data_store,
)
else:
raise Exception("A location object or location_path must be provided.")

if compression_type is not None:
if self.compression_type is not None:
self.logger.info("Finding compression type.")
try:
self.compression_type = self.data_store.get_or_create_item(
model=ExtractCompressionType,
create=False,
extract_compression_type=compression_type,
extract_compression_type=self.compression_type,
)
except Exception:
error_msg = "%s is not a valid compression type." % compression_type
error_msg = (
"%s is not a valid compression type." % self.compression_type
)
self.logger.error(error_msg)
raise Exception(error_msg)

self.compression_type_id = (
self.compression_type.extract_compression_type_id
)
else:
self.compression_type_id = None

if filetype is not None:
if self.filetype is not None:
self.logger.info(
"File type provided. Verifying it is a valid filetype."
)
try:
self.filetype = self.data_store.get_or_create_item(
model=ExtractFileType, create=False, extract_filetype=filetype
model=ExtractFileType,
create=False,
extract_filetype=self.filetype,
)
except Exception:
error_msg = "%s is not a valid file type." % filetype
error_msg = "%s is not a valid file type." % self.filetype
self.logger.error(error_msg)
raise Exception(error_msg)
else:
# Need to try to determine the filetype based on the extension of the filename.
file_extension = os.path.splitext(filename)[1]
file_extension = os.path.splitext(self.filename)[1]
file_extension = file_extension.replace(".", "")
self.logger.info(
"Trying to find record for file extension: %s" % file_extension
Expand All @@ -194,23 +218,23 @@ def __init__(

self.extract = self.data_store.get_or_create_item(
model=Extract,
extract_filename=filename,
extract_filename=self.filename,
extract_location_id=self.location.location.location_id,
extract_compression_type_id=self.compression_type_id,
extract_filetype_id=self.filetype.extract_filetype_id,
)

self.full_filename = self.get_full_filename(location_path=location_path)
self.full_filename = self.get_full_filename(
location_path=self.location_path
)

self.extract_process = self.retrieve_extract_process()

if self.process_run.dataset_types is not None:
self.logger.info("Associating dataset type(s) with extract.")
self.dataset_types = self.register_extract_dataset_types(
dataset_types=self.process_run.dataset_types
)
else:
self.dataset_types = None

self.extract_process = self.retrieve_extract_process()

if self.process_run.source_objects is not None:
self.logger.info(
Expand All @@ -232,23 +256,20 @@ def __init__(

else:
self.logger.info("No source system(s) to associate to.")
self.sources = None

if status is not None:
if self.status is not None:
self.logger.info("Status was provided by user.")
self.change_extract_status(new_status=status)
self.change_extract_status(new_status=self.status)
else:
self.logger.info("Status was not provided. Initializing.")
self.extract.extract_status_id = self.extract_status_initializing

if file_size is not None:
split_filesize = self.file_size_splitter(file_size=file_size)
if self.file_size is not None:
split_filesize = self.file_size_splitter(file_size=self.file_size)

self.extract.extract_filesize = split_filesize[0]
self.extract.extract_filesize_type = split_filesize[1]

self.file_size = file_size

self.session.commit()

def add_dependency(self, dependency_type, dependency):
Expand Down
98 changes: 62 additions & 36 deletions process_tracker/process_tracker.py
Expand Up @@ -129,12 +129,41 @@ def __init__(
self.process_status_failed = self.process_status_types["failed"]
self.process_status_hold = self.process_status_types["on hold"]

if process_tracking_id is not None:
self.process_name = process_name
self.process_run_name = process_run_name
self.process_type = process_type
self.actor_name = actor_name
self.tool_name = tool_name
self.sources = sources
self.targets = targets
self.source_objects = source_objects
self.target_objects = target_objects
self.source_object_attributes = source_object_attributes
self.target_object_attributes = target_object_attributes
self.dataset_types = dataset_types
self.schedule_frequency = schedule_frequency
self.process_tracking_id = process_tracking_id

self.actor = None
self.tool = None
self.process = None
self.process_tracking_run = None

self.initialize_process_tracker()

def initialize_process_tracker(self):
"""
Based on values passed through init, initialize and set up ProcessTracker object, either from scratch or
re-instantiating.
:return:
"""

if self.process_tracking_id is not None:
self.logger.info("Process run id provided. Checking if exists.")

process_run = self.data_store.get_or_create_item(
model=ProcessTracking,
process_tracking_id=process_tracking_id,
process_tracking_id=self.process_tracking_id,
create=False,
)

Expand All @@ -148,10 +177,10 @@ def __init__(

self.dataset_types = process_run.process.dataset_types
self.sources = self.determine_process_sources(
process_run_id=process_tracking_id
process_run_id=self.process_tracking_id
)
self.targets = self.determine_process_targets(
process_run_id=process_tracking_id
process_run_id=self.process_tracking_id
)

self.process_name = process_run.process.process_name
Expand All @@ -160,102 +189,99 @@ def __init__(

else:
error_msg = (
"Process run not found based on id %s." % process_tracking_id
"Process run not found based on id %s." % self.process_tracking_id
)
self.logger.error(error_msg)
raise Exception(error_msg)
else:
if process_name is None is None:
if self.process_name is None:
error_msg = "process_name must be set."
self.logger.error(error_msg)
raise Exception(error_msg)

self.actor = self.data_store.get_or_create_item(
model=Actor, actor_name=actor_name
model=Actor, actor_name=self.actor_name
)

self.tool = self.data_store.get_or_create_item(
model=Tool, tool_name=tool_name
model=Tool, tool_name=self.tool_name
)

if schedule_frequency is None:
if self.schedule_frequency is None:
self.schedule_frequency = self.data_store.get_or_create_item(
model=ScheduleFrequency, schedule_frequency_name="unscheduled"
)
else:
self.schedule_frequency = self.data_store.get_or_create_item(
model=ScheduleFrequency, schedule_frequency_name=schedule_frequency
model=ScheduleFrequency,
schedule_frequency_name=self.schedule_frequency,
)

if process_type is None:
if self.process_type is None:

self.process = self.data_store.get_or_create_item(
model=Process, process_name=process_name, create=False
model=Process, process_name=self.process_name, create=False
)

self.process_type = self.process.process_type

else:

self.process_type = self.data_store.get_or_create_item(
model=ProcessType, process_type_name=process_type
model=ProcessType, process_type_name=self.process_type
)

self.process = self.data_store.get_or_create_item(
model=Process,
process_name=process_name,
process_name=self.process_name,
process_type_id=self.process_type.process_type_id,
process_tool_id=self.tool.tool_id,
schedule_frequency_id=self.schedule_frequency.schedule_frequency_id,
)

# Dataset types should be loaded before source and target because they are also used there.

if dataset_types is not None:
if self.dataset_types is not None:
self.dataset_types = self.register_process_dataset_types(
dataset_types=dataset_types
dataset_types=self.dataset_types
)
else:
self.dataset_types = None

# sources, source_objects, or source_object_attributes should be set, not multiple. Always go with lower grain if possible.

self.source_object_attributes = None
self.source_objects = None
# sources, source_objects, or source_object_attributes should be set, not multiple. Always go with
# lower grain if possible.

if source_object_attributes is not None:
if self.source_object_attributes is not None:
self.source_object_attributes = self.register_process_sources(
source_object_attributes=source_object_attributes
source_object_attributes=self.source_object_attributes
)
self.sources = self.source_object_attributes
elif source_objects is not None:
elif self.source_objects is not None:
self.source_objects = self.register_process_sources(
source_objects=source_objects
source_objects=self.source_objects
)
self.sources = self.source_objects
elif sources is not None:
self.sources = self.register_process_sources(sources=sources)
elif self.sources is not None:
self.sources = self.register_process_sources(sources=self.sources)
else:
self.sources = None

# targets, target_objects, or target_object_attributes should be set, not multiple. Always go with lower grain if possible.
# targets, target_objects, or target_object_attributes should be set, not multiple. Always go with lower
# grain if possible.

if target_object_attributes is not None:
if self.target_object_attributes is not None:
self.targets = self.register_process_targets(
target_object_attributes=target_object_attributes
target_object_attributes=self.target_object_attributes
)
elif target_objects is not None:
elif self.target_objects is not None:
self.targets = self.register_process_targets(
target_objects=target_objects
target_objects=self.target_objects
)
elif targets is not None:
self.targets = self.register_process_targets(targets=targets)
elif self.targets is not None:
self.targets = self.register_process_targets(targets=self.targets)
else:
self.targets = None

self.process_name = process_name
self.process_run_name = process_run_name

self.process_tracking_run = self.register_new_process_run()

@staticmethod
Expand Down

0 comments on commit 1846378

Please sign in to comment.