Skip to content

Commit

Permalink
process_tracker_python-17 Audit Info for Extracts
Browse files Browse the repository at this point in the history
✨  low and high dates, number of records can now be tracked on
Extracts, both at write and at load.

Extracts now can have their data's low and high dates as well as number
of records tracked both when the file is written and when the file is
loading.  All audit fields are optional.

Added table fields to sql scripts as well.

Closes #17
  • Loading branch information
OpenDataAlex committed Jun 6, 2019
1 parent b5ef4fe commit 06ff156
Show file tree
Hide file tree
Showing 12 changed files with 419 additions and 49 deletions.
19 changes: 11 additions & 8 deletions dbscripts/mysql_process_tracker.sql
Expand Up @@ -51,30 +51,33 @@ create table location_lkup
foreign key (location_type) references location_type_lkup (location_type_id)
);

create table extract_tracking
create table process_tracker.extract_tracking
(
extract_id int auto_increment
primary key,
extract_filename varchar(750) not null,
extract_location_id int null,
extract_status_id int null,
extract_registration_date_time datetime not null,
extract_write_low_date_time datetime null comment 'The lowest datetime of the data set as noted when writing the data file.',
extract_write_high_date_time datetime null comment 'The highest datetime of the data set as noted when writing the data file.',
extract_write_record_count int null comment 'The record count of the data set as noted when writing the data file.',
extract_load_low_date_time datetime null comment 'The lowest datetime of the data set as noted when loading the data file. Should match the extract_write_low_date_time.',
extract_load_high_date_time datetime null comment 'The highest datetime of the data set as noted when loading the data file. Should match the extract_load_high_date_time.',
extract_load_record_count int null comment 'The record count of the data set when loading the data file.',
constraint extract_filename
unique (extract_filename),
constraint extract_tracking_ibfk_1
foreign key (extract_location_id) references location_lkup (location_id),
foreign key (extract_location_id) references process_tracker.location_lkup (location_id),
constraint extract_tracking_ibfk_2
foreign key (extract_status_id) references extract_status_lkup (extract_status_id)
foreign key (extract_status_id) references process_tracker.extract_status_lkup (extract_status_id)
);

create index extract_location_id
on extract_tracking (extract_location_id);
on process_tracker.extract_tracking (extract_location_id);

create index extract_status_id
on extract_tracking (extract_status_id);

create index location_type
on location_lkup (location_type);
on process_tracker.extract_tracking (extract_status_id);

create table process_tracker.extract_dependency
(
Expand Down
44 changes: 32 additions & 12 deletions dbscripts/postgresql_process_tracker.sql
Expand Up @@ -283,37 +283,57 @@ create unique index location_lkup_udx01
create unique index location_lkup_udx02
on location_lkup (location_path);

create table extract_tracking
create table process_tracking.extract_tracking
(
extract_id serial not null
constraint extract_tracking_pk
primary key,
extract_filename varchar(750) not null,
extract_location_id integer not null
constraint extract_tracking_fk01
references location_lkup,
references process_tracking.location_lkup,
extract_process_run_id integer
constraint extract_tracking_fk03
references process_tracking,
references process_tracking.process_tracking,
extract_status_id integer
constraint extract_tracking_fk02
references extract_status_lkup,
extract_registration_date_time timestamp not null
references process_tracking.extract_status_lkup,
extract_registration_date_time timestamp not null,
extract_write_low_date_time timestamp,
extract_write_high_date_time timestamp,
extract_write_record_count integer,
extract_load_low_date_time timestamp,
extract_load_high_date_time timestamp,
extract_load_record_count integer
);

comment on table extract_tracking is 'Tracking table for all extract/staging data files.';
comment on table process_tracking.extract_tracking is 'Tracking table for all extract/staging data files.';

comment on column process_tracking.extract_tracking.extract_filename is 'The unique filename for a given extract from a given source.';

comment on column process_tracking.extract_tracking.extract_location_id is 'The location where the given extract can be found.';

comment on column process_tracking.extract_tracking.extract_process_run_id is 'The process that registered or created the extract file.';

comment on column process_tracking.extract_tracking.extract_status_id is 'The status of the extract.';

comment on column process_tracking.extract_tracking.extract_registration_date_time is 'The datetime that the extract was loaded into extract tracking.';

comment on column process_tracking.extract_tracking.extract_write_low_date_time is 'The lowest datetime of the data set as noted when writing the data file.';

comment on column process_tracking.extract_tracking.extract_write_high_date_time is 'The highest datetime of the data set as noted when writing the data file.';

comment on column process_tracking.extract_tracking.extract_write_record_count is 'The record count of the data set as noted when writing the data file.';

comment on column extract_tracking.extract_filename is 'The unique filename for a given extract from a given source.';
comment on column process_tracking.extract_tracking.extract_load_low_date_time is 'The lowest datetime of the data set as noted when loading the data file. Should match the extract_write_low_date_time.';

comment on column extract_tracking.extract_location_id is 'The location where the given extract can be found.';
comment on column process_tracking.extract_tracking.extract_load_high_date_time is 'The highest datetime of the data set as noted when loading the data file.';

comment on column extract_tracking.extract_process_run_id is 'The process that registered or created the extract file.';
comment on column process_tracking.extract_tracking.extract_load_record_count is 'The record count of the data set when loading the data file.';

comment on column extract_tracking.extract_status_id is 'The status of the extract.';
alter table process_tracking.extract_tracking owner to pt_admin;

comment on column extract_tracking.extract_registration_date_time is 'The datetime that the extract was loaded into extract tracking.';

alter table extract_tracking owner to pt_admin;

create table extract_process_tracking
(
Expand Down
73 changes: 73 additions & 0 deletions process_tracker/extract_tracker.py
Expand Up @@ -9,6 +9,7 @@
from process_tracker.data_store import DataStore
from process_tracker.location_tracker import LocationTracker
from process_tracker.utilities.settings import SettingsManager
from process_tracker.utilities import utilities
from process_tracker.models.extract import (
Extract,
ExtractDependency,
Expand Down Expand Up @@ -252,3 +253,75 @@ def retrieve_extract_process(self):
self.session.commit()

return extract_process

def set_extract_low_high_dates(self, low_date, high_date, audit_type="load"):
"""
For the given extract, find the low and high date_times while writing or loading.
:param low_date: The low date of the data set.
:type low_date: Datetime/timestamp
:param high_date: The high date of the data set.
:type high_date: Datetime/timestamp.
:param audit_type: The type of audit fields being populated. Valid types: write, load
:type audittype: String
:return:
"""

if audit_type == "write":

previous_low_date_time = self.extract.extract_write_low_date_time
previous_high_date_time = self.extract.extract_write_high_date_time

if utilities.determine_low_high_date(
date=low_date, previous_date=previous_low_date_time, date_type="low"
):
self.extract.extract_write_low_date_time = low_date

if utilities.determine_low_high_date(
date=high_date, previous_date=previous_high_date_time, date_type="high"
):
self.extract.extract_write_high_date_time = high_date

elif audit_type == "load":

previous_low_date_time = self.extract.extract_load_low_date_time
previous_high_date_time = self.extract.extract_load_high_date_time

if utilities.determine_low_high_date(
date=low_date, previous_date=previous_low_date_time, date_type="low"
):
self.extract.extract_load_low_date_time = low_date

if utilities.determine_low_high_date(
date=high_date, previous_date=previous_high_date_time, date_type="high"
):
self.extract.extract_load_high_date_time = high_date

else:
self.logger.error("%s is not a valid audit_type." % audit_type)
raise Exception("%s is not a valid audit_type." % audit_type)

self.session.commit()

def set_extract_record_count(self, num_records, audit_type="load"):
"""
For the given audit type, set the number of records for the given extract.
:param num_records: Number of records tracked in extract
:type num_records: int
:param audit_type: The type of audit being populated. Valid types: write, load.
:type audit_type: str
:return:
"""

if audit_type == "write":

self.extract.extract_write_record_count = num_records

elif audit_type == "load":

self.extract.extract_load_record_count = num_records

else:
self.logger.error("%s is not a valid audit_type." % audit_type)
raise Exception("%s is not a valid audit_type." % audit_type)

self.session.commit()
6 changes: 6 additions & 0 deletions process_tracker/models/extract.py
Expand Up @@ -44,6 +44,12 @@ class Extract(Base):
extract_registration_date_time = Column(
DateTime, nullable=False, default=datetime.now()
)
extract_write_low_date_time = Column(DateTime, nullable=True)
extract_write_high_date_time = Column(DateTime, nullable=True)
extract_write_record_count = Column(Integer, nullable=True)
extract_load_low_date_time = Column(DateTime, nullable=True)
extract_load_high_date_time = Column(DateTime, nullable=True)
extract_load_record_count = Column(Integer, nullable=True)

extract_process = relationship("ExtractProcess", back_populates="process_extracts")
locations = relationship("Location", foreign_keys=[extract_location_id])
Expand Down
11 changes: 7 additions & 4 deletions process_tracker/process_tracker.py
Expand Up @@ -12,6 +12,7 @@
from process_tracker.location_tracker import LocationTracker
from process_tracker.utilities.logging import console
from process_tracker.utilities.settings import SettingsManager
from process_tracker.utilities import utilities

from process_tracker.models.actor import Actor
from process_tracker.models.extract import (
Expand Down Expand Up @@ -446,14 +447,16 @@ def set_process_run_low_high_dates(self, low_date=None, high_date=None):
previous_low_date_time = self.process_tracking_run.process_run_low_date_time
previous_high_date_time = self.process_tracking_run.process_run_low_date_time

if low_date is not None and (
previous_low_date_time is None or low_date < previous_low_date_time
if utilities.determine_low_high_date(
date=low_date, previous_date=previous_low_date_time, date_type="low"
):

self.process_tracking_run.process_run_low_date_time = low_date

if high_date is not None and (
previous_high_date_time is None or high_date > previous_high_date_time
if utilities.determine_low_high_date(
date=high_date, previous_date=previous_high_date_time, date_type="high"
):

self.process_tracking_run.process_run_high_date_time = high_date

self.session.commit()
Expand Down
5 changes: 3 additions & 2 deletions process_tracker/utilities/settings.py
Expand Up @@ -23,8 +23,9 @@ def __init__(self, config_location=None):
)

else:
self.config_path = os.path.join(
config_location, "process_tracker_config.ini"
self.config_path = config_location
self.config_file = os.path.join(
self.config_path, "process_tracker_config.ini"
)

exists = os.path.isfile(self.config_file)
Expand Down
58 changes: 58 additions & 0 deletions process_tracker/utilities/utilities.py
@@ -0,0 +1,58 @@
"""
Space for generalized helpers that can be utilized across the entire framework.
"""
import logging

from process_tracker.utilities.settings import SettingsManager

config = SettingsManager().config

logger = logging.getLogger(__name__)
logger.setLevel(config["DEFAULT"]["log_level"])


def determine_low_high_date(date, previous_date, date_type):
"""
For the given dates and date type, determine if the date replaces the previous date or not.
:param date: The new datetime.
:type date: Datetime/timestamp
:param previous_date: The previous datetime that date is being compared to.
:type previous_date: Datetime/timestamp
:param date_type: Is the comparison for a low date or high date? Valid values: low, high
:type date_type: str
:return: Boolean if date replaces previous_date
"""

if date_type == "low":

if date is not None and (previous_date is None or date < previous_date):
return True
else:
return False

elif date_type == "high":

if date is not None and (previous_date is None or previous_date > date):
return True
else:
return False

else:
logger.error("%s is not a valid date_type." % date_type)
raise Exception("%s is not a valid date_type." % date_type)


def timestamp_converter(data_store_type, timestamp):
"""
Helper function for when testing with data stores that have funky formats for stock dates with SQLAlchemy.
:param data_store_type: The type of data store
:param timestamp: The timestamp to be created.
:return:
"""

if data_store_type == "mysql":
timestamp = timestamp.replace(microsecond=0)
else:
timestamp = timestamp

return timestamp

0 comments on commit 06ff156

Please sign in to comment.