# **Implementación de Redes Neuronales**

In [1]:
# Preparar espacio de trabajo

# Importaciones
import sys, os
from google.colab import drive
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder

# Montar Google Drive
mount_path = '/content/drive'
if not os.path.ismount(mount_path):
    drive.mount(mount_path)
    print("Google Drive montado.")
else:
    print("Google Drive ya está montado.")

# Crear la estructura de carpetas si no existe, siguiendo las buenas prácticas.
os.makedirs("src", exist_ok=True)       # Código fuente
os.makedirs("notebooks", exist_ok=True) # Cuadernos de Jupyter
os.makedirs("data", exist_ok=True)      # Datos crudos y procesados
os.makedirs("results", exist_ok=True)   # Gráficos y resultados de experimentos
os.makedirs("docs", exist_ok=True)      # Documentación y reportes

Mounted at /content/drive
Google Drive montado.


# **Implementación de Red Neuronal Básica**

In [2]:
## Crear archivo
%%writefile src/neural_network.py

from __future__ import annotations
from typing import List, Sequence, Dict, Tuple, Callable, Optional
import numpy as np
from sklearn.metrics import f1_score, recall_score, precision_score, confusion_matrix, accuracy_score

# Funciones de Activación y sus Derivadas
def sigmoid(z: np.ndarray) -> np.ndarray:
    """Implementación de la función de activación Sigmoid, estable numéricamente."""
    z_clip = np.clip(z, -500, 500)
    return 1.0 / (1.0 + np.exp(-z_clip))

def dsigmoid(a: np.ndarray) -> np.ndarray:
    """Derivada de Sigmoid en términos de la salida 'a'."""
    return a * (1.0 - a)

def relu(z: np.ndarray) -> np.ndarray:
    """Implementación de la función de activación Rectified Linear Unit (ReLU)."""
    return np.maximum(0.0, z)

def drelu(z: np.ndarray, a: Optional[np.ndarray] = None) -> np.ndarray:
    """Derivada de ReLU en función de 'z': 1 si z>0, 0 en caso contrario."""
    g = np.zeros_like(z)
    g[z > 0] = 1.0
    return g

def tanh(z: np.ndarray) -> np.ndarray:
    """Implementación de la función de activación Tanh."""
    return np.tanh(z)

def dtanh(a: np.ndarray) -> np.ndarray:
    """Derivada de Tanh en términos de la salida 'a'."""
    return 1.0 - np.power(a, 2)

# Mapa de Funciones y la Clase NeuralNetwork

# Diccionario para mapear nombres de activación a sus funciones y derivadas.
_ACT_FUNCS: Dict[str, Tuple[Callable[[np.ndarray], np.ndarray], Callable[..., np.ndarray]]] = {
    "sigmoid": (sigmoid, dsigmoid),
    "relu": (relu, drelu),
    "tanh": (tanh, dtanh),
}

# Funciones de inicialización de pesos para evitar el vanishing/exploding gradient.
def _he_std(fan_in: int) -> float:
    """Inicialización He para ReLU: N(0, sqrt(2/fan_in))."""
    return np.sqrt(2.0 / fan_in)

def _xavier_std(fan_in: int) -> float:
    """Inicialización Xavier/Glorot para sigmoid/tanh: N(0, sqrt(1/fan_in))."""
    return 1.0 / np.sqrt(fan_in)

class NeuralNetwork:
    """
    Red neuronal totalmente conectada (MLP) implementada desde cero con NumPy.
    Soporta:
    - Múltiples capas ocultas.
    - Activaciones configurables por capa (Sigmoid, ReLU, Tanh).
    - Entrenamiento por gradiente descendente (full-batch).
    - Pérdida MSE para la regla delta.
    - Métricas de evaluación para clasificación binaria.
    """
    def __init__(self, layers: Sequence[int], activations: Optional[Sequence[str]] = None, seed: Optional[int] = 42):
        assert len(layers) >= 3, "La arquitectura debe tener al menos una capa de entrada, una oculta y una de salida."
        self.layers = list(layers)
        self.L = len(layers) - 1

        if activations is None:
            self.activations = ["relu"] * (self.L - 1) + ["sigmoid"]
        else:
            assert len(activations) == self.L, "El número de activaciones debe ser igual al número de capas con pesos (len(layers) - 1)."
            self.activations = [a.lower() for a in activations]

        for a in self.activations:
            assert a in _ACT_FUNCS, f"Activación desconocida: {a}"

        self.rng = np.random.default_rng(seed)
        self.W: List[np.ndarray] = []
        self.b: List[np.ndarray] = []
        self._init_params()

    def _init_params(self) -> None:
        """
        Inicializa los pesos (W) y sesgos (b) de la red.
        Usa la inicialización de He para ReLU y Xavier para otras activaciones.
        """
        self.W.clear()
        self.b.clear()
        for l in range(self.L):
            fan_in = self.layers[l]
            fan_out = self.layers[l+1]
            act = self.activations[l]

            if act == "relu":
                std = _he_std(fan_in)
            else:
                std = _xavier_std(fan_in)

            W_l = self.rng.normal(loc=0.0, scale=std, size=(fan_in, fan_out)).astype(np.float64)
            b_l = np.zeros((1, fan_out), dtype=np.float64)
            self.W.append(W_l)
            self.b.append(b_l)

    def forward(self, X: np.ndarray) -> Tuple[np.ndarray, Dict[str, List[np.ndarray]]]:
        """
        Propagación hacia adelante: calcula las salidas de cada capa.
        Retorna: la predicción final y un caché de los valores 'Z' y 'A' para el backprop.
        """
        A = X
        A_list = [A]
        Z_list = []

        for l in range(self.L):
            Z = A @ self.W[l] + self.b[l]
            act, _ = _ACT_FUNCS[self.activations[l]]
            A = act(Z)
            Z_list.append(Z)
            A_list.append(A)

        cache = {"A": A_list, "Z": Z_list}
        return A, cache

    def _loss_mse(self, y_pred: np.ndarray, y_true: np.ndarray) -> float:
        """Calcula la pérdida de error cuadrático medio (MSE)."""
        return float(np.mean(np.square(y_pred - y_true)))

    def _dA_mse(self, y_pred: np.ndarray, y_true: np.ndarray) -> np.ndarray:
        """Calcula el gradiente inicial de la pérdida con respecto a la salida."""
        m = y_true.shape[0]
        return (2.0 / m) * (y_pred - y_true)

    def backward(self, cache: Dict[str, List[np.ndarray]], y_true: np.ndarray) -> Tuple[List[np.ndarray], List[np.ndarray]]:
        """
        Backpropagation: calcula los gradientes (dW y db) para todas las capas.
        Utiliza la regla delta para propagar el error hacia atrás.
        """
        A_list = cache["A"]
        Z_list = cache["Z"]

        dW_list = [None] * self.L
        db_list = [None] * self.L

        # Gradiente inicial en la capa de salida
        dA = self._dA_mse(A_list[-1], y_true)

        # Bucle para propagar hacia atrás (de la última capa a la primera)
        for l in reversed(range(self.L)):
            A_prev = A_list[l]
            Z = Z_list[l]
            act_name = self.activations[l]

            _, dact = _ACT_FUNCS[act_name]

            # Cálculo del gradiente local de la activación
            if act_name == "relu":
                dZ = dA * dact(z=Z)
            else:
                A_curr = A_list[l + 1]
                dZ = dA * dact(a=A_curr)

            # Cálculo de los gradientes de pesos y sesgos
            dW = A_prev.T @ dZ
            db = np.sum(dZ, axis=0, keepdims=True)

            # Propagación del gradiente a la capa anterior
            dA = dZ @ self.W[l].T

            dW_list[l] = dW
            db_list[l] = db

        return dW_list, db_list

    def train(self, X: np.ndarray, y: np.ndarray, epochs: int = 1000, learning_rate: float = 0.01, verbose: bool = True) -> List[float]:
        """
        Entrena la red neuronal usando el algoritmo de descenso de gradiente (full-batch).
        Retorna la lista de pérdidas por cada época.
        """
        history = []
        for ep in range(1, epochs + 1):
            y_pred, cache = self.forward(X)
            loss = self._loss_mse(y_pred, y)
            dW_list, db_list = self.backward(cache, y)

            # Actualizar los pesos y sesgos
            for l in range(self.L):
                self.W[l] -= learning_rate * dW_list[l]
                self.b[l] -= learning_rate * db_list[l]

            history.append(loss)
            if verbose and (ep % max(1, epochs // 10) == 0 or ep == 1):
                print(f"[{ep:4d}/{epochs}] loss={loss:.6f}")

        return history

    def predict(self, X: np.ndarray) -> np.ndarray:
        """Realiza una predicción hacia adelante en la red."""
        y_pred, _ = self.forward(X)
        return y_pred

    def accuracy(self, X: np.ndarray, y: np.ndarray, threshold: float = 0.5) -> float:
        """Calcula la precisión (accuracy) binaria del modelo."""
        y_hat = self.predict(X)
        y_bin = (y_hat >= threshold).astype(np.float64)
        return float(np.mean(y_bin == y))

    def get_performance_metrics(self, X: np.ndarray, y_true: np.ndarray, threshold: float = 0.5) -> dict:
        """
        Calcula las métricas de rendimiento clave (precisión, recall, F1-Score, matriz de confusión)
        para una clasificación binaria.
        """
        y_pred_proba = self.predict(X)
        y_pred_binary = (y_pred_proba >= threshold).astype(np.int64)
        y_true = y_true.ravel()

        cm = confusion_matrix(y_true, y_pred_binary)
        # Asegurarse de que la matriz de confusión tiene 4 elementos (2x2)
        tn, fp, fn, tp = cm.ravel() if cm.size == 4 else (0, 0, 0, 0)

        sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0
        specificity = tn / (tn + fp) if (tn + fp) > 0 else 0

        f1 = f1_score(y_true, y_pred_binary, zero_division=0)
        accuracy = accuracy_score(y_true, y_pred_binary)
        precision = precision_score(y_true, y_pred_binary, zero_division=0)

        return {
            'accuracy': accuracy,
            'precision': precision,
            'recall': sensitivity,
            'specificity': specificity,
            'f1_score': f1,
            'confusion_matrix': cm
        }

    def threshold_optimization(self, X: np.ndarray, y_true: np.ndarray) -> tuple:
        """
        Encuentra el umbral óptimo que maximiza el F1-Score en el conjunto de datos.
        Retorna el umbral óptimo y el diccionario de métricas correspondiente.
        """
        y_pred_proba = self.predict(X)
        thresholds = np.linspace(0, 1, 101) # Probar 101 umbrales de 0 a 1
        best_f1 = -1
        best_threshold = 0.5
        best_metrics = {}

        for threshold in thresholds:
            metrics = self.get_performance_metrics(X, y_true, threshold)
            if metrics['f1_score'] > best_f1:
                best_f1 = metrics['f1_score']
                best_threshold = threshold
                best_metrics = metrics

        return best_threshold, best_metrics


Writing src/neural_network.py


# **Pipeline de feature engineering**

In [3]:
# Crear archivo de preprocesamiento
%%writefile src/data_preprocessing.py

import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from typing import List, Tuple

def preprocess_data(df: pd.DataFrame, num_cols: List[str], cat_cols: List[str]) -> Tuple[np.ndarray, np.ndarray, StandardScaler, OneHotEncoder]:
    """
    Preprocesa un DataFrame para ser usado en la red neuronal.
    Aplica:
    - Estandarización a columnas numéricas.
    - One-Hot Encoding a columnas categóricas.
    - Combina las características en una matriz de NumPy.
    - Separa las etiquetas (fraude).
    """
    if df.empty:
        raise ValueError("El DataFrame de entrada no puede estar vacío.")

    scaler = StandardScaler()
    encoder = OneHotEncoder(sparse_output=False, handle_unknown='ignore')

    # Preprocesar columnas numéricas
    X_num = scaler.fit_transform(df[num_cols])

    # Preprocesar columnas categóricas
    X_cat = encoder.fit_transform(df[cat_cols])

    # Combinar todas las características
    X = np.hstack([X_num, X_cat])
    y = df['fraude'].values.reshape(-1, 1)

    return X, y, scaler, encoder

Writing src/data_preprocessing.py


# **Aplicación del modelo al problema de fraude**

In [5]:
import sys
import os

# Añadir el directorio 'src' al path para poder importar los módulos
if os.getcwd() not in sys.path:
    sys.path.insert(0, os.getcwd())
if os.path.join(os.getcwd(), 'src') not in sys.path:
    sys.path.insert(0, os.path.join(os.getcwd(), 'src'))

# Detección de Fraudes

# Importamos los módulos que acabamos de crear.
from data_preprocessing import preprocess_data
from neural_network import NeuralNetwork # Importar NeuralNetwork también

# Cargar los datos sintéticos de la carpeta 'data'.
df = pd.read_csv('/content/data/transacciones_fraude_sinteticas.csv')

# Renombrar la columna 'is_fraud' a 'fraude' para que coincida con la expectativa de preprocess_data.
# Corregido el error lógico en la condición:
if 'is_fraud' in df.columns and 'fraude' not in df.columns:
    df = df.rename(columns={'is_fraud': 'fraude'})

# Definir las columnas numéricas y categóricas.
# Corregido: 'aomunt' a 'amount' y 'conutry' a 'country'
num_cols = ['amount', 'hour', 'high_amount', 'avg_amount', 'is_foreign']
cat_cols = ['country', 'channel', 'device','merchant_category']

# Preprocesar los datos para la red neuronal.
X, y, _, _ = preprocess_data(df, num_cols, cat_cols)

# Dividir el dataset en conjuntos de entrenamiento y prueba, manteniendo la proporción de la clase 'fraude'.
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.2, random_state=42)

# Entrenar la red neuronal para el problema de fraude
# La arquitectura se ajusta al número de características preprocesadas (X.shape[1]).
nn = NeuralNetwork(layers=[X.shape[1], 32, 16, 1], activations=['tanh', 'tanh', 'sigmoid'])
history = nn.train(X_train, y_train, epochs=300, learning_rate=0.05, verbose=True)

# Evaluación del modelo con un umbral estándar de 0.5.
print("\n--- Evaluación con Umbral = 0.5 ---")
metrics_standard = nn.get_performance_metrics(X_test, y_test, threshold=0.5)
print(f"Accuracy: {metrics_standard['accuracy']:.4f}")
print(f"Precision: {metrics_standard['precision']:.4f}")
print(f"Recall (Sensibilidad): {metrics_standard['recall']:.4f}")
print(f"F1-Score: {metrics_standard['f1_score']:.4f}")
print("Matriz de Confusión:\n", metrics_standard['confusion_matrix'])

[   1/300] loss=0.271756
[  30/300] loss=0.058873
[  60/300] loss=0.023754
[  90/300] loss=0.013797
[ 120/300] loss=0.009540
[ 150/300] loss=0.007271
[ 180/300] loss=0.005890
[ 210/300] loss=0.004973
[ 240/300] loss=0.004325
[ 270/300] loss=0.003846
[ 300/300] loss=0.003478

--- Evaluación con Umbral = 0.5 ---
Accuracy: 0.9990
Precision: 0.0000
Recall (Sensibilidad): 0.0000
F1-Score: 0.0000
Matriz de Confusión:
 [[39960     0]
 [   40     0]]


In [6]:
#Implementar threshold optimization
# Optimización del umbral y evaluación con métricas óptimas

# En problemas de fraude, el F1-Score es más relevante que la precisión.
print("\n--- Optimización del Umbral ---")
best_threshold, best_metrics = nn.threshold_optimization(X_test, y_test)

# Imprimir las mejores métricas encontradas.
print(f"Umbral óptimo: {best_threshold:.4f}")
print("Mejores métricas:")
print(f"Accuracy: {best_metrics['accuracy']:.4f}")
print(f"Precision: {best_metrics['precision']:.4f}")
print(f"Recall (Sensibilidad): {best_metrics['recall']:.4f}")
print(f"F1-Score: {best_metrics['f1_score']:.4f}")
print("Matriz de Confusión Óptima:\n", best_metrics['confusion_matrix'])


--- Optimización del Umbral ---
Umbral óptimo: 0.1200
Mejores métricas:
Accuracy: 0.9992
Precision: 0.7857
Recall (Sensibilidad): 0.2750
F1-Score: 0.4074
Matriz de Confusión Óptima:
 [[39957     3]
 [   29    11]]


# **Experimentación Comparativa**

In [7]:
#=====================================
#Comparación de Arquitecturas: Pruebe al menos 3 configuraciones diferentes de capas ocultas
#=====================================
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam

#Escalado de variables
scaler = StandardScaler()
X_train_s = scaler.fit_transform(X_train)
X_test_s = scaler.transform(X_test)


def build_model(hidden_layers, activation="relu", lr=0.001):
    model = Sequential()
    model.add(Dense(hidden_layers[0], activation=activation, input_shape=(X_train_s.shape[1],)))

    for units in hidden_layers[1:]:
        model.add(Dense(units, activation=activation))
        model.add(Dropout(0.2))

    model.add(Dense(1, activation="sigmoid"))

    model.compile(
        optimizer=Adam(learning_rate=lr),
        loss="binary_crossentropy",
        metrics=["AUC"]
    )
    return model

#Definir arquitecturas

architectures = {
    "Simple": [16],
    "Medium": [32, 16],
    "Deep": [64, 32, 16]
}

from sklearn.metrics import roc_auc_score

results_arch = {}

for name, layers in architectures.items():
    model = build_model(layers)

    model.fit(
        X_train, y_train,
        epochs=10,
        batch_size=256,
        validation_split=0.2,
        class_weight={0:1, 1:10},
        verbose=0
    )

    y_pred = model.predict(X_test).ravel()
    auc = roc_auc_score(y_test, y_pred)
    results_arch[name] = auc

results_arch

# Convertir el diccionario de resultados en un DataFrame
results_df = pd.DataFrame(results_arch.items(), columns=['Arquitectura', 'AUC_Score'])

# Establecer la 'Arquitectura' como índice para una mejor presentación
results_df = results_df.set_index('Arquitectura')
print("="*30)
print("Comparación de arquitecturas")
print("="*30)
# Mostrar la tabla
print(results_df)

  print("\Comparación de arquitecturas")
  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 1ms/step


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2ms/step
\Comparación de arquitecturas
              AUC_Score
Arquitectura           
Simple         0.998509
Medium         0.998345
Deep           0.998005


In [9]:
#==================================================================================================
#Análisis de Funciones de Activación: Compare rendimiento con diferentes activaciones
#==================================================================================================
activations = ["relu", "tanh", "elu"]
results_act = {}

for act in activations:
    model = build_model([32,16], activation=act)

    model.fit(
        X_train_s, y_train,
        epochs=10,
        batch_size=256,
        validation_split=0.2,
        class_weight={0:1, 1:10},
        verbose=0
    )

    y_pred = model.predict(X_test_s).ravel()
    results_act[act] = roc_auc_score(y_test, y_pred)

results_act

# Convertir el diccionario de resultados en un DataFrame
results_ac = pd.DataFrame(results_act.items(), columns=['Funciones', 'Rendimientos'])

# Establecer la 'Arquitectura' como índice para una mejor presentación
results_ac = results_ac.set_index('Funciones')
print("="*30)
print("Comparación de arquitecturas")
print("="*30)

# Mostrar la tabla
print(results_ac)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2ms/step


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step
Comparación de arquitecturas
           Rendimientos
Funciones              
relu           0.997839
tanh           0.996935
elu            0.997920


In [10]:
#=================================
#Optimización de Hiperparámetros
#==================================
learning_rates = [0.001, 0.0005]
epochs_list = [10, 20]

results_hyper = {}

for lr in learning_rates:
    for ep in epochs_list:
        model = build_model([32,16], lr=lr)

        model.fit(
            X_train_s, y_train,
            epochs=ep,
            batch_size=256,
            validation_split=0.2,
            class_weight={0:1, 1:10},
            verbose=0
        )

        y_pred = model.predict(X_test_s).ravel()
        results_hyper[f"lr={lr}_ep={ep}"] = roc_auc_score(y_test, y_pred)

results_hyper

# Convertir el diccionario de resultados en un DataFrame
results_hyp = pd.DataFrame(results_hyper.items(), columns=['Hiperparámetros', 'Valor'])

# Establecer la 'Arquitectura' como índice para una mejor presentación
results_hyp = results_hyp.set_index('Hiperparámetros')
print("="*30)
print("Optimización de Hiperparámetros")
print("="*30)
# Mostrar la tabla
print(results_hyp)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1ms/step


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 1ms/step


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m1250/1250[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2ms/step
Optimización de Hiperparámetros
                    Valor
Hiperparámetros          
lr=0.001_ep=10   0.998549
lr=0.001_ep=20   0.997197
lr=0.0005_ep=10  0.998131
lr=0.0005_ep=20  0.998172


In [11]:
#=========================
#Baseline Comparison
#=========================
from sklearn.linear_model import LogisticRegression

log_model = LogisticRegression(
    max_iter=1000,
    class_weight={0:1, 1:10}
)

log_model.fit(X_train_s, y_train)

y_pred_log = log_model.predict_proba(X_test_s)[:,1]
auc_log = roc_auc_score(y_test, y_pred_log)

auc_log
print("="*30)
print("****Baseline Comparison****")
print("="*30)
print("AUC=",f"{auc_log * 100:.2f}%")

  y = column_or_1d(y, warn=True)


****Baseline Comparison****
AUC= 99.87%


In [None]:
# ================================================
#  CELDA FINAL: GUARDAR NOTEBOOK EN GITHUB DESDE COLAB
# ================================================
import os
import json
import subprocess
from google.colab import drive

# CONFIGURACIÓN - Ajusta estos valores para cada notebook
NOTEBOOK_NAME = "02_Feature_Engineering_y_Scoring.ipynb"  # Cambia por el nombre actual
COMMIT_MESSAGE = "Actualizar notebook de proyecto"  # Cambia el mensaje
GITHUB_USERNAME = "akordone22"
REPO_NAME = "UEES-IA-Deteccion_Fraude-Grupo7"

# Token de GitHub - CONFIGURADO DIRECTAMENTE
GITHUB_TOKEN = "[TOKEN_REMOVIDO_POR_SEGURIDAD]"

def get_github_token():
    """Retorna el token de GitHub configurado"""
    return GITHUB_TOKEN

def mount_drive():
    """Monta Google Drive si no está montado"""
    try:
        if not os.path.exists('/content/drive'):
            print("Montando Google Drive...")
            drive.mount('/content/drive')
        print("EXITO: Google Drive montado correctamente")
        return True
    except Exception as e:
        print(f"ERROR: Error montando Google Drive: {e}")
        return False

def configure_git():
    """Configura Git con credenciales"""
    try:
        # Primero intentar configurar de forma local en el directorio
        os.makedirs('/tmp/git_config', exist_ok=True)
        os.chdir('/tmp/git_config')

        # Inicializar git temporalmente
        subprocess.run(['git', 'init'], check=True, capture_output=True)

        # Configurar usuario
        result1 = subprocess.run(['git', 'config', 'user.email', 'andrea.ordonezr@uees.edu.ec'],
                                capture_output=True, text=True)
        result2 = subprocess.run(['git', 'config', 'user.name', 'Andrea Ordoñez'],
                                capture_output=True, text=True)

        # Verificar configuración
        email_check = subprocess.run(['git', 'config', 'user.email'],
                                   capture_output=True, text=True)
        name_check = subprocess.run(['git', 'config', 'user.name'],
                                  capture_output=True, text=True)

        if 'andrea.ordonezr@uees.edu.ec' in email_check.stdout and 'Andrea Ordoñez' in name_check.stdout:
            print("EXITO: Git configurado correctamente")
            return True
        else:
            # Intentar configuración global alternativa
            subprocess.run(['git', 'config', '--global', 'user.email', 'andrea.ordonezr@uees.edu.ec'],
                          check=False)
            subprocess.run(['git', 'config', '--global', 'user.name', 'Andrea Ordoñez'],
                          check=False)
            print("EXITO: Git configurado (modo alternativo)")
            return True

    except subprocess.CalledProcessError as e:
        print(f"ADVERTENCIA: Error configurando Git globalmente, continuando: {e}")
        # Continuamos de todas formas, Git puede funcionar sin configuración global
        return True
    except Exception as e:
        print(f"ERROR: Error configurando Git: {e}")
        return False

def clean_directory():
    """Limpia y prepara el directorio de trabajo"""
    try:
        if os.path.exists('/content/repo_final'):
            subprocess.run(['rm', '-rf', '/content/repo_final'], check=True)
        print("EXITO: Directorio limpiado")
        return True
    except Exception as e:
        print(f"ERROR: Error limpiando directorio: {e}")
        return False

def clone_repository():
    """Clona el repositorio de GitHub"""
    try:
        token = get_github_token()
        repo_url = f"https://{GITHUB_USERNAME}:{token}@github.com/{GITHUB_USERNAME}/{REPO_NAME}.git"

        # Configurar Git en el directorio del repositorio después de clonar
        result = subprocess.run(['git', 'clone', repo_url, '/content/repo_final'],
                               capture_output=True, text=True, check=True)

        # Cambiar al directorio del repositorio y configurar usuario
        os.chdir('/content/repo_final')
        subprocess.run(['git', 'config', 'user.email', 'andrea.ordonezr@uees.edu.ec'], check=False)
        subprocess.run(['git', 'config', 'user.name', 'Andrea Ordoñez'], check=False)

        print("EXITO: Repositorio clonado y configurado correctamente")
        return True
    except subprocess.CalledProcessError as e:
        print(f"ERROR: Error clonando repositorio: {e}")
        print(f"STDOUT: {e.stdout}")
        print(f"STDERR: {e.stderr}")
        return False

def clean_notebook_content(content):
    """Limpia el contenido del notebook removiendo tokens sensibles"""
    # Lista de patrones a remover (incluyendo el token actual)
    token = get_github_token()
    sensitive_patterns = [
        token,
        "[TOKEN_REMOVIDO_POR_SEGURIDAD]",
        "[TOKEN_REMOVIDO_POR_SEGURIDAD]",
        "[TOKEN_REMOVIDO_POR_SEGURIDAD]",
        "[TOKEN_REMOVIDO_POR_SEGURIDAD]",
        "[TOKEN_REMOVIDO_POR_SEGURIDAD]",
        "[TOKEN_REMOVIDO_POR_SEGURIDAD]"
    ]

    cleaned_content = content
    for pattern in sensitive_patterns:
        if pattern and pattern in cleaned_content:
            cleaned_content = cleaned_content.replace(pattern, "[TOKEN_REMOVIDO_POR_SEGURIDAD]")

    # Limpieza adicional: remover líneas que contengan tokens
    lines = cleaned_content.split('\n')
    clean_lines = []
    for line in lines:
        # Si la línea contiene algún patrón de token, la reemplazamos
        line_has_token = False
        for pattern in ["[TOKEN_REMOVIDO_POR_SEGURIDAD]", "[TOKEN_REMOVIDO_POR_SEGURIDAD]", "[TOKEN_REMOVIDO_POR_SEGURIDAD]", "[TOKEN_REMOVIDO_POR_SEGURIDAD]", "[TOKEN_REMOVIDO_POR_SEGURIDAD]", "[TOKEN_REMOVIDO_POR_SEGURIDAD]"]:
            if pattern in line:
                line_has_token = True
                break

        if line_has_token:
            # Reemplazar toda la línea si contiene un token
            if "GITHUB_TOKEN" in line:
                clean_lines.append('GITHUB_TOKEN = "[TOKEN_REMOVIDO_POR_SEGURIDAD]"')
            else:
                clean_lines.append("[LINEA_CON_TOKEN_REMOVIDA_POR_SEGURIDAD]")
        else:
            clean_lines.append(line)

    return '\n'.join(clean_lines)

def copy_and_clean_notebook():
    """Copia el notebook desde Drive y lo limpia"""
    try:
        # Rutas
        drive_path = f"/content/drive/MyDrive/proyectof/{NOTEBOOK_NAME}"
        target_dir = "/content/repo_final/Proyecto_Final/notebooks"
        target_path = f"{target_dir}/{NOTEBOOK_NAME}"

        # Verificar que el notebook existe en Drive
        if not os.path.exists(drive_path):
            print(f"ERROR: No se encontró el notebook en: {drive_path}")
            return False

        # Crear directorio destino
        os.makedirs(target_dir, exist_ok=True)

        # Leer el notebook
        with open(drive_path, 'r', encoding='utf-8') as f:
            content = f.read()

        # Limpiar contenido sensible
        cleaned_content = clean_notebook_content(content)

        # Verificar que el token fue removido completamente
        token = get_github_token()
        if token in cleaned_content:
            print("ADVERTENCIA: Token detectado en contenido después de limpieza")
            # Limpieza adicional más agresiva
            cleaned_content = cleaned_content.replace(token, "[TOKEN_REMOVIDO_POR_SEGURIDAD]")

        # Guardar el notebook limpio
        with open(target_path, 'w', encoding='utf-8') as f:
            f.write(cleaned_content)

        print(f"EXITO: Notebook '{NOTEBOOK_NAME}' copiado y limpiado")
        print("SEGURIDAD: Contenido verificado - tokens removidos")
        return True

    except Exception as e:
        print(f"ERROR: Error copiando notebook: {e}")
        return False

def commit_and_push():
    """Hace commit y push de los cambios"""
    try:
        # Cambiar al directorio del repositorio
        os.chdir('/content/repo_final')

        # Verificar estado
        result = subprocess.run(['git', 'status', '--porcelain'],
                               capture_output=True, text=True, check=True)

        if not result.stdout.strip():
            print("EXITO: No hay cambios para subir")
            return True

        # Verificación final de seguridad antes de subir
        token = get_github_token()
        for root, dirs, files in os.walk('.'):
            for file in files:
                if file.endswith(('.ipynb', '.py', '.md', '.txt')):
                    filepath = os.path.join(root, file)
                    try:
                        with open(filepath, 'r', encoding='utf-8') as f:
                            content = f.read()
                            if token in content:
                                print(f"PELIGRO: Token detectado en {filepath}")
                                print("DETENIENDO PROCESO POR SEGURIDAD")
                                return False
                    except:
                        pass  # Ignorar archivos que no se pueden leer

        print("SEGURIDAD: Verificación final completada - sin tokens detectados")

        # Agregar archivos
        subprocess.run(['git', 'add', '.'], check=True)

        # Commit
        subprocess.run(['git', 'commit', '-m', COMMIT_MESSAGE], check=True)

        # Push
        subprocess.run(['git', 'push', 'origin', 'main'], check=True)

        print("EXITO: Cambios subidos exitosamente a GitHub")
        return True

    except subprocess.CalledProcessError as e:
        print(f"ERROR: Error en commit/push: {e}")
        if e.stderr:
            print(f"STDERR: {e.stderr}")
        return False

def main():
    """Función principal que ejecuta todo el proceso"""
    print("=== INICIANDO PROCESO DE SUBIDA A GITHUB ===")
    print("NOTA: Token configurado directamente en el código")
    print("")

    steps = [
        ("Montando Google Drive", mount_drive),
        ("Configurando Git", configure_git),
        ("Limpiando directorio", clean_directory),
        ("Clonando repositorio", clone_repository),
        ("Copiando y limpiando notebook", copy_and_clean_notebook),
        ("Subiendo cambios", commit_and_push)
    ]

    for step_name, step_function in steps:
        print(f"\n{step_name}...")
        if not step_function():
            print(f"\nERROR: PROCESO FALLIDO en: {step_name}")
            return False

    print(f"\nEXITO: PROCESO COMPLETADO EXITOSAMENTE")
    print(f"Notebook disponible en:")
    print(f"https://github.com/{GITHUB_USERNAME}/{REPO_NAME}/tree/main/Proyecto_Final/notebooks")

    return True

# Ejecutar el proceso
if __name__ == "__main__":
    main()