# Notebook de Integração Data Analysis

#### Configuração dos acessos a chave segura, conexões, leitura e escrita.

In [3]:
import pandas as pd
import ipywidgets as widgets
from IPython.display import display
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from sqlalchemy import create_engine

# Configuração do Azure Key Vault
key_vault_name = "SEU_KEY_VAULT_NAME"
key_vault_uri = f"https://{key_vault_name}.vault.azure.net/"
credential = DefaultAzureCredential()
client = SecretClient(vault_url=key_vault_uri, credential=credential)

# Função para recuperar secret do Azure Key Vault
def get_secret(secret_name):
    retrieved_secret = client.get_secret(secret_name)
    return retrieved_secret.value

# Função de leitura de dados do PostgreSQL
def ler_dados_postgresql(senha):
    usuario = get_secret("postgres-usuario")
    host = get_secret("postgres-host")
    db = get_secret("postgres-db")
    tabela = "nome_tabela" 
    url = f"postgresql://{usuario}:{senha}@{host}/{db}"
    engine = create_engine(url)
    df = pd.read_sql_table(tabela, con=engine)
    return df

def ler_dados_lake():
    caminho_arquivo = get_secret("lake-caminho-arquivo")
    df = pd.read_parquet(caminho_arquivo)
    return df

# Widgets para interação e leitura de dados
selecao_origem = widgets.Dropdown(
    options=['PostgreSQL', 'Data Lake'],
    value='PostgreSQL',
    description='Origem:',
    disabled=False,
)

botao_carregar = widgets.Button(description="Carregar Dados")

# Função chamada quando o botão é clicado
def on_carregar_clicked(b):
    origem = selecao_origem.value
    if origem == 'PostgreSQL':
        # Recupera a senha do banco de dados do Azure Key Vault
        senha = get_secret("postgres-senha")
        df = ler_dados_postgresql(senha)
    else:
        df = ler_dados_lake()
    display(df.head())

botao_carregar.on_click(on_carregar_clicked)

# Exibir widgets
display(selecao_origem, botao_carregar)


ModuleNotFoundError: No module named 'ipywidgets'

#### Set do ambiente para recebimento do arquivo de atualização da tabela com Azure Functions e Logic Apps

In [None]:
import azure.functions as func
import json
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient

def main(req: func.HttpRequest) -> func.HttpResponse:
    try:
        # Parâmetros 'fidedignos' de conexão ao Azure Blob Storage
        connect_str = "DefaultEndpointsProtocol=https;AccountName=SEU_ACCOUNT_NAME;AccountKey=SEU_ACCOUNT_KEY;EndpointSuffix=core.windows.net"
        container_name = "SEU_CONTAINER_NAME"
        
        # Inicializa o cliente do Blob Service
        blob_service_client = BlobServiceClient.from_connection_string(connect_str)
        container_client = blob_service_client.get_container_client(container_name)
        
        # Recupera o corpo da requisição
        dados = req.get_json()
        dados_str = json.dumps(dados)
        
        # Nome do blob pode ser uma combinação de ID único ou timestamp
        blob_name = f"dados_coletados_{dados['id']}.json"
        
        # Cria um blob e carrega os dados
        blob_client = container_client.get_blob_client(blob_name)
        blob_client.upload_blob(dados_str, overwrite=True)
        
        return func.HttpResponse(f"Dados armazenados com sucesso no blob: {blob_name}", status_code=200)
    except Exception as e:
        return func.HttpResponse(f"Erro ao processar a requisição: {str(e)}", status_code=400)


#### Tratamento dos dados de Entrada

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, to_timestamp

# Inicialização da sessão Spark
spark = SparkSession.builder.appName("Tratamento de Dados Loans e Clients").getOrCreate()

# Configuração do acesso ao Azure Blob Storage (ajuste conforme necessário)
spark.conf.set("fs.azure.account.key.SEU_ACCOUNT_NAME.blob.core.windows.net", "SEU_ACCOUNT_KEY")

# Carregamento dos dados coletados do Azure Blob Storage
dados_df = spark.read.json("dbfs:/mnt/SEU_CAMINHO_PARA_O_BLOB/dados_coletados_*.json")

# Tratamento dos dados para a tabela 'clients'
clients_df = dados_df.select(
    col("user_id").cast("integer"),
    to_timestamp(col("created_at"), "yyyy-MM-dd'T'HH:mm:ss").alias("created_at"),
    col("status"),
    col("batch").cast("integer"),
    col("credit_limit").cast("decimal(10,2)"),
    col("interest_rate").cast("decimal(10,2)")
).distinct()  # Removendo duplicatas, se houver

# Tratamento dos dados para a tabela 'loans'
loans_df = dados_df.select(
    col("user_id").cast("integer"),
    col("loan_id").cast("integer"),
    to_date(col("loan_created_at"), "yyyy-MM-dd").alias("created_at"),
    to_date(col("due_at"), "yyyy-MM-dd").alias("due_at"),
    to_date(col("paid_at"), "yyyy-MM-dd").alias("paid_at"),
    col("status"),
    col("loan_amount").cast("decimal(10,2)"),
    col("tax").cast("decimal(10,2)"),
    col("due_amount").cast("decimal(10,2)"),
    col("amount_paid").cast("decimal(10,2)")
).distinct()  # Removendo duplicatas, se houver

# Salvar os DataFrames tratados em um local temporário ou prosseguir com o upsert diretamente
clients_df.write.format("delta").mode("overwrite").save("/mnt/delta/clients_temp")
loans_df.write.format("delta").mode("overwrite").save("/mnt/delta/loans_temp")

#### Merge dos Dados e Atualização no Lake

In [None]:
from delta.tables import DeltaTable
# Caminhos para as tabelas Delta existentes
path_clients_delta = "/mnt/delta/clients"
path_loans_delta = "/mnt/delta/loans"

# Carregar os DataFrames tratados do diretório temporário
df_clients_tratado = spark.read.format("delta").load("/mnt/delta/clients_temp")
df_loans_tratado = spark.read.format("delta").load("/mnt/delta/loans_temp")

# Realizar upsert na tabela 'clients'
deltaTable_clients = DeltaTable.forPath(spark, path_clients_delta)

(deltaTable_clients.alias("existing")
 .merge(
     df_clients_tratado.alias("updates"),
     "existing.user_id = updates.user_id")
 .whenMatchedUpdateAll()
 .whenNotMatchedInsertAll()
 .execute()
)

# Realizar upsert na tabela 'loans'
deltaTable_loans = DeltaTable.forPath(spark, path_loans_delta)

(deltaTable_loans.alias("existing")
 .merge(
     df_loans_tratado.alias("updates"),
     "existing.loan_id = updates.loan_id")
 .whenMatchedUpdateAll()
 .whenNotMatchedInsertAll()
 .execute()
)

ModuleNotFoundError: No module named 'delta'

#### Análise dos Dados consolidados

In [None]:
# Importações necessárias
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Carregamento dos dados
path_clients = '/mnt/data/clients.csv'
path_loans = '/mnt/data/loans.csv'
clients_df = pd.read_csv(path_clients)
loans_df = pd.read_csv(path_loans)

# Análise 1: Distribuição do Limite de Crédito dos Clientes Aprovados
clientes_aprovados = clients_df[clients_df['status'] == 'approved']
plt.figure(figsize=(10, 6))
sns.histplot(clientes_aprovados['credit_limit'], bins=50, kde=True, color='blue')
plt.title('Distribuição do Limite de Crédito dos Clientes Aprovados')
plt.xlabel('Limite de Crédito')
plt.ylabel('Frequência')
plt.grid(True)
plt.show()

# Análise 2: Taxa de Inadimplência Geral
total_emprestimos = len(loans_df)
emprestimos_inadimplentes = len(loans_df[loans_df['status'] == 'default'])
taxa_inadimplencia_geral = emprestimos_inadimplentes / total_emprestimos
print(f"Taxa de Inadimplência Geral: {taxa_inadimplencia_geral:.2%}")

# Análise 3: Total de Empréstimos por Status
total_emprestimos_por_status = loans_df['status'].value_counts().reset_index()
total_emprestimos_por_status.columns = ['status', 'total']
plt.figure(figsize=(10, 6))
sns.barplot(data=total_emprestimos_por_status, x='status', y='total', palette='coolwarm')
plt.title('Total de Empréstimos por Status')
plt.xlabel('Status do Empréstimo')
plt.ylabel('Total de Empréstimos')
plt.grid(axis='y')
plt.show()

# Análise 4: Taxa de Inadimplência por Batch
emprestimos_por_status_batch = loans_df.merge(clients_df[['user_id', 'batch']], on='user_id', how='left')
emprestimos_por_status_batch = emprestimos_por_status_batch.groupby(['batch', 'status']).size().reset_index(name='total')
plt.figure(figsize=(12, 7))
sns.barplot(data=emprestimos_por_status_batch, x='batch', y='total', hue='status', palette='Set2')
plt.title('Total de Empréstimos por Status em Cada Batch')
plt.xlabel('Batch')
plt.ylabel('Total de Empréstimos')
plt.grid(axis='y')
plt.legend(title='Status do Empréstimo')
plt.show()

# Análise 5: Risco Absoluto por Batch
loans_with_batch_info = loans_df.merge(clients_df[['user_id', 'batch']], on='user_id', how='left')
risco_absoluto_por_batch = loans_with_batch_info[loans_with_batch_info['status'] == 'default'].groupby('batch')['due_amount'].sum().reset_index()
plt.figure(figsize=(10, 6))
sns.barplot(data=risco_absoluto_por_batch, x='batch', y='due_amount', palette='rocket')
plt.title('Risco Absoluto por Batch')
plt.xlabel('Batch')
plt.ylabel('Soma do Valor Devido por Empréstimos Inadimplentes')
plt.grid(axis='y')
plt.show()


#### Previsão de Inadimplência

In [None]:
# Exemplo de preparação dos dados para a previsão de inadimplência
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Supondo que 'loans_df' já tenha uma coluna 'default_flag' indicando inadimplência (1 para default, 0 para não default)
features = ['credit_limit', 'interest_rate', 'loan_amount', 'tax']  # Exemplo de features
X = loans_df[features]
y = loans_df['default_flag']

# Divisão dos dados em conjuntos de treino e teste
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Normalização dos dados
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)


In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report

# Construção do modelo
model = LogisticRegression()
model.fit(X_train_scaled, y_train)

# Avaliação do modelo
predictions = model.predict(X_test_scaled)
print(classification_report(y_test, predictions))


In [None]:
from sklearn.cluster import KMeans

# Seleção de variáveis para clusterização e normalização
X_cluster = StandardScaler().fit_transform(clients_df[['credit_limit', 'interest_rate']])

# Construção do modelo de clusterização
kmeans = KMeans(n_clusters=3, random_state=42)
clusters = kmeans.fit_predict(X_cluster)

# Adicionando a informação de cluster ao DataFrame original
clients_df['cluster'] = clusters


In [None]:
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler

# Selecionando features para a clusterização
features_segmentacao = clients_df[['credit_limit', 'interest_rate']].values

# Normalização das features
scaler = StandardScaler()
features_segmentacao_scaled = scaler.fit_transform(features_segmentacao)

# Definindo o número de clusters
k = 3

# Realizando a clusterização
kmeans = KMeans(n_clusters=k, random_state=42)
clients_df['cluster'] = kmeans.fit_predict(features_segmentacao_scaled)

# Visualização da Segmentação de Clientes
plt.figure(figsize=(10, 6))
sns.scatterplot(data=clients_df, x='credit_limit', y='interest_rate', hue='cluster', palette='Set1')
plt.title('Segmentação de Clientes por Limite de Crédito e Taxa de Juros')
plt.xlabel('Limite de Crédito')
plt.ylabel('Taxa de Juros')
plt.legend(title='Cluster')
plt.grid(True)
plt.show()

#### Adição e Alteração de usuário

In [None]:
import pandas as pd
import sqlalchemy

# Conexão com o banco de dados
engine = sqlalchemy.create_engine('postgresql://user:password@host:port/database')

def adicionar_usuario(engine, novo_usuario):
    novo_usuario_df = pd.DataFrame([novo_usuario])
    novo_usuario_df.to_sql('clients', con=engine, if_exists='append', index=False)

def alterar_usuario(engine, user_id, atualizacoes):
    with engine.connect() as con:
        for coluna, valor in atualizacoes.items():
            query = sqlalchemy.text(f"UPDATE clients SET {coluna} = :valor WHERE user_id = :user_id")
            con.execute(query, valor=valor, user_id=user_id)

#### Logging

In [None]:
import logging

# Configurando o logging
logging.basicConfig(filename='log_diario.log', level=logging.INFO, format='%(asctime)s %(message)s')

# Exemplo de função para simular uma tarefa diária
def tarefa_diaria():
    # Simulação de tarefa
    sucesso = True  # Simulação de sucesso da tarefa
    if sucesso:
        logging.info("Tarefa diária executada com sucesso.")
    else:
        logging.error("Falha na execução da tarefa diária.")

tarefa_diaria()
