To run, use the following bash commands:

> source activate gci7043_py36

> jupyter nbconvert --to python "(filename).ipynb"

> python "(filename).py"

(Unfortunatly the --execute option for nbconvert will not work unless you can add the appropriate conda kernel to jupyter kernelspec list... which I cannot)

### Setup Environment

In [None]:
### Load Dependencies ###

import teradata
import pandas
import os
import config
import lockout
import sys
import time

### Lockout Mechanism for Preventing Concurrent Instances

In [None]:
### Automation --- Program Lockout Check ###

# Used to prevent multiple instances from running

# Builtin default lockout file is py.lock
# lockout.lock_file = "py.lock"

# Manual lockout override
# lockout.unlock(verbose=True)

# Check for lockout
# Passing hault=True will stop the program
lockout.islocked(verbose=True, hault=True)

# Enable lockout before continuing
lockout.lock(verbose=True)

In [None]:
### Define Exit Procedures ###

def safe_exit(session=None, hault=True, verbose=False, session_close=False):
    
    if verbose: print("\nSAFE EXIT REQUESTED")
    
    #Close open database session
    try:
        if session_close:
            session.close()
            print("Teradata Session Closed")
    except:
        None
    
    #Remove Lockout
    print("Removing Lockout... ", end ="")
    try:
        lockout.unlock()
        print("Done.")
    except:
        print("Error.")
    
    #if hault: raise RuntimeError("Safe Exit Requested") #Better for jupyter notebook
    if hault: exit()

### Manual Override Parameters for Jupyter Notebook

In [None]:
### Class for storing manual override
class manual:
    override = False

In [None]:
### Enter manual override values for testing here
manual.override = False

manual.QUERY = None #"EDWCL_Temp.clifford_base_2019_03_27 With Data And Stats"
manual.INTRAFRAME_MAX = None

#Use None to indicate index auto-increment
manual.INDEX_SEQ = None
manual.KEYFRAME_SEQ = None
manual.INTRAFRAME_SEQ = None
manual.FRAME_TIMESTAMP = None #'2019-03-27 16:00:01'

In [None]:
if manual.override:
    print("!!!Safety Check!!!")
    print("Please double check manual entries before proceeding.")
    raise

### 00 - Teradata Connection Setup

In [None]:
### Framework for executing Teradata queries

def exec_query(query, session, verbose = True, dryrun = False, ignore_db_error = False):

    if dryrun: print('--- DRY RUN ONLY - NO EXECUTION ---')
    print(str(pandas.datetime.now()))
    print(query)

    try:
        if not dryrun: session.execute(query)
        print(str(pandas.datetime.now()))
        print('DONE.')
        return None

    except teradata.api.DatabaseError as err:
        print(str(pandas.datetime.now()))
        print("DATABASE ERROR: ", err)
        
        if not ignore_db_error:
            safe_exit(hault=False, verbose=True, session=session)
            raise
        
        if ignore_db_error: print("IGNORING DATABASE ERROR.")

        return None
    
    except:
        print(str(pandas.datetime.now()))
        print("FATAL ERROR")
        safe_exit(hault=False, verbose=True, session=session)
        raise
        return None

### Setup and open Teradata connection ###
'''
try:
    os.system('echo -n "$TERADATA_LOGON_PASSWORD" | kinit "$TERADATA_LOGON_USER"');

    udaExec = teradata.UdaExec(appName="python", version="1.0", logConsole=False,
                              odbcLibPath="/opt/app/ttu1510/teradata/client/15.10/odbc_64/lib/libodbc.so")
    session = udaExec.connect(method="odbc", system="edwprod.dw.medcity.net", driver="Teradata")

except:
    
    print(sys.exc_info()[1])
    
    safe_exit(session=None)
'''

In [None]:
### Setup and open Teradata connection ###

# Try 3 times before aborting
conn_flag = False
time_delay = 10
max_attempts = 3

for i in range(0,max_attempts):
    
    try:
        os.system('echo -n "$TERADATA_LOGON_PASSWORD" | kinit "$TERADATA_LOGON_USER"')

        udaExec = teradata.UdaExec(appName="python", version="1.0", logConsole=False,
                                   odbcLibPath="/opt/app/ttu1510/teradata/client/15.10/odbc_64/lib/libodbc.so")
        
        session = udaExec.connect(method="odbc", system="edwprod.dw.medcity.net", driver="Teradata")
        
        print("Teradata session opened.")
        
        conn_flag = True

    except:
        
        print("ATTEMPT NUMBER %i - ERROR: Connection could not be established." %(i + 1))
        
        print(sys.exc_info()[1])
        
        None
        
    if conn_flag: break
        
    if i < (max_attempts - 1) :
        
        print("Retrying in 10 seconds...")
        time.sleep(10)
        
if not conn_flag:
    
    print("ERROR: Maximum number of attempts to connect.")
    
    safe_exit(hault=True, verbose = True)

### 01 - Load Snapshot of Table / Query into Staging

In [None]:
# Clear existing staging table

query = "DROP TABLE " + config.SNAPSHOT_STAGING + ";"

exec_query(query, session, ignore_db_error = True)

In [None]:
# Load data into staging table from QUERY

query = "CREATE TABLE " + config.SNAPSHOT_STAGING + " AS " + config.QUERY + ";"

if manual.override:
    if manual.QUERY is not None:
        query = "CREATE TABLE " + config.SNAPSHOT_STAGING + " AS " + manual.QUERY + ";"

exec_query(query, session)

### 02 - Move Previous Frame into Previous Frame Table

In [None]:
# Clear previous snapshot table

query = "DROP TABLE " + config.SNAPSHOT_PREVIOUS + ";"

exec_query(query, session, ignore_db_error = True)

In [None]:
# Move data in snapshot_current into snapshot_previous

query = "CREATE TABLE " + config.SNAPSHOT_PREVIOUS + " AS " + config.SNAPSHOT_CURRENT + " With Data And Stats;"

exec_query(query, session)

### 03 - Load Current Index Metadata

In [None]:
### Load Last Index Entry ###

query = """
SELECT INDEX_SEQ, KEYFRAME_SEQ, INTRAFRAME_SEQ, FRAME_TIMESTAMP

FROM (
   SELECT RANK() OVER (ORDER BY INDEX_SEQ DESC) AS LAST_ENTRY, ref.*
   FROM """ + config.INDEX_TABLE + """ ref
) idx

WHERE idx.LAST_ENTRY = 1;
"""


try:
    results = pandas.read_sql(query, session)
except:
    safe_exit(hault=False, verbose=True, session=session)
    raise

In [None]:
INITIAL_RUN = False

if len(results) > 0:
    INDEX_SEQ = results.INDEX_SEQ[0].astype(int)
    KEYFRAME_SEQ = results.KEYFRAME_SEQ[0].astype(int)
    INTRAFRAME_SEQ = results.INTRAFRAME_SEQ[0].astype(int)
    FRAME_TIMESTAMP = results.FRAME_TIMESTAMP[0]
    
else:
    INDEX_SEQ = 0
    KEYFRAME_SEQ = 0
    INTRAFRAME_SEQ = 0
    FRAME_TIMESTAMP = None
    INITIAL_RUN = True

In [None]:
print("Last INDEX_SEQ:        %i" %INDEX_SEQ)
print("Last KEYFRAME_SEQ:     %i" %KEYFRAME_SEQ)
print("Last INTRAFRAME_SEQ:   %i" %INTRAFRAME_SEQ)
print("Last FRAME_TIMESTAMP:  %s" %str(FRAME_TIMESTAMP))

if INITIAL_RUN:
    print("\nChecking for first run... ")

### 03a - Check for First Run

In [None]:
### Failsafe - Check for empty Key Frame Table placholder ###

if INITIAL_RUN:

    query = "SELECT COUNT(*) FROM " + config.KEYFRAME_TABLE + ";"
    
    try:
        results_check = pandas.read_sql(query, session)
    except:
        safe_exit(hault=False, verbose=True, session=session)
        raise

    table_check = results_check.iloc[0][0].astype(int)
    print("Key Frame Table record count: %i"  %table_check)
    
    if table_check == 0:
        INITIAL_RUN = True
        print("First Run Verified - Initializing Setup Mode")
    else:
        INITIAL_RUN = False
        print("ERROR:  Index Table Error")
        raise ValueError("ERROR:  Index Table Error")

### 04 - Determine if new frame is a Key Frame

In [None]:
INTRAFRAME_MAX = config.INTRAFRAME_MAX

if manual.override == True:
    if manual.INTRAFRAME_MAX is not None: INTRAFRAME_MAX = manual.INTRAFRAME_MAX

if INTRAFRAME_SEQ >= INTRAFRAME_MAX:
    keyframe_flag = True
else:
    keyframe_flag = False
    
if INITIAL_RUN:
    keyframe_flag = True
    
if keyframe_flag:
    print("New Key Frame")

### 05 - Increment Index

In [None]:
if keyframe_flag == True:
    #Auto Increment KeyFrame Sequence
    KEYFRAME_SEQ = KEYFRAME_SEQ + 1
    
    #Reset Intraframe Sequence
    INTRAFRAME_SEQ = 0
    
elif keyframe_flag == False:
    
    #Auto Increment Intraframe Sequence
    INTRAFRAME_SEQ = INTRAFRAME_SEQ + 1

else:
    raise ValueError('Issue with keyframe_flag')
    
#Auto Increment Index Sequence
INDEX_SEQ = INDEX_SEQ + 1

#Auto Timestamp
FRAME_TIMESTAMP = pandas.datetime.strftime(pandas.datetime.now(), "%Y-%m-%d %H:%M:%S")


In [None]:
if manual.override:
    if manual.INDEX_SEQ is not None:        INDEX_SEQ = manual.INDEX_SEQ
    if manual.KEYFRAME_SEQ is not None:     KEYFRAME_SEQ = manual.KEYFRAME_SEQ
    if manual.INTRAFRAME_SEQ is not None:   INTRAFRAME_SEQ = manual.INTRAFRAME_SEQ
    if manual.FRAME_TIMESTAMP is not None:  FRAME_TIMESTAMP = manual.FRAME_TIMESTAMP

In [None]:
print("Current INDEX_SEQ:        %i" %INDEX_SEQ)
print("Current KEYFRAME_SEQ:     %i" %KEYFRAME_SEQ)
print("Current INTRAFRAME_SEQ:   %i" %INTRAFRAME_SEQ)
print("Current FRAME_TIMESTAMP:  %s" %str(FRAME_TIMESTAMP))

### 06 - Load data from Staging into Current Frame Table

In [None]:
### Clear current snapshot table

query = "DROP TABLE " + config.SNAPSHOT_CURRENT + ";"

exec_query(query, session, ignore_db_error = True)

In [None]:
query = "CREATE TABLE " + config.SNAPSHOT_CURRENT + """ AS ( 

SELECT 
""" + str(KEYFRAME_SEQ) + """ as KEYFRAME_SEQ,
""" + str(INTRAFRAME_SEQ) + """ as INTRAFRAME_SEQ,
'""" + str(FRAME_TIMESTAMP) + """' as FRAME_TIMESTAMP,
""" + config.HASH_VALUE + """ as HASH_VALUE,
base.*

FROM """ + config.SNAPSHOT_STAGING + """  base

) WITH DATA PRIMARY INDEX NUPI (""" + config.PRIMARY_KEY + """);

"""

exec_query(query, session)

### 07 - Update Frame Index Table

In [None]:
query = "INSERT INTO " + config.INDEX_TABLE + " VALUES( " + str(INDEX_SEQ) + ", " + str(KEYFRAME_SEQ) + ", " + \
str(INTRAFRAME_SEQ) + ", '" + str(FRAME_TIMESTAMP) + "');"

exec_query(query, session)

### 08a - Generate New KeyFrame Table On First Run

In [None]:
### Drop Key Frame Table placeholder

if INITIAL_RUN:
    print("\nBUILDING NEW KEYFRAME TABLE\n")
    
    query = "DROP TABLE " + config.KEYFRAME_TABLE + ";"
    exec_query(query, session)

In [None]:
### Create new Key Frame Table

if INITIAL_RUN:
    
    query = "CREATE TABLE " + config.KEYFRAME_TABLE + " AS " + config.SNAPSHOT_CURRENT + " With Data And Stats PRIMARY INDEX NUPI (KEYFRAME_SEQ, " + config.PRIMARY_KEY + ");"
    exec_query(query, session)
    
    print(str(pandas.datetime.now()))
    safe_exit(hault=True, verbose=True, session=session, session_close=True)    
    
    print('SlinkyDog process complete.')
    
    raise ValueError()

### 08b - Generate Key Frame if Applicable

In [None]:
if keyframe_flag == True:

    query = "INSERT INTO " + config.KEYFRAME_TABLE + " SELECT * FROM " + config.SNAPSHOT_CURRENT + ";"

    exec_query(query, session)

### 08c - Drop Frame Timestamp (There's a reason)

In [None]:
query = "ALTER TABLE " + config.SNAPSHOT_CURRENT + " DROP FRAME_TIMESTAMP;"

exec_query(query, session)

### 09 - Compare Current Frame to Previous Frame -> Change Memos

In [None]:
### Clear current memo table

query = "DROP TABLE " + config.INTRAFRAME_MEMO_STAGING + ";"

exec_query(query, session, ignore_db_error = True)

In [None]:
### Calculate differences on hash_value and store memos

query = "CREATE TABLE " + config.INTRAFRAME_MEMO_STAGING + """ AS (

SELECT '""" + str(FRAME_TIMESTAMP) + """' as FRAME_TIMESTAMP,
COALESCE(tbl_A.""" + config.PRIMARY_KEY + """, tbl_B.""" + config.PRIMARY_KEY + """) as """ + config.PRIMARY_KEY + """_KEY,
""" + str(KEYFRAME_SEQ) + """ AS KEYFRAME_SEQ_KEY,
""" + str(INTRAFRAME_SEQ) + """ AS INTRAFRAME_SEQ_KEY,
CASE
    WHEN ((tbl_A.HASH_VALUE <> tbl_B.HASH_VALUE) AND (tbl_A.""" + config.PRIMARY_KEY + """ IS NOT NULL) AND (tbl_B.""" + config.PRIMARY_KEY + """ IS NOT NULL)) THEN 'CHANGE'
    WHEN ((tbl_A.""" + config.PRIMARY_KEY + """ IS NULL) AND (tbl_B.""" + config.PRIMARY_KEY + """ IS NOT NULL)) THEN 'ADD'
    WHEN ((tbl_A.""" + config.PRIMARY_KEY + """ IS NOT NULL) AND (tbl_B.""" + config.PRIMARY_KEY + """ IS NULL)) THEN 'DROP'
    ELSE 'NO_CHANGE'
END AS MEMO

FROM (
    SELECT """ + config.PRIMARY_KEY + """, HASH_VALUE FROM """ + config.SNAPSHOT_PREVIOUS + """
    ) tbl_A

FULL OUTER JOIN (
    SELECT """ + config.PRIMARY_KEY + """, HASH_VALUE FROM """ + config.SNAPSHOT_CURRENT + """
    ) tbl_B

ON tbl_A.""" + config.PRIMARY_KEY + """ = tbl_B.""" + config.PRIMARY_KEY + """

--WHERE memo IS NOT NULL

) WITH DATA PRIMARY INDEX NUPI (""" + config.PRIMARY_KEY + """_KEY);"""

exec_query(query, session)

### 10 - Generate Intraframe

In [None]:
### Clear intraframe staging table

query = "DROP TABLE " + config.INTRAFRAME_STAGING + ";"

exec_query(query, session, ignore_db_error = True)

In [None]:
### Create staging table

query = "CREATE TABLE " + config.INTRAFRAME_STAGING + """ AS (
SELECT * FROM """ + config.INTRAFRAME_MEMO_STAGING + """ memos

FULL OUTER JOIN """ + config.SNAPSHOT_CURRENT + """ base

ON memos.""" + config.PRIMARY_KEY + """_KEY = base.""" + config.PRIMARY_KEY + """

WHERE MEMO not like 'NO_CHANGE'

) WITH DATA PRIMARY INDEX NUPI (""" + config.PRIMARY_KEY + """_KEY);"""

exec_query(query, session)

In [None]:
### Clean up and reorganize staging table

queries = [
    'ALTER TABLE ' + config.INTRAFRAME_STAGING + ' DROP SRG_CASE_SK;',
    'ALTER TABLE ' + config.INTRAFRAME_STAGING + ' DROP KEYFRAME_SEQ;',
    'ALTER TABLE ' + config.INTRAFRAME_STAGING + ' DROP INTRAFRAME_SEQ;',
    #'ALTER TABLE ' + config.INTRAFRAME_STAGING + ' RENAME SRG_CASE_SK_KEY TO SRG_CASE_SK;',  #Does not work on primary key
    'ALTER TABLE ' + config.INTRAFRAME_STAGING + ' RENAME KEYFRAME_SEQ_KEY TO KEYFRAME_SEQ;',
    'ALTER TABLE ' + config.INTRAFRAME_STAGING + ' RENAME INTRAFRAME_SEQ_KEY TO INTRAFRAME_SEQ;']

for query in queries:
    exec_query(query, session)

In [None]:
### Move data into volatile table to alter primary key

query = """
CREATE VOLATILE TABLE slinkydog_xfer ,
NO LOG AS (
    SELECT 
        base.""" + config.PRIMARY_KEY + """_KEY as """ + config.PRIMARY_KEY + """,
        base.*
    FROM """+ config.INTRAFRAME_STAGING +""" base
) WITH DATA
PRIMARY INDEX (KEYFRAME_SEQ, INTRAFRAME_SEQ,""" + config.PRIMARY_KEY + """)
ON COMMIT PRESERVE ROWS;
"""

exec_query(query, session)

In [None]:
### Clear intraframe staging table for reuse

query = "DROP TABLE " + config.INTRAFRAME_STAGING + ";"

exec_query(query, session, ignore_db_error = True)

In [None]:
### Load cleaned up table back into staging

query = "CREATE TABLE " + config.INTRAFRAME_STAGING + " AS slinkydog_xfer With Data And Stats;"

exec_query(query, session)

In [None]:
### Finish up

query = 'ALTER TABLE ' + config.INTRAFRAME_STAGING + ' DROP SRG_CASE_SK_KEY;'

exec_query(query, session)

### 11 - Load Intraframe into Table

In [None]:
### Check to see if the table is empty

query = "SELECT COUNT(*) FROM " + config.INTRAFRAME_TABLE + ";"

try:
    results = pandas.read_sql(query, session)
except:
    safe_exit(hault=False, verbose=True, session=session)
    raise

table_check = results.iloc[0][0].astype(int)

print("Beginning record count: %i"  %table_check)

In [None]:
if table_check == 0:
    print("\nBUILDING NEW INTRAFRAME TABLE\n")
    
    print("CREATING NEW TABLE " + config.INTRAFRAME_TABLE)
    
    query = "DROP TABLE " + config.INTRAFRAME_TABLE + ";"
    exec_query(query, session, ignore_db_error = True)

    query = "CREATE TABLE " + config.INTRAFRAME_TABLE + " AS " + config.INTRAFRAME_STAGING + " With Data And Stats PRIMARY INDEX (KEYFRAME_SEQ, INTRAFRAME_SEQ, " + config.PRIMARY_KEY + ");"
    exec_query(query, session)

else:
    query = "INSERT INTO " + config.INTRAFRAME_TABLE + " SELECT * FROM " + config.INTRAFRAME_STAGING + ";"
    exec_query(query, session)

In [None]:
### Validate new data was appended

query = "SELECT COUNT(*) FROM " + config.INTRAFRAME_TABLE + ";"

try:
    results = pandas.read_sql(query, session)
except:
    safe_exit(hault=False, verbose=True, session=session)
    raise

table_check = results.iloc[0][0].astype(int)

print("Ending record count: %i"  %table_check)

### 99 - Close Teradata Connection

In [None]:
### Close Session ###
print(str(pandas.datetime.now()))
safe_exit(hault=False, verbose=True, session=session, session_close=True)

In [None]:
print('SlinkyDog process complete.')