# Proyecto equipo 4

## Guardar en tablas

In [None]:
from datetime import datetime
import uuid

def save_to_delta(resultados_finales, analisis_roi, umbrales_optimos, experiment_id=None):
    """
    Guarda resultados del pipeline de fraude en Delta tables
    """
    if experiment_id is None:
        experiment_id = str(uuid.uuid4())[:8]
    
    timestamp = datetime.now()
    
    # 1. Tabla principal de resultados
    performance_data = []
    for _, row in resultados_finales.iterrows():
        performance_data.append({
            'experiment_id': experiment_id,
            'timestamp': timestamp,
            'model_name': row['Modelo'],
            'threshold': float(row['Umbral']) if row['Umbral'] != 'default' else 0.5,
            'precision': float(row['Precision']),
            'recall': float(row['Recall']),
            'f1_score': float(row['F1-Score']),
            'true_positives': int(row.get('TP', 0)),
            'false_positives': int(row.get('FP', 0)),
            'total_alerts': int(row.get('Total_Alerts', 0)),
            'status': row['Status']
        })
    
    # 2. Tabla de ROI
    roi_data = []
    for _, row in analisis_roi.iterrows():
        roi_data.append({
            'experiment_id': experiment_id,
            'timestamp': timestamp,
            'model_name': row['Modelo'],
            'total_benefit': int(row['Beneficio_Total'].replace('$', '').replace(',', '')),
            'total_cost': int(row['Costo_Total'].replace('$', '').replace(',', '')),
            'net_benefit': int(row['Beneficio_Neto'].replace('$', '').replace(',', '')),
            'roi_percentage': float(row['ROI_%'].replace('%', ''))
        })
    
    # 3. Tabla de umbrales
    threshold_data = []
    for model_name, threshold_value in umbrales_optimos.items():
        threshold_data.append({
            'experiment_id': experiment_id,
            'timestamp': timestamp,
            'model_name': model_name,
            'optimal_threshold': float(threshold_value)
        })
    
    # Guardar en Delta
    spark.createDataFrame(pd.DataFrame(performance_data)).write.format("delta").mode("append").saveAsTable("fraud_detection.model_results")
    spark.createDataFrame(pd.DataFrame(roi_data)).write.format("delta").mode("append").saveAsTable("fraud_detection.roi_results")
    spark.createDataFrame(pd.DataFrame(threshold_data)).write.format("delta").mode("append").saveAsTable("fraud_detection.thresholds")
    
    return experiment_id

# Vista consolidada para análisis
def create_summary_view():
    spark.sql("""
        CREATE OR REPLACE VIEW fraud_detection.results_summary AS
        SELECT 
            r.model_name,
            r.precision,
            r.recall,
            r.f1_score,
            r.total_alerts,
            roi.roi_percentage,
            roi.net_benefit,
            t.optimal_threshold,
            r.status,
            r.timestamp
        FROM fraud_detection.model_results r
        JOIN fraud_detection.roi_results roi ON r.experiment_id = roi.experiment_id AND r.model_name = roi.model_name
        JOIN fraud_detection.thresholds t ON r.experiment_id = t.experiment_id AND r.model_name = t.model_name
        WHERE r.timestamp = (SELECT MAX(timestamp) FROM fraud_detection.model_results)
    """)

# Ejecutar
experiment_id = save_to_delta(resultados_finales, analisis_roi, umbrales_optimos)
create_summary_view()