In [None]:
from pyspark.sql import SparkSession, DataFrame, functions as F 
from pyspark.sql.window import Window 
from pyspark import StorageLevel 
from datetime import datetime 
import logging 
from typing import Tuple 
from dataclasses import dataclass

@dataclass
class ConfigDistribuicao:
    """Configurações do processo de distribuição"""
    limite_clientes: int = 1473

def setup_logging() -> logging.Logger:
    """Configura o logger"""
    logger = logging.getLogger("distribuicao")
    if not logger.handlers:
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)
    return logger

def get_spark_session(config: ConfigDistribuicao) -> SparkSession:
    """Inicializa a SparkSession"""
    return (SparkSession.builder
        .appName("DistribuicaoClientes")
        .config("spark.sql.shuffle.partitions", "600")
        .getOrCreate())

def load_data(spark: SparkSession, logger: logging.Logger) -> Tuple[DataFrame, DataFrame]:
    """Carrega dados de clientes e gerentes"""
    logger.info("Carregando dados de clientes e gerentes...")
   
    # Carrega os dados
    clientes_df = spark.table("path.base_clientes")
    gerentes_df = spark.table("path.base_gerentes")

    # Verificação inicial dos dados
    logger.info(f"Total de clientes carregados: {clientes_df.count()}")

    clientes_df.show(10, truncate=False)
    gerentes_df.show(10, truncate=False)

    logger.info(f"Total de gerentes carregados: {gerentes_df.count()}")

    # Ajustar filtro para incluir apenas registros válidos
    clientes_df = clientes_df.filter(F.col("chave_unica").isNotNull())
    gerentes_df = gerentes_df.filter(F.col("chave_unica").isNotNull())

    # Verificação após filtro
    logger.info(f"Total de clientes após filtro: {clientes_df.count()}")
    logger.info(f"Total de gerentes após filtro: {gerentes_df.count()}")

    return clientes_df, gerentes_df

def distribuir_clientes(clientes_df: DataFrame, gerentes_df: DataFrame, config: ConfigDistribuicao, logger: logging.Logger) -> DataFrame:
    """Realiza a distribuição de clientes para gerentes"""
    logger.info("Iniciando processo de distribuição...")
    
    # Faz a combinação entre clientes e gerentes com a mesma chave `chave_unica`
    combinacao = clientes_df.join(gerentes_df, "chave_unica")
    
    # Define uma janela para balancear a distribuição entre os gerentes dentro de cada `chave_unica`
    window_dist = Window.partitionBy("nr_cli").orderBy(F.rand())
    
    # Atribui um índice de linha para cada combinação de cliente e gerente
    distribuicao = combinacao.withColumn(
        "row_number",
        F.row_number().over(window_dist)
    ).filter(F.col("row_number") == 1).drop("row_number")

    # Verifica se todos os clientes foram distribuídos
    clientes_distribuidos = distribuicao.select("nr_cli").distinct().count()
    total_clientes = clientes_df.select("nr_cli").distinct().count()

    if clientes_distribuidos != total_clientes:
        raise Exception("Erro: Nem todos os clientes foram distribuídos!")

    logger.info(f"Distribuição concluída: {distribuicao.count()} clientes atribuídos.")
    return distribuicao

def validar_distribuicao(df: DataFrame, logger: logging.Logger) -> None:
    """Valida a distribuição realizada"""
    logger.info("Validando a distribuição...")
    duplicados = df.groupBy("nr_cli").agg(F.count("*").alias("qtd")).filter(F.col("qtd") > 1)
    duplicados_count = duplicados.count()
 
    if duplicados_count > 0:
        logger.error(f"Encontrados {duplicados_count} CPFs duplicados na distribuição.")
        raise Exception("Erro: Clientes atribuídos a mais de um gerente!")
 
    logger.info("Validação concluída: cada cliente foi atribuído a um único gerente.")


def salvar_resultado(df: DataFrame, logger: logging.Logger) -> None:
    """Salva os resultados da distribuição"""
    logger.info("Salvando os resultados da distribuição...")
    
    # Selecionar os campos desejados
    tabela_final = df.select("CAMPO1","CAMPO2"...,"CAMPO-N")
    
    # Salvar os resultados
    tabela_final.write.saveAsTable('path.prenome', mode='overwrite', overwriteSchema='true', mergeSchema='true')
    logger.info("Resultados salvos com sucesso.")


def main():
    config = ConfigDistribuicao()
    spark = get_spark_session(config)
    logger = setup_logging()
 
    logger.info("=== Início do processo de distribuição ===")
    try:
        # Carregamento dos dados
        clientes_df, gerentes_df = load_data(spark, logger)
 
        # Distribuição de clientes
        resultado_df = distribuir_clientes(clientes_df, gerentes_df, config, logger)
 
        # Validação dos resultados
        validar_distribuicao(resultado_df, logger)
 
        # Salvamento dos resultados
        salvar_resultado(resultado_df, logger)
 
        logger.info("=== Processo concluído com sucesso ===")
 
    except Exception as e:
        logger.error(f"Erro crítico: {str(e)}")
        raise

if __name__ == "__main__":
    main()
    


In [None]:
DECLARE 
  MES_REF VARCHAR2(6);
  ANOMES VARCHAR2(6);
BEGIN
  
ANOMES := '202311'; 

FOR i IN 0..14
  LOOP

      MES_REF := TO_CHAR(ADD_MONTHS(TO_DATE(DATA, 'RRRRMM'),-i), 'RRRRMM');
      
      EXECUTE IMMEDIATE('
            DELETE FROM SCHEMA.TABLE
            WHERE CHAVE_DATA(A) = TO_CHAR(ADD_MONTHS(TO_DATE('||MES_REF||', ''RRRRMM''),- TO_NUMBER(REPLACE(SAFRA,''M'','''')) ), ''RRRRMM'')    
                  AND SAFRA = ''M''||MONTHS_BETWEEN(TO_DATE('||MES_REF||',''RRRRMM''), TO_DATE(CHAVE_DATA, ''RRRRMM''))
      ');      

      EXECUTE IMMEDIATE('
      INSERT INTO SCHEMA.TABLE
             SELECT /*+PARALLEL(16)*/
                 A.CAMPO_EXEMPLO,
                 ''M''||MONTHS_BETWEEN(TO_DATE(NVL(B.CHAVE_DATA, '||MES_REF||'),''RRRRMM''), TO_DATE(A.CHAVE_DATA, ''RRRRMM'') ) AS SAFRA





FROM (SELECT * FROM PATH.TABELA_PRINCIPAL) A

          LEFT JOIN PATH.TABELA_JOIN PARTITION(P_'||MES_REF||') B
          ON A.CHAVE_A= B.CHAVE_B
             AND A.CHAVE2_A = B.CHAVE2_B
             AND A.CHAVE_DATA <= B.CHAVE_DATA

In [None]:
%python
import pymsteams
import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql as S
import requests

df = sqlContext.sql("""
select codigo_x AS QTD_CLI,
 date_format(atualizacao,'yyyyMM' )  AS DATA_REFERENCIA
 from --PATH.SUA_TABLE
WHERE xpto1 = 'F' 
AND xpto2 between '2024-06-01' AND '2024-06-31' 
AND xpto3 IN ('004', '005', '006', '007', '106','001', '002', '003', '304','009')
AND xpto4 IN ('N')
AND atualizacao = '2024-06-28'
""")

if (df.count() > 0):
    texto = "MONITORAMENTO XPTO - CLOUD \n"
    texto = texto + "\n"

    if (df.count() < 488000):
      texto = texto + "TEXTO " + str(df.count()) + " TEXTO \n"
    else:
      texto = texto + "TEXTO " + str(df.count()) + " TEXTO_ERRO. Verificar a query. \n"

    print(texto)

    
myTeamsMessage = pymsteams.connectorcard("conector webhook")
myTeamsMessage.text(texto)
myTeamsMessage.send()
    

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DoubleType, DateType, TimestampType

class OracleClient:
    def __init__(self, host, port, sid, user, password):
        """
        Inicializa a conexão JDBC com o banco Oracle.
        
        :param host: Endereço do banco de dados Oracle.
        :param port: Porta de conexão ao banco.
        :param sid: SID do banco de dados.
        :param user: Usuário do banco de dados.
        :param password: Senha do usuário.
        """
        password = '!Petiquinho24'
        self.url = f"jdbc:oracle:thin:{user}/{password.replace('@', '%40')}@//{host}:{port}/{sid}"

        self.spark = SparkSession.builder \
            .appName("OracleClient") \
            .config("spark.jars", "/Volumes/prd/bdq/denodo/ojdbc7.jar") \
            .getOrCreate()


    def table_exists(self, schema, table):
        """
        Verifica se uma tabela existe no schema informado usando JDBC.

        :param schema: O nome do schema.
        :param table: O nome da tabela.
        :return: Retorna True se a tabela existir, caso contrário False.
        """
        query = f"""
            SELECT COUNT(*)
            FROM all_tables
            WHERE owner = '{schema.upper()}' AND table_name = '{table.upper()}'
        """
        result = self.spark.read \
            .format("jdbc") \
            .option("url", self.url) \
            .option("query", query) \
            .option("driver", "oracle.jdbc.driver.OracleDriver") \
            .option("pushDownAggregate", "true") \
            .option("pushDownPredicate", "true") \
            .option("fetchSize", 200) \
            .load()

        exists = result.collect()[0][0]
        return exists > 0

    def fetch_data(self, database, table, fields='*', filters=None, batch_size=100000):
        from pyspark.sql import DataFrame
        
        # Monta a query base
        base_query = f"SELECT {fields} FROM {database}.{table}"

        # Adiciona os filtros, se existirem
        if filters:
            base_query += f" WHERE {filters}"

        print("base_query: ", base_query)
        
        # Inicializa variáveis para paginação
        offset = 0
        final_df = None  # DataFrame acumulado

        while True:
            # Adiciona paginação à consulta
            paginated_query = f"{base_query} OFFSET {offset} ROWS FETCH NEXT {batch_size} ROWS ONLY"

            # Lê o lote atual de dados
            batch_df = self.spark.read \
                .format("jdbc") \
                .option("url", self.url) \
                .option("query", paginated_query) \
                .option("driver", "oracle.jdbc.driver.OracleDriver") \
                .option("pushDownAggregate", "true") \
                .option("pushDownPredicate", "true") \
                .option("fetchSize", 200) \
                .load()

            # Verifica se o lote retornou dados
            if batch_df.rdd.isEmpty():
                break

            # Faz union com o DataFrame acumulado
            if final_df is None:
                final_df = batch_df  # Primeiro lote
            else:
                final_df = final_df.union(batch_df)  # União incremental

            # Atualiza o offset
            offset += batch_size

        # Retorna o DataFrame final ou um DataFrame vazio se nenhum dado for encontrado
        if final_df is None:
            return self.spark.createDataFrame([], schema=None)  # DataFrame vazio
        else:
            return final_df

    def fetch_data_with_query(self, query):

        df = self.spark.read \
            .format("jdbc") \
            .option("url", self.url) \
            .option("query", query) \
            .option("driver", "oracle.jdbc.driver.OracleDriver") \
            .option("pushDownAggregate", "true") \
            .option("pushDownPredicate", "true") \
            .option("fetchSize", 1000) \
            .load()

        # Retorna o DataFrame final ou um DataFrame vazio se nenhum dado for encontrado
        if df is None:
            return self.spark.createDataFrame([], schema=None)  # DataFrame vazio
        else:
            return df

    def create_table_if_not_exists(self, schema, table, columns):
        """
        Verifica se a tabela existe e cria a tabela se ela não existir.

        :param schema: O nome do schema.
        :param table: O nome da tabela.
        :param columns: Dicionário contendo o nome das colunas e seus tipos (ex: {'id': 'NUMBER', 'nome': 'VARCHAR2(100)'}).
        """
        if not self.table_exists(schema, table):
            print(f"Tabela {schema}.{table} não existe. Criando...")
            columns_definitions = ", ".join([f"{col} {dtype}" for col, dtype in columns.items()])
            create_table_query = f"CREATE TABLE {schema}.{table} ({columns_definitions})"
            try:
                self.spark.read \
                    .format("jdbc") \
                    .option("url", self.url) \
                    .option("query", create_table_query) \
                    .option("driver", "oracle.jdbc.driver.OracleDriver") \
                    .load()
                print(f"Tabela {schema}.{table} criada com sucesso!")
            except Exception as e:
                print(f"Erro ao criar a tabela: {str(e)}")
        else:
            print(f"Tabela {schema}.{table} já existe.")

    def insert_data_into_table(self, df, schema, table):
        """
        Insere dados de um PySpark DataFrame em uma tabela Oracle.
        """
        if not self.table_exists(schema, table):
            raise ValueError(f"A tabela {schema}.{table} não existe. Crie a tabela primeiro.")

        df.write \
            .format("jdbc") \
            .option("url", self.url) \
            .option("dbtable", f"{schema}.{table}") \
            .option("driver", "oracle.jdbc.driver.OracleDriver") \
            .mode("append") \
            .save()

        print(f"Dados inseridos com sucesso na tabela {schema}.{table}.")

    def close(self):
        """Fecha a sessão do Spark."""
        self.spark.stop()

    def help_check(self):
        """Fecha a sessão do Spark."""
        print("Running...")