In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, round, to_date, lit, pow,
    datediff, regexp_replace
)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
import sqlite3
from datetime import datetime
from pyspark.sql.utils import AnalysisException
from py4j.protocol import Py4JJavaError
import pandas as pd

class NotebookMain:
    def __init__(self, input_file, output_dir):
        self.input_file = input_file
        self.output_dir = output_dir
        self.spark = self.create_spark_session()
        self.df = None
        self.separator_in = ';'
        self.separator_out = '|'
        self.debug_data = 'N'

    def create_spark_session(self):
        return SparkSession.builder \
            .appName("Desafio Dev Databricks - Solução Completa") \
            .config("spark.hadoop.io.file.buffer.size", "4096") \
            .config("spark.hadoop.fs.file.impl.disable.cache", "true") \
            .config("spark.hadoop.io.nativeio.nativepath", "false") \
            .getOrCreate()

    def read_data(self):
        try:
            
            self.df = self.spark.read.csv(self.input_file, header=True, inferSchema=True, sep=self.separator_in)
            self.df = self.df.withColumn("DATAINICIOVIGENCIA", to_date(col("DATAINICIOVIGENCIA"), "yyyy-MM-dd")) \
                            .withColumn("DATAFINALVIGENCIA", to_date(col("DATAFINALVIGENCIA"), "yyyy-MM-dd")) \
                            .withColumn("DATAREFERENCIA", to_date(col("DATAREFERENCIA"), "yyyy-MM-dd")) \
                            .withColumn("DHREGISTROTD", to_date(col("DHREGISTROTD"), "yyyy-MM-dd")) \
                            .withColumn("IDRISCO_CARGA_CENTRAL", col("IDRISCO_CARGA_CENTRAL")) \
                            .withColumn("COOPERATIVA", col("COOPERATIVA")) \
                            .withColumn("NRCONTA", col("NRCONTA")) \
                            .withColumn("NRCONTRATO", col("NRCONTRATO")) \
                            .withColumn("TAXAJUROSMES", col("TAXAJUROSMES")) \
                            .withColumn("VALORCONTRATO", col("VALORCONTRATO")) \
                            .withColumn("VALORUTILIZADO", col("VALORUTILIZADO")) \
                            .withColumn("VALORNAOUTILIZADO", col("VALORNAOUTILIZADO")) \
                            .withColumn("DATAREFERENCIA", col("DATAREFERENCIA")) \
                            .withColumn("JUROSSUSPENSO", lit(None).cast("string")) \
                            .withColumn("CODIGOCIF", lit(None).cast("string")) \
                            .withColumn("FL_ATIVO_PROBLEMATICO", lit(None).cast("string")) \
                            .withColumn("SALDOS_JTS_OID", col("SALDOS_JTS_OID"))
            self.insert_data_table('CREDITOGESTAO_LIMITES_CHESPECIAL', self.df)
        
        except FileNotFoundError as fnf_error:
            print(f"Arquivo não encontrado: {fnf_error}")
        except AnalysisException as ae:
            print(f"Erro de análise ao ler dados: {ae}")
        except Py4JJavaError as je:
            print(f"Erro na comunicação com Spark ao ler dados: {je}")
        except Exception as e:
            print(f"Erro inesperado ao ler dados: {e}")
    
    def insert_data_table(self, table, df):
        try:
            
            df_transformed_pandas = df.toPandas()
            conn = sqlite3.connect(":memory:")
            df_transformed_pandas.to_sql(table, conn, if_exists='replace', index=False)
            df_sqlite = pd.read_sql_query(f'SELECT * FROM {table}', conn)

            print(f'data inserted {table}!')

            if self.debug_data == 'S':
                print(df_sqlite.to_string)
        
        except sqlite3.DatabaseError as db:
            print(f"Erro ao conectar com o banco de dados: {db}")
        except Py4JJavaError as p4:
            print(f"Erro na comunicação com Spark ao conectar com o banco de dados: {p4}")
        except Exception as e:
            print(f"Ocorreu um erro ao inserir dados na tabela {table}: {e}")
    
        finally:
            conn.close()

    def transform_data(self):
        try:
            
            new_df = self.df.select(
                col("IDRISCO_CARGA_CENTRAL"),
                col("COOPERATIVA"),
                col("NRCONTA"),
                col("NRCONTRATO"),
                col("TAXAJUROSMES"),
                col("FL_ATIVO_PROBLEMATICO"),
                col("CODIGOCIF"),
                col("JUROSSUSPENSO"),
                col("VALORCONTRATO"),
                col("VALORUTILIZADO"),
                col("VALORNAOUTILIZADO"),
                col("DATAINICIOVIGENCIA"),
                col("DATAFINALVIGENCIA")
            )

            new_df = new_df.withColumn("idrisco_carga_central", col("IDRISCO_CARGA_CENTRAL")) \
                        .withColumn("idCooperativa", col("COOPERATIVA")) \
                        .withColumn("nrdconta", col("NRCONTA")) \
                        .withColumn("nrcontrato", col("NRCONTRATO")) \
                        .withColumn("dsctacos", lit("1612000")) \
                        .withColumn("dsorgrec", lit("0199")) \
                        .withColumn("nrtaxidx", lit(21)) \
                        .withColumn("nrperidx", lit(100)) \
                        .withColumn("nrvarcam", lit(790)) \
                        .withColumn("nrcepcon", self.get_nrcepcon_case_expr()) \
                        .withColumn("nrtaxeft", round(((pow(1 + (col("TAXAJUROSMES") / 100), 12) - 1) * 100), 2)) \
                        .withColumn("dtinictr", col("DATAINICIOVIGENCIA")) \
                        .withColumn("cdnatope", lit(1)) \
                        .withColumn("dscaresp", lit(None).cast("string")) \
                        .withColumn("dtinictr", col("VALORCONTRATO")) \
                        .withColumn("flprejuz", lit(None).cast("string")) \
                        .withColumn("qtdiaatr", lit(None).cast("string")) \
                        .withColumn("tpcartao", lit(None).cast("string")) \
                        .withColumn("qtparcela", lit(None).cast("string")) \
                        .withColumn("nrcontrato_principal", col("NRCONTRATO")) \
                        .withColumn("dtsaida", lit(None).cast("string")) \
                        .withColumn("tpcontrato", lit(1)) \
                        .withColumn("vljuros_suspenso", col("JUROSSUSPENSO")) \
                        .withColumn("cdproduto_contabil", when(col("VALORUTILIZADO") > 0, '104').otherwise('103')) \
                        .withColumn("dtvencimento", col("DATAFINALVIGENCIA")) \
                        .withColumn("dtproxima_parcela", lit(None).cast("string")) \
                        .withColumn("vlproxima_parcela", lit(None).cast("string")) \
                        .withColumn("cdmodalidade", when(col("VALORUTILIZADO") > 0, '0213').otherwise('1902')) \
                        .withColumn("cdrisco_refinanciamento", lit(None).cast("string")) \
                        .withColumn("idsistema_origem", lit(2)) \
                        .withColumn("cdCif", col("CODIGOCIF")) \
                        .withColumn("vlcontabil_bruto",  when(col("VALORUTILIZADO") > 0, col("VALORUTILIZADO")).otherwise(col("VALORUTILIZADO")) - col("JUROSSUSPENSO")) \
                        .withColumn("qtparcela_paga", lit(None).cast("string")) \
                        .withColumn("vlparcela_paga", lit(None).cast("string")) \
                        .withColumn("nrversao_contrato", lit(None).cast("string")) \
                        .withColumn("tprenegociacao", lit(None).cast("string")) \
                        .withColumn("pedesconto_renegociacao", lit(None).cast("string")) \
                        .withColumn("flativo_problematico", col("FL_ATIVO_PROBLEMATICO")) \
                        .withColumn("vlperda_acumulada", lit(None).cast("string"))
            self.df = new_df

        except AnalysisException as ae:
            print(f"Erro de análise ao transformar dados: {ae}")
        except TypeError as te:
            print(f"Erro de tipo ao transformar dados: {te}")
        except Py4JJavaError as p4:
            print(f"Erro na comunicação com Spark ao transformar dados: {p4}")
        except Exception as e:
            print(f"Erro inesperado ao transformar dados: {e}")

    def get_nrcepcon_case_expr(self):
        return when(col("COOPERATIVA") == 1, '89010971') \
               .when(col("COOPERATIVA") == 2, '89201260') \
               .when(col("COOPERATIVA") == 3, '89041110') \
               .when(col("COOPERATIVA") == 5, '88811700') \
               .when(col("COOPERATIVA") == 6, '88034050') \
               .when(col("COOPERATIVA") == 7, '88020020') \
               .when(col("COOPERATIVA") == 8, '88020000') \
               .when(col("COOPERATIVA") == 9, '88075301') \
               .when(col("COOPERATIVA") == 10, '88508190') \
               .when(col("COOPERATIVA") == 11, '88307326') \
               .when(col("COOPERATIVA") == 12, '89270000') \
               .when(col("COOPERATIVA") == 13, '89287440') \
               .when(col("COOPERATIVA") == 14, '85601630') \
               .when(col("COOPERATIVA") == 16, '89140000') \
               .otherwise('89041110')

    def add_sequential_column(self, column):
        window_spec = Window.orderBy("NRCONTRATO")
        self.df = self.df.withColumn(column, row_number().over(window_spec))

    def create_bronze_table(self):
        self.df = self.df.withColumn("VALORUTILIZADO", regexp_replace(col("VALORUTILIZADO"), ",", ".").cast("float")) \
                         .withColumn("VALORNAOUTILIZADO", regexp_replace(col("VALORNAOUTILIZADO"), ",", ".").cast("float"))

    def create_silver_table(self):
        df_limite_utilizado = self.df.filter(col("VALORUTILIZADO") > 0) \
            .withColumn("cdmodalidade", lit("0213")) \
            .withColumn("vlsaldo_limite", col("VALORUTILIZADO"))

        df_limite_nao_utilizado = self.df.filter(col("VALORNAOUTILIZADO") > 0) \
            .withColumn("cdmodalidade", lit("1902")) \
            .withColumn("vlsaldo_limite", col("VALORNAOUTILIZADO"))

        self.df_silver = df_limite_utilizado.unionByName(df_limite_nao_utilizado)
        self.insert_data_table('CREDITOGESTAO_RISCO_CARGA_OPERACAO', self.df_silver)

    def calculate_cdvencimento(self, df_risco_carga_operacao, df_bronze, dtReference):
        df_joined = df_risco_carga_operacao.join(df_bronze, on="NRCONTRATO", how="inner")

        df_joined = df_joined.withColumn("cdvencimento",
            when(col("VALORUTILIZADO") > 0,
                 when(datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) <= 30, 110)
                .when((datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) > 30) & (datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) <= 60), 120)
                .when((datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) > 60) & (datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) <= 90), 130)
                .when((datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) > 90) & (datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) <= 180), 140)
                .when((datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) > 180) & (datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) <= 360), 150)
                .when((datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) > 360) & (datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) <= 720), 160)
                .when((datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) > 720) & (datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) <= 1080), 165)
                .when((datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) > 1080) & (datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) <= 1440), 170)
                .when((datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) > 1440) & (datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) <= 1800), 175)
                .when((datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) > 1800) & (datediff(col("DATAFINALVIGENCIA"), to_date(lit(dtReference))) <= 5400), 180)
                .otherwise(190)
            ).otherwise(
                when(datediff(col("DATAFINALVIGENCIA"), col("DATAINICIOVIGENCIA")) <= 360, 20)
                .otherwise(40)
            )
        )

        return df_joined

    def generate_risk_operation_due(self):
        df_risco_carga_operacao = self.df.select(
            col("NRCONTRATO"),
            col("idrisco_carga_central"),
            col("cdcarga_operacao").alias("risco_cdcarga_operacao")
        )

        df_bronze = self.df.withColumnRenamed("idrisco_carga_central", "bronze_idrisco_carga_central") \
                           .withColumnRenamed("cdcarga_operacao", "bronze_cdcarga_operacao")

        df_vencimento = self.calculate_cdvencimento(df_risco_carga_operacao, df_bronze, "2024-08-31")

        self.df_vencimento = df_vencimento.select(
            col("bronze_idrisco_carga_central").alias("idrisco_carga_central"),
            col("risco_cdcarga_operacao").alias("cdcarga_operacao"),
            col("cdvencimento"),
            when(col("VALORUTILIZADO") > 0, col("VALORUTILIZADO")).otherwise(col("VALORNAOUTILIZADO")).alias("vlvencimento")
        )
        self.insert_data_table('CREDITOGESTAO_RISCO_CARGA_OPERACAO_VENCIMENTO', self.df_vencimento)

    def save_to_csv(self, df, filename):
        try:
            
            df_pandas = df.toPandas()
            df_pandas['DHREGISTRO'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            df_pandas.to_csv(f'{self.output_dir}/{filename}', index=False, sep=self.separator_out)
            print(f'{filename} created!')
        
        except IOError as io:
            print(f"Erro de I/O ao escrever dados: {io}")
        except AnalysisException as ae:
            print(f"Erro de análise ao escrever dados: {ae}")
        except Py4JJavaError as p4:
            print(f"Erro na comunicação com Spark ao escrever dados: {p4}")
        except Exception as e:
            print(f"Erro inesperado ao escrever dados: {e}")

    def run_pipeline(self):
        self.read_data()
        self.transform_data()
        self.add_sequential_column("cdcarga_operacao")
        self.create_bronze_table()
        self.create_silver_table()
        self.generate_risk_operation_due()
        self.save_to_csv(self.df_silver, 'silver-operacao.csv')
        self.save_to_csv(self.df_vencimento, 'silver-vencimentos.csv')
        self.spark.stop()

if __name__ == "__main__":
    input_file = "dados-entrada.csv"
    output_dir = "E:/Repositories/ailosTest/csv_python/pythonDesenv"

    processor = NotebookMain(input_file, output_dir)
    processor.run_pipeline()


data inserted CREDITOGESTAO_LIMITES_CHESPECIAL!
data inserted CREDITOGESTAO_RISCO_CARGA_OPERACAO!
data inserted CREDITOGESTAO_RISCO_CARGA_OPERACAO_VENCIMENTO!
silver-operacao.csv created!
silver-vencimentos.csv created!
