# DWH Job monitor

## Loading

In [1]:
import sys
import os
import datetime
import time
import traceback
import logging
import pandas as pd
import tkinter
import tkinter.messagebox

BASE_DIR = r'C:\Users\leo.zhangzs\AppData\Roaming\Code\User\VScode\py3\UAT_Test'
sys.path.append(BASE_DIR)

from string import Template
from core.configadmin import ConfigAdmin
from core.color_me import ColorMe
from core import log_ctrl
from core.outlook import send_email
from core.cron import Cron
from core.sqlplus import SqlPlus

config_path = os.path.join(BASE_DIR, 'conf')
config_name = 'setting.ini'
configs = ConfigAdmin(config_path, config_name)

logs = log_ctrl.de8ug_log(logger_name='Main_DWH', log_file=os.path.join(BASE_DIR, 'log', f'{time.strftime("%Y%m",time.localtime())}-run.log'), level = logging.INFO)
# , level = logging.INFO)
                                                                        
Temp1 = os.path.join(BASE_DIR, 'conf', 'CSS', configs.read_config('CSS_Style','dwhtemplate'))
TempError = os.path.join(BASE_DIR, 'conf', 'CSS', configs.read_config('CSS_Style','errortemplate'))

## Class

In [3]:
class DWH_Jobs(object):
    """
    管理DWH Schedule Jobs（SQL):
    1.读取Excel中的任务列表（到达执行时间，且为激活状态的工作任务）
    2.执行SQL脚本
    3.发送执行结果通知
    """
    def __init__(self):
        """
        db_user: 数据库用户名
        db_password：数据库密码
        db_service：数据库地址
        JobList： Excel数据表
        logs：日志
        """
        self.Sql_Processer = SqlPlus(configs.read_config("SAS_db","dwh_user"), configs.read_config("SAS_db","dwh_password"), configs.read_config("SAS_db","dwh_service"))
        self.df = pd.read_excel(os.path.join(BASE_DIR, 'db', 'cache', configs.read_config("FILE_NM","dwh_list")), 'dwh_job', convert_float=False)
        self.df.EXCUTION_TIME = pd.to_datetime(self.df.EXCUTION_TIME, format="%Y/%m/%d %H:%M:%S")
        self.Job_List = self.df[(self.df['CAMPAIGN_STATUS'] == 'A') & 
            (self.df['SCRIPTS'].isna() == False) & 
            (datetime.datetime.now() >= self.df.SCHEDULE_TIME.apply(lambda x: Cron(x).out_time())) & 
            (self.df.SCRIPTS.apply(lambda x: os.path.exists(os.path.join(BASE_DIR, 'db','DWH_Jobs',str(x))) == True))]
        
        self.Produce_list = self.Job_List[self.Job_List.apply(lambda x:(datetime.datetime.now() - x['EXCUTION_TIME']).total_seconds()/3600 - x['INTERVAL_HOURS'],axis=1) > 0]
        self.log = logs
        self.log.debug('Initial Completed!')
        
    def start(self):
        """
        开始执行SQL Scripts
        """
        if len(self.Produce_list) == 0 :
            print(ColorMe("Warning: ").yellow(),"There are no more jobs to processing right now.")
            self.log.debug("No more jobs.")
            return None
        # self.log.info(self.Produce_list)
        for i in self.Produce_list.index:
            self.log.info(self.Produce_list[self.Produce_list.index == i]['SCRIPTS'])
            sqlName = self.Produce_list[self.Produce_list.index == i]['SCRIPTS'].iloc[0]
            sqlfile = os.path.join(BASE_DIR, 'db', 'DWH_Jobs', sqlName)
            CAMPAIGN_NM = self.Produce_list[self.Produce_list.index == i]['CAMPAIGN_NM'].iloc[0]
            SCRIPTS_TYPE = self.Produce_list[self.Produce_list.index == i]['SCRIPTS_TYPE'].iloc[0]
            result = {'Status': None, 'msg': None}
            self.log.debug(f'Start to process : {CAMPAIGN_NM}')
            try:
                # 判断SQL脚本类型，选择对应执行方法
                if SCRIPTS_TYPE == 'EXECUTE':
                    result = self.Sql_Processer.excute(sqlfile)
                elif SCRIPTS_TYPE == 'PRODUCE':
                    result = self.Sql_Processer.produce(sqlfile)
                else:
                    raise ValueError("Scripts Type Error !!!")
                if result['Status'] == True:
                    self.df.loc[i, 'EXCUTION_TIME'] = datetime.datetime.strftime(datetime.datetime.now(), '%Y/%m/%d %X')
                    print(ColorMe("Success: ").green(),f"Process - {sqlName} success.")
                else:
                    print(ColorMe("Filled: ").red(),f"Process - {sqlName} Failed!!!")
                senter = ""
                # 判断是否需要发送结果通知
                if self.Produce_list[self.Produce_list.index == i]['IS_NOTIFY'].iloc[0] == 'Y':
                    senter = self.Produce_list[self.Produce_list.index == i]['SENTER'].to_string(na_rep=None, index=False)
                # 发送结果通知
                self.notify(senter, CAMPAIGN_NM, result, sqlfile)
            except Exception as e:
                # 程序异常，发送异常警号邮件至相关责任人。
                self.log.critical(traceback.format_exc())
                sender = 'Leo.ZhangZS@homecredit.cn'
                self.notify(sender, None, traceback.format_exc(), sqlfile)
            finally:
                writer = pd.ExcelWriter(JobList)
                self.df.to_excel(writer, 'dwh_job', index=0)
                writer.save()
                self.log.debug(f"End to process : {CAMPAIGN_NM}")
                
    def notify(self, sent_to, Camp_nm = None, result = None, attachement = None):
        """
        发送结果通知
        """
        copyer = 'Leo.ZhangZS@homecredit.cn;Jason.JiJJ@homecreditcfc.cn;Caroline.Cui@homecredit.cn;Iris.Ji@homecreditcfc.cn;'
        content_html = ""
        subject = "DWH Process Resume："
         # 判断是否存在发送人地址
        if len(sent_to) < 5:
            # 联系人地址不合法，自动填充默认联系人
            print(ColorMe("Warning: ").yellow(),f"No Senters, sent to me only.")
            # self.log.warning(f'No Senters.')
            sent_to = 'Leo.ZhangZS@homecredit.cn'
        # 如果联系人只有我，为测试用例或异常值，清空抄送人
        if sent_to == 'Leo.ZhangZS@homecredit.cn':
            copyer = None
        # 如果Camp_nm 为空，则为异常值邮件。
        if Camp_nm != None:
            # 根据返回，判断执行结果，发送对应的结果通知。
            if result['Status'] == True:
                CampStatus = '<p><strong>Status :&nbsp;</strong><span style="color:#99BB00;">Successful</span></p>'
                attachement = None
                subject = subject + 'Successful'
                with open(Temp1, 'r') as f:
                    t1 = Template(f.read())
                    content_html = t1.substitute(CampName=Camp_nm, CampStatus=CampStatus, DetailInfo=result['msg'])
            else:
                CampStatus = '<p><strong>Status :&nbsp;</strong><span style="color:#E53333;">Failed</span></p>'
                subject = subject + 'Failed'
                ProcessName = os.path.basename(attachement)
                EffectLevel = 'Medium'
                ContactNotice = sent_to
                RescueOperation = "Manually execute the attachment script"
                EorrorTime = datetime.datetime.strftime(datetime.datetime.now(), "%Y/%m/%d %X")
                with open(TempError, 'r') as f:
                    t1 = Template(f.read())
                    content_html = t1.substitute(ProcName=ProcessName, EffectLevel=EffectLevel, ContactNotice=ContactNotice, ResOpt=RescueOperation, EorrTime=EorrorTime, EorrInfo=result['msg'])
        else:
            subject = subject + 'Error'
            ProcessName = 'Main_DWH.py'
            EffectLevel = 'High'
            ContactNotice = 'Leo.ZhangZS'
            RescueOperation = 'Check the process log'
            EorrorTime = datetime.datetime.strftime(datetime.datetime.now(), "%Y/%m/%d %X")
            with open(TempError, 'r') as f:
                t1 = Template(f.read())
                content_html = t1.substitute(ProcName=ProcessName, EffectLevel=EffectLevel, ContactNotice=ContactNotice, ResOpt=RescueOperation, EorrTime=EorrorTime, EorrInfo=result)
        # 如果邮件发送失败，触发异常
        if not send_email.outlook(sent_to, subject, content_html, copyer, attachement):
            raise Exception('Send E-mail Failed!')

## Test

In [14]:
def main():
    try:
        dwh_jobs = DWH_Jobs()
        dwh_jobs.start()
        return True
    except Exception as e:
        sent_to = 'Leo.ZhangZS@homecredit.cn'
        subject = 'Program Error'
        ProcessName = 'Main_DWH.py'
        EffectLevel = 'High'
        ContactNotice = 'Leo.ZhangZS'
        RescueOperation = 'Eorror Info'
        EorrorTime = datetime.datetime.strftime(datetime.datetime.now(), "%Y/%m/%d %X")
        with open(TempError, 'r') as f:
            t1 = Template(f.read())
            content_html = t1.substitute(ProcName=ProcessName, EffectLevel=EffectLevel, ContactNotice=ContactNotice, ResOpt=RescueOperation, EorrTime=EorrorTime, EorrInfo=traceback.format_exc())
        logs.critical(traceback.format_exc())
        send_email.outlook(sent_to, subject, content_html, None, None)
    #     tkinter.messagebox.showerror("ERROR", e)
        return False

In [16]:
main()

2019-12-30 15:40:56,473 <ipython-input-13-d1fcc1d9d9af> [line:38] Main_DWH INFO 1.0    SMS_POS_Visit_Feedback_Tipper_SCR-931.sql
Name: SCRIPTS, dtype: object
2019-12-30 15:40:56,473 <ipython-input-13-d1fcc1d9d9af> [line:38] Main_DWH INFO 1.0    SMS_POS_Visit_Feedback_Tipper_SCR-931.sql
Name: SCRIPTS, dtype: object
2019-12-30 15:40:57,680 sqlplus.py [line:69] sqlplus DEBUG 数据传输成功


[32;1mSuccess: [0m Process - SMS_POS_Visit_Feedback_Tipper_SCR-931.sql success.


True

In [8]:
df = pd.read_excel(os.path.join(BASE_DIR, 'db', 'cache', configs.read_config("FILE_NM","dwh_list")), 'dwh_job', convert_float=False)
df.EXCUTION_TIME = pd.to_datetime(df.EXCUTION_TIME, format="%Y/%m/%d %H:%M:%S")
df

Unnamed: 0,CAMPAIGN_NM,CAMPAIGN_CD,COMMUNICATION_CD,CAMPAIGN_STATUS,SCHEDULE_TIME,SCRIPTS_TYPE,EXCUTION_TIME,INTERVAL_HOURS,SCRIPTS,IS_NOTIFY,...,PARM3,PARM4,PARM5,PARM6,PARM7,PARM8,PARM9,PARM10,PARM11,PARM12
0,CALL_SA_document_error_SCR-808,CAMP1305,"COMM5220,COMM5221",A,0 10 17 0 0 3,PRODUCE,2019-11-24 10:00:00,24.0,CALL_SA_document_error_SCR-808.sql,Y,...,,,,,,,,,,
1,SMS_POS_Visit_Feedback_Tipper_SCR-931,CAMP1444,COMM6126,A,0 59 10 * * ?,PRODUCE,2019-12-30 15:40:57,24.0,SMS_POS_Visit_Feedback_Tipper_SCR-931.sql,Y,...,,,,,,,,,,


In [15]:
df[(df['CAMPAIGN_STATUS'] == 'A') & 
(df['SCRIPTS'].isnull() == False) & 
(datetime.datetime.now() >= df.SCHEDULE_TIME.apply(lambda x: Cron(x).out_time())) & 
(df.SCRIPTS.apply(lambda x: os.path.exists(os.path.join(BASE_DIR, 'db','DWH_Jobs',str(x))) == True))]

Unnamed: 0,CAMPAIGN_NM,CAMPAIGN_CD,COMMUNICATION_CD,CAMPAIGN_STATUS,SCHEDULE_TIME,SCRIPTS_TYPE,EXCUTION_TIME,INTERVAL_HOURS,SCRIPTS,IS_NOTIFY,...,PARM3,PARM4,PARM5,PARM6,PARM7,PARM8,PARM9,PARM10,PARM11,PARM12
1,SMS_POS_Visit_Feedback_Tipper_SCR-931,CAMP1444,COMM6126,A,0 59 10 * * ?,PRODUCE,2019-12-30 15:40:57,24.0,SMS_POS_Visit_Feedback_Tipper_SCR-931.sql,Y,...,,,,,,,,,,
