## Instalação da biblioteca holidays (feriados) e import de todas as bibliotecas utilizadas no código

In [2]:

#!pip install holidays #--index-url http://artifactory.santanderbr.corp/artifactory/api/pypi/pypi-all/simple --trusted-host artifactory.santanderbr.corp
#!pip install holidays
import pandas as pd
import requests, json, holidays
from pyspark.sql.functions import to_timestamp, get_json_object, current_timestamp
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F, SparkSession
from datetime import datetime, timedelta

spark = SparkSession.builder.appName("ValidadorTabelas").getOrCreate()


ModuleNotFoundError: No module named 'pyspark'

## Criando as variáveis de data

In [10]:

dt_hj = datetime.today()
d_sem = dt_hj.weekday()
dt_hj_str = datetime.today().strftime("%Y-%m-%d")
exec_datetime = datetime.now()
exec_timestamp_str = exec_datetime.strftime('%Y-%m-%d %H:%M:%S')
ano_mes_corr = exec_datetime.strftime('%Y-%m')
dia_20 = f"{ano_mes_corr}-20"
dt_inic_mes = f"{ano_mes_corr}-01"

## Função para obter qual o dia útil do mês estamos

In [11]:
feriados_br = holidays.Brazil()

def business_days(start_date, end_date):
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)

    num_business_days = 0
    current_date = start_date

    while current_date <= end_date:
        if current_date.weekday() < 5 and current_date not in feriados_br:
            num_business_days += 1
        current_date += timedelta(days=1)

    if end_date.weekday() < 5 and end_date in feriados_br:
        num_business_days += 1

    return num_business_days

dia_util = business_days(dt_inic_mes, dt_hj_str)


## Obtendo todas as tabelas automaticamente pelos schemas 
(Isso faz com que você faça a validação de todas as tabelas disponiveis nos schemas)

In [None]:
lista_schemas = ["schema_1", "schema_2", "schema_3"] #Coloque os schemas das tabelas que deseja validar

df_schemas = None
for i in lista_schemas:
    df_sch = spark.sql(f"SHOW TABLES IN {i}")
    if df_schemas is None:
        df_schemas = df_sch
    else:
        df_schemas = df_schemas.union(df_sch).dropDuplicates(["tableName"])

### Selecionando apenas as tabelas que precisam ser verificadas no dia
Aqui conseguimos retirar da validação do dia aquelas tabelas que precisam ser validadas apenas em dias específicosx por exemplo tabelas que são populadas apenas uma vez por mês ou uma vez por semana, ou tabelas que não são populadas no final de semana etc.

In [None]:
df_tabelas = df_schemas \
    .withColumn("nome_tabela", 
                F.concat(
                    F.col("database"), 
                    F.lit("."), 
                    F.col("tableName"))
    ) \
    .withColumn("exec_hj", #Criamos uma coluna para saber as tabelas que devem ser populadas hoje, a coluna terá apenas "SIM" ou "NÃO" para cada tabela
        F.when(
            F.col("nome_tabela").isin("tabela_1", "tabela_2", "tabela_3") & #coloque aqui as tabelas que só rodam de sábado e domingo
            (~F.lit(d_sem).isin(5, 6)), F.lit("Não")
        ).when(
            F.col("nome_tabela").isin("tabela_1", "tabela_2", "tabela_3") & #coloque aqui as tabelas que só rodam de sexta-feira
            (~F.lit(d_sem).isin(4)), F.lit("Não")
        ).when(
            F.col("nome_tabela").isin("tabela_1", "tabela_2", "tabela_3") & #coloque aqui as tabelas que só rodam todo dia 20 de cada mês
            (~F.lit(dt_hj_str) == F.lit(dia_20)), F.lit("Não")
        ).when(
            F.col("nome_tabela").isin("tabela_1", "tabela_2", "tabela_3") & #coloque aqui as tabelas que NÃO rodam de sábado e domingo
            (F.lit(d_sem).isin(5, 6)), F.lit("Não")
        ).when(
            F.col("nome_tabela").isin("tabela_1", "tabela_2", "tabela_3") & #tabelas que só rodam no 4º dia útil
            (~F.lit(dia_util).isin(4)), F.lit("Não")
        #Aqui você pode configurar da forma que precisar, coloquei alguns exemplos porém fica à critério de cada um
        ).otherwise(F.lit("Sim"))) \
    .filter(
        (~F.col("nome_tabela").isin("tabela_1", "tabela_2", "tabela_3")) & #tabelas que não quer fazer a validação por algum motivo, coloque aqui para retirar do validador
        (F.col("exec_hj").isin("Sim"))) \
    .select("nome_tabela") #fazemos o filtro acima para no final o dataframe ficar apenas com o nome das tabelas que precisam ser validadas no dia da execução

### Aqui é feito um teste para ver se as tabelas tem permissão de acesso.
Se tiver alguma tabela que não possui acesso ela será retirada do validador para não quebrar o código e será colocada em uma lista de tabelas sem permissão e no final vai na mensagem informando que não tem acesso

In [None]:
tab_sem_per = []
for tb_name in df_tabelas.collect():
    try:
        spark.sql(f"select * from {tb_name['nome_tabela']} limit 1")
    except:
        tab_sem_per.append(tb_name['nome_tabela'])
df_tabelas = df_tabelas.filter(~F.col("nome_tabela").isin(tab_sem_per))


### Verificando se o job executou ou não
Aqui coletamos os dados da tabela através do "describe history" pegando a ultima atualização e trazendo dados como data e horário da execução, quantidade de linhas inseridas, nome da tabela e do job.
Aqui é feito um loop no dataframe de tabelas para pegar os dados de todas as tabelas que estão para ser validadas no dia da execução

In [None]:
df_final = None
for table_name in df_tabelas.collect():
    history_df = spark.sql(f"""DESCRIBE HISTORY `{table_name['nome_tabela']}`""") \
        .filter( #Pegando apenas escrita de tabela e desconsiderando outras operações
            (~F.col("operation").isin("VACUUM START", "VACUUM END", "MERGE"))) \
        .withColumn("data_exec", 
                    F.to_date(F.col("timestamp"), "yyyy-MM-dd HH:mm:ss")) \
        .withColumn("hr_exec", 
                    F.date_format(
                        (F.to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss") 
                        - F.expr("INTERVAL 3 HOURS")), "HH:mm:ss")) \
        .withColumn("nome_tabela_Desc", 
                    F.lit(table_name["nome_tabela"]))

    # Obtendo apenas a última execução de cada tabela
    recent_update_df = history_df.orderBy(F.col("timestamp").desc()).limit(1)

    recent_update_df = recent_update_df \
        .withColumn("operationMetricsStr", 
                    F.to_json(F.col("operationMetrics"))) \
        .withColumn("job", F.to_json(F.col("job")))

    #obtendo o nome do job e a quantidade de linhas inseridas
    recent_update_df = recent_update_df \
        .withColumn("qtd_linhas", 
                    F.get_json_object(F.col("operationMetricsStr"), "$.numOutputRows").cast(IntegerType())) \
        .withColumn("job_name",
                    F.get_json_object(F.col("job"), "$.jobName"))

    recent_update_df = recent_update_df \
        .select(
            "nome_tabela_Desc",
            "qtd_linhas",
            "data_exec",
            "hr_exec",            
            "job_name"
        )

    if df_final is None: # Concatenando os dados no DataFrame final
        df_final = recent_update_df
    else:
        df_final = df_final.union(recent_update_df)

### Aplicando status final
Aqui é criada a coluna "status" que vai mostrar se o job deu "Sucesso", "Falha" ou "Executou hoje porém não escreveu dado"

In [None]:
join_final = df_tabelas.alias("a") \
    .join(df_final.alias("b"), 
          on=F.col("a.nome_tabela") == F.col("b.nome_tabela_Desc"), 
          how="left") \
    .withColumn("status", 
        F.when((F.col("qtd_linhas") > 0) & (F.col("data_exec") == dt_hj_str), "Sucesso") \
        .otherwise(F.lit("Falha"))) \
    .withColumn("status", 
        F.when(
            (F.col("qtd_linhas").isNull()) | (F.col("qtd_linhas") == 0) & 
            (F.col("data_exec") == dt_hj_str), "Executou hoje porém não escreveu dado") \
        .otherwise(F.col("status"))) \
    .withColumn("DT_REFE",
        F.lit(dt_hj_str)) \
    .select(
        "nome_tabela",
        "job_name",
        "qtd_linhas",
        "data_exec",
        "hr_exec",      
        "status",
        "DT_REFE"
    )

### Obtendo o horário atual e filtrando o dataframe final
Como o job roda de 1 em 1 hora, fiz essa validação para ele pegar apenas os job's que rodam da hora que está sendo executado para baixo para não ter perigo de aplicar "Falha" a um job que só roda algum tempo depois do horário que está sendo feita a validação

In [None]:
current_hour = str(spark.sql("SELECT hour(current_timestamp()) as current_hour").collect()[0][0] - 3).zfill(2)
hr_1 = f"{current_hour}:00:00"
join_final = join_final.filter(
    F.to_timestamp("hr_exec", "HH:mm:ss") <= F.to_timestamp(F.lit(hr_1), "HH:mm:ss"))

### Criando o dataframe de falhas
Aqui filtramos o dataframe final para ter apenas "Falha" ou "Executou hoje porém não escreveu dado"

In [None]:
df_falhas = join_final.filter((F.col("status") == "Falha") | (F.col("status") == "Executou hoje porém não escreveu dado"))

### Envio da notificação pelo Teams
Aqui é feita a verificação se o df_falhas tiver alguma linha enviamos a notificação para o Teams com o dataframe de falhas para que possa ser feita uma ação o quanto antes, caso contrário a notificação informará que não teve falha em job's no horário da verificação

In [None]:
if df_falhas.count() >= 1:
    webhook_url = 'coloque aqui a URL do webhook Teams'

    tb_dash_html = df_falhas.toPandas().to_html(index=False) #transformando o dataframe pyspark em html pra enviar via teams
    if len(tab_sem_per) > 0:
        message = {
            "title": f"FALHA DE JOB DAS {current_hour}:00 - Timestamp validado: {exec_timestamp_str}",
            "text": f"ATENÇÃO! ALGUM JOB EXECUTOU ATÉ AS (current_hour):59:00 E EXECUTOU COM FALHA OU NÃO ESCREVEU DADO NA DATA DE HOJE ({dt_hj_str})***<br>"
                    f"{tb_dash_html}<br>"
                    f"Essas tabelas sem permissão de acesso conforme abaixo:<br>"
                    f"{tab_sem_per}<br>"
                    f"FAVOR PEDIR PARA LIBERAR O ACESSO AS TABELAS<br>"
        }
    else:
        message = {
            "title": f"FALHA DE JOB - Timestamp validado: {exec_timestamp_str}",
            "text": f"ATENÇÃO! ALGUM JOB EXECUTOU ATÉ AS {current_hour}:59:00 E EXECUTOU COM FALHA OU NÃO ESCREVEU DADO NA DATA DE HOJE ({dt_hj_str})***<br>"
                    f"{tb_dash_html}<br>"
        }

    proxies = {"http": "coloque aqui seu proxy", "https": "coloque aqui seu proxy"}

    try:
        response = requests.post(
            webhook_url,
            headers={"Content-Type": "application/json"},
            data=json.dumps(message),
            timeout=30,
            verify=False,
            proxies=proxies
        )

        if response.status_code == 200:
            print("Mensagem enviada com sucesso!")
        else:
            print(f"Falha ao enviar mensagem. Status code: {response.status_code}")
    except requests.exceptions.RequestException as e:
        print(f"Erro ao enviar mensagem: {e}")

else:
    webhook_url = 'coloque aqui a URL do webhook Teams'

    tb_dash_html = df_falhas.toPandas().to_html(index=False)
    if len(tab_sem_per) > 0:
        message = {
            "title": f"VALIDAÇÃO DE JOB'S DAS {current_hour}:00 - Timestamp validado: {exec_timestamp_str}",
            "text": f"*****ATENÇÃO TODOS OS JOB'S QUE RODAM ATÉ AS {current_hour}:59:00 EXECUTARAM COM SUCESSO!!!*****<br>"
                    f"<br>Existem tabelas sem permissão de acesso conforme abaixo:<br>"
                    f"{tab_sem_per}<br>"
                    f"<br>FAVOR PEDIR PARA LIBERAR O ACESSO AS TABELAS<br>*****"
        }
    else:
        message = {
            "title": f"VALIDAÇÃO DE JOB'S DAS {current_hour}:00 - Timestamp validado: {exec_timestamp_str}",
            "text": f"*****ATENÇÃO TODOS OS JOB'S QUE RODAM ATÉ AS {current_hour}:59:00 EXECUTARAM COM SUCESSO!!!*****<br>"
        }

    proxies = {"http": "coloque aqui seu proxy", "https": "coloque aqui seu proxy"}

    try:
        response = requests.post(
            webhook_url,
            headers={"Content-Type": "application/json"},
            data=json.dumps(message),
            timeout=30,
            verify=False,
            proxies=proxies
        )

        if response.status_code == 200:
            print("Mensagem enviada com sucesso!")
        else:
            print(f"Falha ao enviar mensagem. Status code: {response.status_code}")
    except requests.exceptions.RequestException as e:
        print(f"Erro ao enviar mensagem: {e}")