In [0]:
import pandas as pd



In [0]:
def exec_spark_sql(query,config_data=None):
    try:
        df = spark.sql(query)
        return df
    except Exception as e:
        raise Exception(f'exception in exec_spark_sql: {e} : query: {query}')

In [0]:
# pre and post script auditing status description
def get_status_desc(status_num):
    status_desc = {
        1: "'STARTED'",
        2: "'ERROR_ENCOUNTERED'",
        3: "'COMPLETED'",
        4: "'ACTIVE'",
        5: "'INACTIVE'"
    } 
    return status_desc.get(status_num, "'nothing'")

In [0]:
# initialize job level prescript
def job_pre_script(config_data,job_num,current_batch_start_date):
    # status_num = 1 indicated job is started
    status_num = 1
    # get batch_id / sub_batch_id / previous_batch_start_date / previous_batch_end_date
    pre_script_query = f"""
        select * from
        (select case
        when MAX (BATCH_ID) is null then 1
        when DATEDIFF ( current_timestamp() ,current_batch_start_date ) = 0 then MAX (BATCH_ID)
        when DATEDIFF ( current_timestamp() ,current_batch_start_date ) = -1 then MAX (BATCH_ID)
        when DATEDIFF ( current_timestamp() ,current_batch_start_date ) > 0 then MAX (BATCH_ID) + 1
        end as batch_id from {config_data.get('source').get('audit_schema_nm')}.JOB_DETAILS where JOB_NUM = {job_num}
        group by CURRENT_BATCH_START_DATE ) a
        cross join 
        (select
        case when MAX (SUB_BATCH_ID) is null then 1
        else MAX(sub_batch_id)+ 1 end as sub_batch_id
        from {config_data.get('source').get('audit_schema_nm')}.JOB_DETAILS where
        DATEDIFF ( current_timestamp() ,current_batch_start_date ) = 0
        or DATEDIFF ( current_timestamp() ,current_batch_start_date ) = -1 and JOB_NUM = {job_num}
        group by CURRENT_BATCH_START_DATE) b
        cross join
        (select current_batch_start_date from {config_data.get('source').get('audit_schema_nm')}.JOB_DETAILS where job_num = {job_num}) c
        cross join
        (select current_batch_end_date from {config_data.get('source').get('audit_schema_nm')}.JOB_DETAILS where job_num = {job_num}) d
    """
    pre_script_df = exec_spark_sql(pre_script_query).toPandas().to_dict('records')
    if len(pre_script_df) == 0 :
        data = {'batch_id': 1,'sub_batch_id' : 1, 'current_batch_start_date': pd.NaT,'current_batch_end_date': pd.NaT}
        pre_script_df.append(data)
    batch_id = pre_script_df[0].get('batch_id')
    sub_batch_id = pre_script_df[0].get('sub_batch_id')
    previous_batch_start_date = pre_script_df[0].get('current_batch_start_date')
    previous_batch_end_date = pre_script_df[0].get('current_batch_end_date')
    previous_batch_start_date = f"'{previous_batch_start_date}'" if not pd.isnull(previous_batch_start_date) else "'1800-01-01'"
    previous_batch_end_date = f"'{previous_batch_end_date}'" if not pd.isnull(previous_batch_end_date) else "null"
    
    # merge into job_details table as only one entry should be present for each job
    job_details_query = f"""
    merge INTO {config_data.get('source').get('audit_schema_nm')}.JOB_DETAILS t 
    using (
        select 
        {job_num} job_id,
        {batch_id} batch_id,
        {sub_batch_id} sub_batch_id,
        {job_num} job_num,
        null core_db_batch_start_date,
        null core_db_batch_end_date,
        '{current_batch_start_date}' current_batch_start_date,
        null current_batch_end_date,
        {previous_batch_start_date} previous_batch_start_date,
        {previous_batch_end_date} previous_batch_end_date,
        {status_num} status_num,
        null failure_point,
        null technical_error_code,
        {get_status_desc(status_num)} status_desc,
        null technical_error_desc
    ) s
    ON s.job_num = t.job_num
    WHEN MATCHED THEN UPDATE SET 
        job_id = s.job_id,
        batch_id= s.batch_id,
        sub_batch_id= s.sub_batch_id,
        job_num= s.job_num,
        core_db_batch_start_date= s.core_db_batch_start_date,
        core_db_batch_end_date= s.core_db_batch_end_date,
        current_batch_start_date= s.current_batch_start_date,
        current_batch_end_date= s.current_batch_end_date,
        previous_batch_start_date= s.previous_batch_start_date,
        previous_batch_end_date= s.previous_batch_end_date,
        status_num= s.status_num,
        failure_point= s.failure_point,
        technical_error_code= s.technical_error_code,
        status_desc= s.status_desc,
        technical_error_desc= s.technical_error_desc
        WHEN NOT MATCHED THEN INSERT (
            job_id,
            batch_id,
            sub_batch_id,
            job_num,
            core_db_batch_start_date,
            core_db_batch_end_date,
            current_batch_start_date,
            current_batch_end_date,
            previous_batch_start_date,
            previous_batch_end_date,
            status_num,
            failure_point,
            technical_error_code,
            status_desc,
            technical_error_desc
          )
      VALUES (
            s.job_id,
            s.batch_id,
            s.sub_batch_id,
            s.job_num,
            s.core_db_batch_start_date,
            s.core_db_batch_end_date,
            s.current_batch_start_date,
            s.current_batch_end_date,
            s.previous_batch_start_date,
            s.previous_batch_end_date,
            s.status_num,
            s.failure_point,
            s.technical_error_code,
            s.status_desc,
            s.technical_error_desc
          )
    """
    exec_spark_sql(job_details_query)
    
    # create history of job_details for each job run
    job_details_history_query = f"insert into {config_data.get('source').get('audit_schema_nm')}.JOB_DETAILS_HISTORY (job_id,batch_id,sub_batch_id,job_num,core_db_batch_start_date,core_db_batch_end_date,current_batch_start_date,current_batch_end_date,previous_batch_start_date,previous_batch_end_date,status_num,failure_point,technical_error_code,status_desc,technical_error_desc) values({job_num},{batch_id},{sub_batch_id},{job_num},null,null,'{current_batch_start_date}',null,{previous_batch_start_date},{previous_batch_end_date},{status_num},null,null,{get_status_desc(status_num)},null)"
    exec_spark_sql(job_details_history_query)
    
    batch_sub_batch_id = {'batch_id': batch_id,'sub_batch_id' : sub_batch_id}
    return batch_sub_batch_id

In [0]:
# job level postscript - if job is completed or failed
def job_post_script(config_data,job_num,batch_sub_batch_id,status_num,current_batch_end_date="null",failure_point="null",technical_error_code="null",technical_error_desc="null"):
    job_details_post_script_query = f"""
        update {config_data.get('source').get('audit_schema_nm')}.JOB_DETAILS 
        set STATUS_NUM =  {status_num} , CURRENT_BATCH_END_DATE= '{current_batch_end_date}' ,  
        Status_Desc= {get_status_desc(status_num)}, failure_point = {failure_point} , technical_error_code = {technical_error_code} , technical_error_desc = {technical_error_desc}
        where JOB_NUM= {job_num}  AND BATCH_ID= {batch_sub_batch_id.get('batch_id')}  AND 
        SUB_BATCH_ID= {batch_sub_batch_id.get('sub_batch_id')} 
       """
    job_details_history_post_script_query = f"""
        update {config_data.get('source').get('audit_schema_nm')}.JOB_DETAILS_History 
        set STATUS_NUM = {status_num} ,  CURRENT_BATCH_END_DATE= '{current_batch_end_date}' ,
        Status_Desc= {get_status_desc(status_num)} , failure_point = {failure_point} , technical_error_code = {technical_error_code} , technical_error_desc = {technical_error_desc}
        where JOB_NUM= {job_num}  AND BATCH_ID=  {batch_sub_batch_id.get('batch_id')}  AND 
        SUB_BATCH_ID= {batch_sub_batch_id.get('sub_batch_id')} 
    """
    log.debug(f"job_details_post_script_query: {job_details_post_script_query}")
    log.debug(f"job_details_history_post_script_query: {job_details_history_post_script_query}")
    exec_spark_sql(job_details_post_script_query)
    exec_spark_sql(job_details_history_post_script_query)

In [0]:
# # Retrieve the values of the passed parameters
# function_name = dbutils.widgets.get("function_name")
# config_data = dbutils.widgets.get("config_data")
# job_num = dbutils.widgets.get("job_num")
# current_batch_start_date = dbutils.widgets.get("current_batch_start_date")

# batch_sub_batch_id = dbutils.widgets.get("batch_sub_batch_id")
# status_num = dbutils.widgets.get("status_num")

In [0]:
# # Call the function based on the function_name parameter
# if function_name == "job_pre_script":
#     return job_pre_script(config_data,job_num,current_batch_start_date)

# # Call the function based on the function_name parameter
# if function_name == "job_post_script":
#     return job_post_script(config_data,job_num,batch_sub_batch_id,status_num)