In [3]:
# import os
import sys
import pandas as pd

pd.options.display.float_format = "{:.2f}".format # fomatacao do .describe()
pd.set_option('display.max_columns', None) # mostrar todos os campos no .head()

from pyspark.sql import SparkSession,SQLContext
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

import calendar
import datetime
import datetime as datetime
from datetime import datetime, timedelta, timezone
# from datetime import timedelta

import time
from tqdm import tqdm

try:
    # import small_files
    from small_files.small_files import *
except:
    !pip install small_files
    # import small_files
    from small_files.small_files import *
    
from functools import reduce
from pyspark.sql import DataFrame

from pyspark.sql import Row
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col
from pyspark.sql import SQLContext

In [None]:
!pip install smbprotocol
!pip install PySmbClient
import smbclient   
import base64
import sys
import os

In [None]:
%%time
print('Logando no Spark....')

# compliance: gapl_compliance
# gmc: gapl_gmc
fila = 'gapl_coe'

spark = SparkSession.builder\
    .appName("Case Engenharia de Dados")\
    .enableHiveSupport()\
    .config("spark.executor.memory", "12G")\
    .config("spark.yarn.executor.memoryOverhead", "12G")\
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.dynamicAllocation.initialExecutors", "8")\
    .config("spark.dynamicAllocation.maxExecutors","8")\
    .config("spark.executor.cores", "8")\
    .config("spark.cores.max", "8")\
    .config("spark.driver.memory", "64G")\
    .config("spark.yarn.queue", f"root.{fila}")\
    .config("spark.sql.hive.convertMetastoreParquet","false")\
    .config("spark.sql.catalogImplementation", "hive")\
    .config('spark.sql.parquet.compression.codec', 'snappy')\
    .config('spark.io.compression.snappy.blockSize', '128MB')\
    .config('spark.files.maxPartitionBytes', '128MB')\
    .config("spark.sql.broadcastTimeout", "36000")\
    .config("spark.hadoop.hive.metastore.client.socket.timeout", "900")\
    .config("spark.network.timeout", "8000")\
    .getOrCreate()

print('Login no Spark concluido!')
print(f"printando aplicacao: {spark.sparkContext.applicationId}")

In [None]:
fila = 'galp_gmc'
sand_area = 'sand_min'
tabela_lake_reclamacoes = 'tabela_final_reclamoes'
tabela_lake_bacen = 'tabela_final_bacen'

In [None]:
# informa sua matricula e senha
# Senha de acesso criptografada da rede

input_msg = "Usuário {},\nEntre com tua senha de rede para autenticar-se:".format(os.environ.get('USER'))
usuario = format(os.environ.get('USER'))
# senha = getpass.getpass(input_msg)
senha = base64.b64decode("THVhbmFAMjAyNAo=").decode('utf-8')
# smbclient.ClientConfig(username=f"{usuario}",password=senha)

smbclient.ClientConfig(username=f"{usuario}",password=senha)

### 1.Ingesta dos dados

In [17]:
#Dados de reclamações PROCON

diretorio_base_procon = r"\\mscluster04fs\PROJETO_RDA\CDO_Consistencia\Case Data Master\planilha-de-transparencia-janeiro-a-outubro-2024.xlsx"

smbclient.reset_connection_cache()

base_aux_procon = smbclient.open_file(f"{diretorio_base_procon}", mode="rb")

reclamacoes_df = pd.read_excel(base_aux_procon,sheet_name="RECLAMAÇÕES",engine="openpyxl",skiprows=0,usecols=lambda x:'Unnamed' not in x,index_col=False,na_filter=False)
reclamacoes_df = reclamacoes_df.astype(str)

# lendo tabela raw, sem qualquer ajuste
# df_base_reclamacoes = spark.createDataFrame(base_layout_final)\

In [None]:
#Dados BACEN

diretorio_base_bacen = f"\\mscluster04fs\PROJETO_RDA\CDO_Consistencia\Case Data Master\Bancos+e+financeiras+-+Reclamacoes+e+quantidades+de+clientes+por+instituicao+financeira.csv"

smbclient.reset_connection_cache()

base_final_bacen = smbclient.open_file(f"{diretorio_base_bacen}", mode="rb")

transparencia_df = pd.read_csv(diretorio_base_bacen, sep=";", dtype=str, header=0, keep_default_na=False, encoding="ISO8859-1")
transparencia_df = transparencia_df.astype(str)

### 2. Tratamento de Dados

In [22]:
# 3. Tratamento de Dados
def process_data(reclamacoes_df, transparencia_df):
    print("[INFO] Tratando dados...")
    reclamacoes_df['Índice'] = pd.to_numeric(
        reclamacoes_df['Índice'].str.replace(',', '.'), errors='coerce'
    )
    transparencia_df['DATA DE ENTRADA'] = pd.to_datetime(transparencia_df['DATA DE ENTRADA'], errors='coerce')
    return reclamacoes_df, transparencia_df

### 3. Observabilidade

In [23]:
def log_info(message):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] {message}")

### 4. Segurança de Dados

In [None]:
def secure_data(reclamacoes_df):
    print("[INFO] Aplicando anonimização...")
    reclamacoes_df['Anonimized_CNPJ'] = reclamacoes_df['CNPJ_IF'].apply(lambda x: hash(x))
    return reclamacoes_df

### 5. Exploração e Insights

In [None]:
def generate_insights(reclamacoes_df, transparencia_df):
    print("[INFO] Gerando insights...")
    transparencia_df['Mes'] = transparencia_df['DATA DE ENTRADA'].dt.to_period('M')
    monthly_trend = transparencia_df.groupby('Mes').size()
    top_institutions = reclamacoes_df.nlargest(5, 'Índice')

    plt.figure(figsize=(10, 6))
    plt.bar(top_institutions['Instituição_financeira'], top_institutions['Índice'], color='orange')
    plt.title('Top 5 Instituições Financeiras com Maior Índice de Reclamações', fontsize=14)
    plt.ylabel('Índice de Reclamações', fontsize=12)
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.show()

    plt.figure(figsize=(10, 6))
    monthly_trend.plot(kind='line', marker='o', color='blue')
    plt.title('Tendência Mensal de Reclamações', fontsize=14)
    plt.ylabel('Total de Reclamações', fontsize=12)
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    return top_institutions, monthly_trend


### 6. Armazenamento de Dados

In [None]:
reclamacoes_spark_df = spark.createDataFrame(reclamacoes_df)
transparencia_spark_df = spark.createDataFrame(transparencia_df)

In [None]:
if reclamacoes_spark_df.count() > 0:
    print("Criando tabela {sand_area}.{tabela_lake_reclamacoes}")
    %time reclamacoes_spark_df\
        .whte.format("parquet").mode('append')
        .saveAsTable(f'{sand_area}.{tabela_lake_reclamacoes}')
    print("Tabela {sand_area}.{tabela_lake_reclamacoes} criada!")
else:
    print("Problema ao ingerir a tabela trada do PROCON. Avaliar!")

In [None]:
if reclamacoes_spark_df.count() > 0:
    print("Criando tabela {sand_area}.{tabela_lake_bacen}")
    %time reclamacoes_spark_df\
        .whte.format("parquet").mode('append')
        .saveAsTable(f'{sand_area}.{tabela_lake_bacen}')
    print("Tabela {sand_area}.{tabela_lake_bacen} criada!")
else:
    print("Problema ao ingerir a tabela trada do PROCON. Avaliar!")

In [None]:
# Pipeline Principal
def execute_pipeline():
    log_info("Iniciando pipeline...")
    reclamacoes_df, transparencia_df = load_data()
    reclamacoes_df, transparencia_df = ingest_data(reclamacoes_df, transparencia_df)
    reclamacoes_df, transparencia_df = process_data(reclamacoes_df, transparencia_df)
    reclamacoes_df = secure_data(reclamacoes_df)
    top_institutions, monthly_trend = generate_insights(reclamacoes_df, transparencia_df)
    store_data_spark(reclamacoes_df, transparencia_df)
    log_info("Pipeline concluído com sucesso!")
    return top_institutions, monthly_trend

# Executar pipeline
execute_pipeline()