# **Módulos de Cálculo de Limites**

# **Parcelado**

## Calculos

### CalculaGrupoCreditoDiretoAoConsumidor

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time

def calculaGrupoCDC(df):
    '''
        Funcao para processar os calculos do Agrupamento Credito Direto ao Consumidor
        Arguments:
            df {DataFrame} -- dataframe criado a partir da carga do arquivo contendo os registros dos clientes            
            
        Keyword Arguments:
            None
        Returns:
            {DataFrame}
        See:
            funcao acionada: 
                None
            chamada pela funcao:
                def ProcessaParcelado
                def calculaPrtfcdc
                def initCalculoAgrup
            
            '''
    
    # 1 - Realiza o calculo do agrupamento Conta Corrente
    # 2 - Avalia as condições logicas e atribuiu ao valor verdadeiro o valor da caluna correspondente  
    # 3 - Calcula os tipos [83,115,116,119,129,130,131,142,156,156,159,160,161,163,173] tanto de CALCULADO E APROVADO(apvd e clcd)

    start_time = time.time()
    


    EXPRESSAO_INDICADOR = expr("in_calc_gp_cdc")
    
    # Inicialização 
    calculoGrupoCdcDF = df
    
    # Este trecho é responsavel por calcular o valor grupo 45 calculado e aprovado
    for tipo in ['apvd', 'clcd']:
        calculoGrupoCdcDF = calculoGrupoCdcDF.withColumn(f"maior_slim_cdc_{tipo}", F.when(EXPRESSAO_INDICADOR,lit(0))\
                                                                                    .otherwise(col(f"maior_slim_cdc_{tipo}"))\
                                                        )
                                                         
    for prd in [83,115,116,119,129,130,131,142,156,156,159,160,161,163,173]:
        for tipo in ['apvd', 'clcd']:
            calculoGrupoCdcDF = calculoGrupoCdcDF.withColumn(f"maior_slim_cdc_{tipo}",F.when(EXPRESSAO_INDICADOR & (col(f"vl_slim_{str(prd)}_{tipo}") > col(f"maior_slim_cdc_{tipo}")),col(f"vl_slim_{str(prd)}_{tipo}"))\
                                                                                       .otherwise(col(f"maior_slim_cdc_{tipo}"))\
                                                            )\
    
    for tipo in ['apvd', 'clcd']:
        calculoGrupoCdcDF = calculoGrupoCdcDF.withColumn(f"vl_grupo_45_{tipo}", F.when(EXPRESSAO_INDICADOR,col(f"maior_slim_cdc_{tipo}"))\
                                                                                 .otherwise(col(f"vl_grupo_45_{tipo}"))\
                                                        )
        
    #INCLUIR REGRA DO ANCKCCF1-IN-AUX-EMER AQUI. FALTA INCLUIR A VARIAVEL DE RECONSIDERAÇÃO FUTURAMENTE.
    calculoGrupoCdcDF = calculoGrupoCdcDF.withColumn("vl_grupo_45_apvd", F.when(EXPRESSAO_INDICADOR & (col("in_bnf_astl") == 'S') & (col("in_pgt") == 0), lit(0))\
                                                                          .otherwise(col("vl_grupo_45_apvd"))\
                                                    )
    
        
    print(f"Saindo da função calculaGrupoCDC tempo de excução: {time.time() - start_time} ")

    return calculoGrupoCdcDF 
            

### CalculaGrupoFuncionario

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time

def calculaGrupoFunc(df):
    '''
        Funcao para processar os calculos do Agrupamento Funcionario

        Arguments:
            df {DataFrame} -- dataframe criado a partir da carga do arquivo contendo os registros dos clientes            
            
        Keyword Arguments:
            None
        Returns:
            {DataFrame}
        See:
            funcao acionada: 
                None
            chamada pela funcao:
                def ProcessaParcelado
                def calculaPrtfcdc
                def initCalculoAgrup
    '''
    
    # 1 - Realiza o calculo do agrupamento Funcionario 
    # 2 - Avalia as condições logicas e atribuiu ao valor verdadeiro o valor da caluna correspondente  
    # 3 - Denota o valor calculado e aprovado dos produtos 140, 228, 162, 157 e atribui nas variáveis do valor de grupo 64
    # 4 - O código zera os maiores sublimites do grupo 64 aprovado e calculado
    # 5 - O código compara o valor calculado e aprovado do produto 157
    
    start_time = time.time()
    
    # expressao logica
    
    EXPR_CALC_IN_GP_FUNC = expr(" in_calc_gp_func ")
    
    # Inicialização 
        

    for tipo in ['apvd', 'clcd']:
        
        calculoGrupoFuncDF = df.withColumn(f"maior_slim_funci_{tipo}", F.when(EXPR_CALC_IN_GP_FUNC,lit(0))\
                                                                        .otherwise(col(f"maior_slim_funci_{tipo}"))\
                                          )
    for prd in [140, 157, 162, 228]:
        for tipo in ['apvd', 'clcd']:
            
            calculoGrupoFuncDF = calculoGrupoFuncDF.withColumn(f"maior_slim_funci_{tipo}", F.when(EXPR_CALC_IN_GP_FUNC & (col(f"vl_slim_{str(prd)}_{tipo}") > col(f"maior_slim_funci_{tipo}")), col(f"vl_slim_{str(prd)}_{tipo}")\
                                                                                                 )\
                                                                                            .otherwise(col(f"maior_slim_funci_{tipo}"))\
                                                              )\
                                                   .withColumn(f"vl_grupo_64_{tipo}", F.when(EXPR_CALC_IN_GP_FUNC, col(f"maior_slim_funci_{tipo}"))\
                                                                                       .otherwise(col(f"vl_grupo_64_{tipo}"))\
                                                              )
                                       
    
    print(f"Saindo da função calculoGrupoFunc tempo de excução: {time.time() - start_time} ")  
    return calculoGrupoFuncDF

### CalculaGrupoConsignado

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time

def calculaGrupoECF(df):
    '''
        Funcao para processar os calculos do Agrupamento Consignado

        Arguments:
            df {DataFrame} -- dataframe criado a partir da carga do arquivo contendo os registros dos clientes            
            
        Keyword Arguments:
            None
        Returns:
            {DataFrame}
        See:
            funcao acionada: 
                None
            chamada pela funcao:
                def ProcessaParcelado
                def calculaPrtfcdc
                def initCalculoAgrup
    '''
    
    # 1 - Realiza o calculo do agrupamento ECF
    # 2 - Avalia as condições logicas e atribuiu ao valor verdadeiro o valor da caluna correspondente  
    # 3 - Calcular o valor grupo 55 calculado e aprovado
    # 4 - Inicializa as variaveis 'maior_slim_ecf_apvd' e 'maior_slim_ecf_clcd'
    # 5 - Compara o valor calculado e aprovado do produto 155 e 179
    # 6 - Soma nas variaveis 'maior_slim_ecf_apvd' e 'maior_slim_ecf_clcd', o valor do aprovado e calculado do produto 171

    # expressao logicas
    
    start_time = time.time()
       
    # Inicialização
    
    calculoGrupoECFDF = df.withColumn("vl_grupo_55_clcd", f.greatest('vl_slim_137_clcd', 'vl_slim_155_clcd', 'vl_slim_179_clcd') + f.col('vl_slim_171_clcd') )\
                          .withColumn("vl_grupo_55_apvd", f.greatest('vl_slim_137_apvd', 'vl_slim_155_apvd', 'vl_slim_179_apvd') + f.col('vl_slim_171_apvd') )                                                                     
  
    print(f"Saindo da função calculaGrupoECF tempo de excução: {time.time() - start_time} ")
    
    return calculoGrupoECFDF 

### CalculaGrupoRao

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time

def calculaGrupoRao(df):
    '''
        Funcao para processar os calculos do Agrupamento Rao

        Arguments:
            df {DataFrame} -- dataframe criado a partir da carga do arquivo contendo os registros dos clientes            
            
        Keyword Arguments:
            None
        Returns:
            {DataFrame}
        See:
            funcao acionada: 
                None
            chamada pela funcao:
                def ProcessaParcelado
                def calculaPrtfcdc
                def initCalculoAgrup
    '''
    
    # 1 - Realiza o calculo do agrupamento RAO
    # 2 - Avalia as condições logicas e atribuiu ao valor verdadeiro o valor da caluna correspondente 
    # 3 - Atribui novos valores conforme condicao e zera os valores nas colunas especificas tambem conforme condicao

    start_time = time.time()
    
    # expressao logica
    
    EXPR_IN_CALC_GP_RAO = expr(" in_calc_gp_rao ")
    EXPR_VL_PSTC_RAO = expr(" in_bnf_astl = 'S' and in_pgt = 0 ")


    
    
    # Inicialização
        
    calculoGrupoRaoDF = df.withColumn("vl_pstc_rao_apvd", F.when(EXPR_IN_CALC_GP_RAO,col("vl_ttl_pcl_rsct"))\
                                                           .otherwise(col("vl_pstc_rao_apvd"))\
                                     )\
                          .withColumn("vl_pstc_rao_clcd", F.when(EXPR_IN_CALC_GP_RAO,lit(0))\
                                                           .otherwise(col("vl_pstc_rao_clcd"))\
                                     )\
                          .withColumn("vl_slim_111_apvd", F.when(EXPR_IN_CALC_GP_RAO,col("vl_slim_111_utzd"))\
                                                           .otherwise(col("vl_slim_111_apvd"))\
                                     )\
                          .withColumn("vl_slim_111_clcd", F.when(EXPR_IN_CALC_GP_RAO,lit(0))\
                                                           .otherwise(col("vl_slim_111_clcd"))\
                                     )\
                          .withColumn("vl_grupo_46_apvd", F.when(EXPR_IN_CALC_GP_RAO,col("vl_slim_111_utzd"))\
                                                           .otherwise(col("vl_grupo_46_apvd"))\
                                     )\
                          .withColumn("vl_grupo_46_clcd", F.when(EXPR_IN_CALC_GP_RAO,lit(0))\
                                                           .otherwise(col("vl_grupo_46_clcd"))\
                                     )\
    
    calculoGrupoRaoDF = calculoGrupoRaoDF.withColumn("vl_grupo_46_apvd",F.when(EXPR_IN_CALC_GP_RAO & EXPR_VL_PSTC_RAO,lit(0))\
                                                                         .otherwise(col("vl_grupo_46_apvd"))\
                                                    )\
                                         .withColumn("vl_pstc_rao_apvd",F.when(EXPR_IN_CALC_GP_RAO & EXPR_VL_PSTC_RAO,lit(0))\
                                                                         .otherwise(col("vl_pstc_rao_apvd"))\
                                                    )\
                                         .withColumn("vl_slim_111_apvd",F.when(EXPR_IN_CALC_GP_RAO & EXPR_VL_PSTC_RAO,lit(0))\
                                                                         .otherwise(col("vl_slim_111_apvd"))\
                                                    )\
                                                                                    
       
    print(f"Saindo da função calculoGrupoRao tempo de excução: {time.time() - start_time} ")
    return calculoGrupoRaoDF



### CalculaGrupoRedeExterna

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time

def calculaGrupoRDEX(df):
    '''
        Funcao para processar os calculos do Agrupamento RDEX 

        Arguments:
            df {DataFrame} -- dataframe criado a partir da carga do arquivo contendo os registros dos clientes            
            
        Keyword Arguments:
            None
        Returns:
            {DataFrame}
        See:
            funcao acionada: 
                None
            chamada pela funcao:
                def ProcessaParcelado
                def calculaPrtfcdc
                def initCalculoAgrup
    '''
    
    # 1 - Realiza o calculo do agrupamento RDEX
    # 2 - Avalia as condições logicas e atribuiu ao valor verdadeiro o valor da caluna correspondente
    # 3 - Atribui novos valores conforme condicao e zera os valores nas colunas especificas tambem conforme condicao 
    
    start_time = time.time()
    
    # expressoes condicionais utilizadas na conta 
        
    EXPR_GRUPO_60_APVD = expr(" in_bnf_astl = 'S' and in_pgt = 0 ")
    EXPR_GRUPO_65_APVD = expr(" in_bnf_astl = 'S' and in_pgt = 0 ")
    EXPR_IN_CALC_GP_RDEX = expr(" in_calc_gp_rdex ")
    
    # expressoes calculos 
    
    CALC_VL_GP_65_APVD = expr(" vl_slim_180_apvd + vl_slim_181_apvd ")
    CALC_VL_GP_65_CLCD = expr(" vl_slim_180_clcd + vl_slim_181_clcd ")
    
    # Inicialização
    
    for prd in [180, 181]:
        calculoGrupoRDEXDF = df.withColumn(f"vl_slim_{str(prd)}_apvd", F.when(EXPR_IN_CALC_GP_RDEX,col(f"vl_slim_{str(prd)}_utzd"))\
                                                                        .otherwise(col(f"vl_slim_{str(prd)}_apvd"))\
                                          )\
                               .withColumn(f"vl_slim_{str(prd)}_clcd", F.when(EXPR_IN_CALC_GP_RDEX,lit(0))\
                                                                        .otherwise(col(f"vl_slim_{str(prd)}_clcd"))\
                                          )\
                                  
    for tipo in ['apvd', 'clcd']:
        calculoGrupoRDEXDF = calculoGrupoRDEXDF.withColumn(f"vl_grupo_65_{tipo}", F.when(EXPR_IN_CALC_GP_RDEX,col(f"vl_slim_180_{tipo}") + col(f"vl_slim_181_{tipo}"))\
                                                                                   .otherwise(col(f"vl_grupo_65_{tipo}"))\
                                                          )\
            
                
    #INCLUIR REGRA DO ANCKCCF1-IN-AUX-EMER AQUI. FALTA INCLUIR A VARIAVEL DE RECONSIDERAÇÃO FUTURAMENTE.
    calculoGrupoRDEXDF = calculoGrupoRDEXDF.withColumn("vl_grupo_65_apvd", F.when(EXPR_IN_CALC_GP_RDEX & EXPR_GRUPO_65_APVD, lit(0))\
                                                                            .otherwise(col("vl_grupo_65_apvd"))\
                                                      )\
                                           .withColumn("vl_slim_180_apvd", F.when(EXPR_IN_CALC_GP_RDEX & EXPR_GRUPO_65_APVD, lit(0))\
                                                                            .otherwise(col("vl_slim_180_apvd"))\
                                                      )\
                                           .withColumn("vl_slim_181_apvd", F.when(EXPR_IN_CALC_GP_RDEX & EXPR_GRUPO_65_APVD, lit(0))\
                                                                            .otherwise(col("vl_slim_181_apvd"))\
                                                      )
    
    print(f"Saindo da função calculoGrupoRDEX tempo de excução: {time.time() - start_time} ")
    
    return calculoGrupoRDEXDF

### CalculaGrupoVeiculo

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time

def calculaGrupoVCLO(df):
    '''
        Funcao para processar os calculos do Agrupamento VCLO 

        Arguments:
            df {DataFrame} -- dataframe criado a partir da carga do arquivo contendo os registros dos clientes            
            
        Keyword Arguments:
            None
        Returns:
            {DataFrame}
        See:
            funcao acionada: 
                None
            chamada pela funcao:
                def ProcessaParcelado
                def calculaPrtfcdc
                def initCalculoAgrup
    '''
    
    # expressoa logica
    
    start_time = time.time()

    EXPR_VL_GP_60_APVD = expr(" vl_slim_21_apvd > vl_slim_117_apvd ")
    EXPR_VL_GP_60_CLCD = expr(" vl_slim_21_clcd > vl_slim_117_clcd ")
    EXPR_IN_BNF_ASTL = expr(" in_bnf_astl = 'S' and in_pgt = 0 ")
    
    
    # Inicialização
    

    calculoGrupoVCLODF = df.withColumn("vl_grupo_60_clcd", F.when(EXPR_VL_GP_60_CLCD, col("vl_slim_21_clcd"))\
                                                               .otherwise(col("vl_slim_117_clcd"))\
                                      )\
    
    calculoGrupoVCLODF = calculoGrupoVCLODF.withColumn("vl_grupo_60_apvd", F.when(EXPR_VL_GP_60_APVD, col("vl_slim_21_apvd"))\
                                                                   .otherwise(col("vl_slim_117_apvd"))\
                                          )\
    
    #INCLUIR REGRA DO ANCKCCF1-IN-AUX-EMER AQUI. FALTA INCLUIR A VARIAVEL DE RECONSIDERAÇÃO FUTURAMENTE.
    
    calculoGrupoVCLODF = calculoGrupoVCLODF.withColumn("vl_grupo_60_apvd", F.when(EXPR_IN_BNF_ASTL, lit(0))\
                                                                            .otherwise(col("vl_grupo_60_apvd"))\
                                                      )
    
    print(f"Saindo da função calculoGrupoVCLO tempo de excução: {time.time() - start_time} ")

    return calculoGrupoVCLODF

### CalculaBloco

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time

def calculaBloco(df):
    '''
        O cálculo de blocos no método calcularBlocos inicia zerando as variáveis valor bloco rotativo aprovado e valor bloco rotativo calculado. No cálculo do bloco é atribuído o maior 
        valor dos grupamentos O valor bloco rotativo calculado recebe o valor entre o valor agrupamento cheque especial calculado e valor agrupamento cartão de crédito calculado.

        Arguments:
            df {DataFrame} -- dataframe criado a partir da carga do arquivo contendo os registros dos clientes            
            
        Keyword Arguments:
            None
        Returns:
            {DataFrame}
        See:
            funcao acionada: 
                None
            chamada pela funcao:
                def ProcessaParcelado
    '''
    
    # 1 - Inicialização das variaveis auxiliar
    # 2 - Realiza a soma dos valores dos grupos 55 e 64 e armazena na variavel vl_grupo_5564_clcd e vl_grupo_5564_apvd
    # 3 - Compara se o valor de grupo calculado ou aprovado é maior que o maior valor de grupo calculado ou aprovado
    # 4 - Somariza os valores dos Grupos e armazena na variavel vl_bloco_14_clcd e vl_bloco_14_apvd
    
    start_time = time.time()
    
    # expressao logica
    
    EXPR_IN_CALC_BLOCO = expr(" in_calc_bloco ")
    
    # expressao matematica
    CALC_GP_5564_APVD = expr(" vl_grupo_55_apvd + vl_grupo_64_apvd ")
    CALC_GP_5564_CLCD = expr(" vl_grupo_55_clcd + vl_grupo_64_clcd ")
    CALC_VL_BLOCO_14_APVD = expr(" maior_grupo_apvd + vl_grupo_46_apvd + vl_grupo_65_apvd ")
    CALC_VL_BLOCO_14_CLCD = expr(" maior_grupo_clcd + vl_grupo_46_clcd + vl_grupo_65_clcd ")

    
    
    # Inicialização 
        
        
    calculoBlocoDF = df.withColumn("maior_grupo_apvd",lit(0))\
                       .withColumn("maior_grupo_clcd",lit(0))\
        
        
    calculoBlocoDF = calculoBlocoDF.withColumn("vl_grupo_5564_apvd",F.when(EXPR_IN_CALC_BLOCO,CALC_GP_5564_APVD)\
                                                                     .otherwise(col("vl_grupo_5564_apvd"))\
                                              )\
                                   .withColumn("vl_grupo_5564_clcd",F.when(EXPR_IN_CALC_BLOCO,CALC_GP_5564_CLCD)\
                                                                     .otherwise(col("vl_grupo_5564_clcd"))\
                                              )\

    
    calculoBlocoDF = calculoBlocoDF.withColumn("maior_grupo_apvd", f.greatest('vl_grupo_45_apvd', 'vl_grupo_5564_apvd', 'vl_grupo_60_apvd'))\
                                   .withColumn("maior_grupo_clcd", f.greatest('vl_grupo_45_clcd', 'vl_grupo_5564_clcd', 'vl_grupo_60_clcd'))
    
    
                                   
    calculoBlocoDF = calculoBlocoDF.withColumn("vl_bloco_14_apvd",F.when(EXPR_IN_CALC_BLOCO,CALC_VL_BLOCO_14_APVD)\
                                                                   .otherwise(col("vl_bloco_14_apvd"))\
                                              )\
                                   .withColumn("vl_bloco_14_clcd",F.when(EXPR_IN_CALC_BLOCO,CALC_VL_BLOCO_14_CLCD)\
                                                                   .otherwise(col("vl_bloco_14_clcd"))\
                                              )
    
    print(f"Saindo da função calculoBloco tempo de excução: {time.time() - start_time} ")

    return calculoBlocoDF
            

### CalculaPrestacaoCreditoDiretoConsumidor

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time

def calculaPrtfcdc(df):
    '''
            Valor da parcela do cliente é recalculado através da multiplicação entre o valor da prestação cdc calculada, uma porcentagem que pode ser 80% ou 20% dependendo do 
            perfil do cliente e o valor fator de ajuste prestação parcelado
    
            Arguments:
                df {DataFrame} -- dataframe criado a partir da carga do arquivo contendo os registros dos clientes
                
            Keyword Arguments:
                prdt = Nome do Produto
            Returns:
                {DataFrame}
            See:
                funcao acionada: 
                    def slimApvdCdc
                    def calculaGrupoVCLO
                    def calculaGrupoRDEX
                    def calculaGrupoRao
                    def calculaGrupoECF
                    def calculaGrupoFunc
                    def calculaGrupoCDC
                chamada pela funcao:
                    def verifGTFraude
        '''
    
    # 1 - Calcula o valor da Prestacação CDC aprovado
    # 2 - Chamada para a aplicação de regras do Sublimite Aprovado CDC
    # 3 - Chamada para o calculo dos Grupos
    # 4 - Chamada para o calculo do Bloco
    
    
    # expressoes logicas
    
    EXPR_IN_CALC_PRT_CDC = expr(" in_calc_Prt_cdc ")
    EXPR_IN_CALC_SLIM_APVD = expr(" in_SlimApvdCdc ")
    EXPR_CD_TP_CPVT_REN = expr(" cd_tip_cpvt_ren = 'C' ")
    
    # expressoes matematicas 
    
    CALC_PRTCDC = expr(" cast(vl_pstc_cdc_clcd as int) * (waux_pc_cdc/100) * vl_ftr_ajst_pstc_pcld ")
       
    start_time = time.time()    
        
        
    # Inicialização
            
    calculoPrtcdcDF = df.withColumn("vl_pstc_cdc_apvd", F.when(EXPR_IN_CALC_PRT_CDC,CALC_PRTCDC).otherwise(col("vl_pstc_cdc_apvd")))\
                        .withColumn("in_SlimApvdCdc",   F.when(EXPR_IN_CALC_PRT_CDC, lit(True)).otherwise(lit(False)))\
                        .withColumn("in_calc_gp_vclo",  F.when(EXPR_IN_CALC_PRT_CDC, lit(True)).otherwise(lit(False)))\
                        .withColumn("in_calc_gp_rdex",  F.when(EXPR_IN_CALC_PRT_CDC, lit(True)).otherwise(lit(False)))\
                        .withColumn("in_calc_gp_rao",   F.when(EXPR_IN_CALC_PRT_CDC, lit(True)).otherwise(lit(False)))\
                        .withColumn("in_calc_gp_ecf",   F.when(EXPR_IN_CALC_PRT_CDC, lit(True)).otherwise(lit(False)))\
                        .withColumn("in_calc_gp_func",  F.when(EXPR_IN_CALC_PRT_CDC, lit(True)).otherwise(lit(False)))\
                        .withColumn("in_calc_gp_cdc",   F.when(EXPR_IN_CALC_PRT_CDC, lit(True)).otherwise(lit(False)))\
                        .withColumn("in_calc_bloco",    F.when(EXPR_IN_CALC_PRT_CDC, lit(True)).otherwise(lit(False)))
  
    
    calculoPrtcdcDF.checkpoint()
    # Chamada para a aplicação de regras do Sublimite Aprovado CDC
    
    print('Entrando na slimApvdCdc ')
    
    calculoPrtcdcDF = slimApvdCdc(calculoPrtcdcDF).checkpoint()
      
    spark.catalog.clearCache()
    
    calculoPrtcdcDF = calculoPrtcdcDF.withColumn("vl_slim_21_apvd",  F.when(EXPR_IN_CALC_PRT_CDC & EXPR_CD_TP_CPVT_REN,lit(0))\
                                                                      .otherwise(col("vl_slim_21_apvd"))\
                                                )\
                                     .withColumn("vl_slim_111_apvd", F.when(EXPR_IN_CALC_PRT_CDC & EXPR_CD_TP_CPVT_REN,lit(0))\
                                                                      .otherwise(col("vl_slim_111_apvd"))\
                                                )\
                                     .withColumn("vl_slim_117_apvd", F.when(EXPR_IN_CALC_PRT_CDC & EXPR_CD_TP_CPVT_REN,lit(0))\
                                                                      .otherwise(col("vl_slim_117_apvd"))\
                                                )\
                                     .withColumn("vl_slim_179_apvd", F.when(EXPR_IN_CALC_PRT_CDC & EXPR_CD_TP_CPVT_REN,lit(0))\
                                                                      .otherwise(col("vl_slim_179_apvd"))\
                                                )\
                                     .withColumn("vl_slim_180_apvd", F.when(EXPR_IN_CALC_PRT_CDC & EXPR_CD_TP_CPVT_REN,lit(0))\
                                                                      .otherwise(col("vl_slim_180_apvd"))\
                                                )\
                                     .withColumn("vl_slim_181_apvd", F.when(EXPR_IN_CALC_PRT_CDC & EXPR_CD_TP_CPVT_REN,lit(0))\
                                                                      .otherwise(col("vl_slim_181_apvd"))\
                                                )\
                                     .withColumn("vl_slim_21_clcd",  F.when(EXPR_IN_CALC_PRT_CDC & EXPR_CD_TP_CPVT_REN,lit(0))\
                                                                      .otherwise(col("vl_slim_21_clcd"))\
                                                )\
                                     .withColumn("vl_slim_111_clcd", F.when(EXPR_IN_CALC_PRT_CDC & EXPR_CD_TP_CPVT_REN,lit(0))\
                                                                      .otherwise(col("vl_slim_111_clcd"))\
                                                )\
                                     .withColumn("vl_slim_117_clcd", F.when(EXPR_IN_CALC_PRT_CDC & EXPR_CD_TP_CPVT_REN,lit(0))\
                                                                      .otherwise(col("vl_slim_117_clcd"))\
                                                )\
                                     .withColumn("vl_slim_179_clcd", F.when(EXPR_IN_CALC_PRT_CDC & EXPR_CD_TP_CPVT_REN,lit(0))\
                                                                      .otherwise(col("vl_slim_179_clcd"))\
                                                )\
                                     .withColumn("vl_slim_180_clcd", F.when(EXPR_IN_CALC_PRT_CDC & EXPR_CD_TP_CPVT_REN,lit(0))\
                                                                      .otherwise(col("vl_slim_180_clcd"))\
                                                )\
                                     .withColumn("vl_slim_181_clcd", F.when(EXPR_IN_CALC_PRT_CDC & EXPR_CD_TP_CPVT_REN,lit(0))\
                                                                      .otherwise(col("vl_slim_181_clcd"))\
                                                )
               
    #Chamada para o calculo dos Grupos
    
    
    print('Entrando na calculaGrupoRDEX')
    calculoPrtcdcDF = calculaGrupoRDEX(calculoPrtcdcDF).checkpoint()
    
    spark.catalog.clearCache()
    
    print('Entrando na calculaGrupoRao')
    calculoPrtcdcDF = calculaGrupoRao(calculoPrtcdcDF).cache()
    
    print('Entrando na calculaGrupoECF')
    calculoPrtcdcDF = calculaGrupoECF(calculoPrtcdcDF).cache()
    
    print('Entrando na calculaGrupoFunc')
    calculoPrtcdcDF = calculaGrupoFunc(calculoPrtcdcDF).checkpoint()
    
    print('Entrando na calculaGrupoVCLO')
    calculoPrtcdcDF = calculaGrupoVCLO(calculoPrtcdcDF).cache()
    
    spark.catalog.clearCache()
    
    print('Entrando na calculaGrupoCDC')
    calculoPrtcdcDF = calculaGrupoCDC(calculoPrtcdcDF).cache()
        
    # Chamada para o calculo do Bloco
    
    print('Entrando na calculaBloco')
    calculoPrtcdcDF = calculaBloco(calculoPrtcdcDF).cache()
        
    print(f"Saindo da função calculoPrtcdcDF tempo de excução: {time.time() - start_time} ")
    
    return calculoPrtcdcDF
         

### CalculaPrestacoes

In [None]:

%%spark

import pyspark.sql.functions as F
import time

def calculaPrestacao(df, ativo):
    '''
        Funcao para processar os calculos das prestações
        
        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            ativo = tipo de prestacao a ser calculado no processo 
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def aplicaRegraPstcCorren
                def aplicaRegraPstcProven
                def aplicaRegraPstcInss
                def aplicaRegraPstcParc
                def aplicarRegraPstcVclo
            funcoes acionadas: 
                None
    '''
    
    # 1 - Metodo para processar os calculos das prestações conforme o tipo de prestacao
    start_time = time.time()

    
    
    # expressoes logicas utulizadas nas contas
    
    EXPR_IN_CALC_PSTC = expr(" in_calc_pstc ")
    EXPR_ATIVO_CORREN = expr(f" '{ativo}' = 'Corren' ")
    EXPR_ATIVO_INSS = expr(f" '{ativo}' = 'Inss' ")
    EXPR_ATIVO_PARC = expr(f" '{ativo}' = 'Parc' ")
    EXPR_ATIVO_PROVEN = expr(f" '{ativo}' = 'Proven' ")
    EXPR_ATIVO_VCLO = expr(f" '{ativo}' = 'Vclo' ")
    
    # expressoes matematicas utulizadas nas contas conforme condicao
    
    CALC_ATIVO_CORREN = expr(" cast(vl_ftr_demp  * cd_ftr_rlc * vl_cfct_045 * vl_smat_ren_pres_trans as int) ")
    CALC_ATIVO_INSS =   expr(" cast(vl_cfct_045  * vl_smat_ren_pres_trans as int) ")
    CALC_ATIVO_PARC =   expr(" cast(vl_ftr_demp  * vl_cfct_045 * vl_ren_brto * waux_coef_ren as int) ")
    CALC_ATIVO_VCLO =   expr(" cast(vl_ftr_demp  * cd_ftr_rlc * vl_cfct_060 * vl_rend_lqdo_ttl as int) ")
    CALC_ATIVO_PROVEN = expr(" cast(vl_ftr_demp  * cd_ftr_rlc * vl_cfct_x_prtt * vl_smat_ren_clcd as int) ")
    
    
    
    
    # Inicialização
    prestacaoCalculadaDF = df.withColumn("vl_pstc_cdc_clcd",F.when(EXPR_IN_CALC_PSTC & EXPR_ATIVO_CORREN,CALC_ATIVO_CORREN)\
                                                             .when(EXPR_IN_CALC_PSTC & EXPR_ATIVO_INSS,CALC_ATIVO_INSS)\
                                                             .when(EXPR_IN_CALC_PSTC & EXPR_ATIVO_PARC,CALC_ATIVO_PARC)
                                                             .otherwise(col("vl_pstc_cdc_clcd")))\
                             .withColumn("vl_pstc_cdc_prtt", F.when(EXPR_IN_CALC_PSTC & EXPR_ATIVO_PROVEN,CALC_ATIVO_PROVEN)\
                                                              .otherwise(col("vl_pstc_cdc_prtt"))\
                                        )\
                             .withColumn("vl_pstc_vclo_clcd",F.when(EXPR_IN_CALC_PSTC & EXPR_ATIVO_VCLO,CALC_ATIVO_VCLO)\
                                                              .otherwise(col("vl_pstc_vclo_clcd"))\
                                        )
    
    
    print(f"Saindo da função prestacaoCalculada tempo de excução: {time.time() - start_time} ")

    return prestacaoCalculadaDF 


### CalculaSublimite

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time


def calculaSlim(df, regra):
    '''
        Funcao para calcular o valor do Sub Limite. 
             
        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            regra = tipo de produto a ser usado como parametro para realizar os calculos 
            
            1: ECF
            2: Veiculo
            None: Demais produtos
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def processaCalculo
                def slim_Vclo
                def slim_Funci
                def slim_ecf
                def slimApvdCdc
                def slimClcdCdc
            funcoes acionadas: 
                None
    '''
    
    # 1 - Calculo sublimite ECF
    # 2 - Calculo sublimite de Veiculos
    # 3 - Calculos sublimite - demais produtos
    # 4 - O código compara se a variável auxiliar de cordialidade de sublimite é igual a 161 para calcular o valor da variável auxiliar de valor de sublimite
    # 5 - Se a variável auxiliar do valor de sublimite for maior que a variável auxiliar de sublimite do valor
    # 6 - limite referencia e se o caso for de pessoa física, a variável auxiliar do valor de sublimite recebe
    # 7 - o valor da variável do valor de limite referencia
        
    # constantes
    
    
    start_time = time.time()

    tp_regra = regra
        
    # expressoes condicionais utulizadas nas contas conforme condicao 
        
    EXPR_AUX_VL_SLIM = expr(" aux_vl_slim > aux_vl_lim_ref and cd_fuc = 1")
    EXPR_WIX_CRIT = expr(" aux_mtp_slim > gda_maior_mtp and aux_vl_slim > 0")
    EXPR_AUX_CD_SLIM = expr(" aux_cd_slim = 161 ")
    EXPR_IN_CALC_SLIM_REGRA_1 = expr("in_CalculaSlim_regra1")
    EXPR_IN_CALC_SLIM_REGRA_2 = expr("in_CalculaSlim_regra2")
    EXPR_IN_CALC_SLIM_REGRA_0 = expr("in_CalculaSlim_regra_outros")
    
    # expressoes matematicas utilizadas nas contas conforme condicao
    
    CALC_REGRA_1 = expr(" aux_pstc_slim * aux_mtp_slim ")
    CALC_REGRA_2_APVD_21 = expr(" cast(vl_pstc_vclo_apvd as int) * vl_mtp_clc_slim_21 ")
    CALC_REGRA_2_CLCD_21 = expr(" cast(vl_pstc_vclo_clcd as int) * vl_mtp_clc_slim_21 ")
    CALC_REGRA_2_APVD_117 = expr(" cast(vl_pstc_vclo_apvd as int) * vl_mtp_clc_slim_117 ")
    CALC_REGRA_2_CLCD_117 = expr(" cast(vl_pstc_vclo_clcd as int) * vl_mtp_clc_slim_117 ")
    CALC_REGRA_NONE_161 = expr("aux_pstc_slim * aux_mtp_slim * aux_red_slim * aux_far_slim ")
    CALC_REGRA_NONE_OUTROS = expr(" aux_pstc_slim * aux_mtp_slim * aux_red_slim * aux_far_slim * aux_redt_vcli  ")
         
        
        
    # Inicialização
    
    if tp_regra == 1:
        
        calculaSlimDF = df.withColumn("aux_vl_slim",F.when(EXPR_IN_CALC_SLIM_REGRA_1,CALC_REGRA_1)\
                                                     .otherwise(lit(0))\
                                     )
                         
        
        calculaSlimDF = calculaSlimDF.withColumn("aux_vl_slim",F.when(EXPR_IN_CALC_SLIM_REGRA_1 & EXPR_AUX_VL_SLIM,col("aux_vl_lim_ref"))\
                                                                .otherwise(col("aux_vl_slim"))\
                                                )

                                        
            
    elif tp_regra == 2:
        calculaSlimDF = df.withColumn("vl_slim_21_apvd",F.when(EXPR_IN_CALC_SLIM_REGRA_2,CALC_REGRA_2_APVD_21)\
                                                         .otherwise(col("vl_slim_21_apvd"))\
                                     )\
                          .withColumn("vl_slim_21_clcd",F.when(EXPR_IN_CALC_SLIM_REGRA_2,CALC_REGRA_2_CLCD_21)\
                                                         .otherwise(col("vl_slim_21_clcd"))\
                                     )\
                          .withColumn("vl_slim_117_apvd",F.when(EXPR_IN_CALC_SLIM_REGRA_2,CALC_REGRA_2_APVD_117)\
                                                          .otherwise(col("vl_slim_117_apvd"))\
                                     )\
                          .withColumn("vl_slim_117_clcd",F.when(EXPR_IN_CALC_SLIM_REGRA_2,CALC_REGRA_2_CLCD_117)\
                                                          .otherwise(col("vl_slim_117_clcd"))\
                                     )

        
       
    elif tp_regra == None or tp_regra == 0:

        calculaSlimDF = df.withColumn("aux_red_slim",lit(1))
                

        calculaSlimDF = calculaSlimDF.withColumn("aux_vl_slim",F.when(EXPR_AUX_CD_SLIM & EXPR_IN_CALC_SLIM_REGRA_0,CALC_REGRA_NONE_161)\
                                                                .otherwise(lit(0))\
                                                )\
                                     .withColumn("aux_vl_slim",F.when(~EXPR_AUX_CD_SLIM & EXPR_IN_CALC_SLIM_REGRA_0,CALC_REGRA_NONE_OUTROS)\
                                                                .otherwise(col("aux_vl_slim"))\
                                                )\
                                     .withColumn("aux_vl_slim",F.when(~EXPR_AUX_CD_SLIM & EXPR_AUX_VL_SLIM & EXPR_IN_CALC_SLIM_REGRA_0,col("aux_vl_lim_ref"))\
                                                                .otherwise(col("aux_vl_slim"))\
                                                )\
                                     .withColumn("wix_crit",F.when(~EXPR_AUX_CD_SLIM & EXPR_WIX_CRIT & EXPR_IN_CALC_SLIM_REGRA_0, lit(0))\
                                                             .otherwise(col("wix_crit"))\
                                                )
                
        calculaSlimDF = calculaSlimDF.withColumn('aux_cd_slim_aux', col('aux_cd_slim').cast('bigint'))
        calculaSlimDF = calculaSlimDF.withColumn('cd_slim_rnvc_aux', col('cd_slim_rnvc_aux').cast('array<int>'))

        
        calculaSlimDF = calculaSlimDF.withColumn('gda_maior_mtp', F.when(~EXPR_AUX_CD_SLIM & EXPR_WIX_CRIT & EXPR_IN_CALC_SLIM_REGRA_0 & (F.array_contains(col('cd_slim_rnvc_aux'), col('aux_cd_slim_aux'))),\
                                                                                                              col('aux_mtp_slim'))\
                                                                   .otherwise(col('gda_maior_mtp')))
            
    
    print(f"Saindo da função calculaSlim tempo de excução: {time.time() - start_time} ")
        
    return calculaSlimDF

### CalculaValorTeto

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time


def calculaVlTeto(df):
    '''
    Funcao para processar o calculo do Valor Teto

    Arguments:
        df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado

    Keyword Arguments:
        None
    Returns:
        dataframe
    See:
        chamada pela funcao:
            def apuraTeto 
        funcoes acionadas: 
            None
    '''
    
    # 1 - Metodo para processar o calculo do Valor Teto
    
    start_time = time.time()


    # constantes 
    PC1_51 = 0.10 
    PC2_51 = 0.15
    PC3_51 = 0.20
    
    PC1_52 = 0.07
    PC2_52 = 0.10
    PC3_52 = 0.15
    
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    EXPR_LOGICA_INCALCULO = expr(" in_calculoVlTeto ")
    EXPR_LOGICA_51_PC1 = expr(" wgda_vlr_rndm_lqdo_total <= wgda_vl_pc40_rnd_parm_teto and cd_mtdl_anl_crd = 51")
    EXPR_LOGICA_51_PC2 = expr(" wgda_vlr_rndm_lqdo_total <= wgda_vl_pc80_rnd_parm_teto and cd_mtdl_anl_crd = 51")
    EXPR_LOGICA_51_PC3 = expr(" wgda_vlr_rndm_lqdo_total > wgda_vl_pc80_rnd_parm_teto and cd_mtdl_anl_crd = 51")
    EXPR_LOGICA_52_PC1 = expr(" wgda_vlr_rndm_lqdo_total <= wgda_vl_pc40_rnd_parm_teto and cd_mtdl_anl_crd in (52,53)")
    EXPR_LOGICA_52_PC2 = expr(" wgda_vlr_rndm_lqdo_total <= wgda_vl_pc80_rnd_parm_teto and cd_mtdl_anl_crd in (52,53)")
    EXPR_LOGICA_52_PC3 = expr(" wgda_vlr_rndm_lqdo_total > wgda_vl_pc80_rnd_parm_teto and cd_mtdl_anl_crd in (52,53)")
    
    # expressoes matematicas utulizadas nas contas
    CALC_51_PC1 =      expr(f" wgda_vlr_rndm_lqdo_total * {PC1_51}")
    CALC_51_PC2 =      expr(f" wgda_vlr_rndm_lqdo_total * {PC2_51}")
    CALC_51_PC3 =      expr(f" wgda_vlr_rndm_lqdo_total * {PC3_51}")
    CALC_52_PC1 =      expr(f" wgda_vlr_rndm_lqdo_total * {PC1_52}")
    CALC_52_PC2 =      expr(f" wgda_vlr_rndm_lqdo_total * {PC2_52}")
    CALC_52_PC3 =      expr(f" wgda_vlr_rndm_lqdo_total * {PC3_52}")
    
    
    
    
    # Inicialização 

    valorTetoCalculadoDF = df.withColumn("wgda_teto_pstc_cdc",F.when(EXPR_LOGICA_INCALCULO & EXPR_LOGICA_51_PC1,CALC_51_PC1)\
                                                               .when(EXPR_LOGICA_INCALCULO & EXPR_LOGICA_51_PC2,CALC_51_PC2)\
                                                               .when(EXPR_LOGICA_INCALCULO & EXPR_LOGICA_51_PC3,CALC_51_PC3)\
                                                               .when(EXPR_LOGICA_INCALCULO & EXPR_LOGICA_52_PC1,CALC_52_PC1)\
                                                               .when(EXPR_LOGICA_INCALCULO & EXPR_LOGICA_52_PC2,CALC_52_PC2)\
                                                               .when(EXPR_LOGICA_INCALCULO & EXPR_LOGICA_52_PC3,CALC_52_PC3)\
                                                               .otherwise(lit(0))\
                                        )
    
    
    print(f"Saindo da função calculaVlTeto tempo de excução: {time.time() - start_time} ")

    return valorTetoCalculadoDF
        

## Regras

### ApuraTeto

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time


def apuraTeto(df):
    '''
        O código tem por objetivo realizar o cálculo do teto da prestação (wgda_teto_pstc_cdc) que posteriormente é utilizada como base de comparação para o processo de cálculo
        do valor da prestação aprovada e calculada

        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def recompRendaLqda 
            funcoes acionadas: 
                def calculaVlTeto
    '''
    
    # 1 - O código calcula as variáveis auxiliares de teto de prestação de crédito direto e teto de prestação de crédito direto do banco postal de acordo 
    # 2 - Com o valor código de segmento de análise e indicador de banco postal
    # 3 - O valor de código de natureza de ocupação, indicador de banco postal e teto de prestação de crédito direto do banco postal 
    # 4 - Definem as condições para calcular o valor do teto de prestação de crédito
        
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    start_time = time.time()
    
    EXPR_IND_APURA_TETO = expr(" in_apuraTeto ")
    EXPR_WGDA_TETO = expr("cd_sgm_anl = 39 or in_bco_pstl = 'S' and vl_pstc_cdc_clcd * 0.5 > vl_pstc_ant_045 * 0.5")
    EXPR_WGDA_TETO_PSTC_CDC = expr("cd_sgm_anl = 39 or in_bco_pstl = 'S' and wgda_teto_pstc_cdc_bco_pstl < wgda_teto_pstc_cdc ")
    EXPR_WGDA_TETO_PSTC_CDC_1 = expr("wgda_teto_pstc_cdc = 0")
    EXPR_CALCULO_VLTETO = expr("cd_ntz_ocp in (4, 5, 6, 8) or (cd_ntz_ocp = 13 and cd_ocp_ppl in (271, 272, 273, 310, 330))")
    
    # expressoes matematicas utulizadas nas contas
    
    CALC_WGDA_TETO =  expr(" vl_pstc_cdc_clcd * 0.5 * vl_ftr_ajst_pstc_pcld ")
    CALC_WGDA_TETO_1 =  expr(" vl_pstc_ant_045 * 0.5 * vl_ftr_ajst_pstc_pcld ")  
    

    # Inicialização
    

                                 
    regraApuraTetoDF = df.withColumn("wgda_teto_pstc_cdc_bco_pstl",F.when(EXPR_IND_APURA_TETO & EXPR_WGDA_TETO, CALC_WGDA_TETO)\
                                                                    .otherwise(CALC_WGDA_TETO_1)\
                                    )\
                         .withColumn("in_calculoVlTeto",F.when(EXPR_IND_APURA_TETO & EXPR_CALCULO_VLTETO,lit(True))\
                                                         .otherwise(lit(False))\
                                    )
                     
    print("Entrando na função calculaVlTeto ")                           
                                
    regraApuraTetoDF = calculaVlTeto(regraApuraTetoDF)
    
    
    
    regraApuraTetoDF = regraApuraTetoDF.withColumn("wgda_teto_pstc_cdc", F.when(EXPR_IND_APURA_TETO & EXPR_WGDA_TETO_PSTC_CDC, col("wgda_teto_pstc_cdc_bco_pstl"))\
                                                                          .when(EXPR_IND_APURA_TETO & EXPR_WGDA_TETO_PSTC_CDC_1, col("wgda_teto_pstc_cdc_bco_pstl"))\
                                                                          .otherwise(lit(0))\
                                                  )
    
    print(f"Saindo da função regraApuraTeto tempo de excução: {time.time() - start_time} ")

    return regraApuraTetoDF

### RecomposicaoRendaLiquida

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time


def RecompRendaLqda(df):
    '''
        Procede o cálculo da variável auxiliar de valor de rendimento líquido total e valor utilizado consignação Banco do Brasil. Por fim, compara o valor da variável auxiliar 
        e chama a apuraTeto.py para calcular as variáveis auxiliares de teto de prestação de crédito

        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def calcPstcCorren
                def calcPstcInss
                def calcPstcParc
            funcoes acionadas: 
                def apurateto
    '''
    
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    start_time = time.time()

    EXPR_IND_APURATETO = expr(" wgda_vlr_rndm_lqdo_total <= vl_ren_prtz_teto  ")
    EXPR_IN_FUN = expr("in_fun = 1")
    EXPR_IN_PGT = expr("in_pgt = 1")
    EXPR_IN_INSS = expr("in_inss = 1")
    EXPR_IN_RENDA = expr(" in_recomp_renda ")
    
    # expressoes matematicas utulizadas nas contas
    
    CALC_WGDA_VLR_RNDM =  expr(" vl_rend_lqdo_ttl + vl_utzd_consig_sfn + vl_utzd_consig_bb  ")
    
    
    
    # Inicialização
    
    regraRecompRendaLqdaDF = df.withColumn('wgda_vlr_rndm_lqdo_total',F.when(EXPR_IN_RENDA,col('vl_rend_lqdo_ttl'))\
                                                                       .otherwise(col('wgda_vlr_rndm_lqdo_total'))\
                                          )

                                       
                                       
    regraRecompRendaLqdaDF =  regraRecompRendaLqdaDF.withColumn("vl_utzd_consig_bb",       F.when(EXPR_IN_RENDA & EXPR_IN_FUN,lit(0))\
                                                                                            .otherwise(col("vl_utzd_consig_bb"))\
                                                                )\
                                                    .withColumn("wgda_vlr_rndm_lqdo_total",F.when(EXPR_IN_RENDA & EXPR_IN_PGT | EXPR_IN_INSS,CALC_WGDA_VLR_RNDM)\
                                                                                            .otherwise(col("wgda_vlr_rndm_lqdo_total"))\
                                                                )\
                                                    .withColumn("in_apuraTeto",            F.when(EXPR_IN_RENDA & EXPR_IND_APURATETO,lit(True))\
                                                                                            .otherwise(lit(False))\
                                                                )      
    print("Entrando na funcao apuraTeto")                          
    
    regraRecompRendaLqdaDF = apuraTeto(regraRecompRendaLqdaDF)
    
    
    print(f"Saindo da função regraRecompRendaLqda tempo de excução: {time.time() - start_time} ")

    
    return regraRecompRendaLqdaDF

### RegraPrestacaoFuncionario

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time


def RegraPstcEcfFunci(df):
    '''
        Atribui o valor prestação cadastro emitentes cheque sem fundo aprovado, calculado e demais, valor prestação funcionário bb aprovado e calculado
        redutor convênio prestação cadastro emitentes cheque sem fundo.

        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def ProcessaParcelado
            funcoes acionadas:
                def processar
                
    '''
    
    # expressoes logicas
    
    start_time = time.time()
    EXPR_IN_REGRAPSTC = expr("in_RegraPstcEcfFunci")
    
    # Chamada para o ANCSF159
    
    print("Entrando na função processar")
    
    RegraPstcEcfFunciDF = processar(df)
    
    RegraPstcEcfFunciDF = RegraPstcEcfFunciDF.withColumn("vl_pstc_ecf_clcd", col("vl_pstc_ttl_ecf_clcd"))\
                                             .withColumn("vl_pstc_ecf_apvd", col("vl_pstc_ttl_ecf_apvd"))\
                                             .withColumn("vl_pstc_fun_bb_clcd", col("vl_pstc_funci_clcd"))\
                                             .withColumn("vl_pstc_fun_bb_apvd", col("vl_pstc_funci_apvd"))\
                                             .withColumn("vl_pstc_ecf_demais", col("vl_pstc_demais"))\
                                             .withColumn("vl_mtp_clc_slim_137", col("ic_mtp_cvn_ecf"))\
                                             .withColumn("vl_mtp_clc_slim_171", col("ic_mtp_cvn_crt"))\
                                             .withColumn("vl_renda_pres", col("vl_renda_pres"))\
                                             .withColumn("vl_renda_funci", col("vl_renda_funci"))\
                                             .withColumn("vl_renda_liq_ttl", col("vl_renda_liq_ttl"))\
                                             .withColumn("vl_ren_utzd_pstc_ecf", col("ren_utzd_pstc_ecf"))\
                                             .withColumn("ic_ajst_cvn_137", col("ic_ajst_cvn_ecf"))\
                                             .withColumn("ic_ajst_cvn_171", col("ic_ajst_cvn_crt"))\
                                             .withColumn("vl_pstc_137", col("vl_pstc_ttl_ecf_clcd"))\
                                             .withColumn("vl_renda_funci", col("vl_renda_funci"))\
                                             .withColumn("vl_max_cvn_083", col("vl_max_cvn_sal"))\
                                             .withColumn("vl_max_cvn_137", col("vl_max_cvn_ecf"))\
                                             .withColumn("vl_max_cvn_171", col("vl_max_cvn_crt"))\

    
    print(f"Sainda da função RegraPstcEcfFunci tempo de execução {time.time() - start_time} ")
    
    return RegraPstcEcfFunciDF

### TrataSinalAlerta

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time


def trata_Snl_Alerta(df):
    '''
        Proceder o tratamento do sinal de alerta que determina o valor de prestação de crédito direto ao consumidor aprovado e calculado

        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def aplicaRegraPstcCorren
                def aplicaRegraPstcInss
                def aplicaRegraPstcParc
                def aplicarRegraPstcVclo
            funcoes acionadas:
                None
                
    '''

    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    start_time = time.time()

    
    EXPR_IN_TRATA_SN_ALERTA = expr(" in_trata_sn_alerta ")
    EXPR_VL_PSTC_ANT_045 = expr(" cd_snl_avs_pcld = 0 and in_lim_ant = 'N'  ")
    EXPR_VL_CD_SNL_AVS = expr(" cd_snl_avs_pcld = 0 ")
    EXPR_VL_PSTC_CDC_APVD_SN1 = expr(" cd_snl_avs_pcld = 0 and vl_pstc_ant_045 > vl_pstc_cdc_clcd ")
    EXPR_VL_PSTC_CDC_APVD_SN1_ELSE = expr(" vl_pstc_ant_045 > vl_pstc_cdc_clcd ")
    EXPR_VL_PSTC_CDC_APVD_SN1_FAR = expr(" ws_vl_pstc_far_apvd_045 > ws_vl_pstc_far_clcd_045 ")
    EXPR_VL_CD_SNL_AVS_SN2 = expr(" cd_snl_avs_pcld = 1 ")
    EXPR_VL_PSTC_CDC_APVD_SN2 = expr(" cd_snl_avs_pcld = 1 and in_lim_ant = 'N' ")
    EXPR_VL_PSTC_CDC_APVD_SN2_ELSE = expr(" cd_snl_avs_pcld = 1 and in_lim_ant <> 'N' ")
    EXPR_VL_PSTC_CDC_APVD_SN2_1 = expr(" cd_snl_avs_pcld = 1 and ws_vl_pstc_far_apvd_045 > ws_vl_pstc_far_clcd_045 and ws_vl_pstc_far_apvd_045 < vl_pstc_cdc_apvd ")
    EXPR_VL_PSTC_CDC_APVD_SN2_2 = expr(" cd_snl_avs_pcld = 1 and ws_vl_pstc_far_apvd_045 <= ws_vl_pstc_far_clcd_045 and ws_vl_pstc_far_clcd_045 < vl_pstc_cdc_apvd ")
    EXPR_VL_PSTC_CDC_APVD_SN3 = expr(" cd_snl_avs_pcld in (2,5,6) and in_lim_ant = 'N' ")
    EXPR_VL_PSTC_CDC_APVD_SN3_ELSE = expr(" cd_snl_avs_pcld in (2,5,6) and in_lim_ant <> 'N' and vl_pstc_ant_045 < vl_pstc_cdc_clcd ")
    EXPR_VL_PSTC_CDC_APVD_SN3_ELSE_1 = expr(" cd_snl_avs_pcld in (2,5,6) and in_lim_ant <> 'N' and vl_pstc_ant_045 >= vl_pstc_cdc_clcd ")
    EXPR_VL_PSTC_CDC_APVD_SN3_2 = expr(" cd_snl_avs_pcld in (2,5,6) and ws_vl_pstc_far_apvd_045 > ws_vl_pstc_far_clcd_045 and ws_vl_pstc_far_apvd_045 < vl_pstc_cdc_apvd ")
    EXPR_VL_PSTC_CDC_APVD_SN3_3_ELSE = expr(" cd_snl_avs_pcld in (2,5,6) and ws_vl_pstc_far_apvd_045 <= ws_vl_pstc_far_clcd_045 and ws_vl_pstc_far_clcd_045 < vl_pstc_cdc_apvd")
    EXPR_CALC_FAR_SN3 = expr("cd_snl_avs_pcld in (2,5,6)")

    # expressoes matematicas utulizadas nas contas conforme condicao 
        
    CALC_CLCD_045 = expr(" vl_pstc_cdc_clcd * vl_ftr_ajst_pstc_pcld ")
    CALC_APVD_045 = expr(" vl_pstc_cdc_apvd * vl_ftr_ajst_pstc_pcld ")
    CALC_VL_PSTC_CDC_APVD_SN_4 =  expr(" vl_pstc_cdc_clcd * vl_pc_red_pstc_clcd *  vl_ftr_ajst_pstc_pcld ")

    
    # Inicialização
    
    regraTrataSnAlertaDF = df.withColumn("vl_pstc_ant_045",F.when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_PSTC_ANT_045,lit(0))\
                                                            .otherwise(col("vl_pstc_ant_045"))\
                                        )\
                             .withColumn("vl_pstc_cdc_apvd",F.when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_PSTC_CDC_APVD_SN1, col("vl_pstc_ant_045"))\
                                                             .when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_CD_SNL_AVS & ~EXPR_VL_PSTC_CDC_APVD_SN1_ELSE, col("vl_pstc_cdc_clcd"))\
                                                             .otherwise(col("vl_pstc_cdc_apvd"))\
                                        )
        
    regraTrataSnAlertaDF = regraTrataSnAlertaDF.withColumn("ws_vl_pstc_far_clcd_045"  ,F.when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_CD_SNL_AVS, CALC_CLCD_045)\
                                                                                        .otherwise(col("ws_vl_pstc_far_clcd_045"))\
                                                          )\
                                               .withColumn("ws_vl_pstc_far_apvd_045",F.when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_CD_SNL_AVS, CALC_APVD_045)\
                                                                                      .otherwise(col("ws_vl_pstc_far_apvd_045"))\
                                                          )\
                                               .withColumn("vl_pstc_cdc_apvd",F.when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_CD_SNL_AVS & EXPR_VL_PSTC_CDC_APVD_SN1_FAR, col("ws_vl_pstc_far_apvd_045"))\
                                                                               .when(EXPR_IN_TRATA_SN_ALERTA &  EXPR_VL_CD_SNL_AVS & ~EXPR_VL_PSTC_CDC_APVD_SN1_FAR, col("ws_vl_pstc_far_clcd_045"))\
                                                                               .otherwise(col("vl_pstc_cdc_apvd"))\
                                                          )
    
    regraTrataSnAlertaDF.cache()
    
    # Regra 2 - Sinal de Alerta igual a 0
    
    regraTrataSnAlertaDF =  regraTrataSnAlertaDF.withColumn("vl_pstc_cdc_apvd", F.when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_PSTC_CDC_APVD_SN2 , col("vl_pstc_cdc_clcd"))\
                                                                                 .when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_PSTC_CDC_APVD_SN2_ELSE, col("vl_pstc_ant_045"))\
                                                                                 .otherwise(col("vl_pstc_cdc_apvd"))\
                                                           )
    
    regraTrataSnAlertaDF = regraTrataSnAlertaDF.withColumn("ws_vl_pstc_far_clcd_045"  ,F.when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_CD_SNL_AVS_SN2,CALC_CLCD_045)\
                                                                                        .otherwise(col("ws_vl_pstc_far_clcd_045"))\
                                                          )\
                                               .withColumn("ws_vl_pstc_far_apvd_045"  ,F.when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_CD_SNL_AVS_SN2,CALC_APVD_045)\
                                                                                        .otherwise(col("ws_vl_pstc_far_apvd_045"))\
                                                          )\
    
    
    
    regraTrataSnAlertaDF = regraTrataSnAlertaDF.withColumn("vl_pstc_cdc_apvd", F.when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_PSTC_CDC_APVD_SN2_1, col("ws_vl_pstc_far_apvd_045"))\
                                                                                .when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_PSTC_CDC_APVD_SN2_2, col("ws_vl_pstc_far_clcd_045"))\
                                                                                .otherwise(col("vl_pstc_cdc_apvd"))\
                                                          )
    
    # Regra 3 - Sinal de Alerta igual a 2, 5 e 6
    
    regraTrataSnAlertaDF.cache()
        
    regraTrataSnAlertaDF =  regraTrataSnAlertaDF.withColumn("vl_pstc_cdc_apvd",F.when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_PSTC_CDC_APVD_SN3, col("vl_pstc_cdc_clcd"))\
                                                                                .when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_PSTC_CDC_APVD_SN3_ELSE, col("vl_pstc_ant_045"))\
                                                                                .when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_PSTC_CDC_APVD_SN3_ELSE_1, col("vl_pstc_cdc_clcd"))\
                                                                                .otherwise(col("vl_pstc_cdc_apvd"))\
                                                           )
    
    regraTrataSnAlertaDF =  regraTrataSnAlertaDF.withColumn("ws_vl_pstc_far_clcd_045"  ,F.when(EXPR_IN_TRATA_SN_ALERTA & EXPR_CALC_FAR_SN3,CALC_CLCD_045)\
                                                                                         .otherwise(col("ws_vl_pstc_far_clcd_045"))\
                                                           )\
                                                .withColumn("ws_vl_pstc_far_apvd_045"  ,F.when(EXPR_IN_TRATA_SN_ALERTA & EXPR_CALC_FAR_SN3,CALC_APVD_045)\
                                                                                         .otherwise(col("ws_vl_pstc_far_apvd_045"))\
                                                           )\
    
    
    regraTrataSnAlertaDF =  regraTrataSnAlertaDF.withColumn("vl_pstc_cdc_apvd",  F.when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_PSTC_CDC_APVD_SN3_2, col("ws_vl_pstc_far_apvd_045"))\
                                                                                  .when(EXPR_IN_TRATA_SN_ALERTA & EXPR_VL_PSTC_CDC_APVD_SN3_3_ELSE, col("ws_vl_pstc_far_clcd_045"))\
                                                                                  .otherwise(col("vl_pstc_cdc_apvd"))\
                                                           )

    # Regra 4 - Sinal de Alerta igual a 4    
    regraTrataSnAlertaDF = regraTrataSnAlertaDF.withColumn("vl_pstc_cdc_apvd",F.when(EXPR_IN_TRATA_SN_ALERTA & (col("cd_snl_avs_pcld") == 4), CALC_VL_PSTC_CDC_APVD_SN_4)\
                                                                               .otherwise(col("vl_pstc_cdc_apvd"))\
                                                          )
    
    regraTrataSnAlertaDF.unpersist()
    
    print(f"Saindo da função regraTrataSnAlerta tempo de excução: {time.time() - start_time} ")
                                         
    return regraTrataSnAlertaDF

### TrataSinalAlertaVeiculo

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time


def trata_Snl_Alerta_Vclo(df):
    '''
        Proceder o tratamento do sinal de alerta que determina o valor de prestação de crédito direto ao consumidor aprovado e calculado

        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def aplicaRegraPstcCorren
                def aplicaRegraPstcInss
                def aplicaRegraPstcParc
                def aplicarRegraPstcVclo
            funcoes acionadas:
                None
                
    '''

        
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    start_time = time.time()

        
    EXPR_VL_PSTC_ANT_060 = expr(" cd_snl_avs_pcld = 0 and in_lim_ant = 'N' ")
    EXPR_BLOCO_1 = expr("cd_snl_avs_pcld = 0")
    EXPR_VL_PSTC_VCLO_APVD = expr("vl_ttl_pcl_fin > vl_pstc_ant_060")
    EXPR_VL_PSTC_VCLO_APVD_1 = expr("vl_pstc_vclo_apvd < vl_pstc_vclo_clcd")
    EXPR_IN_TRATA_SN = expr("in_trata_sn_alerta_vclo")
    EXPR_VL_PSTC_VCLO_APVD_2 = expr("ws_vl_pstc_far_apvd_021 > ws_vl_pstc_far_clcd_021")
    EXPR_BLOCO_2 = expr("cd_snl_avs_pcld = 1")
    EXPR_VL_PSTC_VCLO_APVD_3 = expr("cd_snl_avs_pcld = 1 and vl_ttl_pcl_fin > vl_pstc_vclo_clcd")
    EXPR_VL_PSTC_VCLO_APVD_3_ELSE = expr("cd_snl_avs_pcld = 1 and vl_ttl_pcl_fin <= vl_pstc_vclo_clcd")
    EXPR_VL_PSTC_VCLO_APVD_4 = expr("cd_snl_avs_pcld = 1 and in_lim_ant <> 'N' and vl_pstc_ant_060 < vl_pstc_vclo_apvd  ")
    EXPR_VL_PSTC_VCLO_APVD_5 = expr("cd_snl_avs_pcld = 1 and ws_vl_pstc_far_apvd_021 > ws_vl_pstc_far_clcd_021 and ws_vl_pstc_far_apvd_021 < vl_pstc_vclo_apvd  ")
    EXPR_VL_PSTC_VCLO_APVD_5_ELSE = expr("cd_snl_avs_pcld = 1 and ws_vl_pstc_far_apvd_021 <= ws_vl_pstc_far_clcd_021 and ws_vl_pstc_far_clcd_021 < vl_pstc_vclo_apvd ")
    EXPR_BLOCO_3 = expr("cd_snl_avs_pcld in (2, 5, 6)")
    EXPR_VL_PSTC_VCLO_APVD_6 = expr(" cd_snl_avs_pcld in (2, 5, 6) and in_lim_ant = 'N' ")
    EXPR_VL_PSTC_VCLO_APVD_6_ELSE = expr(" cd_snl_avs_pcld in (2, 5, 6) and in_lim_ant <> 'N' and vl_pstc_vclo_clcd < vl_pstc_ant_060")
    EXPR_VL_PSTC_VCLO_APVD_6_MAIOR = expr(" cd_snl_avs_pcld in (2, 5, 6) and in_lim_ant <> 'N' and vl_pstc_vclo_clcd >= vl_pstc_ant_060")
    EXPR_VL_PSTC_VCLO_APVD_7 = expr("cd_snl_avs_pcld in (2, 5, 6) and ws_vl_pstc_far_apvd_021 > ws_vl_pstc_far_clcd_021 and ws_vl_pstc_far_apvd_021 < vl_pstc_vclo_apvd")
    EXPR_VL_PSTC_VCLO_APVD_7_ELSE = expr("cd_snl_avs_pcld in (2, 5, 6) and ws_vl_pstc_far_apvd_021 <= ws_vl_pstc_far_clcd_021 and ws_vl_pstc_far_clcd_021 < vl_pstc_vclo_apvd")
    EXPR_BLOCO_4 = expr("cd_snl_avs_pcld = 4")

    # expressoes matematicas utulizadas nas contas conforme condicao 
    
    CALC_BLOCO_4 = expr("vl_pstc_vclo_clcd * vl_pc_red_pstc_clcd * vl_ftr_ajst_pstc_vclo")
    CALC_CLCD_021 = expr(" cast(vl_pstc_vclo_clcd as int) * vl_ftr_ajst_pstc_vclo ")
    CALC_APVD_021 = expr(" cast(vl_pstc_vclo_apvd as int) * vl_ftr_ajst_pstc_vclo ")

        
    
    
    # Inicialização
    # Bloco 1
     
    regraTrataSnAlerta_VcloDF = df.withColumn("vl_pstc_ant_060", F.when(EXPR_IN_TRATA_SN & EXPR_VL_PSTC_ANT_060, lit(0))\
                                                                  .otherwise(col('vl_pstc_ant_060')))\
                                  .withColumn("vl_pstc_vclo_apvd",F.when(EXPR_IN_TRATA_SN & EXPR_BLOCO_1 & EXPR_VL_PSTC_VCLO_APVD,col("vl_ttl_pcl_fin"))\
                                                                   .when(EXPR_IN_TRATA_SN & EXPR_BLOCO_1 & ~EXPR_VL_PSTC_VCLO_APVD,col("vl_pstc_ant_060"))\
                                                                   .otherwise(col("vl_pstc_vclo_apvd")))
                                              
    regraTrataSnAlerta_VcloDF = regraTrataSnAlerta_VcloDF.withColumn("vl_pstc_vclo_apvd",F.when(EXPR_IN_TRATA_SN & EXPR_BLOCO_1 & EXPR_VL_PSTC_VCLO_APVD_1,col("vl_pstc_vclo_clcd")\
                                                                                                )\
                                                                                          .otherwise(col("vl_pstc_vclo_apvd"))\
                                                                    )\
                                                         .withColumn("ws_vl_pstc_far_clcd_021", F.when(EXPR_IN_TRATA_SN & EXPR_BLOCO_1,CALC_CLCD_021)\
                                                                                                 .otherwise(lit(0))\
                                                                    )\
                                                         .withColumn("ws_vl_pstc_far_apvd_021", F.when(EXPR_IN_TRATA_SN & EXPR_BLOCO_1,CALC_APVD_021)\
                                                                                                 .otherwise(lit(0))\
                                                                    )
                                
    regraTrataSnAlerta_VcloDF = regraTrataSnAlerta_VcloDF.withColumn("vl_pstc_vclo_apvd",F.when(EXPR_IN_TRATA_SN & EXPR_BLOCO_1 & EXPR_VL_PSTC_VCLO_APVD_2,col("ws_vl_pstc_far_apvd_021"))\
                                                                                          .when(EXPR_IN_TRATA_SN & EXPR_BLOCO_1 & ~EXPR_VL_PSTC_VCLO_APVD_2,col("ws_vl_pstc_far_clcd_021"))\
                                                                                          .otherwise(col("vl_pstc_vclo_apvd"))\
                                                                    )\
        
    # Bloco 2 
    
    regraTrataSnAlerta_VcloDF = regraTrataSnAlerta_VcloDF.withColumn("vl_pstc_vclo_apvd",F.when(EXPR_IN_TRATA_SN & EXPR_VL_PSTC_VCLO_APVD_3,col("vl_ttl_pcl_fin"))\
                                                                                          .when(EXPR_IN_TRATA_SN & EXPR_VL_PSTC_VCLO_APVD_3_ELSE,col('vl_pstc_vclo_clcd'))\
                                                                                          .otherwise(col("vl_pstc_vclo_apvd"))\
                                                                    )\
                                              
    regraTrataSnAlerta_VcloDF = regraTrataSnAlerta_VcloDF.withColumn("vl_pstc_vclo_apvd",F.when(EXPR_IN_TRATA_SN & EXPR_VL_PSTC_VCLO_APVD_4,col("vl_pstc_ant_060"))\
                                                                                           .otherwise(col("vl_pstc_vclo_apvd"))\
                                                                    )\
                                                          .withColumn("ws_vl_pstc_far_clcd_021", F.when(EXPR_IN_TRATA_SN & EXPR_BLOCO_2,CALC_CLCD_021)\
                                                                                                 .otherwise(col("ws_vl_pstc_far_clcd_021"))\
                                                                   )\
                                                          .withColumn("ws_vl_pstc_far_apvd_021", F.when(EXPR_IN_TRATA_SN & EXPR_BLOCO_2,CALC_APVD_021)\
                                                                                                 .otherwise(col("ws_vl_pstc_far_apvd_021"))\
                                                                    )
    
    
    regraTrataSnAlerta_VcloDF = regraTrataSnAlerta_VcloDF.withColumn("vl_pstc_vclo_apvd",F.when(EXPR_IN_TRATA_SN & EXPR_VL_PSTC_VCLO_APVD_5,col("ws_vl_pstc_far_apvd_021"))\
                                                                                          .when(EXPR_IN_TRATA_SN & EXPR_VL_PSTC_VCLO_APVD_5_ELSE,col("ws_vl_pstc_far_clcd_021"))\
                                                                                          .otherwise(col("vl_pstc_vclo_apvd"))\
                                                                    )\
                                                                   
    # Bloco 3

    regraTrataSnAlerta_VcloDF = regraTrataSnAlerta_VcloDF.withColumn("vl_pstc_vclo_apvd",F.when(EXPR_IN_TRATA_SN & EXPR_VL_PSTC_VCLO_APVD_6, col("vl_pstc_vclo_clcd"))\
                                                                                          .when(EXPR_IN_TRATA_SN & EXPR_VL_PSTC_VCLO_APVD_6_ELSE, col("vl_pstc_vclo_clcd"))\
                                                                                          .when(EXPR_IN_TRATA_SN & EXPR_VL_PSTC_VCLO_APVD_6_MAIOR, col("vl_pstc_ant_060"))\
                                                                                          .otherwise(col("vl_pstc_vclo_apvd"))\
                                                                    )\
                                                         .withColumn("ws_vl_pstc_far_clcd_021", F.when(EXPR_IN_TRATA_SN & EXPR_BLOCO_3, CALC_CLCD_021)\
                                                                                                .otherwise(col("ws_vl_pstc_far_clcd_021"))\
                                                                   )\
                                                         .withColumn("ws_vl_pstc_far_apvd_021", F.when(EXPR_IN_TRATA_SN & EXPR_BLOCO_3,CALC_APVD_021)\
                                                                                                 .otherwise(col("ws_vl_pstc_far_apvd_021"))\
                                                                    )
        
    regraTrataSnAlerta_VcloDF = regraTrataSnAlerta_VcloDF.withColumn("vl_pstc_vclo_apvd",F.when(EXPR_IN_TRATA_SN & EXPR_VL_PSTC_VCLO_APVD_7, col("ws_vl_pstc_far_apvd_021"))\
                                                                                          .when(EXPR_IN_TRATA_SN & EXPR_VL_PSTC_VCLO_APVD_7_ELSE, col("ws_vl_pstc_far_clcd_021"))\
                                                                                          .otherwise(col("vl_pstc_vclo_apvd"))\
                                                                   )\
                                                    
    # Bloco 4

    regraTrataSnAlerta_VcloDF = regraTrataSnAlerta_VcloDF.withColumn("vl_pstc_vclo_apvd", F.when(EXPR_IN_TRATA_SN & EXPR_BLOCO_4, CALC_BLOCO_4)\
                                                                                           .otherwise(col("vl_pstc_vclo_apvd")))
        
    print(f"Saindo da função regraTrataSnAlerta_Vclo tempo de excução: {time.time() - start_time} ")
      
    return regraTrataSnAlerta_VcloDF

### ZeraPrestacao

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time


def zera_Prestacoes(df):
    '''
        A funcao zeraPrestacoes contém dois métodos para zerar as variáveis: zerar e zera_Prestacoes. O método zerar é chamado para atribui o valor zero nas variáveis que satisfazem 
        as condições delimitadas no método zera_Prestacoes. O método zera_Prestacoes contém as condições que zeram as variáveis de saída

        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def aplicaRegraPstcCorren
            funcoes acionadas:
                
    '''
    
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    start_time = time.time()
        
    EXPR_COND_1 = expr(" in_cli_anot_348 = 'S' and in_cli_adpc_rhbl <> 1  ")
    EXPR_COND_2 = expr(" cd_ntz_ocp in (7, 10, 11, 16) ")
    EXPR_COND_3 = expr(" in_ctr_rsct = 1 and in_ptdd = 'N' ")
    EXPR_COND_4 = expr(" in_anot = 0 and in_ptdd <> 'N' ")
    EXPR_COND_5 = expr(" in_ctra_52_18 = 1 or in_anot in (3, 4, 5) ")
    EXPR_COND_6 = expr(" in_ctra_349 = 1 or in_ctra_349_12 = 1 ")
    EXPR_COND_7 = expr(" in_anot in (4, 5) and in_ctr_rsct = 0 ")
    EXPR_COND_8 = expr(" cd_ocp_ppl in (179, 200) or cd_cpcd_cvil in (2, 3)  and cd_grau_inst < 5")
    EXPR_COND_9 = expr(" cd_mtdl_anl_crd = 51 and cd_sgm_anl = 19 and in_bco_pstl = 'S'  ")
    EXPR_COND_10 = expr(" cd_tip_bnf = 4 and cd_mtdl_anl_crd in (52, 53) and in_bco_pstl = 'S' ")
    EXPR_COND_11 = expr(" qt_idd > 85 ")

    # expressoes matematicas utulizadas nas contas conforme condicao 
        
    CALC_CLCD_021 = expr(" vl_pstc_vclo_clcd * vl_ftr_ajst_pstc_vclo ")
    
    
    
    
    # Incialização 
    
    regraZeraPrestacaoDF = df.withColumn("vl_pstc_cdc_apvd", F.when(EXPR_COND_1,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_apvd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_clcd", F.when(EXPR_COND_1,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_clcd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_apvd", F.when(EXPR_COND_2,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_apvd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_clcd", F.when(EXPR_COND_2,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_clcd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_apvd", F.when(EXPR_COND_3,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_apvd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_clcd", F.when(EXPR_COND_3,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_clcd"))\
                                        )\
                             .withColumn("in_ctra_52_18",    F.when(EXPR_COND_4,lit(0))\
                                                              .otherwise(col("in_ctra_52_18"))\
                                        )\
                             .withColumn("in_ctra_52_18",    F.when(EXPR_COND_4,lit(0))\
                                                              .otherwise(col("in_ctra_52_18"))\
                                        )\
                             .withColumn("vl_pstc_cdc_apvd", F.when(EXPR_COND_5,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_apvd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_clcd", F.when(EXPR_COND_5,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_clcd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_apvd", F.when(EXPR_COND_6,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_apvd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_clcd", F.when(EXPR_COND_6,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_clcd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_apvd", F.when(EXPR_COND_7,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_apvd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_clcd", F.when(EXPR_COND_7,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_clcd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_apvd", F.when(EXPR_COND_8,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_apvd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_clcd", F.when(EXPR_COND_8,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_clcd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_apvd", F.when(EXPR_COND_9,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_apvd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_clcd", F.when(EXPR_COND_9,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_clcd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_apvd", F.when(EXPR_COND_10,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_apvd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_clcd", F.when(EXPR_COND_10,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_clcd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_apvd", F.when(EXPR_COND_11,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_apvd"))\
                                        )\
                             .withColumn("vl_pstc_cdc_clcd", F.when(EXPR_COND_11,lit(0))\
                                                              .otherwise(col("vl_pstc_cdc_clcd"))\
       
                                        )
    
    print(f"Saindo da função regraZeraPrestacao tempo de excução: {time.time() - start_time} ")

    return regraZeraPrestacaoDF

### AplicaRegraPrestacaoCorren

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time

def aplicaRegraPstcCorren(df):
    '''
            Aplica as regras referente a prestação dos correntistas 

        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado

        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def ancs0160
            funcoes acionadas:
                def calculaPrestacao
                def RecompRendaLqda
                def trata_Snl_Alerta
                def zera_Prestacoes
                
    '''
    # constantes
    
    start_time = time.time()

    
    TAXA_ACRESCIMO = 1.25
    FATOR_MULTIPLICACAO = 2 
    
    # expressoes condicionais utulizadas nas contas conforme condicao 
        
    EXPR_VL_PSTC_CDC_CLCD = expr(" vl_pstc_cdc_clcd > 9999999.99  ")
    EXPR_VL_PSTC_CDC_CLCD_1 = expr(" vl_pstc_cdc_clcd > vl_lim_ref_045  ")
    EXPR_WGDA_VL_REN_BRTO = expr(" cd_sgm_anl <> 25 and in_bco_pstl <> 'N' ")
    EXPR_IN_RECOMP_RENDA = expr(" wgda_vl_ren_brto <= vl_ren_prtz_teto and vl_rend_lqdo_ttl <= vl_ren_prtz_teto ")
    EXPR_VL_PSTC_CDC_APVD = expr(" vl_pstc_cdc_apvd > wgda_teto_pstc_cdc and wgda_teto_pstc_cdc > 0 ")
    EXPR_VL_PSTC_CDC_APVD_1 = expr(" vl_pstc_cdc_apvd > vl_lim_ref_045 ")
    EXPR_IN_REGRA_PSTC_CORREN = expr(" in_regra_pstc_corren ")
    EXPR_IN_CALC_PSTC = expr(" in_calc_pstc ")
    EXPR_IN_PGT = expr("in_pgt = 1")
    EXPR_IN_INSS = expr("in_inss = 1")
    EXPR_VLR_BRUTO = expr("vl_ren_brto = 0")

    # expressoes matematicas utulizadas nas contas
        
    CALC_VL_PSTC_CDC_CLCD =  expr(" vl_pstc_cdc_clcd + vl_pstc_cdc_prtt ")
    CALC_WGDA_VL_REN_BRTO =  expr(f" vl_rend_lqdo_ttl * {TAXA_ACRESCIMO}  ")

    
    
    #Inicialização
    
    RegraPstcCorrenDF = df.withColumn("waux_vl_2slr_min", F.when(EXPR_IN_REGRA_PSTC_CORREN,col("vl_slr_min") * f"{FATOR_MULTIPLICACAO}")\
                                                           .otherwise(lit(0))\
                                     )\
                          .withColumn("in_calc_pstc", F.when(EXPR_IN_REGRA_PSTC_CORREN,lit(True))\
                                                       .otherwise(lit(False))\
                                     )
    
    
    print("Entrando na calculaPrestacao")
               
    RegraPstcCorrenDF = calculaPrestacao(RegraPstcCorrenDF,"Corren")
    
    
    RegraPstcCorrenDF = RegraPstcCorrenDF.withColumn("vl_pstc_cdc_clcd", F.when(EXPR_IN_REGRA_PSTC_CORREN & EXPR_VL_PSTC_CDC_CLCD, lit(9999999.99))\
                                                                          .when(EXPR_IN_REGRA_PSTC_CORREN & EXPR_VL_PSTC_CDC_CLCD_1, col("vl_lim_ref_045"))\
                                                                          .otherwise(col("vl_pstc_cdc_clcd"))\
                                                    )\
    
    RegraPstcCorrenDF = RegraPstcCorrenDF.withColumn("vl_pstc_cdc_clcd",F.when(EXPR_IN_REGRA_PSTC_CORREN,CALC_VL_PSTC_CDC_CLCD)\
                                                                         .otherwise(col("vl_pstc_cdc_clcd"))\
                                                    )\
                                         .withColumn("wgda_vl_ren_brto",F.when(EXPR_IN_REGRA_PSTC_CORREN & EXPR_WGDA_VL_REN_BRTO,col("vl_ren_brto"))\
                                                                         .otherwise(col("wgda_vl_ren_brto"))\
                                                    )\
                                         .withColumn("wgda_vl_ren_brto",F.when(EXPR_IN_REGRA_PSTC_CORREN\
                                                                                 & EXPR_WGDA_VL_REN_BRTO\
                                                                                   & (EXPR_IN_PGT | EXPR_IN_INSS)\
                                                                                     & EXPR_VLR_BRUTO,col("vl_rend_lqdo_ttl") * 1.25\
                                                                              )\
                                                                         .otherwise(col("wgda_vl_ren_brto"))\
                                                    )\
 
    
    RegraPstcCorrenDF = RegraPstcCorrenDF.select("*",
                                                  (F.when(EXPR_IN_REGRA_PSTC_CORREN & EXPR_WGDA_VL_REN_BRTO & EXPR_IN_RECOMP_RENDA,lit(True))\
                                                     .otherwise(lit(False))\
                                                   ).alias("in_recomp_renda"),
                                                  (F.when(EXPR_IN_REGRA_PSTC_CORREN, lit(True))\
                                                    .otherwise(lit(False))\
                                                  ).alias("in_trata_sn_alerta"),
                                               )
                                         
                                         
    print("Entrando na funcao RecompRendaLqda")                                             
                                                 
    RegraPstcCorrenDF = RecompRendaLqda(RegraPstcCorrenDF)
    
    print("Entrando na funcao trata_Snl_Alerta")
    
    RegraPstcCorrenDF = trata_Snl_Alerta(RegraPstcCorrenDF)
            
    RegraPstcCorrenDF = RegraPstcCorrenDF.withColumn("vl_pstc_cdc_apvd", F.when(EXPR_IN_REGRA_PSTC_CORREN & EXPR_VL_PSTC_CDC_APVD, col("wgda_teto_pstc_cdc"))\
                                                                          .otherwise(col("vl_pstc_cdc_apvd"))\
                                                    )
    RegraPstcCorrenDF = RegraPstcCorrenDF.withColumn("vl_pstc_cdc_apvd", F.when(EXPR_IN_REGRA_PSTC_CORREN & EXPR_VL_PSTC_CDC_APVD_1, col("vl_lim_ref_045"))\
                                                                          .otherwise(col("vl_pstc_cdc_apvd"))\
                                                    )
    
    RegraPstcCorrenDF.cache()
    
    print("Entrando na funcao zera_Prestacoes")
    
    RegraPstcCorrenDF = zera_Prestacoes(RegraPstcCorrenDF)
    
    RegraPstcCorrenDF.unpersist()
    
    print(f"Saindo da função RegraPstcCorren tempo de excução: {time.time() - start_time} ")

                                                                            
    return RegraPstcCorrenDF
    

### AplicaRegraPrestacaoProven

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time


def aplicaRegraPstcProven(df):
    '''
        Calcula a prestação de crédito direto ao consumidor proventista

        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado

                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def ancs0160
            funcoes acionadas:
                def calculaPrestacao
                
    '''
    
    # 1 - O código calcula o numeral inteiro do valor da prestação de crédito direto ao consumidor proventista
    # 2 - Se a variável de valor da prestação de crédito direto ao consumidor proventista for maior que 9999999,99, a variável recebe 9999999,99
    # 3 - Se o valor da prestação de crédito direto ao consumidor proventista for maior que o limite referência X, a variável recebe o valor do limite referência X
    
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    start_time = time.time()

        
    EXPR_PSTC_CDC_PRTT = expr(" vl_pstc_cdc_prtt > 9999999.99 ")
    EXPR_PSTC_CDC_PRTT_1 = expr(" vl_pstc_cdc_prtt > vl_lim_ref_045 ")
    EXPR_IN_ANCS0160 = expr("in_regra_pstc_corren")
    
    
    

    # Inicialização 
    
    RegraPstcProvenDF = df.withColumn("in_calc_pstc",F.when(EXPR_IN_ANCS0160,lit(True))\
                                                      .otherwise(lit(False))\
                                     )
    
    print("Entrando na funcao calculaPrestacao")
    
    RegraPstcProvenDF = calculaPrestacao(RegraPstcProvenDF, "Proven")

    RegraPstcProvenDF = RegraPstcProvenDF.withColumn("vl_pstc_cdc_prtt", F.when(EXPR_IN_ANCS0160 & EXPR_PSTC_CDC_PRTT, lit(9999999.99))\
                                                                          .when(EXPR_IN_ANCS0160 & EXPR_PSTC_CDC_PRTT_1, col("vl_lim_ref_045"))\
                                                                          .otherwise(col("vl_pstc_cdc_prtt"))\
                                                    )
    
    print(f"Saindo da função RegraPstcProven tempo de excução: {time.time() - start_time} ")


    return RegraPstcProvenDF

### AplicaRegraPrestacaoVeiculo

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time

def aplicarRegraPstcVclo(df):
    '''
        O código calcula valor do sublimite de veiculo calculado (vl_pstc_vclo_clcd) e aprovado  (vl_pstc_vclo_apvd) 
        As variaveis ws_vl_pstc_clcd_021 e ws_vl_pstc_far_apvd_021 são calculadas e servem de insumos para o vl_pstc_vclo_apvd 


        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def ANCS0160
                def ProcessaParcelado
            funcoes acionadas:
                def recomp_Renda_Lqda
                def CalculoPstcCdc
                def TrataSnlAlerta
                
    '''
    
    # 1 - O código calcula valor do sublimite de veiculo calculado (vl_pstc_vclo_clcd) e aprovado  (vl_pstc_vclo_apvd) 
    # 2 - As variaveis ws_vl_pstc_clcd_021 e ws_vl_pstc_far_apvd_021 são calculadas e servem de insumos para o vl_pstc_vclo_apvd
    
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    start_time = time.time()
        
    EXPR_VL_PSTC_VCLO_CLCD = expr(" in_cli_anot_348 = 'S' and in_cli_adpc_rhbl <> 1 ")
    EXPR_VL_PSTC_VCLO_CLCD_1 = expr(" (in_ctr_rsct = 1 and in_ptdd = 'N') or in_ctra_52_18 = 1 or in_ctra_349 = 1 or in_ctra_349_12 = 1")
    EXPR_VL_PSTC_VCLO_CLCD_2 = expr(" in_bco_pstl = 'S' and ((cd_sgm_anl = 19) or (cd_mtdl_anl_crd = 52 and cd_tip_bnf = 4)) ")
    EXPR_VL_PSTC_VCLO_CLCD_3 = expr(" vl_pstc_vclo_clcd > vl_lim_ref_060 ")
    EXPR_VL_PSTC_VCLO_CLCD_4 = expr(" vl_ttl_pcl_fin > vl_lim_ref_060 and vl_pstc_vclo_apvd > vl_ttl_pcl_fin ")    
    EXPR_VL_PSTC_VCLO_APVD = expr(" (in_ctr_rsct = 1 and in_ptdd = 'N') or in_ctra_52_18 = 1 or in_ctra_349 = 1 or in_ctra_349_12 = 1")
    EXPR_VL_PSTC_VCLO_APVD_2 = expr(" in_bco_pstl = 'S' and ((cd_sgm_anl = 19) or (cd_mtdl_anl_crd = 52 and cd_tip_bnf = 4)) ")
    EXPR_VL_PSTC_VCLO_APVD_3 = expr(" vl_pstc_vclo_apvd > vl_lim_ref_060 ")
    EXPR_VL_PSTC_VCLO_APVD_4 = expr(" vl_ttl_pcl_fin > vl_lim_ref_060 and vl_pstc_vclo_apvd > vl_ttl_pcl_fin ")
    EXPR_ELSE_PSTC_VCLO_APVD_1 = expr(" vl_ttl_pcl_fin > vl_lim_ref_060 ")
    EXPR_ELSE_PSTC_VCLO_APVD_2 = expr(" vl_pstc_vclo_apvd > vl_ttl_pcl_fin ")
    EXPR_ELSE_PSTC_VCLO_APVD_3 = expr(" vl_pstc_vclo_apvd > vl_lim_ref_060 ")
    EXPR_IN_TRATA_SN_ALERTA = expr(" cd_status_cli = 2 ")
    EXPR_STATUS = expr(" cd_status_cli = 2 ")
    EXPR_CD_NTZ_OCP = expr(" cd_ntz_ocp in (7,10,11,16) ")
    EXPR_IN_PTDD = expr(" in_ptdd = 'S' ")
    EXPR_RECONSIDERACAO = expr(" cd_crct_1682 = 1 ")
    EXPR_ANKCCF1 = expr(" in_bnf_astl = 'S' and in_pgt = 0 ")
    EXPR_REGRAVCLO = expr(" in_RegraPstcVclo ")


    
    
    
    # Inicialização 
    
    RegraPstcVcloDF = df.withColumn("vl_pstc_vclo_clcd", F.when(EXPR_REGRAVCLO & EXPR_VL_PSTC_VCLO_CLCD, lit(0))\
                                                          .otherwise(col("vl_pstc_vclo_clcd"))\
                                   )\
                        .withColumn("vl_pstc_vclo_apvd", F.when(EXPR_REGRAVCLO & EXPR_VL_PSTC_VCLO_CLCD, lit(0))\
                                                          .otherwise(col("vl_pstc_vclo_apvd"))
                                   )\
                        .withColumn("in_trata_sn_alerta_vclo",F.when(EXPR_REGRAVCLO & EXPR_IN_TRATA_SN_ALERTA & ~EXPR_VL_PSTC_VCLO_CLCD, lit(True))\
                                                               .otherwise(lit(False))\
                                   )\
                        .withColumn("in_calc_pstc",      F.when(EXPR_REGRAVCLO & EXPR_IN_TRATA_SN_ALERTA & ~EXPR_VL_PSTC_VCLO_CLCD,lit(True))\
                                                          .otherwise(lit(False))
                                   )
    
    print("Entrando na funcao calculaPrestacaoVCLO")
    
    RegraPstcVcloDF = calculaPrestacao(RegraPstcVcloDF, "Vclo")
    
    print("Entrando na funcao calculaPrestacaoVCLO")

    RegraPstcVcloDF = trata_Snl_Alerta_Vclo(RegraPstcVcloDF)
        
    RegraPstcVcloDF = RegraPstcVcloDF.withColumn("vl_pstc_vclo_clcd", F.when(EXPR_REGRAVCLO & EXPR_CD_NTZ_OCP & EXPR_STATUS & ~EXPR_VL_PSTC_VCLO_CLCD, lit(0))\
                                                                       .otherwise(col("vl_pstc_vclo_clcd"))\
                                                )\
                                     .withColumn("vl_pstc_vclo_apvd", F.when(EXPR_REGRAVCLO & EXPR_CD_NTZ_OCP & EXPR_STATUS & ~EXPR_VL_PSTC_VCLO_CLCD, lit(0))\
                                                                       .otherwise(col("vl_pstc_vclo_apvd"))\
                                                )\
                                     .withColumn("in_ctr_rsct",       F.when(EXPR_REGRAVCLO & EXPR_IN_PTDD & EXPR_STATUS & ~EXPR_VL_PSTC_VCLO_CLCD, lit(0))\
                                                                       .otherwise(col("in_ctr_rsct"))\
                                                )\
                                     .withColumn("in_ctra_52_18",     F.when(EXPR_REGRAVCLO & EXPR_IN_PTDD & EXPR_STATUS & ~EXPR_VL_PSTC_VCLO_CLCD, lit(0))\
                                                                       .otherwise(col("in_ctra_52_18"))\
                                                )\
                                     .withColumn("vl_pstc_vclo_clcd", F.when(EXPR_REGRAVCLO & EXPR_VL_PSTC_VCLO_CLCD_1 & EXPR_STATUS & ~EXPR_VL_PSTC_VCLO_CLCD, lit(0))\
                                                                       .otherwise(col("vl_pstc_vclo_clcd"))\
                                                )\
                                     .withColumn("vl_pstc_vclo_apvd", F.when(EXPR_REGRAVCLO & EXPR_VL_PSTC_VCLO_APVD & EXPR_STATUS & ~EXPR_VL_PSTC_VCLO_CLCD, lit(0))\
                                                                       .otherwise(col("vl_pstc_vclo_apvd"))\
                                                )
        
    RegraPstcVcloDF = RegraPstcVcloDF.withColumn("vl_pstc_vclo_clcd", F.when(EXPR_REGRAVCLO & ~EXPR_STATUS & ~EXPR_VL_PSTC_VCLO_CLCD, lit(0))\
                                                                        .otherwise(col("vl_pstc_vclo_clcd"))\
                                                 )\
                                      .withColumn("vl_pstc_vclo_apvd", F.when(EXPR_REGRAVCLO & ~EXPR_STATUS & ~EXPR_VL_PSTC_VCLO_CLCD, lit(0))\
                                                                        .otherwise(col("vl_pstc_vclo_apvd"))\
                                                                                  )\
        
        
    RegraPstcVcloDF = RegraPstcVcloDF.withColumn("vl_pstc_vclo_clcd", F.when(EXPR_REGRAVCLO & EXPR_VL_PSTC_VCLO_CLCD_2 & ~EXPR_VL_PSTC_VCLO_CLCD, lit(0))\
                                                                        .otherwise(col("vl_pstc_vclo_clcd"))\
                                                 )\
                                      .withColumn("vl_pstc_vclo_apvd", F.when(EXPR_REGRAVCLO & EXPR_VL_PSTC_VCLO_APVD_2 & ~EXPR_VL_PSTC_VCLO_CLCD, lit(0))\
                                                                        .otherwise(col("vl_pstc_vclo_apvd"))\
                                                 )\
            
    RegraPstcVcloDF = RegraPstcVcloDF.withColumn("vl_pstc_vclo_clcd", F.when(EXPR_REGRAVCLO & EXPR_VL_PSTC_VCLO_CLCD_3 & ~EXPR_VL_PSTC_VCLO_CLCD, col("vl_lim_ref_060"))\
                                                                        .otherwise(col("vl_pstc_vclo_clcd"))\
                                                 )\
        
        
    RegraPstcVcloDF = RegraPstcVcloDF.withColumn("vl_pstc_vclo_apvd", F.when(EXPR_REGRAVCLO & EXPR_VL_PSTC_VCLO_APVD_4 & ~EXPR_VL_PSTC_VCLO_CLCD, col("vl_ttl_pcl_fin"))\
                                                                       .otherwise(col("vl_pstc_vclo_apvd"))\
                                                )\
                                     .withColumn("vl_pstc_vclo_apvd", F.when(EXPR_REGRAVCLO & ~EXPR_ELSE_PSTC_VCLO_APVD_1 & EXPR_ELSE_PSTC_VCLO_APVD_3 & ~EXPR_VL_PSTC_VCLO_CLCD, col("vl_lim_ref_060"))\
                                                                       .otherwise(col("vl_pstc_vclo_apvd"))\
                                                )\
                                                 
    #INCLUIR REGRA DO ANCKCCF1-IN-AUX-EMER AQUI. FALTA INCLUIR A VARIAVEL DE RECONSIDERAÇÃO FUTURAMENTE.
    
    RegraPstcVcloDF = RegraPstcVcloDF.withColumn("vl_pstc_vclo_apvd", F.when(EXPR_REGRAVCLO & EXPR_RECONSIDERACAO & ~EXPR_VL_PSTC_VCLO_CLCD, lit(0))\
                                                                       .otherwise(col("vl_pstc_vclo_apvd"))\
                                                )
        
    RegraPstcVcloDF = RegraPstcVcloDF.withColumn("vl_pstc_vclo_apvd", F.when(EXPR_REGRAVCLO & EXPR_ANKCCF1 & ~EXPR_VL_PSTC_VCLO_CLCD,lit(0))\
                                                                       .otherwise(col("vl_pstc_vclo_apvd"))\
                                                )
    
    print(f"Saindo da função RegraPstcVclo tempo de excução: {time.time() - start_time} ")


    return RegraPstcVcloDF

### ProcessaCalculo

In [None]:
%%spark

import pyspark.sql.functions as F
import time 

def processaCalculo(df,prd,tp):
    '''
        Possuem as regras especificas para seus produtos, atribuindo valores referência, zerando ou enviando para o processa 
        cálculo (processaCalculo) e posteriormente enviando para o cálculo do valor do sublimite (CalculoSlim)   
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            prd = produto 
            tp = pode ser calculado ou aprovado se vim vazio será os dois  
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def  processaParcelado
                def  CalculoPrtfCdc
            funcoes acionadas:
                def  calculaSLim
                
    '''
    # recebimento dos parametros
    
    start_time = time.time()

    
    tipo = tp
    
    ecf = [137, 140, 155, 162, 171, 179, 228]    

    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    EXPR_IN_PROCESSA_CALCULO = expr("in_ProcessaCalculo")
    # expressoes matematicas utulizadas nas contas conforme condicao 
        
    
    # Inicialização 
                                            
    
   
    if prd in [137, 155, 179]:
        if tipo == 'apvd':
            processaCalculoDF = df.withColumn("aux_pstc_slim",F.when(EXPR_IN_PROCESSA_CALCULO,col('vl_pstc_ecf_demais').cast(IntegerType()) * col('vl_ftr_ajst_pstc_cgn_flh'))\
                                                               .otherwise(col("aux_pstc_slim"))\
                                             )
        else:
            processaCalculoDF = df.withColumn("aux_pstc_slim",F.when(EXPR_IN_PROCESSA_CALCULO, col("vl_pstc_ecf_demais").cast(IntegerType()))\
                                                               .otherwise(col("aux_pstc_slim"))\
                                             )
            
    elif prd == 171:
        processaCalculoDF = df.withColumn("aux_pstc_slim",F.when(EXPR_IN_PROCESSA_CALCULO,col(f'vl_pstc_crt_{tipo}').cast(IntegerType()))\
                                                           .otherwise(col("aux_pstc_slim"))\
                                         )
        
    elif prd in [140, 157, 162, 228]:
         processaCalculoDF = df.withColumn("aux_pstc_slim",F.when(EXPR_IN_PROCESSA_CALCULO,col(f'vl_pstc_fun_bb_{tipo}').cast(IntegerType()))\
                                                            .otherwise(col("aux_pstc_slim"))\
                                          )
            
    
    elif prd in [116, 119, 131, 142, 156, 159, 160, 161, 163, 173]:
        processaCalculoDF = df.withColumn("aux_pstc_slim",F.when(EXPR_IN_PROCESSA_CALCULO,col(f'vl_pstc_cdc_{tipo}').cast(IntegerType()))\
                                                           .otherwise(col("aux_pstc_slim"))\
                                         )
        
    elif prd in [83, 115]:
        processaCalculoDF = df.withColumn("aux_pstc_slim",F.when(EXPR_IN_PROCESSA_CALCULO,col(f'ws_psct_cdc_{str(prd)}_c').cast(IntegerType()))\
                                                           .otherwise(col("aux_pstc_slim"))\
                                         )
        
    
    processaCalculoDF = processaCalculoDF.withColumn("aux_mtp_slim", F.when(EXPR_IN_PROCESSA_CALCULO,col(f"vl_mtp_clc_slim_{str(prd)}"))\
                                                                      .otherwise(col("aux_mtp_slim"))\
                                                    )\
                                         .withColumn("aux_cd_slim", F.when(EXPR_IN_PROCESSA_CALCULO,str(prd))\
                                                                     .otherwise(col("aux_cd_slim"))\
                                                    )\
                                         .withColumn("aux_far_slim", F.when(EXPR_IN_PROCESSA_CALCULO,lit(1))\
                                                                      .otherwise(col("aux_far_slim"))\
                                                    )\
                                         .withColumn("aux_vl_lim_ref", F.when(EXPR_IN_PROCESSA_CALCULO,col(f"vl_lim_ref_{str(prd)}"))\
                                                                        .otherwise(col("aux_vl_lim_ref"))\
                                                    )\
    
    if prd == 160 and tipo == 'apvd':
        processaCalculoDF = processaCalculoDF.withColumn("aux_redt_vcli", F.when(EXPR_IN_PROCESSA_CALCULO,col(f"redt_slim_{str(prd)}"))\
                                                                           .otherwise(col("aux_redt_vcli"))\
                                                        )
    elif prd in [83, 115, 116, 142] and tipo == 'apvd':
        processaCalculoDF = processaCalculoDF.withColumn("aux_redt_vcli", F.when(EXPR_IN_PROCESSA_CALCULO,col(f"redt_slim_{str(prd)}_vs_cli"))\
                                                                           .otherwise(col("aux_redt_vcli"))\
                                                        )
    else:
        processaCalculoDF = processaCalculoDF.withColumn("aux_redt_vcli", F.when(EXPR_IN_PROCESSA_CALCULO, lit(1))\
                                                                           .otherwise(col("aux_redt_vcli"))\
                                                        )
        
    if prd in ecf:
        
        print("Entrando na funcao calculaSlim - regra 1 ")
        processaCalculoDF = processaCalculoDF.withColumn("in_CalculaSlim_regra1", F.when(EXPR_IN_PROCESSA_CALCULO,lit(True))\
                                                                                   .otherwise(lit(False))\
                                                        )
        processaCalculoDF = calculaSlim(processaCalculoDF,1)
        
        
    else:
                
        print("Entrando na funcao calculaSlim - regra none ")
        processaCalculoDF = processaCalculoDF.withColumn("in_CalculaSlim_regra_outros", F.when(EXPR_IN_PROCESSA_CALCULO,lit(True))\
                                                                                         .otherwise(lit(False))\
                                                        )
        processaCalculoDF = calculaSlim(processaCalculoDF,0)
        
    if (prd in [137, 140, 155, 179, 228]) and (tipo == 'apvd'):
        
        processaCalculoDF = processaCalculoDF.withColumn(f"vl_slim_{str(prd)}_{tipo}", F.when(EXPR_IN_PROCESSA_CALCULO,col("aux_vl_slim") * col(f"vl_ftr_ajst_slim_{str(prd)}"))\
                                                                                        .otherwise(col(f"vl_slim_{str(prd)}_{tipo}"))\
                                                        )
    else:
        processaCalculoDF = processaCalculoDF.withColumn(f"vl_slim_{str(prd)}_{tipo}", F.when(EXPR_IN_PROCESSA_CALCULO,col("aux_vl_slim"))\
                                                                                        .otherwise(col(f"vl_slim_{str(prd)}_{tipo}"))\
                                                        )
        
    print(f"Saindo da função processaCalculo tempo de excução: {time.time() - start_time} ")

                                            
    return processaCalculoDF

### SublimiteVeiculo

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time

def slim_Vclo(df):
    '''
        Funcao especifica para tratar do sublimite do produto carro possuem as regras especificas para seus produtos, atribuindo valores referência, zerando ou enviando 
        para o processa cálculo (“processaCalculo”) e posteriormente enviando para o cálculo do valor do sublimite (“CalculoSlim”)
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def  ProcessaParcelado
            funcoes acionadas:
                def  calculaSLim
                
    '''
    
    # 1 - Para cliente banco postal, slim veículo igual a zero
    # 2 - Regra enviada no dia 22/10/2014. Liberar os valores Veiculos
    # 3 - Nao considerando a regra de bco postal (565)
    # 4 - OBS No book a variavel self.dicio['in_rcdr'] está setada com 12
    # 5 - Aqui ele faz uma comparação para saber se a execução é em Batch

    
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    EXPR_VL_SLIM_APVD =          expr("in_bco_pstl = 'S' and (cd_sgm_anl = 19 or (cd_mtdl_anl_crd = 52 and cd_tip_bnf = 4)) ")
    EXPR_VL_SLIM_21_CLCD =       expr("vl_slim_21_clcd > vl_lim_ref_21")
    EXPR_VL_SLIM_117_CLCD =      expr("vl_slim_117_clcd > vl_lim_ref_117")
    EXPR_VL_SLIM_21_APVD =       expr("vl_slim_21_utzd > vl_slim_21_apvd")
    EXPR_VL_SLIM_117_APVD =      expr("vl_slim_117_utzd > vl_slim_117_apvd")
    EXPR_VL_SLIM_21_APVD_1 =     expr("vl_slim_21_apvd > vl_lim_ref_21")
    EXPR_VL_SLIM_117_APVD_1 =    expr("vl_slim_117_apvd > vl_lim_ref_117")
    EXPR_VL_SLIM_21_UTZD =       expr("vl_slim_21_utzd > vl_lim_ref_21")
    EXPR_VL_SLIM_117_UTZD =      expr("vl_slim_117_utzd > vl_lim_ref_117")
    EXPR_VL_SLIM_UTZD_APVD_21 =  expr(" vl_slim_21_utzd > vl_lim_ref_21 and vl_slim_21_apvd > vl_slim_21_utzd ")
    EXPR_VL_SLIM_UTZD_APVD_117 = expr(" vl_slim_117_utzd > vl_lim_ref_117 and vl_slim_117_apvd > vl_slim_117_utzd ")
    EXPR_CD_RSCO =               expr(" cd_rsco_05 = 'E' and cd_rsco_5_prvr = 'D' ")
    EXPR_IDADE =                 expr("qt_idd > 85")
    EXPR_FUTURAMENTE =           expr(" in_bnf_astl = 'S' and in_pgt = 0 ")
    EXPR_VL_SLIM_ELSE_21 =       expr(" vl_slim_21_apvd > vl_slim_21_utzd ")
    EXPR_VL_SLIM_ELSE_117 =      expr(" vl_slim_117_apvd > vl_slim_117_utzd ")


    
    start_time = time.time()

    
    # Inicialização 
    
    slim_VcloDF = df.withColumn("vl_slim_21_apvd",   F.when(EXPR_VL_SLIM_APVD, lit(0))\
                                                      .when(EXPR_IDADE,        lit(0))\
                                                      .otherwise(col("vl_slim_21_apvd"))\
                               )\
                    .withColumn("vl_slim_21_clcd",   F.when(EXPR_VL_SLIM_APVD, lit(0))\
                                                      .when(EXPR_IDADE,        lit(0))\
                                                      .otherwise(col("vl_slim_21_clcd"))\
                               )\
                    .withColumn("vl_slim_117_apvd",  F.when(EXPR_VL_SLIM_APVD, lit(0))\
                                                      .when(EXPR_IDADE,        lit(0))\
                                                      .otherwise(col("vl_slim_117_apvd"))\
                               )\
                    .withColumn("vl_slim_117_clcd",  F.when(EXPR_VL_SLIM_APVD, lit(0))\
                                                      .when(EXPR_IDADE,        lit(0))\
                                                      .otherwise(col("vl_slim_117_clcd"))\
                               )\
                    .withColumn("vl_pstc_vclo_apvd", F.when(EXPR_VL_SLIM_APVD, lit(0))\
                                                      .when(EXPR_IDADE,        lit(0))\
                                                      .otherwise(col("vl_pstc_vclo_apvd"))\
                               )\
                    .withColumn("vl_pstc_vclo_clcd", F.when(EXPR_VL_SLIM_APVD, lit(0))\
                                                      .when(EXPR_IDADE,        lit(0))\
                                                      .otherwise(col("vl_pstc_vclo_clcd"))\
                               )\
                    .withColumn("in_CalculaSlim_regra2",lit(True)\
                               )
    
    print("Entrando na função calculaSlim ")


    slim_VcloDF = calculaSlim(slim_VcloDF,2)
    slim_VcloDF.cache()
    
    slim_VcloDF= slim_VcloDF.withColumn("vl_slim_21_clcd",  F.when(EXPR_VL_SLIM_21_CLCD,col("vl_lim_ref_21"))\
                                                             .otherwise(col("vl_slim_21_clcd"))\
                                       )\
                            .withColumn("vl_slim_117_clcd", F.when(EXPR_VL_SLIM_117_CLCD,col("vl_lim_ref_117"))\
                                                             .otherwise(col("vl_slim_117_clcd"))\

                                       )\
                            .withColumn("vl_slim_21_apvd",  F.when(EXPR_VL_SLIM_21_APVD,col("vl_slim_21_utzd"))\
                                                             .otherwise(col("vl_slim_21_apvd"))\

                                       )\
                            .withColumn("vl_slim_117_apvd", F.when(EXPR_VL_SLIM_117_APVD,col("vl_slim_117_utzd"))\
                                                             .otherwise(col("vl_slim_117_apvd"))\

                                       )\
                            .withColumn("vl_slim_21_apvd",  F.when(EXPR_VL_SLIM_UTZD_APVD_21,col("vl_slim_21_utzd"))\
                                                             .when(~EXPR_VL_SLIM_21_UTZD & EXPR_VL_SLIM_21_APVD_1, col("vl_lim_ref_21"))\
                                                             .otherwise(col("vl_slim_21_apvd"))\

                                       )\
                            .withColumn("vl_slim_117_apvd", F.when(EXPR_VL_SLIM_UTZD_APVD_117,col("vl_slim_117_utzd"))\
                                                             .when(~EXPR_VL_SLIM_117_UTZD & EXPR_VL_SLIM_117_APVD_1, col("vl_lim_ref_117"))\
                                                             .otherwise(col("vl_slim_117_apvd"))\

                                       )\
                            .withColumn("vl_slim_21_apvd",  F.when(EXPR_CD_RSCO,lit(0))\
                                                             .otherwise(col("vl_slim_21_apvd"))\
                                       )\
                            .withColumn("vl_slim_117_apvd", F.when(EXPR_CD_RSCO,lit(0))\
                                                             .otherwise(col("vl_slim_117_apvd"))\
                                       )\
                            .withColumn("vl_slim_21_clcd",  F.when(EXPR_CD_RSCO,lit(0))\
                                                             .otherwise(col("vl_slim_21_clcd"))\
                                       )\
                            .withColumn("vl_slim_117_clcd", F.when(EXPR_CD_RSCO,lit(0))
                                                             .otherwise(col("vl_slim_117_clcd"))\
                                       )


    #INCLUIR REGRA DO ANCKCCF1-IN-AUX-EMER AQUI. FALTA INCLUIR A VARIAVEL DE RECONSIDERAÇÃO FUTURAMENTE.
    
    slim_VcloDF = slim_VcloDF.withColumn("vl_pstc_vclo_apvd" ,F.when(EXPR_FUTURAMENTE, lit(0))\
                                                               .otherwise(col("vl_pstc_vclo_apvd"))\
                                        )\
                             .withColumn("vl_slim_21_apvd" ,  F.when(EXPR_FUTURAMENTE, lit(0))\
                                                               .otherwise(col("vl_slim_21_apvd"))\

                                        )\
                             .withColumn("vl_slim_117_apvd" , F.when(EXPR_FUTURAMENTE, lit(0))\
                                                               .otherwise(col("vl_slim_117_apvd"))\

                                        )
    
    print(f"Saindo da função slim_Vclo tempo de excução: {time.time() - start_time} ")
    
    return slim_VcloDF

### SublimiteFuncionario

In [None]:
%%time
%%spark

import pyspark.sql.functions as F
import time


def slim_Funci(df):
    '''
        Funcao especifica para tratar do sublimite do produto funcionario possuem as regras especificas para seus produtos, atribuindo valores referência, zerando ou enviando 
        para o processa cálculo (“processaCalculo”) e posteriormente enviando para o cálculo do valor do sublimite (“CalculoSlim”)  
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def  processaParcelado
            funcoes acionadas:
                def  calculaSLim
                
    '''

    
    # Inicialização 
    
    start_time = time.time()
    
    slim_FunciDF = df.drop("vl_ftr_ajst_slim_228")\
                     .select("*",
                             col("vl_lim_ref_140").alias("vl_lim_ref_228"),
                             col("vl_mtp_clc_slim_140").alias("vl_mtp_clc_slim_228"),
                             col("vl_ftr_ajst_slim_140").alias("vl_ftr_ajst_slim_228"),
                             )
    

    slim_FunciDF = processaCalculo(slim_FunciDF,140,"apvd")
    slim_FunciDF = processaCalculo(slim_FunciDF,140,"clcd")
    slim_FunciDF = processaCalculo(slim_FunciDF,157,"apvd")
    slim_FunciDF = processaCalculo(slim_FunciDF,157,"clcd")
    slim_FunciDF.cache()
    slim_FunciDF = processaCalculo(slim_FunciDF,162,"apvd")
    slim_FunciDF = processaCalculo(slim_FunciDF,162,"clcd")
    slim_FunciDF = processaCalculo(slim_FunciDF,228,"apvd")
    slim_FunciDF = processaCalculo(slim_FunciDF,228,"clcd")


    
            
    print(f"Saindo da função slim_Funci tempo de excução: {time.time() - start_time} ")
    
    return slim_FunciDF




### SublimiteConsignado

In [None]:
%%spark

import pyspark.sql.functions as F
import time


def slim_ecf(df):
    '''
        Possuem as regras especificas para seus produtos, atribuindo valores referência, zerando ou enviando para o processa 
        cálculo (processaCalculo) e posteriormente enviando para o cálculo do valor do sublimite (CalculoSlim)   
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado

                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def  processaParcelado
            funcoes acionadas:
                def  calculaSLim
                
    '''
    
    start_time = time.time()
    
    # constantes usadas nos calculos 
    
    FATOR_MULTI_VL80 = 0.80
    
    
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    EXPR_VL_SLIM_155_UTZD_1 = expr(" vl_slim_155_utzd > vl_slim_155_apvd ")
    EXPR_IN_PROCESSA_CALCULO_171 = expr(" ic_mtp_cvn_crt > 0 ")
    EXPR_IN_CTR = expr(" in_ctr_rsct = 1 and in_ptdd = 'S' ")
    EXPR_QT_ID_E_VL_REND = expr(" qt_idd > 85 and vl_rend_lqdo_ttl < 20000 ")
    EXPR_VL_PSTC_ECF = expr(" (in_ctr_rsct = 1 and cd_prd_rsct = 338 and cd_mdld_rsct = 18) or in_ctra_349 = 1 or in_ctra_349_12 = 1 ")
    EXPR_VL_PSTC_ECF_REC = expr(" vl_pstc_ecf_rec > 0 and vl_pstc_ecf_rec = 0.01 ")
    EXPR_VL_PSTC_ECF_ELSE = expr(" vl_pstc_ecf_rec > 0 and vl_pstc_ecf_rec <> 0.01 ")
    EXPR_VL_SLIM_155_APVD = expr(" vl_slim_155_apvd = 0 ")
    EXPR_VL_SLIM_155_UTZD = expr(" vl_slim_155_apvd = 0 and vl_slim_137_apvd = 0 and vl_slim_171_apvd = 0 and vl_slim_179_apvd = 0 ")
    EXPR_VL_SLIM_137_APVD = expr(" vl_slim_137_apvd > 0 ")
    EXPR_VL_SLIM_171_APVD = expr(" vl_slim_171_apvd > 0 ")
    EXPR_VL_SLIM_179_UTZD = expr(" vl_slim_179_utzd > vl_slim_179_apvd ")
    
    # expressoes matematicas utulizadas nas contas conforme condicao 
    
    CALC_PSTC_ECF_080 = expr(f" vl_pstc_ecf_apvd * {FATOR_MULTI_VL80} ")


    # Inicialização 
       
    slim_ecfDF = df.withColumn("vl_mtp_clc_slim_155",col("vl_mtp_clc_slim_137"))\
                   .withColumn("vl_lim_ref_155",col("vl_lim_ref_137"))\
                   .withColumn("vl_mtp_clc_slim_179",col("vl_mtp_clc_slim_137"))\
                   .withColumn("vl_lim_ref_179",col("vl_lim_ref_137"))\
                   .withColumn("vl_ftr_ajst_slim_179",col("vl_ftr_ajst_slim_137"))\
                   .withColumn("in_ctr_52_18",lit(0))\
    
                   
    slim_ecfDF = slim_ecfDF.withColumn("in_ProcessaCalculo", lit(True)\
                                      )
    
    # Calculando o sublimite calculado e aprovado para os produtos 137 e 155
    
    print("Entrando na processaCalculo - 137 apvd")
        
    slim_ecfDF = processaCalculo(slim_ecfDF,137,"apvd")
    
    print("Entrando na processaCalculo - 137 clcd")

    slim_ecfDF = processaCalculo(slim_ecfDF,137,"clcd")
    
    slim_ecfDF.cache()
    
    print("Entrando na processaCalculo - 155 apvd")
    
    slim_ecfDF = processaCalculo(slim_ecfDF,155,"apvd")
    
    print("Entrando na processaCalculo - 155 clcd")
    
    slim_ecfDF = processaCalculo(slim_ecfDF,155,"clcd")
        
    slim_ecfDF = slim_ecfDF.drop("in_ProcessaCalculo")\
                           .withColumn("vl_slim_155_apvd",F.when(EXPR_VL_SLIM_155_UTZD_1,col("vl_slim_155_utzd"))\
                                                           .otherwise(col("vl_slim_155_apvd"))\
                                      )\
                           .select("*",
                                   (F.when(EXPR_IN_PROCESSA_CALCULO_171,lit(True))\
                                     .otherwise(lit(False))\
                                   ).alias("in_ProcessaCalculo")
                                  )
    
    print("Entrando na processaCalculo - 171 apvd")

    slim_ecfDF = processaCalculo(slim_ecfDF,171,"apvd")
    
    
    print("Entrando na processaCalculo - 171 clcd")

    
    slim_ecfDF = processaCalculo(slim_ecfDF,171,"clcd")
    
    slim_ecfDF.cache()
    
    slim_ecfDF = slim_ecfDF.drop("vl_pstc_ttl_ecf_org")\
                           .withColumn("in_ctr_rsct",F.when(EXPR_IN_CTR,lit(0))\
                                                      .otherwise(col("in_ctr_rsct"))\
                                      )\
                           .withColumn("vl_pstc_ttl_ecf_org",col("vl_pstc_ttl_ecf_apvd")\
                                      )\
                           .withColumn("in_ctr_52_18",F.when(EXPR_IN_CTR,lit(0))\
                                                       .otherwise(col("in_ctr_52_18"))\
                                      )
        
    slim_ecfDF = slim_ecfDF.withColumn("vl_pstc_ecf_clcd",F.when(~EXPR_QT_ID_E_VL_REND & EXPR_VL_PSTC_ECF,col("vl_pstc_ttl_ecf_org"))\
                                                           .otherwise(col("vl_pstc_ecf_clcd"))\
                                      )\
                           .withColumn("vl_pstc_ecf_apvd",F.when(~EXPR_QT_ID_E_VL_REND & EXPR_VL_PSTC_ECF,col("vl_pstc_ttl_ecf_org"))\
                                                           .otherwise(col("vl_pstc_ecf_apvd"))\
                                      )\
                           .withColumn("vl_pstc_fun_bb_clcd",F.when(~EXPR_QT_ID_E_VL_REND & EXPR_VL_PSTC_ECF,col("vl_pstc_bb_org"))\
                                                           .otherwise(col("vl_pstc_bb_org"))\
                                      )\
                           .withColumn("vl_pstc_ecf_demais",F.when(~EXPR_QT_ID_E_VL_REND & EXPR_VL_PSTC_ECF,col("vl_pstc_demais_org"))\
                                                           .otherwise(col("vl_pstc_demais_org"))\
                                      )\
                           .withColumn("vl_pstc_ecf_clcd",F.when(~EXPR_QT_ID_E_VL_REND & EXPR_VL_PSTC_ECF & EXPR_VL_PSTC_ECF_REC,lit(0))\
                                                           .otherwise(col("vl_pstc_ecf_clcd"))\
                                      )\
                           .withColumn("vl_pstc_fun_bb_clcd",F.when(~EXPR_QT_ID_E_VL_REND & EXPR_VL_PSTC_ECF & EXPR_VL_PSTC_ECF_REC,lit(0))\
                                                           .otherwise(col("vl_pstc_fun_bb_clcd"))\
                                      )\
                           .withColumn("vl_pstc_ecf_demais",F.when(~EXPR_QT_ID_E_VL_REND & EXPR_VL_PSTC_ECF & EXPR_VL_PSTC_ECF_REC,lit(0))\
                                                           .otherwise(col("vl_pstc_ecf_demais"))\
                                      )\
                           .withColumn("vl_pstc_ecf_clcd",F.when(~EXPR_QT_ID_E_VL_REND & EXPR_VL_PSTC_ECF & EXPR_VL_PSTC_ECF_ELSE,col("vl_pstc_ecf_rec"))\
                                                           .otherwise(col("vl_pstc_ecf_clcd"))\
                                      )\
                           .withColumn("vl_pstc_fun_bb_clcd",F.when(~EXPR_QT_ID_E_VL_REND & EXPR_VL_PSTC_ECF & EXPR_VL_PSTC_ECF_ELSE,col("vl_pstc_ecf_rec"))\
                                                           .otherwise(col("vl_pstc_fun_bb_clcd"))\
                                      )\
                           .withColumn("vl_pstc_ecf_demais",F.when(~EXPR_QT_ID_E_VL_REND & EXPR_VL_PSTC_ECF & EXPR_VL_PSTC_ECF_ELSE,col("vl_pstc_ecf_rec"))\
                                                           .otherwise(col("vl_pstc_ecf_demais"))\
                                      )\
    
    
    slim_ecfDF = slim_ecfDF.withColumn("in_ProcessaCalculo",F.when(~EXPR_QT_ID_E_VL_REND & EXPR_VL_PSTC_ECF, lit(True))\
                                                             .otherwise(lit(False))\
                                      )
    
    
    print("Entrando na processaCalculo - 155 apvd")

    slim_ecfDF = processaCalculo(slim_ecfDF,155,"apvd")    
    
    print("Entrando na processaCalculo - 155 clcd")

    slim_ecfDF = processaCalculo(slim_ecfDF,155,"clcd")
    
    slim_ecfDF.cache()
                      
    slim_ecfDF = slim_ecfDF.withColumn("vl_pstc_ecf_apvd",F.when(EXPR_VL_SLIM_155_APVD & EXPR_VL_SLIM_155_UTZD ,lit(0))\
                                                           .otherwise(col("vl_pstc_ecf_apvd"))\
                                      )\
                           .withColumn("vl_slim_155_apvd",F.when(EXPR_VL_SLIM_155_APVD ,col("vl_slim_155_utzd"))\
                                                           .otherwise(col("vl_slim_155_apvd"))\
                                      )\
                           .withColumn("vl_pstc_ecf_clcd",F.when(EXPR_VL_SLIM_155_APVD & EXPR_VL_SLIM_155_UTZD ,lit(0))\
                                                           .otherwise(col("vl_pstc_ecf_clcd"))\
                                                             )\
                           .withColumn("vl_pstc_ecf_demais",F.when(EXPR_VL_SLIM_155_APVD & EXPR_VL_SLIM_155_UTZD ,lit(0))\
                                                           .otherwise(col("vl_pstc_ecf_demais"))\
                                      )\
                           .select("*",
                                   CALC_PSTC_ECF_080.alias("waux_vl_80pc_pstc_ecf_apvd")
                                  )
    
    
    slim_ecfDF = slim_ecfDF.withColumn("in_grava_cvn",F.when(EXPR_VL_SLIM_137_APVD,col("in_grava_cvn") + 3)\
                                                       .otherwise(col('in_grava_cvn'))\
                                      )\
                           .withColumn("in_grava_cvn",F.when(EXPR_VL_SLIM_171_APVD,col("in_grava_cvn") + 5)\
                                                       .otherwise(col('in_grava_cvn'))\
                                      )
    
    slim_ecfDF = slim_ecfDF.withColumn("in_ProcessaCalculo", lit(True)\
                                      )
                                       
    print("Entrando na funcao processaCalculo - 179 apvd")
    
    slim_ecfDF = processaCalculo(slim_ecfDF,179,"apvd")
    
    print("Entrando na funcao processaCalculo - 179 clcd")

    slim_ecfDF = processaCalculo(slim_ecfDF,179,"clcd")
    
    
    slim_ecfDF = slim_ecfDF.withColumn("vl_slim_179_apvd",F.when(EXPR_VL_SLIM_179_UTZD,col("vl_slim_179_utzd"))\
                                                           .otherwise(col("vl_slim_179_apvd"))\
                                      )

    print(f"Saindo da função slim_ecf tempo de excução: {time.time() - start_time} ")
    
    return slim_ecfDF

### SublimiteCreditoDiretoConsumidor

In [None]:
%%spark

import pyspark.sql.functions as F
import time 

def slimApvdCdc(df):
    '''
        Possuem as regras especificas para seus produtos aprovado conta corrente, atribuindo valores referência, zerando ou enviando para o processa 
        cálculo (processaCalculo) e posteriormente enviando para o cálculo do valor do sublimite (CalculoSlim)   
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def  processaParcelado
            funcoes acionadas:
                def  calculaSLim
                
    '''
    
    start_time = time.time()

    # expressoes condicionais utulizadas nas contas conforme condicao 
    

    EXPR_VL_MTP_CLC_SLIM_83_1 = expr(" ic_mtp_cvn_sal <> 0 and in_grava_cvn = 1")
    EXPR_VL_SLIM_83_APVD = expr("in_bnf_astl = 'S' and in_pgt = 0 ") 
    EXPR_IN_CALC_SLIM_CDC = expr("in_SlimApvdCdc")
    EXPRESSAO_VALOR_SUBILIMITE_APROVADO_173_1 = expr("in_518 = 'S' and cd_crct_1682 = 1 ")
    EXPRESSAO_CALCULA_INDICADOR_PROCESSA_CALCULO = expr("in_518 = 'S' and cd_crct_1682 <> 1")
    EXPRESSAO_CALCULA_INDICADOR_PROCESSA_CALCULO_1 = expr("vl_slim_173_apvd = 0 and cd_fuc <> 2 and cd_crct_1682 <> 1")
    EXPRESSAO_VALOR_SUBILIMITE_APROVADO_173_2 = expr("in_bnf_astl = 'S' and in_pgt = 0")
    EXPRESSAO_VALOR_LIMITE_REFERENCIA_131 = expr("vl_slim_173_apvd = 0 and cd_fuc <> 2 and cd_crct_1682 = 1")
    EXPRESSAO_VALOR_SUBILIMITE_131_APVD = EXPRESSAO_VALOR_SUBILIMITE_APROVADO_173_2
    EXPRESSAO_VALOR_SUBILIMITE_116_APROVADO = expr("cd_crct_1682 = 1")
    EXPRESSAO_WS_PRESTACAO_CREDITOAOCONSUMIDOR_83 = expr("cd_crct_1682 = 1")
    EXPRESSAO_WS_PRESTACAO_CREDITODIRETOCONSUMIR_83 = expr("vl_pstc_cdc_prtt > vl_pstc_cdc_apvd")
    EXPRESSAO_IC_MTP_CONVENIO_SALARIO = expr("(in_pgt = 1 and in_fun = 0 and ws_psct_cdc_83_c > 0) or (in_pgt = 1 and cd_mtdl_anl_crd in (70,71,72) and ic_mtp_cvn_sal <> 0) ")
    EXPRESSAO_WAUX_RED_MTP_SUBILIMITE_083 = expr(" (cd_ntz_ocp in (4, 5, 6, 8) or (cd_ntz_ocp = 13) and cd_ocp_ppl in (271, 272, 273, 310, 330))")
    EXPRESSAO_CODIGO_CRCT_1682 = expr("cd_crct_1682 = 1")
    EXPRESSAO_WS_PRESTACAO_CREDITOCONSUMIDOR_115 = expr(" cd_crct_1682 <> 1 and ws_psct_cdc_115_c > vl_pstc_cdc_apvd")
    EXPRESSAO_INDICADOR_ANL_PMT = expr("in_anl_pmt = 'S'")
    EXPRESSAO_VALOR_SUBILIMITE_159_APROVADO = expr("cd_fuc <> 2 and cd_crct_1682 = 0 and vl_slim_159_apvd > vl_slim_15_20_apvd")
    EXPRESSAO_VALOR_SUBILIMITE_161_APROVADO = expr("vl_ren_brto <= waux_vl_10slr_min and cd_sgm_anl in (9,19,25,21,30,31,55) and (in_cli_novo = 'N' or in_cli_novo_prtt = 'S' )")
    EXPRESSAO_VALOR_MTP_CALCULADO_SUBILIMITE_142 = expr("gda_maior_mtp > vl_mtp_clc_slim_142")
    EXPRESSAO_VALOR_161_APROVADO = expr("vl_slim_161_apvd > vl_teto_slim_161")
    EXPR_VL_SLIM_15_APVD = expr(" vl_slim_15_apvd > vl_slim_20_apvd ")


    # expressoes matematicas  nas contas conforme condicao 
    
    CALCULO_WS_PRESTACAO_CREDITOAOCONSUMIDOR_83 = expr("ws_psct_cdc_83_c * vl_ftr_ajst_pstc_pcld")
    CALCULO_VALOR_MTP_CALCULADO_SUBILIMITE_83 = expr("vl_mtp_clc_slim_83 * waux_red_mtp_slim_083")
    CALCULO_WS_PRESTACAO_CREDITOCONSUMIDOR_115 = expr("ws_psct_cdc_115_c * vl_ftr_ajst_pstc_pcld")
    
    
    # Inicialização 

    slimApvdCdcDF = df.withColumn("ws_psct_cdc_83_c", F.when(EXPR_IN_CALC_SLIM_CDC,lit(0))\
                                                       .otherwise(col("ws_psct_cdc_83_c"))\
                                 )\
                      .withColumn("ws_psct_cdc_115_c", F.when(EXPR_IN_CALC_SLIM_CDC,lit(0))\
                                                        .otherwise(col("ws_psct_cdc_115_c"))\
                                 )\
    
    
                        
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_173_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_APROVADO_173_1,lit(0))\
                                                                 .otherwise(col("vl_slim_173_apvd"))\
                                            )\
                                 .withColumn("in_ProcessaCalculo", F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_CALCULA_INDICADOR_PROCESSA_CALCULO, lit(True))\
                                                                    .otherwise(lit(False))\
                                            )\
                                 
                                            
    print("Entrando na funcao processaCalculo - 173 apvd")                                                                  
                                                                    
    slimApvdCdcDF = processaCalculo(slimApvdCdcDF,173,"apvd")
    
    
    slimApvdCdcDF.cache()
    
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_173_apvd", F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_CALCULA_INDICADOR_PROCESSA_CALCULO,col("aux_vl_slim").cast(IntegerType()))\
                                                                  .otherwise(col("vl_slim_173_apvd"))\
                                            )
    
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_173_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_APROVADO_173_2, lit(0))\
                                                                  .otherwise(col("vl_slim_173_apvd"))\
                                              )
    
    
    
    #INCLUIR REGRA DO ANCKCCF1-IN-AUX-EMER AQUI. FALTA INCLUIR A VARIAVEL DE RECONSIDERAÇÃO FUTURAMENTE.

    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_lim_ref_131",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_LIMITE_REFERENCIA_131,lit(0))\
                                                               .otherwise(col("vl_lim_ref_131"))\
                                            )\
                                 .withColumn("in_ProcessaCalculo", F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_CALCULA_INDICADOR_PROCESSA_CALCULO_1, lit(True))\
                                                                    .otherwise(lit(False))\
                                            )                                            
    print("Entrando na funcao processaCalculo - 131 apvd")      
                    
    slimApvdCdcDF = processaCalculo(slimApvdCdcDF,131,"apvd")

    #INCLUIR REGRA DO ANCKCCF1-IN-AUX-EMER AQUI. FALTA INCLUIR A VARIAVEL DE RECONSIDERAÇÃO FUTURAMENTE.

    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_131_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_131_APVD,lit(0))\
                                                                 .otherwise(col("vl_slim_131_apvd"))\
                                             )\
                                 .withColumn("vl_slim_130_apvd",lit(0)\
                                             )\
                                 .withColumn("vl_slim_130_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_131_APVD,lit(0))\
                                                                 .otherwise(col("vl_slim_130_apvd"))\
                                             )\
                                 .withColumn("vl_slim_116_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_116_APROVADO,lit(0))\
                                                                 .otherwise(col("vl_slim_116_apvd"))\
                                            )\
   
        
    #INCLUIR REGRA DO ANCKCCF1-IN-AUX-EMER AQUI. FALTA INCLUIR A VARIAVEL DE RECONSIDERAÇÃO FUTURAMENTE.        
        
    slimApvdCdcDF = slimApvdCdcDF.withColumn("in_ProcessaCalculo",F.when(EXPR_IN_CALC_SLIM_CDC & ~EXPRESSAO_VALOR_SUBILIMITE_116_APROVADO,lit(True))\
                                                                   .otherwise(lit(False))\
                                             )\
    
    print("Entrando na funcao processaCalculo - 116 apvd")    
        
    slimApvdCdcDF = processaCalculo(slimApvdCdcDF,116,"apvd")
    
    slimApvdCdcDF.cache()

    #INCLUIR REGRA DO ANCKCCF1-IN-AUX-EMER AQUI. FALTA INCLUIR A VARIAVEL DE RECONSIDERAÇÃO FUTURAMENTE.        

    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_116_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_APROVADO_173_2,lit(0))\
                                                                 .otherwise(col("vl_slim_116_apvd"))\
                                 )\

    print("Entrando na funcao vrf_Rnd_Slr_Bnf")
    
    slimApvdCdcDF = vrf_Rnd_Slr_Bnf(slimApvdCdcDF)
    slimApvdCdcDF.cache()
                
    slimApvdCdcDF = slimApvdCdcDF.withColumn("ws_psct_cdc_83_c",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_WS_PRESTACAO_CREDITOAOCONSUMIDOR_83,lit(0))\
                                                                 .otherwise(CALCULO_WS_PRESTACAO_CREDITOAOCONSUMIDOR_83)\
                                            )\
                                 .withColumn("ws_psct_cdc_83_c",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_WS_PRESTACAO_CREDITODIRETOCONSUMIR_83,col("vl_pstc_cdc_apvd"))\
                                                                 .otherwise(col("vl_pstc_cdc_prtt"))\
                                            )


    # Teste incluido pelo Fernando para ajuste na informação do VSAM e valor 
    
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_mtp_clc_slim_83",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_IC_MTP_CONVENIO_SALARIO,col("ic_mtp_cvn_sal"))\
                                                                   .otherwise(col("vl_mtp_clc_slim_83"))\
                                            )\
                                 .withColumn("vl_pstc_083",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_IC_MTP_CONVENIO_SALARIO,col("ws_psct_cdc_83_c"))\
                                                            .otherwise(col("vl_pstc_083"))\
                                            )\
                                 .withColumn("waux_red_mtp_slim_083", F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_IC_MTP_CONVENIO_SALARIO,1 - col("redt_mtp_slim_083"))\
                                                                       .otherwise(col("waux_red_mtp_slim_083"))\
                                            )\

     
    slimApvdCdcDF = slimApvdCdcDF.withColumn("waux_red_mtp_slim_083",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_IC_MTP_CONVENIO_SALARIO & EXPRESSAO_WAUX_RED_MTP_SUBILIMITE_083,lit(1))\
                                                                      .otherwise(col("waux_red_mtp_slim_083"))\
                                            )\
                                 .withColumn("vl_mtp_clc_slim_83",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_IC_MTP_CONVENIO_SALARIO,CALCULO_VALOR_MTP_CALCULADO_SUBILIMITE_83)\
                                                                   .otherwise(col("vl_mtp_clc_slim_83"))\
                                            )
                                            

          
    # Conforme correio DICRE, para clientes sem convenio com FEi      
    # maior que 0,135335283 , zerar o multiplicador                 
    # Retirado tratamento abaixo conforme ideia 56720
    
    
    slimApvdCdcDF = slimApvdCdcDF.withColumn("in_ProcessaCalculo",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_IC_MTP_CONVENIO_SALARIO,lit(True))\
                                                                    .otherwise(lit(False))\
                                             )\
    
    print("Entrando na funcao processaCalculo - 83 apvd")
    

    slimApvdCdcDF = processaCalculo(slimApvdCdcDF,83,"apvd")
    slimApvdCdcDF.cache()
    
    #INCLUIR REGRA DO ANCKCCF1-IN-AUX-EMER AQUI. FALTA INCLUIR A VARIAVEL DE RECONSIDERAÇÃO FUTURAMENTE.   

    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_83_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_IC_MTP_CONVENIO_SALARIO,col("aux_vl_slim").cast(IntegerType()))\
                                                                .otherwise(col("vl_slim_83_apvd"))\
                                            )\
                                 .withColumn("vl_slim_83_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_APROVADO_173_2,lit(0))\
                                                                .otherwise(col("vl_slim_83_apvd"))\
                                            )\
                                 .withColumn("ws_psct_cdc_115_c",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_CODIGO_CRCT_1682,lit(0))\
                                                                  .otherwise(CALCULO_WS_PRESTACAO_CREDITOCONSUMIDOR_115)\
                                            )\
                
    slimApvdCdcDF = slimApvdCdcDF.withColumn("ws_psct_cdc_115_c",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_WS_PRESTACAO_CREDITOCONSUMIDOR_115,col("vl_pstc_cdc_apvd"))\
                                                                  .otherwise(col("ws_psct_cdc_115_c"))\
                                            )
    

    slimApvdCdcDF = slimApvdCdcDF.withColumn("in_ProcessaCalculo",F.when(EXPR_IN_CALC_SLIM_CDC,lit(True))\
                                                                   .otherwise(lit(False))\
                                            )
    
    # prd = 115
    
    print("Entrando na funcao processaCalculo - 115 apvd") 
    
    slimApvdCdcDF = processaCalculo(slimApvdCdcDF,115,"apvd")
    
    
    
    
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_115_apvd", f.when(EXPR_IN_CALC_SLIM_CDC, col("aux_vl_slim").cast(IntegerType()))
                                                                  .otherwise(col('vl_slim_115_apvd')))\
                                 .withColumn("aux_redt_vcli",lit(1))\

    
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_119_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_116_APROVADO,lit(0))\
                                                                 .otherwise(col("vl_slim_119_apvd"))\
                                             )\
                                 .withColumn("vl_slim_157_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_116_APROVADO,lit(0))\
                                                                 .otherwise(col("vl_slim_157_apvd"))\
                                             )\
                                 .withColumn("vl_slim_156_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_116_APROVADO,lit(0))\
                                                                 .otherwise(col("aux_vl_slim"))\
                                             )\
                                 .withColumn("vl_slim_159_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_116_APROVADO,lit(0))\
                                                                 .otherwise(col("vl_slim_159_apvd"))\
                                             )\
                                 .withColumn("in_ProcessaCalculo",F.when(EXPR_IN_CALC_SLIM_CDC & ~EXPRESSAO_VALOR_SUBILIMITE_116_APROVADO,lit(True))\
                                                                   .otherwise(lit(False))\
                                             )\
    
    slimApvdCdcDF.cache()
    # prd = 119
    print("Entrando na funcao processaCalculo - 119 apvd") 
    
    slimApvdCdcDF = processaCalculo(slimApvdCdcDF,119,"apvd")
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_119_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & ~EXPRESSAO_VALOR_SUBILIMITE_116_APROVADO,col("aux_vl_slim").cast(IntegerType()))\
                                                                 .otherwise(col("vl_slim_119_apvd"))\
                                            )
    # prd = 156

    print("Entrando na funcao processaCalculo - 156 apvd") 
    
    slimApvdCdcDF = processaCalculo(slimApvdCdcDF,156,"apvd")
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_156_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & ~EXPRESSAO_VALOR_SUBILIMITE_116_APROVADO,col("aux_vl_slim").cast(IntegerType()))\
                                                                 .otherwise(col("vl_slim_156_apvd"))\
                                            )
    
    # prd = 159
    
    print("Entrando na funcao processaCalculo - 159 apvd") 
    slimApvdCdcDF = processaCalculo(slimApvdCdcDF,159,"apvd")
    
    
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_159_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & ~EXPRESSAO_VALOR_SUBILIMITE_116_APROVADO & EXPRESSAO_INDICADOR_ANL_PMT,col("vl_slim_159_clcd"))\
                                                                 .otherwise(col("vl_slim_159_apvd"))\
                                             )\
                                 .withColumn("vl_slim_159_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & ~EXPRESSAO_VALOR_SUBILIMITE_116_APROVADO & ~EXPRESSAO_INDICADOR_ANL_PMT,col("aux_vl_slim").cast(IntegerType()))\
                                                                 .otherwise(col("vl_slim_159_apvd"))\
                                            )\
    
                                             
                                           
    # Limitado valor do Credito rotativo
    
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_159_apvd", F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_159_APROVADO,col("vl_slim_15_20_apvd"))\
                                                                  .otherwise(col("vl_slim_159_apvd"))\
                                            )\
                                 .withColumn("vl_slim_115_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_APROVADO_173_2,lit(0))\
                                                                 .otherwise(col("vl_slim_115_apvd"))\
                                             )\
                                 .withColumn("vl_slim_119_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_APROVADO_173_2,lit(0))\
                                                                 .otherwise(col("vl_slim_119_apvd"))\
                                             )\
                                 .withColumn("vl_slim_156_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_APROVADO_173_2,lit(0))\
                                                                 .otherwise(col("vl_slim_156_apvd"))\
                                             )\
                                 .withColumn("vl_slim_159_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_APROVADO_173_2,lit(0))\
                                                                 .otherwise(col("vl_slim_159_apvd"))\
                                             )\
                                 .withColumn("vl_slim_157_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_VALOR_SUBILIMITE_APROVADO_173_2,lit(0))\
                                                                 .otherwise(col("vl_slim_157_apvd"))\
                                             )\
                                 .withColumn("in_ProcessaCalculo",F.when(EXPR_IN_CALC_SLIM_CDC,lit(True))\
                                                                   .otherwise(lit(False))\
                                            )
                
               
    slimApvdCdcDF.cache()
    
    # prd = 160
    print("Entrando na funcao processaCalculo - 160 apvd") 
    
    slimApvdCdcDF = processaCalculo(slimApvdCdcDF,160,"apvd")
       
    slimApvdCdcDF = slimApvdCdcDF.withColumn("aux_redt_vcli",lit(1))\
                                 .withColumn("vl_slim_160_apvd", f.when(EXPR_IN_CALC_SLIM_CDC, col("aux_vl_slim").cast(IntegerType()))\
                                                                  .otherwise(col('vl_slim_160_apvd')))
    
    

    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_161_apvd", F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_CODIGO_CRCT_1682,lit(0))\
                                                                  .otherwise(col("vl_slim_161_apvd"))\
                                             )\
                                 .withColumn("vl_slim_163_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_CODIGO_CRCT_1682,lit(0))\
                                                                 .otherwise(col("vl_slim_163_apvd"))\
                                             )\
                                 .withColumn("vl_slim_142_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & EXPRESSAO_CODIGO_CRCT_1682,lit(0))\
                                                                 .otherwise(col("vl_slim_142_apvd"))\
                                             )
    


    
    slimApvdCdcDF = slimApvdCdcDF.withColumn("in_ProcessaCalculo",F.when(EXPR_IN_CALC_SLIM_CDC & ~EXPRESSAO_CODIGO_CRCT_1682 & EXPRESSAO_VALOR_SUBILIMITE_161_APROVADO,lit(True))\
                                                                   .otherwise(lit(False))\
                                            )\
    
    #  prd = 161 
    print("Entrando na funcao processaCalculo - 161 apvd") 
    
    slimApvdCdcDF = processaCalculo(slimApvdCdcDF,161,"apvd")

    slimApvdCdcDF.cache()
    
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_161_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & ~EXPRESSAO_CODIGO_CRCT_1682 & EXPRESSAO_VALOR_SUBILIMITE_161_APROVADO,col("aux_vl_slim").cast(IntegerType())\
                                                                      )\
                                                                 .otherwise(col("vl_slim_161_apvd"))\
                                            )
    
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_161_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & ~EXPRESSAO_CODIGO_CRCT_1682 & EXPRESSAO_VALOR_161_APROVADO,col("vl_teto_slim_161"))\
                                                                 .otherwise(col("vl_slim_161_apvd"))\
                                            )
    
    slimApvdCdcDF = slimApvdCdcDF.withColumn("in_ProcessaCalculo",F.when(EXPR_IN_CALC_SLIM_CDC & ~EXPRESSAO_CODIGO_CRCT_1682,lit(True))\
                                                                   .otherwise(lit(False))\
                                            )\
        
    # prd = 163
    
    print("Entrando na funcao processaCalculo - 163 apvd") 
    
    slimApvdCdcDF = processaCalculo(slimApvdCdcDF,163,"apvd")
    
    
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_163_apvd",F.when(EXPR_IN_CALC_SLIM_CDC & ~EXPRESSAO_CODIGO_CRCT_1682,col("aux_vl_slim").cast(IntegerType()))\
                                                                 .otherwise(col("vl_slim_163_apvd"))\
                                            )\
    
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_mtp_clc_slim_142",F.when(EXPR_IN_CALC_SLIM_CDC & ~EXPRESSAO_CODIGO_CRCT_1682 & EXPRESSAO_VALOR_MTP_CALCULADO_SUBILIMITE_142,col("gda_maior_mtp"))\
                                                                    .otherwise(col("vl_mtp_clc_slim_142"))\
                                            )\
        
    # prd = 142
    
    print("Entrando na funcao processaCalculo - 142 apvd") 
    
    slimApvdCdcDF = processaCalculo(slimApvdCdcDF,142,"apvd")
    
    
    #INCLUIR REGRA DO ANCKCCF1-IN-AUX-EMER AQUI. FALTA INCLUIR A VARIAVEL DE RECONSIDERAÇÃO FUTURAMENTE.        

            
    slimApvdCdcDF = slimApvdCdcDF.withColumn("vl_slim_142_apvd",   F.when(EXPR_IN_CALC_SLIM_CDC & EXPR_VL_SLIM_83_APVD, lit(0))\
                                                                    .otherwise(col("vl_slim_142_apvd"))\
                                            )\
                                 .withColumn("vl_slim_161_apvd",   F.when(EXPR_IN_CALC_SLIM_CDC & EXPR_VL_SLIM_83_APVD, lit(0))\
                                                                    .otherwise(col("vl_slim_161_apvd"))\
                                            )\
                                 .withColumn("vl_slim_163_apvd",   F.when(EXPR_IN_CALC_SLIM_CDC & EXPR_VL_SLIM_83_APVD, lit(0))\
                                                                    .otherwise(col("vl_slim_163_apvd"))\
                                            )\
                                 .withColumn("vl_mtp_clc_slim_83", F.when(EXPR_IN_CALC_SLIM_CDC & EXPR_VL_MTP_CLC_SLIM_83_1, col("ic_mtp_cvn_sal"))\
                                                                    .otherwise(col("vl_mtp_clc_slim_83"))\
                                            )
    
                                            
    print(f"Saindo da função slimApvdCdc tempo de excução: {time.time() - start_time} ")
    
    return slimApvdCdcDF

### SublimiteCalculadoCreditoDiretoConsumidor

In [None]:
%%spark

import pyspark.sql.functions as F
import time


def slimClcdCdc(df):
    '''
        Possuem as regras especificas para seus produtos, calculado conta corrente, atribuindo valores referência, zerando ou enviando para o processa 
        cálculo (processaCalculo) e posteriormente enviando para o cálculo do valor do sublimite (CalculoSlim)   
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def  processaParcelado
            funcoes acionadas:
                def  calculaSLim
                def  vrf_Rnd_Slr_Bnf
                
    '''
    
    start_time = time.time()

    # 1 - O código tem como objetivo preparar as variaveis que serão utilizadas para o calculo da prestação calculada
    # 2 - Regra geral = prestacao * respectivo multiplicador *  fator de idade calculado na função 960000
    # 3 - Prepara as variaveis para todos os sublimites e salva o valor do sublimite calculado após passar pela função 960000
    # 4 - Preparar a formula (prestacao x fator de idade) -aplicar nos calculos dos sublimites 
    # 5 - Sublimites 173 (DRS), 130 (benef. 2 sal.) e 131 (pronto) sao excludentes entre si
    # 6 - BB microcrédito DRS - caracteristica especial 518

    
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    EXPR_IN_CLI_NOVO = expr(" (cd_mtdl_anl_crd = 52 and cd_sgm_anl in (21, 31)) or cd_mtdl_anl_crd = 53   ")
    EXPR_IN_CALC_173 = expr(" in_518 = 'S' ")
    EXPR_IN_CALC_131 = expr(" vl_slim_173_clcd = 0 and not cd_fuc = 2 ")
    EXPR_VL_PSTC_CDC_PRTT = expr(" vl_pstc_cdc_prtt > vl_pstc_cdc_clcd ")
    EXPR_VL_PSTC_083 = expr(" (in_pgt = 1 and in_fun = 0) or (in_pgt = 1 and cd_mtdl_anl_crd in (70, 71, 72))")
    EXPR_IC_MTP_CVN_SAL = expr(" ic_mtp_cvn_sal <> 0  ")
    EXPR_IN_GRAVA_CSV = expr("vl_pstc_083 <> 0 ")
    EXPR_VL_MTP_CLC_SLIM_83 = expr(" ic_mtp_cvn_sal = 0 and vl_fei_cli > 0.135335283 ")
    EXPR_WS_PSCT_CDC_115 = expr(" ws_psct_cdc_115_c > vl_pstc_cdc_clcd ")
    EXPR_VL_SLIM_159_CLCD = expr(" cd_fuc <> 2 and vl_slim_159_clcd > vl_slim_15_20_clcd ")
    EXPR_IN_CALC_161 = expr(" vl_ren_brto < waux_vl_10slr_min and (cd_sgm_anl = 9 or cd_sgm_anl in (19, 21, 25, 30, 31, 55)) and (in_cli_novo = 'N' or in_cli_novo_prtt = 'S') ")
    EXPR_VL_SLIM_161_CLCD = expr(" vl_slim_161_clcd > vl_teto_slim_161 ")
    EXPR_VL_MTP_CLC_SLIM_142 = expr(" gda_maior_mtp > vl_mtp_clc_slim_142 ")
    EXPR_REDMTPSLIM83 = expr(" cd_ntz_ocp in (4, 5, 6, 8) or (cd_ntz_ocp = 13 and cd_ocp_ppl in (271, 272, 273, 310, 330)) ")


    # expressoes matematicas  nas contas conforme condicao 
    
    CALC_AUX_PSTC_SLIM = expr(" vl_pstc_ecf_demais * vl_ftr_ajst_pstc_cgn_flh ")
    CALC_VL_MTP_SLIM_83 = expr("  vl_mtp_clc_slim_83 * waux_red_mtp_slim_083 ")
       
    # Inicialização 
    
    slimClcdCdcDF = df.withColumn("aux_far_slim",lit(1))\
                      .withColumn("ws_psct_cdc_115_c",lit(0))\
                      .withColumn("ws_psct_cdc_83_c",lit(0))\
                      .withColumn("in_cli_novo",lit('N'))\
                      .withColumn("cd_slim_rnvc_aux", keep_non_zero_values_udf(col('cd_slim_rnvc')))\
    
    slimClcdCdcDF = slimClcdCdcDF.withColumn("in_cli_novo",F.when(EXPR_IN_CLI_NOVO,lit("S"))\
                                                            .otherwise(col("in_cli_novo"))\
                                            )\
                                 .withColumn("in_ProcessaCalculo",F.when(EXPR_IN_CALC_173,lit(True))\
                                                                   .otherwise(lit(False))\
                                            )\
    
    print("Entrando na funcao processaCalculo - 173 clcd")
    
    slimClcdCdcDF = processaCalculo(slimClcdCdcDF,173,"clcd")
     
     
     
    slimClcdCdcDF = slimClcdCdcDF.drop("in_ProcessaCalculo")\
                                  .select("*",
                                         (F.when(EXPR_IN_CALC_131, lit(True))\
                                           .otherwise(lit(False))\
                                         ).alias("in_ProcessaCalculo")
                                         )
              
        
    print("Entrando na funcao processaCalculo - 131 clcd")
                                               
    slimClcdCdcDF = processaCalculo(slimClcdCdcDF,131,"clcd")
    
    slimClcdCdcDF.cache()
    
    slimClcdCdcDF = slimClcdCdcDF.withColumn("in_ProcessaCalculo",lit(True))\
                                 .withColumn("vl_slim_130_clcd",lit(0))\

    
    print("Entrando na funcao processaCalculo - 116 clcd")

    slimClcdCdcDF = processaCalculo(slimClcdCdcDF,116,"clcd")
    
    
    print("Entrando na funcao vrf_Rnd_Slr_Bnf")

    slimClcdCdcDF = vrf_Rnd_Slr_Bnf(slimClcdCdcDF).cache()
    
    
    slimClcdCdcDF = slimClcdCdcDF.withColumn("ws_psct_cdc_83_c",F.when(EXPR_VL_PSTC_CDC_PRTT,col("vl_pstc_cdc_clcd"))\
                                                                 .otherwise(col("vl_pstc_cdc_prtt"))\
                                            )\
                                 .withColumn("vl_mtp_clc_slim_83", F.when(EXPR_VL_PSTC_083 & EXPR_IC_MTP_CVN_SAL,col("ic_mtp_cvn_sal"))\
                                                                    .otherwise(col("vl_mtp_clc_slim_83"))\
                                            )\
                                 .withColumn("vl_pstc_083", F.when(EXPR_VL_PSTC_083 & EXPR_IC_MTP_CVN_SAL, col("ws_psct_cdc_83_c"))\
                                                             .otherwise(col("vl_pstc_083"))\
                                            )\
                                 .withColumn("aux_cd_cliente", F.when(EXPR_VL_PSTC_083 & EXPR_IC_MTP_CVN_SAL, col("cd_cli"))\
                                                                .otherwise(col("aux_cd_cliente"))\
                                            )\
                                 .withColumn("waux_red_mtp_slim_083", F.when(EXPR_VL_PSTC_083 & EXPR_IC_MTP_CVN_SAL, 1 - col("redt_mtp_slim_083"))\
                                                                        .otherwise(lit(0))\
                                            )\
                                 .withColumn("waux_red_mtp_slim_083", F.when(EXPR_VL_PSTC_083 & EXPR_IC_MTP_CVN_SAL & EXPR_REDMTPSLIM83,lit(1)\
                                                                            )\
                                                                       .otherwise(col("waux_red_mtp_slim_083"))\
                                            )\
                                 .withColumn("vl_mtp_clc_slim_83", F.when(EXPR_VL_PSTC_083 & EXPR_IC_MTP_CVN_SAL,col("vl_mtp_clc_slim_83") * col("waux_red_mtp_slim_083"))\
                                                                    .otherwise(col("vl_mtp_clc_slim_83"))\
                                            )\
                                 .withColumn("in_grava_cvn", F.when(EXPR_VL_PSTC_083 & EXPR_IC_MTP_CVN_SAL & EXPR_IN_GRAVA_CSV,lit(1))\
                                                               .otherwise(lit(0))\
                                            )
                                             

    slimClcdCdcDF = slimClcdCdcDF.withColumn("vl_mtp_clc_slim_83",F.when(EXPR_VL_PSTC_083 & EXPR_VL_MTP_CLC_SLIM_83,lit(0))\
                                                                   .otherwise(col("vl_mtp_clc_slim_83"))\
                                            )\
                                 .withColumn("in_ProcessaCalculo",F.when(EXPR_VL_PSTC_083,lit(True))\
                                                                   .otherwise(lit(False))\
                                            )
    
    print("Entrando na funcao processaCalculo - 83 clcd")

    slimClcdCdcDF = processaCalculo(slimClcdCdcDF,83,"clcd")
    
    
    slimClcdCdcDF = slimClcdCdcDF.withColumn("ws_psct_cdc_115_c",F.when(EXPR_WS_PSCT_CDC_115, col("vl_pstc_cdc_clcd"))\
                                                                  .otherwise(col("ws_psct_cdc_115_c"))\
                                            )\
                                 .withColumn("in_ProcessaCalculo", lit(True))
    
    
    print("Entrando na funcao processaCalculo - 115 clcd")
        
    slimClcdCdcDF = processaCalculo(slimClcdCdcDF,115,"clcd")
    
    print("Entrando na funcao processaCalculo - 119 clcd")

    slimClcdCdcDF = processaCalculo(slimClcdCdcDF,119,"clcd")
    
    print("Entrando na funcao processaCalculo - 156 clcd")
    
    slimClcdCdcDF = processaCalculo(slimClcdCdcDF,156,"clcd")
    
    print("Entrando na funcao processaCalculo - 159 clcd")
    
    slimClcdCdcDF = processaCalculo(slimClcdCdcDF,159,"clcd")
    
    
    
    slimClcdCdcDF = slimClcdCdcDF.withColumn("vl_slim_159_clcd", F.when(EXPR_VL_SLIM_159_CLCD, col("vl_slim_15_20_clcd"))\
                                                                  .otherwise(col("vl_slim_159_clcd"))\
                                            )\
                                 .withColumn("in_ProcessaCalculo", lit(True))
                                     
    
    print("Entrando na funcao processaCalculo - 160 clcd")

    slimClcdCdcDF = processaCalculo(slimClcdCdcDF,160,"clcd")
    
    slimClcdCdcDF.cache()
    
    slimClcdCdcDF = slimClcdCdcDF.withColumn("in_ProcessaCalculo",F.when(EXPR_IN_CALC_161,lit(True)).otherwise(lit(False)))
    
    
    print("Entrando na funcao processaCalculo - 161 clcd")

    
    slimClcdCdcDF = processaCalculo(slimClcdCdcDF,161,"clcd")
    
    
    slimClcdCdcDF = slimClcdCdcDF.withColumn("vl_slim_161_clcd", F.when(EXPR_VL_SLIM_161_CLCD, col("vl_teto_slim_161"))\
                                                                  .otherwise(col("vl_slim_161_clcd"))\
                                            )\
                                 .withColumn("in_ProcessaCalculo",lit(True))
    
    
    print("Entrando na funcao processaCalculo - 163 clcd")
    
    slimClcdCdcDF = processaCalculo(slimClcdCdcDF,163,"clcd")
    
    
    
    slimClcdCdcDF = slimClcdCdcDF.withColumn("vl_mtp_clc_slim_142", F.when(EXPR_VL_MTP_CLC_SLIM_142, col("gda_maior_mtp"))\
                                                                     .otherwise(col("vl_mtp_clc_slim_142"))\
                                            )
                                             
    print("Entrando na funcao processaCalculo - 142 clcd")
    
    slimClcdCdcDF = processaCalculo(slimClcdCdcDF,142,"clcd")

    slimClcdCdcDF.unpersist()

    print(f"Saindo da função slimClcdCdc tempo de excução: {time.time() - start_time} ")

    return slimClcdCdcDF

### VerificaFraude

In [None]:
%%spark

import pyspark.sql.functions as F
import time


def verifGTFraude(df):
    '''
        verificar se o público é GT-Fraude e separa o seu cálculo pela comprovação de renda, podendo ela ser a comprovação de renda do cliente formal (próprio punho) 
        ou comprovada (proventista ou não).        
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado

        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def ProcessaParcelado
            funcoes acionadas:
                def calculaPrtfCdc
                
    '''
    
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    start_time = time.time()
    
    EXPR_WAUX_PC_CDC = expr(" cd_fuc = 1 and in_frd in ('A', 'a') and in_pbco_gr_frd = 'S' and cd_tip_cpvt_ren = 'F' and in_rlc_cli in ('I', 'P', '', ' ', '  ')  ")
    EXPR_WAUX_PC_CDC_1 = expr(" cd_fuc = 1 and in_frd in ('A', 'a') and in_pbco_gr_frd = 'S'and cd_tip_cpvt_ren = 'F' and in_rlc_cli = 'C' ")
    EXPR_WAUX_PC_CDC_2 = expr(" cd_fuc = 1 and in_frd in ('A', 'a') and in_pbco_gr_frd = 'S' and cd_tip_cpvt_ren = 'C' and in_rlc_cli in ('I', '', ' ', '  ') ")
    EXPR_WAUX_PC_CDC_3 = expr(" cd_fuc = 1 and in_frd in ('A', 'a')  and in_pbco_gr_frd = 'S' and cd_tip_cpvt_ren = 'C' and in_rlc_cli = 'P' ")


    
    
    
    # Inicialização        
    
    verifGTFraudeDF = df.withColumn("in_calc_Prt_cdc", F.when(EXPR_WAUX_PC_CDC,lit(True))\
                                                        .when(EXPR_WAUX_PC_CDC_1,lit(True))\
                                                        .when(EXPR_WAUX_PC_CDC_2,lit(True))\
                                                        .when(EXPR_WAUX_PC_CDC_3,lit(True))\
                                                        .otherwise(lit(False))\
                                   )\
                        .withColumn("waux_pc_cdc", F.when(EXPR_WAUX_PC_CDC,lit(20))\
                                                    .when(EXPR_WAUX_PC_CDC_1,col("vl_pc_cdc_rlc_c"))\
                                                    .when(EXPR_WAUX_PC_CDC_2,lit(80))\
                                                    .when(EXPR_WAUX_PC_CDC_3,col("vl_pc_cdc_rlc_p"))\
                                                    .otherwise(col("waux_pc_cdc"))\
                                   )
    
    verifGTFraudeDF = calculaPrtfcdc(verifGTFraudeDF)
    
    
    verifGTFraudeDF = verifGTFraudeDF.withColumn("vl_pstc_vclo_clcd", F.when(EXPR_WAUX_PC_CDC_3,lit(0))\
                                                                       .otherwise(col("vl_pstc_vclo_clcd"))\
                                                )\
                                     .withColumn("vl_pstc_vclo_apvd", F.when(EXPR_WAUX_PC_CDC_3,lit(0))\
                                                                       .otherwise(col("vl_pstc_vclo_apvd"))\
                                                )\
    

    print(f"Saindo da função verifGTFraude tempo de excução: {time.time() - start_time} ")

    return verifGTFraudeDF

### VerificaSalarioBeneficiario

In [None]:
%%spark

import pyspark.sql.functions as F
import time


def vrf_Rnd_Slr_Bnf(df):
    '''
        verificar natureza da ocupação, a fim de computar a renda utilizada no cálculo dos sublimites de crédito salário (83) e crédito benefício (115).        
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado

        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def slimApvdCdc
                def slimClcdCdc
            funcoes acionadas:
                def verifEngajado
                
    '''
    
    # constantes
    start_time = time.time()
    ambiente = os.environ["AMBIENTE"]

    # expressoes condicionais utulizadas nas contas conforme condicao 

    EXPR_WS_PSCT_CDC_83_C  = expr(" (in_ctr_rsct = 1 and in_ptdd = 'N') or in_ctra_52_18 = 1 or in_ctra_349 = 1 or in_ctra_349_12 = 1 ")
    EXPR_WS_PSCT_CDC_115_C = expr(" (in_ctr_rsct = 1 and in_ptdd = 'N') or in_ctra_52_18 = 1 or in_ctra_349 = 1 or in_ctra_349_12 = 1 ")

    # expressoes matematicas

    CALC_WS_PSCT_CDC_83_C = expr(" vl_cfct_045 * waux_vl_rnd_slr * cd_ftr_rlc * vl_ftr_demp ")
    CALC_WS_PSCT_CDC_115_C = expr(" vl_cfct_045 * waux_vl_rnd_bnf * cd_ftr_rlc * vl_ftr_demp ")

    # Inicialização 

    vrf_Rnd_Slr_BnfDF = df.withColumn("gda_epgr_ppl_cli",F.array())\
                          .withColumn("waux_vl_rnd_slr",lit(0))\
                          .withColumn("waux_vl_rnd_bnf",lit(0))\
                          .withColumn("in_cli_novo_prtt",lit('N'))\
    
    vrf_Rnd_Slr_BnfDF = vrf_Rnd_Slr_BnfDF.withColumn("waux_mci_inss",  F.when(lit(ambiente).contains("DESENVOLVIMENTO"),F.array(lit(208961759)).cast(ArrayType(IntegerType())))\
                                                                        .when(lit(ambiente == "HOMOLOGACAO"),F.array(lit(208961759)).cast(ArrayType(IntegerType())))\
                                                                        .otherwise(F.array(lit(103824079),lit(512203746)).cast(ArrayType(IntegerType()))\
                                                                                  )\
                                                    )\
                                         .withColumn("waux_mci_bb",    F.when(lit(ambiente).contains("DESENVOLVIMENTO"),lit(903485186))\
                                                                        .when(lit(ambiente == "HOMOLOGACAO"),lit(130965148))\
                                                                        .otherwise(lit(903485186))\
                                                    )\
                                         .withColumn("waux_mci_previ", F.when(lit(ambiente).contains("DESENVOLVIMENTO"),lit(602907665))\
                                                                        .when(lit(ambiente == "HOMOLOGACAO"),lit(178998134))\
                                                                        .otherwise(lit(100186582))\
                                                    )
    
    vrf_Rnd_Slr_BnfDF = vrf_Rnd_Slr_BnfDF.withColumn("in_cli_novo_prtt",F.when((F.array_contains(col("cd_rgr_rend_lqdo"), 'C')) &  (col("in_cli_novo") == 'S'), lit('S'))\
                                                                         .otherwise(col("in_cli_novo_prtt"))\
                                                    )\
                                         .withColumn("in_rlc_cli", F.when((F.array_contains(col("cd_rgr_rend_lqdo"),'C')), lit('C'))\
                                                                    .otherwise(col("in_rlc_cli"))\
                                                    )

    
    
    
    aux_df = vrf_Rnd_Slr_BnfDF.selectExpr('cd_cli', 'in_cli_novo', 'vl_bnf', 'in_rlc_cli', 'cd_epgr_engaj', 'waux_vl_rnd_slr', 'waux_vl_rnd_bnf', 'waux_mci_inss', 'inline(arrays_zip(cd_ntz, in_ocp_ppl_scdr, cd_rgr_rend_lqdo, cd_epgr_cli, vl_rend_lqdo))').where('cd_ntz <> 0')
    
    
    aux_df = aux_df.withColumn("in_vrf_eng",F.when(((col("cd_rgr_rend_lqdo") == 'C') | (col("vl_bnf") > 0)) & ((col("cd_ntz").contains(19)) | (col("cd_ntz").contains(20))) ,lit(True))\
                                             .otherwise(lit(False)))
    
    # alteração devido erro de type com cd_epgr_cli
    
    aux_df = aux_df.withColumn("cd_epgr_engaj", col("cd_epgr_engaj").cast("array<int>"))

    aux_df = aux_df.withColumn("waux_engajado",F.when(F.array_contains(col("cd_epgr_engaj"), col("cd_epgr_cli")) & col("in_vrf_eng") == True, lit('S'))\
                                                          .otherwise(lit('N')))

    aux_df = aux_df.withColumn("waux_vl_rnd_slr",F.when((col("in_vrf_eng"))\
                                                            & (col("waux_engajado") == 'S')\
                                                            & (col("cd_rgr_rend_lqdo") == 'C')\
                                                                ,col("vl_rend_lqdo")\
                                                       )\
                                                  .when(
                                                        ((col("cd_rgr_rend_lqdo") == 'S') | (col("vl_bnf") > 0))\
                                                        & (~col("cd_ntz").isin([2,3,7,10,11,12,14,16,17,18,19,20]))\
                                                        & ~(F.array_contains(col('waux_mci_inss'), col('cd_epgr_cli')) & (col("cd_ntz") == 8))\
                                                        & (col("cd_rgr_rend_lqdo") == 'C')\
                                                        ,col("vl_rend_lqdo")\
                                                       )\
                               .otherwise(col('waux_vl_rnd_slr'))\
                              )
    

    aux_df = aux_df.withColumn("waux_vl_rnd_bnf",F.when(((col("cd_rgr_rend_lqdo") == 'C') | (col("vl_bnf") > 0))\
                                                            & (~col("cd_ntz").isin([2,3,7,10,11,12,14,16,17,18,19,20]))\
                                                                & (F.array_contains(col('waux_mci_inss'), col('cd_epgr_cli')))\
                                                                  & (col("cd_ntz") == 8)\
                                                                  ,col("vl_rend_lqdo")\
                                                       )\
                                                  .otherwise(col('waux_vl_rnd_bnf'))\
                              )

    
    aux_df = aux_df.groupby('cd_cli').agg(F.sum('waux_vl_rnd_slr').alias('waux_vl_rnd_slr2'),
                                          F.sum('waux_vl_rnd_bnf').alias('waux_vl_rnd_bnf2'),
                                          F.last('in_rlc_cli').alias('in_rlc_cli2'))
    
    vrf_Rnd_Slr_BnfDF = vrf_Rnd_Slr_BnfDF.join(aux_df, ['cd_cli'], 'left')
    
    
    vrf_Rnd_Slr_BnfDF = vrf_Rnd_Slr_BnfDF.withColumn('waux_vl_rnd_slr',F.when(col('waux_vl_rnd_slr2').isNull(),col('waux_vl_rnd_slr'))\
                                                                        .otherwise(col('waux_vl_rnd_slr2'))\
                                                     )\
                                         .withColumn('waux_vl_rnd_bnf',F.when(col('waux_vl_rnd_bnf2').isNull(),col('waux_vl_rnd_bnf'))\
                                                                        .otherwise(col('waux_vl_rnd_bnf2'))\
                                                    )\
                                         .withColumn('in_rlc_cli',F.when(col('in_rlc_cli2').isNull(),col('in_rlc_cli'))\
                                                                   .otherwise(col('in_rlc_cli2'))\
                                                    )\
                                  
    
    vrf_Rnd_Slr_BnfDF = vrf_Rnd_Slr_BnfDF.drop('waux_vl_rnd_slr2','waux_vl_rnd_bnf2','in_rlc_cli2')

    vrf_Rnd_Slr_BnfDF = vrf_Rnd_Slr_BnfDF.withColumn("ws_psct_cdc_83_c", CALC_WS_PSCT_CDC_83_C)\
                                         .withColumn("ws_psct_cdc_115_c", CALC_WS_PSCT_CDC_115_C)\
                                         .withColumn("ws_psct_cdc_83_c", F.when(EXPR_WS_PSCT_CDC_83_C, lit(0))\
                                                                          .otherwise(col("ws_psct_cdc_83_c"))\
                                                     )\
                                         .withColumn("ws_psct_cdc_115_c", F.when(EXPR_WS_PSCT_CDC_115_C, lit(0))\
                                                                           .otherwise(col("ws_psct_cdc_115_c"))\
                                                     )\
                                         .withColumn("vl_ren_slr_bnf", col("waux_vl_rnd_slr"))
        
    print(f"Saindo da função vrf_Rnd_Slr_BnfDF tempo de excução: {time.time() - start_time} ")
    return vrf_Rnd_Slr_BnfDF

## Modulos

## ANCSF246

### MontaConsultasDB

In [None]:
%%spark

import pyspark.sql.functions as F

def MontaQuery(key):

    if key == 12:
            
        query_psqC12 = """
        (
        ) as query_psqC12
        """
    
        query_psqC12 = spark.read.format("jdbc")\
                                 .option("URL",dicionarioDeConstantes['URL']) \
                                 .option("driver",dicionarioDeConstantes['JCN']) \
                                 .option("dbtable",query_psqC12) \
                                 .option("user",dicionarioDeConstantes['DB_USER']) \
                                 .option("password",dicionarioDeConstantes['DB_PASSWORD']) \
                                 .option('KeepAliveTimeout','10') \
                                 .load()
        
        return query_psqC12

### RotinaPrincipal

In [None]:
%%spark
import pyspark.sql.functions as F
import time

def rotina_Principal(df):
        '''
        Acionada pela função PesquisarEmpregador e tem por objetivo chamar as rotinas que pesquisam os critérios e as metodologias.
        
        Arguments:
            df {DataFrame} -- dataframe criado a partir da carga do arquivo contendo os registros dos clientes
                              que terao o ANC calculado.
        Keyword Arguments:
            None
        Returns:
            {DataFrame}
        See:
            funcao acionada: 
                def MontaQuery
                def pesquisarCriterioPend
                def pesquisaMetodologia
            chamada pela funcao:
                def pesquisaEpgrDfrt30Pc
        '''

        #1 Verifica as condições para criação da cod das query
        #2 Realiza a montagem das query e a execução no banco de dados retornando um dataframe spark atraves da função MONTAQUERY 
        #3 IN_MONTAQUERY: Construção dos parametros para as funções de construções e execução das query no banco de dados já formatadas conforme função
        #4 MONTAQUERY: Refere-se a função MONTAQUERY onde a mesma formata determinadas query e executa as mesmas no banco de dados o resultado é transformado em dataframe spark 
        #5 EXPR_IN_PESQUISA_CRITERIO_1_9: Refere-se ao indicador para execução da query com chave 1 mas executando a query de chave 9 
        
        
        # constantes
        
        start_time = time.time()
        w=Window.orderBy(lit(1))
            
        keep_non_zero_values_udf = udf(keep_non_zero_values, ArrayType(IntegerType()))
        

        # inicialização 
        
        cod_query = 12
            
        QueryMontadaDF = MontaQuery(cod_query).checkpoint()

        rotina_PrincipalDF = df.withColumn("cd_epgr_cli_aux",keep_non_zero_values_udf(col("cd_epgr_cli")))

        df_aux = rotina_PrincipalDF.select('cd_cli','cd_epgr_cli_aux')

        df_aux = df_aux.alias("df_principal").join(QueryMontadaDF.alias("QueryMontadaDF"),F.expr("array_contains(df_principal.cd_epgr_cli_aux, QueryMontadaDF.CD_CLI_EPGR)"),"left")

        df_aux = df_aux.drop('cd_epgr_cli_aux').fillna(0, subset=["CD_CLI_EPGR", "PC_REN_BRTO_APLD", "PC_REN_LQDO_APLD"])

        df_aux = df_aux.groupby('cd_cli').agg(F.collect_list('CD_CLI_EPGR').alias('gda_mci_epgr'),\
                                              F.collect_list('PC_REN_BRTO_APLD').alias('gda_pc_rend_brt'),\
                                              F.collect_list('PC_REN_LQDO_APLD').alias('gda_pc_rend_lqd')\
                                              )

        rotina_PrincipalDF = rotina_PrincipalDF.drop('gda_mci_epgr','gda_pc_rend_brt','gda_pc_rend_lqd')

        rotina_PrincipalDF = rotina_PrincipalDF.join(df_aux, ['cd_cli'], 'left')

        print(f"Sainda da função rotina_PrincipalDF tempo de execução {time.time() - start_time} ")

        return rotina_PrincipalDF.cache()

## ANCSF159

### BuscaMCI

In [None]:
%%spark

import pyspark.sql.functions as F
import time
import os 

def buscaMCI(df):
    '''
        Classe responsavel por carregar os MCIs do INSS, BB, PREVIDENCIA e Exercito nos ambientes de modelagem, desenvolvimento e produção.

        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def processar
            funcoes acionadas:
                None
                
    '''
    # 1 - Define o codigo para cada modalidade
    # 2 - Verifica o ambiente atual 
    # 3 - Conforme o ambiente é inserido um valor especifico na coluna correspondente 
    
    # Constante
    
    ambiente = os.environ["AMBIENTE"]
    start_time = time.time()

    
    # Inicialização 
    
    
    buscaMCIDF = df.withColumn("cd_id_inss",     lit(1))\
                       .withColumn("cd_id_bb",       lit(2))\
                       .withColumn("cd_id_previ",    lit(3))\
                       .withColumn("cd_id_exercito", lit(4))\
                       .select("*",
                     (F.when(lit(ambiente == "DESENVOLVIMENTO"),lit(103824079))\
                       .when(lit(ambiente == "HOMOLOGAÇÃO"),lit(208961759))\
                       .otherwise(lit(103824079))\
                     ).alias("cd_mci_inss"),
                     (F.when(lit(ambiente == "DESENVOLVIMENTO"),lit(0))\
                       .when(lit(ambiente == "HOMOLOGAÇÃO"),lit(0))\
                       .otherwise(lit(0))\
                     ).alias("cd_cvn_inss"),
                     (F.when(lit(ambiente == "DESENVOLVIMENTO"),lit(903485186))\
                       .when(lit(ambiente == "HOMOLOGAÇÃO"),lit(130965148))\
                       .otherwise(lit(903485186))\
                     ).alias("cd_mci_bb"),
                     (F.when(lit(ambiente == "DESENVOLVIMENTO"),lit(0))\
                       .when(lit(ambiente == "HOMOLOGAÇÃO"),lit(0))\
                       .otherwise(lit(0))\
                     ).alias("cd_cvn_bb"),
                     (F.when(lit(ambiente == "DESENVOLVIMENTO"),lit(602907665))\
                       .when(lit(ambiente == "HOMOLOGAÇÃO"),lit(178998134))\
                       .otherwise(lit(100186582))\
                     ).alias("cd_mci_previ"),
                    (F.when(lit(ambiente == "DESENVOLVIMENTO"),lit(0))\
                       .when(lit(ambiente == "HOMOLOGAÇÃO"),lit(0))\
                       .otherwise(lit(0))\
                     ).alias("cd_cvn_previ"),
                     (F.when(lit(ambiente == "DESENVOLVIMENTO"),lit(690001155))\
                       .when(lit(ambiente == "HOMOLOGAÇÃO"),lit(248940924))\
                       .otherwise(lit(105907575))\
                     ).alias("cd_mci_exercito"),  
                    (F.when(lit(ambiente == "DESENVOLVIMENTO"),lit(0))\
                       .when(lit(ambiente == "HOMOLOGAÇÃO"),lit(296598))\
                       .otherwise(lit(296598))\
                      ).alias("cd_cvn_exercito")
                    )
        
        
    print(f"Sainda da função buscaMCI tempo de execução {time.time() - start_time} ")
 
    return buscaMCIDF



### PesquisaEmpregadoDiferenciado30

In [None]:
%%spark

import pyspark.sql.functions as F
import time

def pesquisaEpgrDfrt30Pc(df):
    '''
        Pesquisar o empregador com os métodos pesquisarEpgr e pesquisarEpgrDfrt30Pc. O método pesquisarEpgr procede a pesquisa do empregador da Natureza 18 e 20. O método pesquisarEpgrDfrt30Pc 
        recupera a lista de empregadores cadastrados que permitem contratar cadastro emitentes cheque sem fundo com percentual diferenciado.

        
        Parametros:
        
        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            cd_ntz = codigo natureza do calculo
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def ancsf159
                def CalculoRenda
                def SelecionaRenda
            funcoes acionadas:
                def rotina_principal
                
    '''
    # 1 - Cria determinadas colunas inserindo valores padrões nas mesmas 
    # 2 - Aciona def rotina_principal
    # 3 - Com o dataframe retornado realiza tratamento e cria a coluna GDA_MCI_EPGR 
    

    # expressoes condicionais utulizadas nas contas conforme condicao
    
    start_time = time.time()

    
    EXPR_IN_EPGR_DFRT_30 = expr(" in_epgr_dfrt_30_pc_crg = 'S' ")


    # inicialização 
    
    pesquisaEpgrDfrt30PcDF = df.withColumn("cd_funcao", lit(1))\
                               .withColumn("cd_mtdl_e", lit(15))\
                               .withColumn("cd_rgr_e",  lit(225))\
                               .withColumn("in_ret_bxd",lit("N"))\
                               .withColumn("cd_crit_e", lit(0))\
                               .withColumn("gda_nr_crit_rech", lit(0))\
                               .withColumn("gda_seql_crit_rech", lit(0))\
                               .withColumn("qtd_reg", lit(""))\
                               .withColumn("in_rotina_principal", lit(True))\
        
    print("Entrando na funcao rotina_Principal")
        
    pesquisaEpgrDfrt30PcDF = rotina_Principal(pesquisaEpgrDfrt30PcDF)
    

    print(f"Sainda da função pesquisaEpgrDfrt30Pc tempo de execução {time.time() - start_time} ")

    return pesquisaEpgrDfrt30PcDF



### CalculaPrestacaoFinal

In [None]:
%%spark

import pyspark.sql.functions as F


def calculaPstcFinal(df):
    '''
        Classe responsavel por carregar os MCIs do INSS, BB, PREVIDENCIA e Exercito
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def processar
            funcoes acionadas:
                None
                
    '''
    
    # 1 - Verifica as expressoes matematica e logicas
    # 2 - Executa os tratamentos 
    
    start_time = time.time()
    
    # constantes
    
    FATOR_MULTIPLICADOR = 0.30
    
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    EXPR_VL_PSTC_FUNC_CLCD = expr(" vl_pstc_funci_clcd > vl_lim_ref_055 ")
    EXPR_VL_PSTC_DEMAIS = expr(" vl_pstc_demais > vl_lim_ref_055 ")
    EXPR_VL_PSTC_TTL_ECF = expr(" vl_pstc_ttl_ecf_clcd > vl_lim_ref_055 ")
    EXPR_WAUX_PSTC_SLIM_171 = expr(" waux_pstc_slim_171 > vl_lim_ref_055 ")
    EXPR_VL_PSTC_BB_ORG = expr(" in_cli_anot_348 = 'S' and in_cli_adpc_clireab <> 1 ")
    
    # expressoes matematicas utulizadas nas contas
    
    CALC_VL_PSTC_FUNC_CLCD =  expr(f" cast(gda_vl_rnd_funci_seld * {FATOR_MULTIPLICADOR} * vl_cfct_055 as int) ")
    CALC_VL_PSTC_FUNC_APVD = expr(" cast(vl_pstc_funci_clcd  * vl_ftr_ajst_pstc_cgn_flh as int) ")
    CALC_VL_PSTC_DEMAIS = expr(" cast(gda_vl_pstc_ecf_seld  * vl_cfct_055 as int) ")
    CALC_VL_PSTC_TTL_ECF = expr(" cast(vl_pstc_ttl_ecf_clcd * vl_ftr_ajst_pstc_cgn_flh as int) ")
    CALC_VL_PSTC_CRT_APVD = expr(" cast(waux_pstc_slim_171 * vl_ftr_ajst_pstc_cgn_flh as int) ")
    
    # Inicialização 

    calculaPstcFinalDF = df.withColumn("vl_pstc_funci_clcd",CALC_VL_PSTC_FUNC_CLCD)\
    
    calculaPstcFinalDF = calculaPstcFinalDF.withColumn("vl_pstc_funci_clcd",F.when(EXPR_VL_PSTC_FUNC_CLCD,col("vl_lim_ref_055"))\
                                                                             .otherwise(col("vl_pstc_funci_clcd"))\
                                                      )\
    
    calculaPstcFinalDF = calculaPstcFinalDF.withColumn("vl_pstc_funci_apvd", CALC_VL_PSTC_FUNC_APVD)\
                                           .withColumn("vl_pstc_bb_org", col("vl_pstc_funci_clcd"))\
                                           .withColumn("vl_pstc_demais", CALC_VL_PSTC_DEMAIS)\
       
    calculaPstcFinalDF = calculaPstcFinalDF.withColumn("vl_pstc_demais",F.when(EXPR_VL_PSTC_DEMAIS, col("vl_lim_ref_055"))\
                                                                         .otherwise(col("vl_pstc_demais"))\
                                                      )
    
    calculaPstcFinalDF = calculaPstcFinalDF.withColumn("vl_pstc_demais_org",col("vl_pstc_demais"))\
                                           .withColumn("vl_pstc_ttl_ecf_clcd",col("vl_pstc_demais"))\

    
    calculaPstcFinalDF = calculaPstcFinalDF.withColumn("vl_pstc_ttl_ecf_clcd",F.when(EXPR_VL_PSTC_TTL_ECF,col("vl_lim_ref_055"))\
                                                                               .otherwise(col("vl_pstc_ttl_ecf_clcd"))\
                                                      )\
                                           .withColumn("vl_pstc_ttl_ecf_apvd",CALC_VL_PSTC_TTL_ECF)\
                                           .withColumn("vl_pstc_ttl_ecf_org",col("vl_pstc_ttl_ecf_apvd"))\
    
    
    calculaPstcFinalDF = calculaPstcFinalDF.withColumn("waux_pstc_slim_171",F.when(EXPR_WAUX_PSTC_SLIM_171,col("vl_lim_ref_055"))\
                                                                             .otherwise(col("waux_pstc_slim_171"))\
                                                      )\
                                            .withColumn("vl_pstc_crt_clcd",col("waux_pstc_slim_171").cast(IntegerType()))\
                                            .withColumn("vl_pstc_crt_apvd",CALC_VL_PSTC_CRT_APVD)\
                                                       
    
    calculaPstcFinalDF = calculaPstcFinalDF.withColumn("vl_pstc_funci_clcd",F.when(EXPR_VL_PSTC_BB_ORG,lit(0))\
                                                                             .otherwise(col("vl_pstc_funci_clcd"))\
                                                      )\
                                           .withColumn("vl_pstc_bb_org",F.when(EXPR_VL_PSTC_BB_ORG,lit(0))\
                                                                         .otherwise(col("vl_pstc_bb_org"))\
                                                      )\
    
    calculaPstcFinalDF = calculaPstcFinalDF.withColumn("ic_ajst_cvn_ecf",col("waux_ic_ajst_cvn_ecf_seld"))\
                                           .withColumn("ic_ajst_cvn_crt",col("waux_ic_ajst_cvn_crt_seld"))\
                                           .withColumn("ic_ajst_cvn_sal",col("waux_ic_ajst_cvn_sal_seld"))\
                                           .withColumn("ic_mtp_cvn_ecf",col("waux_ic_mtp_seld_ecf"))\
                                           .withColumn("ic_mtp_cvn_crt",col("waux_ic_mtp_seld_crt"))\
                                           .withColumn("ic_mtp_cvn_sal",col("waux_ic_mtp_seld_sal"))\
                                           .withColumn("vl_max_cvn_ecf",col("waux_vl_max_cvn_ecf_seld"))\
                                           .withColumn("vl_max_cvn_crt",col("waux_vl_max_cvn_crt_seld"))\
                                           .withColumn("vl_max_cvn_sal",col("waux_vl_max_cvn_sal_seld"))\
    
    print(f"Saindo da função calculaPstcFinal tempo de excução: {time.time() - start_time} ")
                                      
    return calculaPstcFinalDF

### SelecionaRendaConvenio

In [None]:
%%spark

import pyspark.sql.functions as F
import time

def selecionarRndCvn(df):
    '''
        Pesquisar os dados do convênio
        
        Parametros:
        
        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def calcularPstcPF
            funcoes acionadas:
                def pesquisaDadosConvenio
                
    '''
    # 1 - Realiza as verificações das condições inseridas nas expressoes logicas
    # 2 - Cria varias colunas com valor zero 
    # 3 - Aciona a função pesquisaDados 
    # 4 - Recebe o retorno do dataframa spark e realiza o tratamento dos dados 
    
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    start_time = time.time()
    
    selecionarRndCvnDF = df.withColumn("gda_vl_pstc_ecf_seld", lit(0))\
                           .withColumn("waux_maior_pstc_ecf", lit(0))\
                           .withColumn("vl_renda_liq_ttl", lit(0))\
                           .withColumn("vl_renda_pres", lit(0))\
                           .withColumn("vl_renda_funci", lit(0))\
                           .withColumn("waux_pstc_slim_171", lit(0))\
                           .withColumn("waux_maior_vl_slim_171_intd", lit(0))\
                           .withColumn("waux_ic_ajst_cvn_ecf_seld", lit(0))\
                           .withColumn("waux_ic_ajst_cvn_crt_seld", lit(0))\
                           .withColumn("waux_ic_ajst_cvn_sal_seld", lit(0))\
                           .withColumn("waux_ic_mtp_seld_ecf", lit(0))\
                           .withColumn("waux_ic_mtp_seld_crt", lit(0))\
                           .withColumn("waux_ic_mtp_seld_sal", lit(0))\
                           .withColumn("waux_vl_max_cvn_ecf_seld", lit(0))\
                           .withColumn("waux_vl_max_cvn_crt_seld", lit(0))\
                           .withColumn("waux_vl_max_cvn_sal_seld", lit(0))\
    
    EXPR_SEL_RND_CVN = expr(' gda_epgr <> 0 and gda_vl_pstc_ecf_ini > 0 ')
    EXPR_IN_ENGAJADO = expr(' in_engajado = "S" ')
    EXPR_IN_RECRUTA = expr(' in_recruta = "S" ')
    EXPR_WAUX_MTP_CVN = expr(" ic_mtp_cvn > 0 ")
    EXPR_WUAX_VL_SLIM_171 = expr(" gda_vl_ren_lqda_clcd > gda_vl_ren_lqda_pres ")
    EXPR_WUAX_83 = expr(" gda_vl_ren_sal > 0 ")
    EXPR_WUAX_83_FUNCI = expr(" gda_vl_ren_funci > 0 ")
    EXPR_GDA_EPGR = expr(" gda_epgr <> 0 ")
    EXPR_JOIN_137 = expr(' gda_epgr <> 0 and gda_vl_pstc_ecf_ini > 0 and gda_epgr = nr_cli_cvn and cd_slim_crd = 137')
    EXPR_JOIN_171 = expr(' gda_epgr <> 0 and gda_vl_pstc_ecf_ini > 0 and gda_epgr = nr_cli_cvn and cd_slim_crd = 171')
    EXPR_WAUX_IC_MTP = expr(' waux_ic_mtp_cvn > 0 ')    
    
    # CALCULOS MATEMATICOS
    
    CALC_GDA_VL_PSTC_ECF = expr(" gda_vl_pstc_ecf_ini * ic_red_pstc ")
    CALC_WUAX_VL_SLIM_171 = expr(" gda_vl_ren_lqda_clcd * ic_ajst_cvn * ic_mtp_cvn ")
    CALC_WAUX_VL_SLIM_INTD = expr(" cast(gda_vl_ren_lqda_pres * ic_ajst_cvn * ic_mtp_cvn as int) ")
    CALC_WAUX_PSTC_SLIM_171 = expr(" cast(waux_rnd_utzd_clc_slim_171 * ic_ajst_cvn as int) ")
    
    # INICIALIZAÇÃO
    
    # Pesquisa Convenio - 137
    selecionarRndCvnDF = selecionarRndCvnDF.withColumn('posicao', f.expr('ROW_NUMBER () OVER (PARTITION BY cd_cli, gda_epgr ORDER BY gda_epgr desc)'))
            
    lista_cvn = selecionarRndCvnDF.drop_duplicates(['cd_cli']).selectExpr('cd_cli as cd_cli2', 'inline(arrays_zip(nr_cli_cvn, ic_red_pstc, cd_slim_crd, ic_ajst_cvn, ic_mtp_cvn, qt_max_mm_fnto, vl_max_cvn, vl_mnr_taxa_jur))')\
                                              .where('nr_cli_cvn <> 0')\
                  .withColumn('posicao', expr('ROW_NUMBER () OVER (PARTITION BY cd_cli2, nr_cli_cvn, cd_slim_crd  ORDER BY nr_cli_cvn desc)'))\
                  .withColumn('max_posicao', max(col('posicao')).over(Window.partitionBy('cd_cli2', 'nr_cli_cvn', 'cd_slim_crd')) )
    
    selecionarRndCvnDF = selecionarRndCvnDF.drop('nr_cli_cvn', 'ic_red_pstc', 'cd_slim_crd', 'ic_ajst_cvn', 'ic_mtp_cvn', 'qt_max_mm_fnto', 'vl_max_cvn', 'vl_mnr_taxa_jur')\
                                           .join(lista_cvn.where('cd_slim_crd = 137 and posicao = max_posicao'), [(selecionarRndCvnDF.cd_cli == lista_cvn.cd_cli2) & (selecionarRndCvnDF.gda_epgr == lista_cvn.nr_cli_cvn)], 'left').drop('cd_cli2')
    
    selecionarRndCvnDF = selecionarRndCvnDF.na.fill(0)
        
    selecionarRndCvnDF = selecionarRndCvnDF.withColumn("waux_ic_mtp_cvn", F.when(EXPR_SEL_RND_CVN & EXPR_IN_ENGAJADO,col("ic_ajst_cvn"))\
                                                                           .otherwise(col("ic_mtp_cvn"))\
                                                      )\
                                           .withColumn("waux_ic_mtp_seld_ecf", F.when(EXPR_SEL_RND_CVN & EXPR_WAUX_MTP_CVN,col("waux_ic_mtp_cvn"))\
                                                                                .otherwise(col("waux_ic_mtp_seld_ecf"))\
                                                      )\
                                           .withColumn("waux_vl_max_cvn_ecf_seld", F.when(EXPR_SEL_RND_CVN & EXPR_WAUX_MTP_CVN,col("vl_max_cvn"))\
                                                                                    .otherwise(col("waux_vl_max_cvn_ecf_seld"))\
                                                      )\
                                           .withColumn("waux_ic_ajst_cvn_ecf_seld", F.when(EXPR_SEL_RND_CVN & EXPR_WAUX_MTP_CVN,col("ic_ajst_cvn"))\
                                                                                     .otherwise(col("waux_ic_ajst_cvn_ecf_seld"))\
                                                      )\
    
    
    selecionarRndCvnDF = selecionarRndCvnDF.withColumn('maior_ic_mtp_cvn', F.expr('ROW_NUMBER () OVER (PARTITION BY cd_cli ORDER BY waux_ic_mtp_seld_ecf desc)'))

    selecionarRndCvnDF = selecionarRndCvnDF.withColumn("gda_vl_pstc_ecf_seld", F.when(EXPR_SEL_RND_CVN & EXPR_WAUX_MTP_CVN,CALC_GDA_VL_PSTC_ECF)\
                                                                                .otherwise(col("gda_vl_pstc_ecf_seld"))\
                                                      )\
                                           .withColumn("vl_renda_liq_ttl", F.when(EXPR_SEL_RND_CVN & EXPR_WAUX_MTP_CVN, col("gda_vl_ren_lqda_clcd"))\
                                                                            .otherwise(lit(0))\
                                                      )\
                                           .withColumn("vl_renda_pres", F.when(EXPR_SEL_RND_CVN & EXPR_WAUX_MTP_CVN, col("gda_vl_ren_lqda_pres"))\
                                                                         .otherwise(lit(0))\
                                                      )\
        
    df_waux_ic_mtp_cvn = selecionarRndCvnDF.where('waux_ic_mtp_cvn <> 0').groupby('cd_cli').agg(F.sum('gda_vl_pstc_ecf_seld').alias('gda_vl_pstc_ecf_seld'),\
                                                                                            F.sum('vl_renda_liq_ttl').alias('vl_renda_liq_ttl'),\
                                                                                            F.sum('vl_renda_pres').alias('vl_renda_pres'))
    
        
    # Pesquisa Convenio - 171
            
    selecionarRndCvnDF = selecionarRndCvnDF.drop('nr_cli_cvn', 'ic_red_pstc', 'cd_slim_crd', 'ic_ajst_cvn', 'ic_mtp_cvn', 'qt_max_mm_fnto', 'vl_max_cvn', 'vl_mnr_taxa_jur')\
                                           .join(lista_cvn.where('cd_slim_crd = 171 and posicao = max_posicao'), [(selecionarRndCvnDF.cd_cli == lista_cvn.cd_cli2) & (selecionarRndCvnDF.gda_epgr == lista_cvn.nr_cli_cvn)], 'left').drop('cd_cli2')
    
    selecionarRndCvnDF = selecionarRndCvnDF.na.fill(0)
    
    selecionarRndCvnDF = selecionarRndCvnDF.withColumn("waux_vl_slim_171_intd", F.when(EXPR_SEL_RND_CVN & EXPR_WAUX_MTP_CVN & EXPR_WUAX_VL_SLIM_171, CALC_WUAX_VL_SLIM_171)\
                                                                                 .otherwise(CALC_WAUX_VL_SLIM_INTD)\
                                                      )\
                                           .withColumn("waux_rnd_utzd_clc_slim_171", F.when(EXPR_SEL_RND_CVN & EXPR_WAUX_MTP_CVN & EXPR_WUAX_VL_SLIM_171,col("gda_vl_ren_lqda_clcd"))\
                                                                                      .otherwise(col("gda_vl_ren_lqda_pres"))\
                                                      )\

    selecionarRndCvnDF = selecionarRndCvnDF.withColumn('maior_vl_slim_171_intd', F.expr('ROW_NUMBER () OVER (PARTITION BY cd_cli ORDER BY waux_vl_slim_171_intd desc)'))
                                                
    selecionarRndCvnDF = selecionarRndCvnDF.withColumn("waux_maior_vl_slim_171_intd", F.when(EXPR_SEL_RND_CVN & EXPR_WAUX_MTP_CVN & expr(' maior_vl_slim_171_intd = 1 '), col("waux_vl_slim_171_intd"))\
                                                                               .otherwise(col("waux_maior_vl_slim_171_intd"))\
                                                      )\
                                           .withColumn("waux_pstc_slim_171", F.when(EXPR_SEL_RND_CVN & EXPR_WAUX_MTP_CVN & expr(' maior_vl_slim_171_intd = 1 '), CALC_WAUX_PSTC_SLIM_171)\
                                                                              .otherwise(col("waux_pstc_slim_171"))\
                                                      )\
                                           .withColumn("waux_ic_ajst_cvn_crt_seld", F.when(EXPR_SEL_RND_CVN & EXPR_WAUX_MTP_CVN & expr(' maior_vl_slim_171_intd = 1 '), col("ic_ajst_cvn"))\
                                                                                     .otherwise(col("waux_ic_ajst_cvn_crt_seld"))\
                                                      )\
                                           .withColumn("waux_ic_mtp_seld_crt", F.when(EXPR_SEL_RND_CVN & EXPR_WAUX_MTP_CVN & expr(' maior_vl_slim_171_intd = 1 '), col("ic_mtp_cvn"))\
                                                                                .otherwise(col("waux_ic_mtp_seld_crt"))\
                                                      )\
                                           .withColumn("waux_vl_max_cvn_crt_seld", F.when(EXPR_SEL_RND_CVN & EXPR_WAUX_MTP_CVN & expr(' maior_vl_slim_171_intd = 1 '), col("waux_rnd_utzd_clc_slim_171"))\
                                                                                    .otherwise(col("waux_vl_max_cvn_crt_seld"))\
                                                      )\
    
    # Pesquisa Convenio - 83
    selecionarRndCvnDF = selecionarRndCvnDF.drop('nr_cli_cvn', 'ic_red_pstc', 'cd_slim_crd', 'ic_ajst_cvn', 'ic_mtp_cvn', 'qt_max_mm_fnto', 'vl_max_cvn', 'vl_mnr_taxa_jur')\
                                           .join(lista_cvn.where('cd_slim_crd = 83 and posicao = max_posicao'), [(selecionarRndCvnDF.cd_cli == lista_cvn.cd_cli2) & (selecionarRndCvnDF.gda_epgr == lista_cvn.nr_cli_cvn)], 'left').drop('cd_cli2')
    
    selecionarRndCvnDF = selecionarRndCvnDF.na.fill(0)
    
    selecionarRndCvnDF = selecionarRndCvnDF.withColumn("waux_ic_ajst_cvn", F.when(EXPR_GDA_EPGR & EXPR_WUAX_83 & EXPR_IN_RECRUTA,col("ic_ajst_rrt"))\
                                                                            .when(EXPR_GDA_EPGR & EXPR_WUAX_83 & ~EXPR_IN_RECRUTA,col("ic_ajst_cvn"))\
                                                                            .otherwise(lit(0))\
                                                      )\
                                           .withColumn("waux_ic_mtp_cvn", F.when(EXPR_GDA_EPGR & EXPR_WUAX_83 & EXPR_IN_RECRUTA, col("ic_mtp_rrt"))\
                                                                           .when(EXPR_GDA_EPGR & EXPR_WUAX_83 & ~EXPR_IN_RECRUTA, col("ic_mtp_cvn"))\
                                                                           .otherwise(lit(0))\
                                                      )\
                                           .withColumn("waux_qt_max_mm_fnto", F.when(EXPR_GDA_EPGR & EXPR_WUAX_83 & EXPR_IN_RECRUTA,col("qt_max_mm_fnto_rrt"))\
                                                                               .when(EXPR_GDA_EPGR & EXPR_WUAX_83 & ~EXPR_IN_RECRUTA,col("qt_max_mm_fnto"))\
                                                                               .otherwise(lit(0))\
                                                      )\
                                           .withColumn("waux_vl_max_cvn", F.when(EXPR_GDA_EPGR & EXPR_WUAX_83 & EXPR_IN_RECRUTA,col("vl_max_cvn_rrt"))\
                                                                           .when(EXPR_GDA_EPGR & EXPR_WUAX_83 & ~EXPR_IN_RECRUTA,col("vl_max_cvn"))\
                                                                           .otherwise(lit(0))\
                                                      )\
                                           .withColumn("waux_vl_mnr_taxa_jur", F.when(EXPR_GDA_EPGR & EXPR_WUAX_83 & EXPR_IN_RECRUTA,col("vl_mnr_taxa_jur_rrt"))\
                                                                                .when(EXPR_GDA_EPGR & EXPR_WUAX_83 & ~EXPR_IN_RECRUTA,col("vl_mnr_taxa_jur"))\
                                                                                .otherwise(lit(0))\
                                                      )\
                                           .withColumn("waux_ic_red_pstc", F.when(EXPR_GDA_EPGR & EXPR_WUAX_83 & EXPR_IN_RECRUTA,col("ic_red_rrt"))\
                                                                            .when(EXPR_GDA_EPGR & EXPR_WUAX_83 & ~EXPR_IN_RECRUTA,col("ic_red_pstc"))\
                                                                            .otherwise(lit(0))\
                                                      )\
    
    selecionarRndCvnDF = selecionarRndCvnDF.withColumn("waux_ic_mtp_cvn", F.when(EXPR_GDA_EPGR & EXPR_WUAX_83 & EXPR_IN_ENGAJADO,col("ic_ajst_cvn")\
                                                                                )\
                                                                           .otherwise(col("waux_ic_mtp_cvn"))\
                                                      )\
    
    selecionarRndCvnDF = selecionarRndCvnDF.withColumn("waux_ic_mtp_seld_sal", F.when(EXPR_GDA_EPGR & EXPR_WUAX_83 & EXPR_WAUX_IC_MTP, max(col("waux_ic_mtp_cvn")).over(Window.partitionBy('cd_cli')))\
                                                                                .when(EXPR_GDA_EPGR & EXPR_WUAX_83 &  ~EXPR_WAUX_IC_MTP, col('vl_mtp_clc_slim_83'))
                                                                                .otherwise(col("waux_ic_mtp_seld_sal"))\
                                                      )\
                                           .withColumn("waux_vl_max_cvn_sal_seld", F.when(EXPR_GDA_EPGR & EXPR_WUAX_83 &  EXPR_WAUX_IC_MTP, col("waux_vl_max_cvn")\
                                                      )\
                                                                                    .otherwise(col("waux_vl_max_cvn_sal_seld"))\
                                                      )\
                                           .withColumn("waux_ic_ajst_cvn_sal_seld", F.when(EXPR_GDA_EPGR & EXPR_WUAX_83 &  EXPR_WAUX_IC_MTP, col("waux_ic_ajst_cvn")\
                                                      )\
                                                                                     .otherwise(col("waux_ic_ajst_cvn_sal_seld"))\
                                                      )\
                                                      
    selecionarRndCvnDF = selecionarRndCvnDF.withColumn("waux_ic_mtp_cvn", F.when(EXPR_GDA_EPGR & EXPR_WUAX_83_FUNCI,col("gda_vl_rnd_funci_seld") + col("gda_vl_ren_funci")\
                                                                                )\
                                                                           .otherwise(col("waux_ic_mtp_cvn"))\
                                                      )
    
    # Agrupando Retorno
    selecionarRndCvnDF.cache()
    df_clientes = selecionarRndCvnDF.select('cd_cli').distinct()
    
    df_maior_mtp_cvn = selecionarRndCvnDF.where('maior_ic_mtp_cvn = 1').groupby('cd_cli').agg(F.last('waux_ic_mtp_seld_ecf').alias('waux_ic_mtp_seld_ecf'),\
                                                                                              F.last('waux_vl_max_cvn_ecf_seld').alias('waux_vl_max_cvn_ecf_seld'),\
                                                                                              F.last('waux_ic_ajst_cvn_ecf_seld').alias('waux_ic_ajst_cvn_ecf_seld'))

    df_maior_slim_171 = selecionarRndCvnDF.where('maior_vl_slim_171_intd = 1').groupby('cd_cli').agg(F.last('waux_pstc_slim_171').alias('waux_pstc_slim_171'),\
                                                                                                     F.last('waux_ic_ajst_cvn_crt_seld').alias('waux_ic_ajst_cvn_crt_seld'),\
                                                                                                     F.last('waux_ic_mtp_seld_crt').alias('waux_ic_mtp_seld_crt'),\
                                                                                                     F.last('waux_vl_max_cvn_crt_seld').alias('waux_vl_max_cvn_crt_seld'))

    df_maior_slim_83 = selecionarRndCvnDF.groupby('cd_cli').agg(F.max('waux_vl_max_cvn_sal_seld').alias('waux_vl_max_cvn_sal_seld'),\
                                                                F.max('waux_ic_ajst_cvn_sal_seld').alias('waux_ic_ajst_cvn_sal_seld'))

    df_last_mtp_seld_sal = selecionarRndCvnDF.where('gda_epgr <> 0 and gda_vl_ren_sal > 0').groupby('cd_cli').agg(F.max('waux_ic_mtp_seld_sal').alias('waux_ic_mtp_seld_sal'))


    df_final = df_clientes.join(df_waux_ic_mtp_cvn, ['cd_cli'], 'left')
    df_final = df_final.join(df_maior_mtp_cvn, ['cd_cli'], 'left')
    df_final = df_final.join(df_maior_slim_171, ['cd_cli'], 'left')
    df_final = df_final.join(df_maior_slim_83, ['cd_cli'], 'left')
    df_final = df_final.join(df_last_mtp_seld_sal, ['cd_cli'], 'left')
    
    df_final.cache()
    selecionarRndCvnDF.unpersist()
    print(f"Saindo da função selecionarRndCvn tempo de excução: {time.time() - start_time} ")
                                                         
    return df_final

### CalculaPrestacaoSegmento13

In [None]:
%%spark

import pyspark.sql.functions as F

def calculaPstcSgm13(df):
    '''
        Captura os dados do convênio de INSS, calcular a prestação do cadastro emitentes cheque sem fundo e capturar dados de convênio
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def processar
            funcoes acionadas:
                def PesquisarDados
                
    '''
    
    # 1 - Realiza as verificações das condições inseridas nas expressoes logicas 
    # 2 - Executa os tratamento necessarios 
    
    
    
    # constantes
    
    FATOR_MULTIPLICADOR = 0.30
    PARAMETRO = calculaPstcSgm13DF.select("cd_mci_inss")
    
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    EXPR_VL_PSTC_TTL_ECF = expr(" vl_pstc_ttl_ecf_clcd > lim_ref_055 ")
    
    # expressoes matematicas utulizadas nas contas
    
    CALC_VL_PSTC_TTL_ECF  =  expr(f" vl_bnf * {FATOR_MULTIPLICADOR} * cfct_033 * waux_ic_red_pstc ")
    
    
    
    # Inicialização 

    calculaPstcSgm13DF = df.withColumn("in_pesquisa_dados_convenio", lit(True))
    
    calculaPstcSgm13DF = pesquisaDadosConvenio(calculaPstcSgm13DF,0,137)
    
    calculaPstcSgm13DF = calculaPstcSgm13DF.withColumn("vl_pstc_ttl_ecf_clcd",CALC_VL_PSTC_TTL_ECF)\
                                             .withColumn("vl_pstc_ttl_ecf_clcd", F.when(EXPR_VL_PSTC_TTL_ECF, col("lim_ref_055")))\
                                             .withColumn("vl_pstc_ttl_ecf_org", col("vl_pstc_ttl_ecf_clcd"))\
                                             .withColumn("vl_pstc_ttl_ecf_apvd", col("vl_pstc_ttl_ecf_clcd"))\
                                             .withColumn("ic_ajst_cvn_ecf", col("ic_ajst_cvn"))\
                                             .withColumn("ic_mtp_cvn_ecf", col("waux_ic_mtp_cvn"))\
                                             .withColumn("vl_max_cvn_ecf", col("waux_vl_max_cvn"))\
                                             .withColumn("in_pesquisa_dados_convenio",lit(True))
                     
    
    calculaPstcSgm13DF = pesquisaDadosConvenio(calculaPstcSgm13DF,0,171)
    
    
    calculaPstcSgm13DF = calculaPstcSgm13DF.withColumn("ic_ajst_cvn_crt", col("waux_ic_ajst_cvn"))\
                                             .withColumn("ic_mtp_cvn_crt", col("waux_ic_mtp_cvn"))\
                                             .withColumn("vl_max_cvn_crt", col("waux_vl_max_cvn"))

    
    return calculaPstcSgm13DF



### CalculaRendaLiquidaPresumida

In [1]:
%%spark

def calculaRndLiqPres(df):

    EXPR_MENOR_2_ANOS_RGR_REND = expr(' dt_rend_epgr_cli > waux_dta_a2anos or cd_rgr_rend_lqdo in ("C", "T") ')
    EXPR_NTZ_ECF = expr(' cd_ntz not in (2, 3, 7, 10, 11, 12, 14, 16, 17, 19) ')
    EXPR_RENDA_1 = expr(' cd_rgr_rend_lqdo = "C" and cd_ntz not in (2, 3, 7, 10, 11, 12, 14, 16, 17) and in_fun == 0 ')
    EXPR_RENDA_2 = expr(' in_fun == 1 ')

    colunas = [c for c in df.columns if c not in {'cd_epgr_cli', 'cd_ocp_epgr_cli', 'cd_ntz', 'dt_inc_scdr', 'dt_rend_epgr_cli', 'cd_rgr_rend_lqdo', 'vl_rend_epgr_cli', 'vl_rend_lqdo', 'gda_epgr', 'gda_index','gda_vl_ren_sal', 'in_recruta', 'in_engajado'}]
    
    df_aux = df.selectExpr(*colunas, 'inline(arrays_zip(cd_epgr_cli, cd_ocp_epgr_cli, cd_ntz, dt_inc_scdr, dt_rend_epgr_cli, cd_rgr_rend_lqdo, vl_rend_epgr_cli, vl_rend_lqdo, gda_epgr, gda_index, gda_vl_ren_sal, in_recruta, in_engajado))').where('cd_ocp_epgr_cli <> 0')

    df_aux = df_aux.withColumn('dt_rend_epgr_cli', F.to_date('dt_rend_epgr_cli', 'dd.MM.yyyy'))
    
    df_aux = df_aux.na.fill(0)
    
    df_aux = df_aux.select('*',
                             (f.when(EXPR_MENOR_2_ANOS_RGR_REND & EXPR_NTZ_ECF, lit(True))\
                               .otherwise(lit(False))).alias('in_calcula_rnd_ecf'),\
                             (f.when(EXPR_MENOR_2_ANOS_RGR_REND & EXPR_RENDA_1, lit(True))\
                               .otherwise(lit(False))).alias('in_seleciona_renda_1'),\
                             (f.when(EXPR_MENOR_2_ANOS_RGR_REND & EXPR_RENDA_2, lit(True))\
                               .otherwise(lit(False))).alias('in_seleciona_renda_2'),\
                             )
    
    df_aux = calculaRndEcf(df_aux).cache()
    
    df_aux = selecionaRenda(df_aux)
    
    df_aux = df_aux.checkpoint()
    
    df_final = selecionarRndCvn(df_aux)
    
    dfcalculaRndLiqPres = df.drop("waux_ic_mtp_cvn","waux_ic_mtp_seld_ecf","waux_vl_max_cvn_ecf_seld","waux_ic_ajst_cvn_ecf_seld",\
                                  "gda_vl_pstc_ecf_seld","vl_renda_liq_ttl","vl_renda_pres","waux_pstc_slim_171","waux_ic_ajst_cvn_crt_seld",\
                                  "waux_ic_mtp_seld_crt","waux_vl_max_cvn_crt_seld","waux_ic_ajst_cvn","waux_qt_max_mm_fnto","waux_vl_max_cvn",\
                                  "waux_vl_mnr_taxa_jur","waux_ic_mtp_seld_sal","waux_vl_max_cvn_sal_seld","waux_ic_ajst_cvn_sal_seld")\
                            .join(df_final, ['cd_cli'], 'left')
        
    # alteração devido erro de type com gda_mci_epgr #######################################################################################
    
    dfcalculaRndLiqPres = dfcalculaRndLiqPres.withColumn("cd_epgr_cli", col("cd_epgr_cli").cast("array<int>"))
    
    dfcalculaRndLiqPres = dfcalculaRndLiqPres.na.fill(0)
        
    return dfcalculaRndLiqPres

UsageError: Cell magic `%%spark` not found.


### CalculaRendaECF

In [None]:
%%spark

def calculaRndEcf(df):
    
    EXPR_IN_CALCULA_RND_ECF = expr(" in_calcula_rnd_ecf ")
    
    # A lógica do prequisa empregador está nessa expressão
    EXPR_CARREGA_RENDA_VALIDA = expr(" (cd_ntz = 18 and array_contains(cd_epgr_rgm_espl, cd_epgr_cli) ) OR (cd_ntz = 20 and array_contains(cd_epgr_cgn_flh, cd_epgr_cli) ) OR (cd_ntz NOT IN (18,20)) ")
    
    df = df.select('*',
                      (f.when(EXPR_IN_CALCULA_RND_ECF & EXPR_CARREGA_RENDA_VALIDA, lit(True))\
                       .otherwise(lit(False))).alias('in_carrega_renda')\
                  )
    
    df = calculaRndPres(df)
    df = carregaRendaValida(df)
    
    return df

### CalculaRendaPresumida

In [None]:
%%spark

def calculaRndPres(df):
    
    EXPR_IN_CARREGA_RENDA = expr(" in_carrega_renda ")
    EXPR_RND_PRES_FX_1 = expr(' vl_rend_epgr_cli >= 0 and vl_rend_epgr_cli <= 1900.00 ')
    EXPR_RND_PRES_FX_2 = expr(' vl_rend_epgr_cli > 1900.01 and vl_rend_epgr_cli <= 2800.01 ')
    EXPR_RND_PRES_FX_3 = expr(' vl_rend_epgr_cli > 2800.01 and vl_rend_epgr_cli <= 4600.01 ')
    EXPR_RND_PRES_FX_4 = expr(' vl_rend_epgr_cli > 4600.01 ')
    
    CALC_VL_RND_PRES = expr(' vl_rend_epgr_cli * gda_pc_calc_rnd_pres ')


    df = df.withColumn('gda_pc_calc_rnd_pres', F.when(EXPR_IN_CARREGA_RENDA & EXPR_RND_PRES_FX_1, lit(0.92))\
                                                    .when(EXPR_IN_CARREGA_RENDA & EXPR_RND_PRES_FX_2, lit(0.88))\
                                                    .when(EXPR_IN_CARREGA_RENDA & EXPR_RND_PRES_FX_3, lit(0.82))\
                                                    .when(EXPR_IN_CARREGA_RENDA & EXPR_RND_PRES_FX_4, lit(0.77))\
                                                    .otherwise(lit(0)))\
           .withColumn('waux_vl_rnd_pres', f.when(EXPR_IN_CARREGA_RENDA, CALC_VL_RND_PRES)\
                                            .otherwise(lit(0))\
                      )
    
    return df

### Carrega Renda Valida

In [None]:
%%spark

def carregaRendaValida(df):
    
    EXPR_IN_CARREGA_RENDA = expr(" in_carrega_renda ")
    EXPR_IN_CVN_DIF = expr(' in_cvn_dif_30_pc = "S" ')
    EXPR_RND_CLC = expr(' cd_rgr_rend_lqdo = "S" ')
    EXPR_GDA_EPGR = expr(' (gda_epgr = 0) OR (gda_epgr = cd_epgr_cli) ')
    EXPR_SGM_14_16 = expr(' cd_sgm_anl in (14,16) ')
    EXPR_GDA_VL_PSTC_BRT = expr(' waux_rnd_brta_x_030 >= waux_rnd_pres_x_030 and waux_rnd_brta_x_030 >= waux_rnd_lqd_x_045')
    EXPR_GDA_VL_PSTC_PRES = expr(' waux_rnd_pres_x_030 >= waux_rnd_brta_x_030 and waux_rnd_pres_x_030 >= waux_rnd_lqd_x_045')
    EXPR_GDA_VL_PSTC_LQD = expr(' waux_rnd_lqd_x_045 >= waux_rnd_pres_x_030 and waux_rnd_lqd_x_045 >= waux_rnd_brta_x_030')
    EXPR_NTZ_19 = expr(' cd_ntz = 19 ')
    EXPR_NTZ_20 = expr(' cd_ntz = 20 ')
    
    CALC_RND_PRES = expr(' gda_vl_ren_lqda_pres *  gda_pc_rend_brt')
    CALC_RND_PRES_30 = expr(' gda_vl_ren_lqda_pres * 0.3 ')
    CALC_RND_BRTA = expr(' gda_vl_ren_lqda_clcd *  gda_pc_rend_lqd')
    CALC_RND_BRTA_45 = expr(' gda_vl_ren_lqda_clcd *  0.45')
    CALC_RND_BRTA_30 = expr(' vl_rend_epgr_cli * 0.3 ')
    
    df_pc_dif = df.selectExpr('inline(arrays_zip(gda_mci_epgr, gda_pc_rend_brt, gda_pc_rend_lqd))')
    
    df_pc_dif = df_pc_dif.drop_duplicates(['gda_mci_epgr'])
    
    df = df.drop('gda_mci_epgr', 'gda_pc_rend_brt', 'gda_pc_rend_lqd').join(df_pc_dif, [df.cd_epgr_cli == df_pc_dif.gda_mci_epgr], 'left')
    
    df = df.withColumn('in_cvn_dif_30_pc', f.when(EXPR_IN_CARREGA_RENDA & expr('gda_mci_epgr is not null'), lit('S'))\
                                        .otherwise(lit('N'))\
                  )\
        
    df = df.withColumn('gda_epgr', F.when(EXPR_IN_CARREGA_RENDA & EXPR_GDA_EPGR, col('cd_epgr_cli'))\
                                    .otherwise(lit(0))\
                      )\
           .withColumn('gda_vl_ren_lqda_pres', F.when(EXPR_IN_CARREGA_RENDA & EXPR_GDA_EPGR, col('waux_vl_rnd_pres'))\
                                                .otherwise(lit(0))\
                      )\
           .withColumn('gda_vl_ren_brta', F.when(EXPR_IN_CARREGA_RENDA & EXPR_GDA_EPGR, col('vl_rend_epgr_cli'))\
                                           .otherwise(lit(0))\
                      )\
           .withColumn('gda_vl_ren_lqda_clcd', F.when(EXPR_IN_CARREGA_RENDA & EXPR_GDA_EPGR & EXPR_RND_CLC, lit(0))\
                                                .when(EXPR_IN_CARREGA_RENDA & EXPR_GDA_EPGR & ~EXPR_RND_CLC, col('vl_rend_lqdo'))\
                                                .otherwise(lit(0)))
    
    df = df.withColumn('waux_rnd_pres_x_030', F.when(EXPR_IN_CARREGA_RENDA & EXPR_IN_CVN_DIF, CALC_RND_PRES)\
                                               .when(EXPR_IN_CARREGA_RENDA & ~EXPR_IN_CVN_DIF, CALC_RND_PRES_30)\
                                               .otherwise(lit(0)))\
           .withColumn('waux_rnd_lqd_x_045', F.when( (EXPR_IN_CARREGA_RENDA & EXPR_IN_CVN_DIF) & ~EXPR_SGM_14_16, CALC_RND_BRTA)\
                                              .when( (EXPR_IN_CARREGA_RENDA & ~EXPR_IN_CVN_DIF) & ~EXPR_SGM_14_16, CALC_RND_BRTA_45)\
                                              .otherwise(lit(0)))
    
    # Para Aposentados do INSS avaliar também 30% da Renda Bruta caso este seja ENTRANTE ("tem renda líquida presumida")
    
    df = df.withColumn('waux_rnd_brta_x_030', CALC_RND_BRTA_30)
                               
    df = df.withColumn('gda_vl_pstc_ecf_ini', F.when(EXPR_IN_CARREGA_RENDA & EXPR_GDA_VL_PSTC_BRT, col('waux_rnd_brta_x_030'))\
                                               .when(EXPR_IN_CARREGA_RENDA & EXPR_GDA_VL_PSTC_PRES, col('waux_rnd_pres_x_030'))\
                                               .when(EXPR_IN_CARREGA_RENDA & EXPR_GDA_VL_PSTC_LQD, col('waux_rnd_lqd_x_045'))\
                                               .otherwise(lit(0))\
                      )\
    
    df = df.withColumn('in_recruta', F.when(EXPR_IN_CARREGA_RENDA & EXPR_NTZ_19, lit('S')\
                                           )\
                                      .otherwise(lit(col('in_recruta')))\
                      )\
           .withColumn('in_engajado', F.when(EXPR_IN_CARREGA_RENDA & EXPR_NTZ_20, lit('S'))\
                                       .otherwise(lit(col('in_engajado')))\
                      )
    
    return df

### Seleciona Renda

In [None]:
%%spark

def selecionaRenda(df):
    
    EXPR_SELECIONA_RENDA_1 = expr(' in_seleciona_renda_1 ')
    EXPR_SELECIONA_RENDA_2 = expr(' in_seleciona_renda_2 ')
    EXPR_INSERIR_RENDA = expr(" (cd_ntz in (19, 20) and array_contains(cd_epgr_engaj, cd_epgr_cli) ) OR (cd_ntz not in (19, 20)) ")
    
       
    df = df.withColumn('in_inserir_renda', f.when(EXPR_SELECIONA_RENDA_1 & EXPR_INSERIR_RENDA, lit(True))\
                                            .otherwise(lit(False))\
                      )

    df = inserir_renda(df)
        
    return df

### Inserir Renda

In [None]:
%%spark

def inserir_renda(df):
    
    EXPR_INSERIR_RENDA = expr(' in_inserir_renda ')
    EXPR_SELECIONA_RENDA_1 = expr('in_seleciona_renda_1')
    EXPR_VL_REN_SAL = expr(' gda_epgr = 0 or gda_epgr = cd_epgr_cli ')
    EXPR_VL_REN_FUN = expr(' gda_epgr in (cd_mci_previ, cd_mci_bb)  ')
    EXPR_POSICAO_0 = expr(' not in_inserir_renda and posicao = 0 ')
    EXPR_NTZ_19 = max(when(col('cd_ntz') == 19, lit(True))).over(Window.partitionBy('cd_cli'))
    EXPR_NTZ_20 = max(when(col('cd_ntz') == 20, lit(True))).over(Window.partitionBy('cd_cli'))
    
    w = Window.partitionBy('cd_cli')
    
    EXPR_INSERIR_RENDA_LAG = f.lag('in_inserir_renda').over(w.orderBy('gda_index'))
    
    df = df.withColumn('in_inserir_renda_2', max(col("in_inserir_renda").cast("int")).over(w) > 0)
    
    df = df.withColumn('gda_vl_ren_sal', f.when((EXPR_INSERIR_RENDA | EXPR_INSERIR_RENDA_LAG) & EXPR_VL_REN_SAL, col('vl_rend_lqdo'))\
                                          .otherwise(col('gda_vl_ren_sal')))\
           .withColumn('gda_epgr', f.when(EXPR_INSERIR_RENDA & EXPR_VL_REN_SAL, col('cd_epgr_cli'))\
                                    .otherwise(col('gda_epgr')))\
           .withColumn('gda_vl_ren_funci', f.when(EXPR_INSERIR_RENDA & EXPR_VL_REN_FUN, col('vl_rend_lqdo'))\
                                    .otherwise(lit(0)))\
           .withColumn('in_recruta', f.when(expr('in_inserir_renda_2') & EXPR_NTZ_19, lit('S'))\
                                      .otherwise(col('in_recruta')))\
           .withColumn('in_engajado', f.when(expr('in_inserir_renda_2') & EXPR_NTZ_20, lit('S'))\
                                       .otherwise(col('in_engajado')))\
    
    df = df.withColumn('gda_vl_ren_sal', f.when(expr('gda_epgr = 0') & expr('in_inserir_renda_2'), col('vl_rend_lqdo'))\
                                    .otherwise(col('gda_vl_ren_sal')))\
           .withColumn('gda_epgr', f.when(expr('gda_epgr = 0') & expr('in_inserir_renda_2'), col('cd_epgr_cli'))\
                                    .otherwise(col('gda_epgr')))\

    
    return df

### VerificaZeraPrestacao

In [None]:
%%spark

import pyspark.sql.functions as F


def verifZeraPrestacao(df):
    '''
        Por objetivo zerar a prestação para clientes com idade de 86 anos completos ou mais.

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
    
                                             
        Keyword Arguments:
            Tipo = maneira como será realizado o zeramento dos clientes 
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def calcularPstcPF
            funcoes acionadas:
                none
                
    '''
    
    # 1 - Verifica as condicoes
    # 2 - Zera todas as colunas
  
    # expressao logica
    
    EXPR_IDADE = expr("qt_idd > 85")
    EXPR_RND_LQDA = expr("vl_rend_lqdo_ttl < 20000")
    
    start_time = time.time()
    
    # Inicialização 
     
    verifZeraPrestacaoDF = df.withColumn("vl_pstc_funci_clcd",   F.when(EXPR_IDADE,lit(0))\
                                                                  .otherwise(col("vl_pstc_funci_clcd"))\
                                        )\
                             .withColumn("vl_pstc_funci_apvd",   F.when(EXPR_IDADE,lit(0))\
                                                                  .otherwise(col("vl_pstc_funci_clcd"))\
                                        )\
                             .withColumn("vl_pstc_ttl_ecf_clcd", F.when(EXPR_RND_LQDA & EXPR_IDADE,lit(0))\
                                                                  .otherwise(col("vl_pstc_ttl_ecf_clcd"))\
                                        )\
                             .withColumn("vl_pstc_ttl_ecf_apvd", F.when(EXPR_RND_LQDA & EXPR_IDADE,lit(0))\
                                                                  .otherwise(col("vl_pstc_ttl_ecf_apvd"))\
                                        )\
                             .withColumn("vl_pstc_demais",       F.when(EXPR_RND_LQDA & EXPR_IDADE,lit(0))\
                                                                  .otherwise(col("vl_pstc_demais"))\
                                        )\
                             .withColumn("vl_pstc_crt_apvd",     F.when(EXPR_RND_LQDA & EXPR_IDADE,lit(0))\
                                                                  .otherwise(col("vl_pstc_crt_apvd"))\
                                        )\
                             .withColumn("vl_pstc_crt_clcd",     F.when(EXPR_RND_LQDA & EXPR_IDADE,lit(0))\
                                                                  .otherwise(col("vl_pstc_crt_clcd"))\
                                        )\
    
    
    print(f"Saindo da função verifZeraPrestacao tempo de excução: {time.time() - start_time} ")

    return verifZeraPrestacaoDF

### CalculaPrestacaoPessoaFisica

In [None]:
%%spark

import pyspark.sql.functions as F
import time 

def calcularPstcPF(df):
    '''
        E tem por objetivo calcular as prestações de crédito consignado como os métodos calculaPstcFinal, calculaPstcSgm13 e calcularPstcPF. Os métodos calculaPstcFinal e calcularPstcPF 
        calculam as prestações de crédito consignado, enquanto o método calculaPstcSgm13 captura os dados do convênio de INSS, calcular a prestação do cadastro emitentes cheque sem fundo e capturar 
        dados de convênio
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def processar
            funcoes acionadas:
                def calculaRndLiqPres
                def selecionarRndCvn
                def calculaPstcFinal
                def verifZeraPrestacao
                
    '''
    
    # 1 - Cria coluna de data retirando dois anos da data atual
    # 2 - Aciona as funções necessarias para realização dos calculos do modulo
    
    
    # Inicialização 
    
    start_time = time.time()

    df = df.withColumn("waux_dta_a2anos",F.current_timestamp())
    df = df.withColumn("waux_dta_a2anos",date_format(df["waux_dta_a2anos"], "yyyy-MM-dd"))
    df = df.withColumn("waux_dta_a2anos",F.add_months(df["waux_dta_a2anos"],-24))
    
    
    print("Entrando na funcao calculaRndLiqPres")
    
    calcularPstcPFDF = calculaRndLiqPres(df).cache()
          
    print("Entrando na funcao calculaPstcFinal")
    
    calcularPstcPFDF = calculaPstcFinal(calcularPstcPFDF)
    
    print("Entrando na funcao verifZeraPrestacao")
    
    calcularPstcPFDF = verifZeraPrestacao(calcularPstcPFDF)
    
    print(f"Saindo da função calcularPstcPFDF tempo de excução: {time.time() - start_time} ")

    return calcularPstcPFDF

### inicializa_Vrv_Trab

In [None]:
%%spark

import pyspark.sql.functions as F
import time

def inicializa_Vrv_Trab(df):
    '''
        E tem por objetivo calcular as prestações de crédito consignado como os métodos calculaPstcFinal, calculaPstcSgm13 e calcularPstcPF. Os métodos calculaPstcFinal e calcularPstcPF 
        calculam as prestações de crédito consignado, enquanto o método calculaPstcSgm13 captura os dados do convênio de INSS, calcular a prestação do cadastro emitentes cheque sem fundo e capturar 
        dados de convênio
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def processar
            funcoes acionadas:
                None
    '''
    
    # 1 - Aciona as funções necessarias para realização dos calculos do modulo
    
    start_time = time.time()

    
    
    # Inicialização 
    
    inicializa_Vrv_TrabDF = df.withColumn("gda_epgr", F.array([lit(0).cast(IntegerType()) for i in range(0,50)]))\
                              .withColumn("gda_index", F.array([lit(i).cast(IntegerType()) for i in range(0,50)]))\
                              .withColumn("gda_vl_ren_brta", F.array([F.lit(0).cast('decimal(17,2)') for i in range(0,50)]))\
                              .withColumn("gda_vl_ren_lqda_clcd", F.array([F.lit(0).cast('decimal(17,2)') for i in range(0,50)]))\
                              .withColumn("gda_vl_ren_lqda_pres", F.array([lit(0) for i in range(0,50)]))\
                              .withColumn("gda_vl_ren_ecf", F.array([lit(0) for i in range(0,50)]))\
                              .withColumn("gda_vl_ren_sal", F.array([lit(0) for i in range(0,50)]))\
                              .withColumn("gda_vl_ren_funci", F.array([lit(0) for i in range(0,50)]))\
                              .withColumn("gda_vl_pstc_ecf_ini", F.array([lit(0) for i in range(0,50)]))\
                              .withColumn("gda_vl_pstc_funci", F.array([lit(0) for i in range(0,50)]))\
                              .withColumn("in_engajado", F.array([lit(' ') for i in range(0,50)]))\
                              .withColumn("in_recruta", F.array([lit(' ') for i in range(0,50)]))\
                              .withColumn("gda_mci_epgr", F.array())\
                              .withColumn("gda_pc_rend_brt", F.array())\
                              .withColumn("gda_pc_rend_lqd", F.array())\
                              .withColumn("gda_nr_seql_crit", F.array())\
                              .withColumn("cd_zero",lit(0))\
                              .withColumn("IniColString",lit("N"))\
                              .withColumn("in_epgr_dfrt_30_pc_crg",lit(""))\
                              .select("*",
                                     col("IniColString").alias("ind_zera_pstc_ecf"),
                                     col("cd_zero").alias("waux_mci_epgr"),
                                     col("cd_zero").alias("waux_cli_pj_cvnd"),
                                     col("cd_zero").alias("waux_ic_ajst_cvn"),
                                     col("cd_zero").alias("waux_ic_mtp_cvn"),
                                     col("cd_zero").alias("waux_qt_max_mm_fnto"),
                                     col("cd_zero").alias("waux_vl_max_cvn"),
                                     col("cd_zero").alias("waux_vl_mnr_taxa_jur"),
                                     col("cd_zero").alias("waux_cd_slim_cvn"),
                                     col("cd_zero").alias("waux_nr_cli_cvn"),
                                     col("cd_zero").alias("waux_vl_rnd_pres"),
                                     col("cd_zero").alias("waux_maior_pstc_ecf"),
                                     col("cd_zero").alias("waux_rnd_pres_x_030"),
                                     col("cd_zero").alias("waux_rnd_lqd_x_045"),
                                     col("cd_zero").alias("waux_dta_ini_ocp"),
                                     col("cd_zero").alias("gda_vl_rnd_funci_seld"),
                                     col("cd_zero").alias("gda_vl_pstc_ecf_seld"),
                                     col("cd_zero").alias("waux_ic_mtp_seld_sal"),
                                     col("cd_zero").alias("waux_ic_mtp_seld_crt"),
                                     col("cd_zero").alias("waux_ic_mtp_seld_ecf"),
                                     col("cd_zero").alias("waux_vl_max_cvn_ecf_seld"),
                                     col("cd_zero").alias("waux_vl_max_cvn_sal_seld"),
                                     col("cd_zero").alias("waux_vl_max_cvn_crt_seld"),
                                     col("cd_zero").alias("waux_pc_rend_brt"),
                                     col("cd_zero").alias("waux_pc_rend_lqd")
                                     )
    
                                      
    print(f"Sainda da função inicializa_Vrv_Trab tempo de execução {time.time() - start_time} ")

    return inicializa_Vrv_TrabDF

### Processa

In [None]:
%%spark

import pyspark.sql.functions as F
import time

def processar(df):
    '''
        Processa Modulo do ANCSFC159
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def procParcelado
            funcoes acionadas:
                def buscarMCI
                def pesquisarEpgrDfrt30Pc
                def calcularPstcPF
                
    '''
    # 1 - Funcao orquestradora do modulo ANCSF159
    start_time = time.time()
    
    # Inicialização 
    
    print("Entrando na funcao inicializa_Vrv_Trab")
        
    processarANCSF159DF = inicializa_Vrv_Trab(df) 
    
    print("Entrando na funcao buscaMCI")
    
    processarANCSF159DF = buscaMCI(processarANCSF159DF)
    
    print("Entrando na funcao pesquisaEpgrDfrt30Pc")
    
    processarANCSF159DF = pesquisaEpgrDfrt30Pc(processarANCSF159DF).checkpoint()
    
    print("Entrando na funcao calcularPstcPF")
    
    processarANCSF159DF = calcularPstcPF(processarANCSF159DF)
    
    print(f"Saindo da função processar tempo de excução: {time.time() - start_time} ")
    
    return processarANCSF159DF

## Funções Orquestradoras

### ANCS0160

In [None]:
%%spark

import pyspark.sql.functions as F
import time


def ancs0160(df):
    '''
        O valor de prestação de crédito aprovado e calculado é procedido de acordo com o código segmento análise que denota o cálculo da prestação proventista, cálculo da
        prestação correntista, cálculo da prestação proventista e cálculo da prestação INSS        
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado

                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def ProcessaPrestacao
            funcoes acionadas:
                def aplicaRegraPstcProven
                def aplicaRegraPstcCorren
                def aplicaRegraPstcParc
                def aplicaRegraPstcInss
                
    '''
    
    # constante 
    
    start_time = time.time()

    
    TAXA_MULTIPLICACAO_40 = 0.40
    TAXA_MULTIPLICACAO_80 = 0.80

    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    EXPR_IN_ANOT = expr(" cd_status_cli <> 2 and cd_status_cli = 1 and cd_tip_bnf < 4 and in_anot = 4 ")
    EXPR_VL_PSTC_CDC = expr(" cd_status_cli <> 2 and (cd_status_cli <> 1 or cd_tip_bnf >= 4) ")
    EXPR_IN_PSTC_CORREN = expr(" cd_sgm_anl in (9, 19, 21, 25, 30, 31, 39, 55) ")
    EXPR_IN_PSTC_PARC = expr(" cd_sgm_anl in (14, 16) ")
    EXPR_IN_PSTC_INSS = expr(" cd_sgm_anl = 13 ")
    EXPR_IN_ANCS0160 = expr(" in_ancs0160 ")

    
    # expressoes matematicas utulizadas nas contas conforme condicao 
    
    CALC_WGDA_VL_PC40 = expr(f" vl_ren_prtz_teto * {TAXA_MULTIPLICACAO_40} ")
    CALC_WGDA_VL_PC80 = expr(f" vl_ren_prtz_teto * {TAXA_MULTIPLICACAO_80} ")

    # Inicialização 
    ancs0160DF = df.withColumn("wgda_teto_pstc_cdc", F.when(EXPR_IN_ANCS0160, lit(0))\
                                                      .otherwise(col("wgda_teto_pstc_cdc"))\
                              )\
                   .withColumn("in_anot", F.when(EXPR_IN_ANCS0160 & EXPR_IN_ANOT, lit(0))\
                                           .otherwise(col("in_anot"))\
                              )\
                   .withColumn("vl_pstc_cdc_clcd", F.when(EXPR_IN_ANCS0160 & EXPR_VL_PSTC_CDC, lit(0))\
                                                    .otherwise(col("vl_pstc_cdc_clcd"))\
                              )\
                   .withColumn("vl_pstc_cdc_apvd", F.when(EXPR_IN_ANCS0160 & EXPR_VL_PSTC_CDC, lit(0))\
                                                    .otherwise(col("vl_pstc_cdc_apvd"))\
                              )\
    
    ancs0160DF = ancs0160DF.withColumn("wgda_vl_pc40_rnd_parm_teto", F.when(EXPR_IN_ANCS0160 & ~EXPR_VL_PSTC_CDC,CALC_WGDA_VL_PC40 )\
                                                                      .otherwise(col("wgda_vl_pc80_rnd_parm_teto"))\
                                      )\
                           .withColumn("wgda_vl_pc80_rnd_parm_teto", F.when(EXPR_IN_ANCS0160 & ~EXPR_VL_PSTC_CDC,CALC_WGDA_VL_PC80 )\
                                                                      .otherwise(col("wgda_vl_pc80_rnd_parm_teto"))\
                                      )
    
    
    ancs0160DF = ancs0160DF.withColumn("in_regra_pstc_corren", F.when(EXPR_IN_ANCS0160 & ~EXPR_VL_PSTC_CDC,lit(True)).otherwise(lit(False)))
    
    
    print("Entrando na funcao aplicaRegraPstcProven")
    
    ancs0160DF = aplicaRegraPstcProven(ancs0160DF)
    
    # No escopo da Metodologia 53, os clientes podem pertencer somente ao cd_sgm_anl = 55
    # As demais regras se aplicam somente para as demais metodologias

    ancs0160DF = ancs0160DF.withColumn("in_regra_pstc_corren", F.when(EXPR_IN_ANCS0160 & ~EXPR_VL_PSTC_CDC & EXPR_IN_PSTC_CORREN,lit(True)).otherwise(lit(False)))
    
    print("Entrando na funcao aplicaRegraPstcCorren")

    ancs0160DF = aplicaRegraPstcCorren(ancs0160DF)
    

    print(f"Saindo da função ancs0160 tempo de excução: {time.time() - start_time} ")


    return ancs0160DF

### ProcessaPrestacao

In [None]:
%%spark

import pyspark.sql.functions as F
import time


def ProcessaPrestacao(df):
    '''
        Calcular o valor da prestação crédito aprovado 
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def ProcessaParcelado
            funcoes acionadas:
                def ancs0160
                
    '''
    
    # 1 - Chamada a partir da ProcessaParcelado
    # 3 - Aciona a funcao ancs0160
    # 2 - Executada a construcao da coluna vl_pstc_cdc_apvd
    
    start_time = time.time()

    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    EXPR_IN_ANL_PMT = expr(" in_anl_pmt = 'S' ")
    EXPR_IN_BNF_ASTL = expr(" in_bnf_astl = 'S' and in_pgt = 0 ")
    EXPR_IN_PROCPRESTACAO = expr(" in_procPrestacao ")

    # Inicialização 
    
    ProcessaPrestacaoDF = df.withColumn("in_ancs0160", F.when(EXPR_IN_PROCPRESTACAO, lit(True)).otherwise(lit(False)))
    
    print("Entrando na função ancs0160")
    
    ProcessaPrestacaoDF = ancs0160(ProcessaPrestacaoDF)
    
    ProcessaPrestacaoDF = ProcessaPrestacaoDF.withColumn("vl_pstc_cdc_apvd", F.when(EXPR_IN_PROCPRESTACAO & EXPR_IN_ANL_PMT, col("vl_pstc_cdc_clcd"))\
                                                                              .otherwise(col("vl_pstc_cdc_apvd"))\
                                                        )\
                                             .withColumn("vl_pstc_cdc_apvd", F.when(EXPR_IN_PROCPRESTACAO & EXPR_IN_BNF_ASTL, lit(0))\
                                                                              .otherwise(col("vl_pstc_cdc_apvd"))\
                                                        )
    
     # INCLUIR REGRA DO ANCKCCF1-IN-AUX-EMER AQUI. FALTA INCLUIR A VARIAVEL DE RECONSIDERAÇÃO FUTURAMENTE.
     # OBS: NO COBOL ELE ATRIBUI ZERO DUAS VEZES PARA O CDC-APVD
    
    
    print(f"Saindo da função ProcessaPrestacao tempo de excução: {time.time() - start_time} ")
    
    return ProcessaPrestacaoDF

### ProcessaParcelado

In [None]:
%%spark

import pyspark.sql.functions as F


def ProcessaParcelado(df):
    '''
        Executa as rotinas procPrestacao.py e as que realizam o cálculo (calculoAgrup.py) e fazem  as regras (regraPstcEcfFunci.py
        verifGTFraude.py, regrasPstc.py, trataSLim.py, regraCPM.py) dos produtos do parcelado, na qual, retorna o dicionário de dados        
        
        Parametros:

        Arguments:
            df {DataFrame} -- Dataframe contendo os dados dos clientes que terao o ANC calculado
                              See: 
                                   def obtemDadosClientes
                                             
        Keyword Arguments:
            None
            
        Returns:
            dataframe
            
        See:
            chamada pela funcao:
                def preditor
            funcoes acionadas:
                def calculoAgrup
                def regraPstcEcfFunci
                def verifGTFraude
                def regrasPstc
                def trataSLim
                def regraCPM
                def procprestacao
    '''
    
    # 1 - Funcao orquestradora de diversas outras funcoes
    # 2 - Recebe um dataframe 
    # 3 - Realiza alguns tratamentos iniciais
    # 4 - Aciona as funções 
    
    

    
   # constante
    
    FATOR_MULTIPLICADOR_2SLR = 2
    FATOR_MULTIPLICADOR_10SLR = 10
    
    
    # expressoes condicionais utulizadas nas contas conforme condicao 
    
    EXPR_VL_SLIM_15_APVD = expr(" vl_slim_15_apvd > vl_slim_20_apvd ")
    EXPR_VL_SLIM_15_CLCD = expr(" vl_slim_15_clcd > vl_slim_20_clcd ")
    EXPR_IN_CD_FUNC = expr(" cd_fuc = 1 ")

    
    # expressoes matematicas utulizadas nas contas conforme condicao 
    
    CALC_WAUX_VL_2SLR = expr(f" vl_slr_min * {FATOR_MULTIPLICADOR_2SLR}  ")
    CALC_WAUX_VL_10SLR = expr(f" vl_slr_min * {FATOR_MULTIPLICADOR_10SLR}  ")

    
    # Inicialização 
    
    
    ProcessaParceladoDF = df.withColumn("indicadorFunc",F.when(col("cd_fuc") == 1, lit(True))\
                                                         .otherwise(lit(False))\
                                        )\
                            .withColumn("vl_slim_15_20_apvd",F.when(EXPR_VL_SLIM_15_APVD, col("vl_slim_15_apvd"))\
                                                              .otherwise(col("vl_slim_20_apvd"))\
                                       )\
                            .withColumn("vl_slim_15_20_clcd",F.when(EXPR_VL_SLIM_15_CLCD, col("vl_slim_15_clcd"))\
                                                              .otherwise(col("vl_slim_20_clcd"))\
                                       )\
                            .select("*",
                                      CALC_WAUX_VL_10SLR.alias("waux_vl_10slr_min"),
                                      CALC_WAUX_VL_2SLR.alias("waux_vl_2slr_min"),
                                      col("indicadorFunc").alias("in_procPrestacao"),
                                      col("indicadorFunc").alias("in_RegraPstcEcfFunci"),
                                      col("indicadorFunc").alias("in_RegraPstcVclo"),
                                      col("indicadorFunc").alias("in_SlimClcdCdc"),
                                      col("indicadorFunc").alias("in_SlimApvdCdc"),
                                      col("indicadorFunc").alias("in_calc_gp_vclo"),
                                      col("indicadorFunc").alias("in_calc_gp_rao"),
                                      col("indicadorFunc").alias("in_calc_gp_ecf"),
                                      col("indicadorFunc").alias("in_calc_gp_func"),
                                      col("indicadorFunc").alias("in_calc_gp_rdex"),
                                      col("indicadorFunc").alias("in_calc_gp_cdc"),
                                      col("indicadorFunc").alias("in_calc_bloco"),
                                    )

    
    print("Entrando na função ProcessaPrestacao - Bloco 1")  
    ProcessaParceladoDF = ProcessaPrestacao(ProcessaParceladoDF)

    
    return ProcessaParceladoDF

## Ingestão

In [None]:
%%spark

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *
from pyspark.sql import DataFrame

def procedimentosIniciaisTramentoDados(df: DataFrame):
    '''
        Funcao responsavel por fazer arranjos previos de variaveis que serao utilizadas nos calculas
        
        Arguments:
            df {DataFrame} -- dataframe criado a partir da carga do arquivo contendo os registros dos clientes
                              que terao o ANC calculado.
        Keyword Arguments:
            None
        Returns:
            {DataFrame}
        See:
            funcao acionada: 
                None
            chamada pela funcao:
                def obtemDadosClientes
    '''
    
    #constantes utilizadas no arranjo das variaveis
    PERC_40 = 0.40
    PERC_80 = 0.80
    
    # 1o withColumn - converte o tipo da variavel vl_slim_15_ant de String para Integer
    # 2o withColumn - converte o tipo da variavel vl_slim_20_ant de String para Integer
    # 3o withColumn - cria nova variavel wgda_vl_pc40_rnd_parm_teto
    # 4o withColumn - cria nova variavel wgda_vl_pc80_rnd_parm_teto
    # 5o withColumn - cria nova variavel vl_slim_15_clcd copiando o valor da variavel vl_slim_15_ant
    # 6o withColumn - cria nova variavel vl_slim_15_apvd copiando o valor da variavel vl_slim_15_ant
    # 7o withColumn - cria nova variavel vl_slim_20_clcd copiando o valor da variavel vl_slim_20_ant
    # 8o withColumn - cria nova variavel vl_slim_20_apvd copiando o valor da variavel vl_slim_20_ant
    # 9o withColumn - cria nova variavel vl_slim_53_clcd copiando o valor da variavel vl_slim_53_ant
    # 10o withColumn - cria nova variavel vl_slim_53_apvd copiando o valor da variavel vl_slim_53_ant
    # 11o withColumn - cria nova variavel vl_slim_178_clcd copiando o valor da variavel vl_slim_178_utzd
    # 12o withColumn - cria nova variavel vl_slim_178_apvd copiando o valor da variavel vl_slim_178_utzd
    # 13o withColumn - cria nova variavel ind_pbco_min com o valor "N" para todos os registros do dataframe
    
    
    
    processaRotativoDF = df.withColumn("vl_slim_15_ant", col('vl_slim_15_ant').cast(IntegerType()))\
                           .withColumn("vl_slim_20_ant", col('vl_slim_20_ant').cast(IntegerType()))\
                           .withColumn("wgda_vl_pc40_rnd_parm_teto", col('vl_ren_prtz_teto') * lit(PERC_40))\
                           .withColumn("wgda_vl_pc80_rnd_parm_teto", col('vl_ren_prtz_teto') * lit(PERC_80))\
                           .withColumn("vl_slim_53_clcd", col('vl_slim_53_ant'))\
                           .withColumn("vl_slim_53_apvd", col('vl_slim_53_ant'))\
                           .withColumn("vl_slim_178_clcd", col('vl_slim_53_ant'))\
                           .withColumn("vl_slim_178_apvd", col('vl_slim_53_ant'))\
                 
    processaRotativoDF = processaRotativoDF.withColumn("vl_slim_20_clcd", col('vl_slim_20_clcd').cast(DecimalType(15,2)))\
                                           .withColumn("vl_slim_20_apvd", col('vl_slim_20_apvd').cast(DecimalType(15,2)))\
                                           .withColumn("vl_slim_15_clcd", col('vl_slim_15_clcd').cast(IntegerType()))\
                                           .withColumn("vl_slim_15_apvd", col('vl_slim_15_apvd').cast(IntegerType()))
                      
    return processaRotativoDF