# Headers/Startup

In [None]:
# IMPORTS
import logging
import traceback
from os import environ as os_environ
from sys import stdout

import dotenv

dotenv.load_dotenv()

from datetime import datetime, timedelta

import pandas as pd

# for verifying DB updates
from db_engines import rprt_db, wh_conn_str, wh_db as db

tmstmp_fmt: str = r'%Y-%m-%d %H:%M:%S'
query_date_fmt: str = r'%Y-%m-%d'

import re
from pathlib import Path
from threading import Thread

import pandas as pd
from pandas import DataFrame as Df


In [None]:
# OTHER CONSTANTS
today: str = datetime.now().strftime(query_date_fmt)
repos_path = Path(os_environ['PRMDIA_EVAN_LOCAL_LAKEPATH'])


In [None]:
# LOAD TOLL-NUMBER MAP


In [None]:
# LOGGING SETUP
log_fmt_date_strm = r'%y%m%d|%H%M'
log_fmt_date_file = r'%Y-%m-%d %H:%M:%S'
log_fmt_file =\
    '%(asctime)s [%(name)s,%(funcName)s,%(module)s::%(levelname)s]>>%(message)s'
log_fmt_strm =\
    '\x1b[32m%(asctime)s[%(name)s %(levelname)s]\x1b[0m >> %(message)s'

logger = logging.getLogger(os_environ['PRMDIA_MM_LOGNAME'])
hdlr = logging.StreamHandler(stdout)
hdlr.setFormatter(
    logging.Formatter(
        fmt=log_fmt_strm, datefmt=log_fmt_date_strm))
# hdlr.setLevel(logging.DEBUG)
logger.addHandler(hdlr)
logger.setLevel(logging.INFO)

In [None]:
# LOAD PM PHONE NUMBERS
pm_phone: Df
with rprt_db.connect() as conn:
    pm_phone = pd.read_sql_query(
        sql="""--sql
            SELECT phone_dir FROM dim_phone
        """.replace('--sql\n', ''),
        con=conn
    )
pm_phone: list[int] = list(pm_phone['phone_dir'])

# create string for query
# print(*[i for i in pm_phone], sep=', ')
ph_not_in: str = ', '.join([str(i) for i in pm_phone])

log_msg = ', '.join([str(i) for i in pm_phone])
logger.debug(f"PM phone nums excluded: \n{log_msg}")
del log_msg

with db.connect() as conn:
    conn.execute(Path('master_join.pgsql').read_text())


In [None]:
# check for active connections, else raise exception and bail
from db_engines import MySQL_OpErr, check_connection

for d in db, rprt_db:
    try:
        check_connection(d)
    except MySQL_OpErr:
        raise Exception(f"\x1b[91mSEE BELOW/ABOVE\x1b[0m\n")
    else:
        pass

del MySQL_OpErr, check_connection


In [None]:
# DATA LAKE VINTAGE CHECK
from table_config import AF_CFGS, ATT_FILE_CFG
rng = 5

ansi = '\x1b[{clr}m'
good_ansi = ansi.format(clr='93')
bad_ansi = ansi.format(clr='1;91')
ansi_rst = '\x1b[0m'
rpo_chk_prstr = (
    "❇️{an}{nm}{anr}, source or top of glob for ({ds}) "
    + "Repos Vintage: {an}{ts}{anr}"
)


af_glob: str = AF_CFGS['src_label']
att_glob: str = ATT_FILE_CFG['src_label']
del AF_CFGS, ATT_FILE_CFG

# get recent mtimes
af_files: list[Path]
att_files: list[Path]
af_files, att_files = (
    list(repos_path.rglob(glob))
    for glob in (af_glob, att_glob)
)

for l, t in ((af_files, 'af_message_data'), (att_files, 'att_data')):
    l.sort(reverse=True, key=lambda p: p.stat().st_mtime)

    for i in range(rng):
        p = l[i]
        t = datetime.fromtimestamp(p.stat().st_mtime)
        ts = t.strftime(tmstmp_fmt)
        nm = l[i].name

        now, dlt = datetime.now(), timedelta(hours=(16))
        clr = good_ansi if (now - t < dlt) else bad_ansi
        del now, dlt

        logger.info(rpo_chk_prstr.format(
            an=clr, anr=ansi_rst, nm=nm, ds=t, ts=ts))

        del clr, nm, ts


# ETL Scripts

In [None]:
# ETL FROM REPOS
from etl_att_repos import main as att
from etl_af_repos import main as af
from etl_client_key import main as client

att_thr = Thread(target=att)
af_thr = Thread(target=af)
client_thr = Thread(target=client)
threads = (
    af_thr,
    att_thr,
    client_thr,
)

for t in threads:
    t.start()

for t in threads:
    t.join()

del att_thr, af_thr, threads, att, af

In [None]:
# create view of master join
xtra_sql_file = Path('master_join.pgsql').name
psql_cmd: str = f"psql --file={xtra_sql_file} {wh_conn_str}"
logger.info(f"Re-instating master join view.")
!{psql_cmd} 