In [1]:
import pandas as pd
import numpy as np
import sqlalchemy as sa
from datetime import datetime, timedelta

from collections import defaultdict
from json import JSONEncoder,JSONDecoder
import requests

import threading
from queue import Queue

from time import sleep

from simple_salesforce import Salesforce
from os import environ, sys
from dotenv import load_dotenv
_ = load_dotenv()

In [2]:
transformThreadCount = 3
logFileName = "TableETL_KippFoundation.log"
sqlSchema = 'etl'
saveFileBaseName = "SalesForceEduCloud_KippFound_%s.csv"
outputTableBaseName = "SalesForceEduCloud_KippFound_%s"
jsonConfigFileName = "TableETL-KippFoundation.json"

In [3]:
logQueue = Queue()


def lprint(val):
    outstr = f"[{datetime.now()}] ({threading.current_thread().name}) {val}"
    print(outstr)
    logQueue.put(outstr)
    
    
lprint("Started...")

[2024-08-26 14:22:05.935425] (MainThread) Started...


In [4]:
handleThreadError_Super = threading.excepthook

def handleThreadError(args):
    handleThreadError_Super(args)
    lprint(f"ERROR!!! {args.exc_type} \"{args.exc_value}\" in thread {args.thread}  ERROR!!!")
    
threading.excepthook = handleThreadError

In [5]:
def lprintDaemon():
    with open(logFileName,"w") as logfile:
        while True:
            logfile.write(logQueue.get())
            logfile.write("\n")
            logfile.flush()
            logQueue.task_done()

lprintDaemonThread = threading.Thread(target=lprintDaemon)
lprintDaemonThread.daemon = True
lprintDaemonThread.name = "lprintDaemon"
lprintDaemonThread.start()

In [6]:
#sfusername = environ.get('KFsfusername')
#sfpassword = environ.get('KFsfpassword')
#sfsecret = environ.get("KFsfsecret")
sfclientid = environ.get("KFsfclientid")
sfclientsecret = environ.get("KFsfclientsecret")
sfinstanceurl = environ.get("KFsfinstanceurl")
connstr = environ.get("KNOS_Datawarehouse")

In [7]:
lprint("Creating engine")
engine = sa.create_engine(connstr, fast_executemany=True, isolation_level="READ UNCOMMITTED")

[2024-08-26 14:22:05.974316] (MainThread) Creating engine


In [8]:
jsonDecoder = JSONDecoder()

In [9]:
lprint(f"Reading config file {jsonConfigFileName}")
with open(jsonConfigFileName) as f:

    jsonConfig = jsonDecoder.decode( f.read() )

lprint(jsonConfig)


[2024-08-26 14:22:06.057445] (MainThread) Reading config file TableETL-KippFoundation.json
[2024-08-26 14:22:06.059971] (MainThread) {'Tables': {'Account': {'Columns': ['Id', 'IsDeleted', 'MasterRecordId', 'Name', 'Type', 'RecordTypeId', 'ParentId', 'BillingPostalCode', 'Metro_Area__c', 'College_PowerRanking__c', 'DEP_College_PowerRanking__c', 'Long_Account_ID__c']}, 'Application__c': {'Columns': ['Id', 'IsDeleted', 'Name', 'RecordTypeId', 'Applicant__c', 'School__c', 'GPA_Type__c', 'GPA__c']}, 'Contact': {'Columns': ['Id', 'IsDeleted', 'AccountId', 'LastName', 'FirstName', 'Name', 'RecordTypeId', 'MailingPostalCode', 'Birthdate', 'Currently_Enrolled_School__c', 'High_School_Graduated_From__c', 'KIPP_Region_School__c', 'KCA_KIPP_ID__c', 'Salesforce_ID__c', 'College_Match_Display_GPA__c', 'Most_Recent_College_Application__c', 'Most_Recent_College_Enrollment_Status__c', 'Most_Recent_College_Enrollment__c']}, 'Enrollment__c': {'Columns': ['Id', 'IsDeleted', 'Name', 'RecordTypeId', 'Studen

In [10]:
lprint("Setting destired tables")
desiredTables = list(jsonConfig['Tables'].keys())
lprint(desiredTables)

[2024-08-26 14:22:06.072781] (MainThread) Setting destired tables
[2024-08-26 14:22:06.075848] (MainThread) ['Account', 'Application__c', 'Contact', 'Enrollment__c', 'Standardized_Test__c']


In [11]:
#soqlFilters = defaultdict(lambda: "where IsDeleted = false LIMIT 1000", {})
soqlFilters = defaultdict(lambda: "where IsDeleted = false", {})

In [12]:
lprint(f"Getting access token for {sfclientid} from instance {sfinstanceurl}")
sfVersion='58.0'
payload = {
    'grant_type':'client_credentials',
    'client_id':sfclientid,
    'client_secret':sfclientsecret,
    #'username':sfusername,
    #'password':sfpassword+sfsecret
}

authURL = f"{sfinstanceurl}services/oauth2/token"
#authURL = "http://127.0.0.1:55555/services/oauth2/token"
lprint(f"Getting Auth token from {authURL}")
session = requests.Session()
authResp = session.post(authURL,\
                    data=payload,)

authRespData = jsonDecoder.decode(authResp.text)

[2024-08-26 14:22:06.108475] (MainThread) Getting access token for 3MVG9CVKiXR7Ri5rJHLGnofNMvaL5BIBQWSC4w5tfDVGFdeHsnNQ4swJ40eKiZ_mO8F0.9Rl_1MKToWdgbJ9J from instance https://kippadb.my.salesforce.com/
[2024-08-26 14:22:06.109912] (MainThread) Getting Auth token from https://kippadb.my.salesforce.com/services/oauth2/token


In [13]:
authRespDataPublic = authRespData.copy()

if 'access_token' in authRespDataPublic.keys():
    authRespDataPublic['access_token'] = '*' * len(authRespDataPublic['access_token'])

lprint(authRespDataPublic)

[2024-08-26 14:22:08.042105] (MainThread) {'access_token': '****************************************************************************************************************', 'signature': '698OvwKXIiddY/ZIZfAuxnjbWIOqc5rAcsOeUD2Pglk=', 'scope': 'sfap_api api', 'instance_url': 'https://kippadb.my.salesforce.com', 'id': 'https://login.salesforce.com/id/00D80000000dTInEAM/005Qg000007xMJpIAM', 'token_type': 'Bearer', 'issued_at': '1724700128919', 'api_instance_url': 'https://api.salesforce.com'}


In [14]:
lprint("Creating Simple Salesforce Instance using sessionID...")
sf = Salesforce(instance_url=authRespData['instance_url'], session_id=authRespData['access_token'], version='58.0')
lprint("Session created!")

[2024-08-26 14:22:08.064114] (MainThread) Creating Simple Salesforce Instance using sessionID...
[2024-08-26 14:22:08.066114] (MainThread) Session created!


In [15]:
metaData = {}
metaDataLock = threading.Lock()

for tbl in desiredTables:
    fieldDescs = {}
    
    lprint("Getting metadata for %s" % tbl)
    
    tblDesc = getattr(sf, tbl).describe()
    
    for field in tblDesc['fields']:
        fieldDescs[field['name']] = {
                                'type':field['type'],
                                'length':field['length']
        }
        
    
    metaData[tbl] = fieldDescs
    


[2024-08-26 14:22:08.081561] (MainThread) Getting metadata for Account
[2024-08-26 14:22:10.352291] (MainThread) Getting metadata for Application__c
[2024-08-26 14:22:10.666320] (MainThread) Getting metadata for Contact
[2024-08-26 14:22:11.089200] (MainThread) Getting metadata for Enrollment__c
[2024-08-26 14:22:11.357224] (MainThread) Getting metadata for Standardized_Test__c


In [16]:
daemonStatusLock = threading.Lock()
daemonStatus = {}
daemonCurTaskQueue = Queue()

In [17]:
outputDataQueue = Queue()
extractDaemonStats = {}

def extractDaemon():
    daemonName = 'extractDaemon'
    while True:
        with daemonStatusLock:
            daemonStatus[daemonName] = f"{daemonName} waiting for job"
        
        tbl = tableQueue.get()
        daemonCurTaskQueue.put(daemonName)
        
        lprint(f"Querying data for {tbl}")
        startTime = datetime.now()
        
        with daemonStatusLock:
            daemonStatus[daemonName] = f"{daemonName} Querying data for {tbl}"

        #with metaDataLock:
        #    feilds = ", ".join(metaData[tbl].keys())

        feilds = ", ".join(jsonConfig['Tables'][tbl]['Columns'])
        
        soql = "select %s from %s %s" % (feilds, tbl, soqlFilters[tbl])

        
        lprint("Starting query:  %s" % soql)
        resp = sf.query_all(soql)
    
        lprint(f"Adding {tbl} to output queue")
        
        lprint("Finished %s" % tbl)
    
        lprint("%s totalRecords %d" % (tbl, resp['totalSize']))
        
        outputDataQueue.put( (tbl, resp) )

        extractDaemonStats[tbl] = datetime.now() - startTime
        tableQueue.task_done()
        daemonCurTaskQueue.get()
        daemonCurTaskQueue.task_done()
        
lprint("Creating extractDaemon Thread")
extractDaemonThread = threading.Thread(target=extractDaemon)
extractDaemonThread.daemon=True
extractDaemonThread.name = 'extractDaemon'

[2024-08-26 14:22:11.608753] (MainThread) Creating extractDaemon Thread


In [18]:
dataFramesQueue = Queue()
transformDaemonStats = {}
transformDaemonStatsLock = threading.Lock()
transformDaemonThreads = []

def transformDaemon(num):
    daemonName = f"transformDaemon[{num}]"
    while True:
        with daemonStatusLock:
            daemonStatus[daemonName] = f"{daemonName} waiting for job"
            
        (tbl, resp) = outputDataQueue.get()
        daemonCurTaskQueue.put(daemonName)
        
        lprint(f"Turning {tbl} into a dataframe")
        startTime = datetime.now()
        
        with daemonStatusLock:
            daemonStatus[daemonName] = f"{daemonName} Turning {tbl} into a dataframe"
        
        respDf = pd.DataFrame.from_dict(resp['records'])
    
        if 'attributes' in respDf.columns:
            
            lprint("Dropping attributes from %s" % tbl)
            respDf.drop(columns=['attributes'], inplace=True)
        
        lprint("%s shape %s" % (tbl, respDf.shape))


        csvTblName = saveFileBaseName % tbl
        lprint(f"Saving {tbl} to {csvTblName}")

        with daemonStatusLock:
            daemonStatus[daemonName] = f"{daemonName} Saving {tbl} to {csvTblName}"

        respDf.to_csv(csvTblName, index=False)

        lprint("Finished saving %s!" % tbl)
        
        lprint(f"transforming columns for {tbl}...")

        for col in respDf.columns:
            with metaDataLock:
                theType = metaData[tbl][col]['type']

            with daemonStatusLock:
                daemonStatus[daemonName] = f"{daemonName} processing {tbl}->{col}..."
            
            if theType in ['date', 'datetime']:
                lprint(f"Converting {tbl}->{col} to datetime...")
                respDf[col] = pd.to_datetime(respDf[col], errors='coerce')
            
    
            for col in respDf.columns:
        
                if np.count_nonzero(respDf[col].map(lambda a: isinstance(a, dict ) or isinstance(a, list ))) > 0:
                    lprint("Ordered dict found in %s column %s converting..." % (tbl, col)) 
                    
                    respDf[col] = respDf[col].map(lambda v: jcoder.encode(v) if not pd.isna(v) else None)
                    
                    lprint("Updating type for %s column %s to JSON..." % (tbl, col)) 
                    with metaDataLock:
                        metaData[tbl][col]['type'] = 'JSON'
                    
                    fieldLen = int(respDf[col].str.len().max())
                    
                    lprint("Setting fieldlen to %d for %s" % (fieldLen, col))            
                    with metaDataLock:
                        metaData[tbl][col]['length'] = int(respDf[col].str.len().max())
            

        lprint(f"Adding response for {tbl} to dataFrame queue")

        dataFramesQueue.put( (tbl, respDf) )
        
        with transformDaemonStatsLock:
            transformDaemonStats[tbl] = datetime.now() - startTime
            
        outputDataQueue.task_done()
        daemonCurTaskQueue.get()
        daemonCurTaskQueue.task_done()

for i in range(transformThreadCount):
    lprint(f"Creating dataFrameDaemon[{i}] thread")
    threadObj = threading.Thread(target=transformDaemon,args=(i,))
    threadObj.daemon=True
    threadObj.name = f"transformDaemon[{i}]"
    transformDaemonThreads.append(threadObj)
    

[2024-08-26 14:22:11.637706] (MainThread) Creating dataFrameDaemon[0] thread
[2024-08-26 14:22:11.638728] (MainThread) Creating dataFrameDaemon[1] thread
[2024-08-26 14:22:11.638728] (MainThread) Creating dataFrameDaemon[2] thread


In [19]:
jcoder = JSONEncoder()

In [20]:
staticFields = {
                 'boolean':sa.Boolean,
                 'date':sa.DATE,
                 'datetime':sa.DATETIME,
                 'double': sa.FLOAT,
                 #'email',
                 #'id',
                 'int':sa.INT,
                 #'multipicklist',
                 #'picklist',
                 #'reference',
                 #'string',
                 'textarea':sa.TEXT,
                 #'JSON':sa.JSON
}

#this is a mess
def getSQLTypes(tbl, respDf):
    with metaDataLock:
        sqlTypes = {}
        
        lprint("Getting SQLTypes for %s" % tbl)
        
        curMeta = metaData[tbl]
            
        for field in jsonConfig['Tables'][tbl]['Columns']:

            
            
            if curMeta[field]['type'] in staticFields.keys():
                sqlTypes[field] = staticFields[curMeta[field]['type']]()
            
            
            else:
                fieldLen = curMeta[field]['length']
                
                if fieldLen <= 255:
                    sqlTypes[field] = sa.NVARCHAR(fieldLen)
                
                #this is a fix they set some of the custom field max values to weird stuff
                elif np.count_nonzero(~pd.isna(respDf[field])) > 0 \
                                and ( fieldLen := int(respDf[field].str.len().max())) <= 255:
                    sqlTypes[field] = sa.NVARCHAR(fieldLen)                
                    
                else:
                    sqlTypes[field] = sa.TEXT()
                
    return sqlTypes


In [21]:
loadDaemonStats = {}
def loadDaemon():
    daemonName = 'loadDaemon'
    while True:
        with daemonStatusLock:
            daemonStatus[daemonName] = f"{daemonName} waiting for job"
        
        (tbl, respDf) = dataFramesQueue.get()
        daemonCurTaskQueue.put(daemonName)
        
        startTime = datetime.now()
        
        if respDf.shape[0] == 0:
            lprint("Skipping %s no data!" % tbl)
            loadDaemonStats[tbl] = "Skipped, no data!"
            dataFramesQueue.task_done()
            daemonCurTaskQueue.get()
            daemonCurTaskQueue.task_done()
            continue

        with daemonStatusLock:
            daemonStatus[daemonName] = f"{daemonName} getting SQL types for {tbl}"
            
        sqlTypes = getSQLTypes(tbl, respDf)
        
        
        sqlTblName = outputTableBaseName % tbl
        lprint("Uploading table %s to etl.%s" % (tbl,  sqlTblName))

        with daemonStatusLock:
            daemonStatus[daemonName] = f"{daemonName} uploading {tbl}"
            
        with engine.begin() as conn:
            respDf.to_sql(sqlTblName, conn, schema=sqlSchema, if_exists='replace', index=False, dtype=sqlTypes, chunksize=1)
        
        lprint("Finished uploading %s!" % tbl)
        loadDaemonStats[tbl] = datetime.now() - startTime

        dataFramesQueue.task_done()
        daemonCurTaskQueue.get()
        daemonCurTaskQueue.task_done()

lprint("Creating loadDaemon thread")
loadDaemonThread = threading.Thread(target=loadDaemon)
loadDaemonThread.daemon=True
loadDaemonThread.name = "loadDaemon"

[2024-08-26 14:22:11.746218] (MainThread) Creating loadDaemon thread


In [22]:
tableQueue = Queue()
for tbl in desiredTables:
    lprint(f"Queuing {tbl}")
    tableQueue.put(tbl)

[2024-08-26 14:22:11.783764] (MainThread) Queuing Account
[2024-08-26 14:22:11.783764] (MainThread) Queuing Application__c
[2024-08-26 14:22:11.783764] (MainThread) Queuing Contact
[2024-08-26 14:22:11.783764] (MainThread) Queuing Enrollment__c
[2024-08-26 14:22:11.783764] (MainThread) Queuing Standardized_Test__c


In [23]:
lprint("Starting loadDaemon thread")
loadDaemonThread.start()

[2024-08-26 14:22:11.810191] (MainThread) Starting loadDaemon thread
[2024-08-26 14:22:12.477740] (loadDaemon) Getting SQLTypes for Account
[2024-08-26 14:22:12.483989] (loadDaemon) Uploading table Account to etl.SalesForceEduCloud_KippFound_Account
[2024-08-26 14:28:27.016036] (loadDaemon) Finished uploading Account!
[2024-08-26 14:28:27.019037] (loadDaemon) Getting SQLTypes for Application__c
[2024-08-26 14:28:27.019037] (loadDaemon) Uploading table Application__c to etl.SalesForceEduCloud_KippFound_Application__c
[2024-08-26 14:30:29.089296] (loadDaemon) Finished uploading Application__c!
[2024-08-26 14:30:29.090303] (loadDaemon) Getting SQLTypes for Contact
[2024-08-26 14:30:29.094973] (loadDaemon) Uploading table Contact to etl.SalesForceEduCloud_KippFound_Contact
[2024-08-26 14:35:47.988536] (loadDaemon) Finished uploading Contact!
[2024-08-26 14:35:47.988536] (loadDaemon) Getting SQLTypes for Enrollment__c
[2024-08-26 14:35:48.000101] (loadDaemon) Uploading table Enrollment__c t

In [24]:
lprint("Starting dataFrameDaemon threads")
for i,thread in enumerate(transformDaemonThreads):
    lprint(f"Starting dataFrameDaemon[{i}]")
    thread.start()

[2024-08-26 14:22:11.832577] (MainThread) Starting dataFrameDaemon threads
[2024-08-26 14:22:11.832577] (MainThread) Starting dataFrameDaemon[0]
[2024-08-26 14:22:11.834845] (MainThread) Starting dataFrameDaemon[1]
[2024-08-26 14:22:11.835920] (MainThread) Starting dataFrameDaemon[2]
[2024-08-26 14:22:12.345455] (transformDaemon[0]) Turning Account into a dataframe
[2024-08-26 14:22:12.356323] (transformDaemon[0]) Dropping attributes from Account
[2024-08-26 14:22:12.357285] (transformDaemon[0]) Account shape (1000, 12)
[2024-08-26 14:22:12.357285] (transformDaemon[0]) Saving Account to SalesForceEduCloud_KippFound_Account.csv
[2024-08-26 14:22:12.378266] (transformDaemon[0]) Finished saving Account!
[2024-08-26 14:22:12.378266] (transformDaemon[0]) transforming columns for Account...
[2024-08-26 14:22:12.477740] (transformDaemon[0]) Adding response for Account to dataFrame queue
[2024-08-26 14:22:13.217810] (transformDaemon[1]) Turning Application__c into a dataframe
[2024-08-26 14:22

In [25]:
lprint("Starting query Thread")
extractDaemonThread.start()

[2024-08-26 14:22:11.848577] (MainThread) Starting query Thread
[2024-08-26 14:22:11.849577] (extractDaemon) Querying data for Account
[2024-08-26 14:22:11.849577] (extractDaemon) Starting query:  select Id, IsDeleted, MasterRecordId, Name, Type, RecordTypeId, ParentId, BillingPostalCode, Metro_Area__c, College_PowerRanking__c, DEP_College_PowerRanking__c, Long_Account_ID__c from Account where IsDeleted = false LIMIT 1000
[2024-08-26 14:22:12.342220] (extractDaemon) Adding Account to output queue
[2024-08-26 14:22:12.345455] (extractDaemon) Finished Account
[2024-08-26 14:22:12.345455] (extractDaemon) Account totalRecords 1000
[2024-08-26 14:22:12.345455] (extractDaemon) Querying data for Application__c
[2024-08-26 14:22:12.345455] (extractDaemon) Starting query:  select Id, IsDeleted, Name, RecordTypeId, Applicant__c, School__c, GPA_Type__c, GPA__c from Application__c where IsDeleted = false LIMIT 1000
[2024-08-26 14:22:13.215780] (extractDaemon) Adding Application__c to output queue


In [26]:
lprint("All daemons started!")

[2024-08-26 14:22:11.870059] (MainThread) All daemons started!


In [27]:
comboSize = 1
while comboSize > 0:
    tqSize = tableQueue.qsize()
    odqSize = outputDataQueue.qsize()
    dfqSize = dataFramesQueue.qsize()
    runSize = daemonCurTaskQueue.qsize()
    comboSize = tqSize + odqSize + dfqSize + runSize
    
    with daemonStatusLock:
        lprint("Daemon Status:\n" + ("\n".join(daemonStatus.values())))

    lprint(f"Extract queue({tqSize})\tTransform queue({odqSize})\tLoad queue({dfqSize})\tRunning({runSize})\tTotal({comboSize})")
        
    sleep(30)

lprint("All queues are empty but uploading is most likely still continueing")
    
    

[2024-08-26 14:22:11.891257] (MainThread) Daemon Status:
loadDaemon waiting for job
transformDaemon[0] waiting for job
transformDaemon[1] waiting for job
transformDaemon[2] waiting for job
extractDaemon Querying data for Account
[2024-08-26 14:22:11.891257] (MainThread) Extract queue(4)	Transform queue(0)	Load queue(0)	Running(1)	Total(5)
[2024-08-26 14:22:41.892063] (MainThread) Daemon Status:
loadDaemon uploading Account
transformDaemon[0] waiting for job
transformDaemon[1] waiting for job
transformDaemon[2] waiting for job
extractDaemon waiting for job
[2024-08-26 14:22:41.892063] (MainThread) Extract queue(0)	Transform queue(0)	Load queue(4)	Running(1)	Total(5)
[2024-08-26 14:23:11.892869] (MainThread) Daemon Status:
loadDaemon uploading Account
transformDaemon[0] waiting for job
transformDaemon[1] waiting for job
transformDaemon[2] waiting for job
extractDaemon waiting for job
[2024-08-26 14:23:11.892869] (MainThread) Extract queue(0)	Transform queue(0)	Load queue(4)	Running(1)	To

In [28]:
tableQueue.join()
lprint("All table data extracted")
outputDataQueue.join()
lprint("All data transformed!")
dataFramesQueue.join()
lprint(f"All data loaded to {engine}")


[2024-08-26 14:42:11.960131] (MainThread) All table data extracted
[2024-08-26 14:42:11.960131] (MainThread) All data transformed!
[2024-08-26 14:42:11.960131] (MainThread) All data loaded to Engine(mssql+pyodbc://gbezet:***@KNOS_Datawarehouse)


In [29]:
daemonCurTaskQueue.join()
lprint("All running daemons have completed!")

[2024-08-26 14:42:11.977758] (MainThread) All running daemons have completed!


In [30]:
for key in sorted(extractDaemonStats.keys()):
    lprint(f"extractDaemon processed {key} in {extractDaemonStats[key]}")

[2024-08-26 14:42:11.986280] (MainThread) extractDaemon processed Account in 0:00:00.495878
[2024-08-26 14:42:11.986280] (MainThread) extractDaemon processed Application__c in 0:00:00.871338
[2024-08-26 14:42:11.986280] (MainThread) extractDaemon processed Contact in 0:00:00.483495
[2024-08-26 14:42:11.986280] (MainThread) extractDaemon processed Enrollment__c in 0:00:00.510822
[2024-08-26 14:42:11.986280] (MainThread) extractDaemon processed Standardized_Test__c in 0:00:00.362554


In [31]:
for key in sorted(transformDaemonStats.keys()):
    lprint(f"transformDaemon processed {key} in {transformDaemonStats[key]}")

[2024-08-26 14:42:12.006172] (MainThread) transformDaemon processed Account in 0:00:00.132285
[2024-08-26 14:42:12.006172] (MainThread) transformDaemon processed Application__c in 0:00:00.108887
[2024-08-26 14:42:12.006172] (MainThread) transformDaemon processed Contact in 0:00:00.334932
[2024-08-26 14:42:12.006172] (MainThread) transformDaemon processed Enrollment__c in 0:00:00.062157
[2024-08-26 14:42:12.006172] (MainThread) transformDaemon processed Standardized_Test__c in 0:00:00.063743


In [32]:
for key in sorted(loadDaemonStats.keys()):
    lprint(f"loadDaemon processed {key} in {loadDaemonStats[key]}")

[2024-08-26 14:42:12.015129] (MainThread) loadDaemon processed Account in 0:06:14.541297
[2024-08-26 14:42:12.016133] (MainThread) loadDaemon processed Application__c in 0:02:02.070259
[2024-08-26 14:42:12.016133] (MainThread) loadDaemon processed Contact in 0:05:18.898233
[2024-08-26 14:42:12.016133] (MainThread) loadDaemon processed Enrollment__c in 0:02:19.514087
[2024-08-26 14:42:12.016133] (MainThread) loadDaemon processed Standardized_Test__c in 0:03:23.726923


In [33]:
statLists = [extractDaemonStats, transformDaemonStats, loadDaemonStats]
for key in sorted(extractDaemonStats.keys()):
    if np.all([isinstance(statList[key], timedelta) for statList in statLists]):
        comboTime = np.sum([statList[key] for statList in statLists])
        lprint(f"{key} completed in {comboTime}")
    else:
        lprint(F"{key} incomplete {[str(l[key]) for l in statLists]}")

[2024-08-26 14:42:12.025432] (MainThread) Account completed in 0:06:15.169460
[2024-08-26 14:42:12.025940] (MainThread) Application__c completed in 0:02:03.050484
[2024-08-26 14:42:12.025940] (MainThread) Contact completed in 0:05:19.716660
[2024-08-26 14:42:12.025940] (MainThread) Enrollment__c completed in 0:02:20.087066
[2024-08-26 14:42:12.025940] (MainThread) Standardized_Test__c completed in 0:03:24.153220


In [34]:
lprint("=============DONE!===================")



In [35]:
sleep(5)

In [36]:
#jupyter nbconvert .\TableETL-KippFoundation.ipynb --to python