#### 1 - Connection to local DB

In [1]:
from sqlalchemy import create_engine, text
import urllib
import pandas as pd
import logging

logging.basicConfig()
logging.getLogger('sqlalchemy.pool').setLevel(logging.INFO)
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

server = r'localhost\SQLEXPRESS'
database = 'esg_project'
driver = 'ODBC Driver 17 for SQL Server'

# build complete ODBC chain
params = urllib.parse.quote_plus(
    f"DRIVER={{{driver}}};"
    f"SERVER={server};"
    f"DATABASE={database};"
    "Trusted_Connection=yes;"
)
# Create engine (we let sqlalchemy to manage connection and pool)
engine = create_engine(f"mssql+pyodbc:///?odbc_connect={params}",
                       pool_size=12,           # Número máximo de conexiones abiertas
                       max_overflow=20,       # Conexiones adicionales en caso de pico de carga
                       pool_timeout=30,       # Segundos que espera antes de lanzar error si el pool está lleno
                       pool_recycle=1800,     # Recicla conexiones viejas cada 30 min
                       future=True)

# Test de conexión
with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM esg.companies"))
    print(result.scalar())

INFO:sqlalchemy.engine.Engine:SELECT CAST(SERVERPROPERTY('ProductVersion') AS VARCHAR)
INFO:sqlalchemy.engine.Engine:[raw sql] ()
INFO:sqlalchemy.engine.Engine:SELECT schema_name()
INFO:sqlalchemy.engine.Engine:[generated in 0.00080s] ()
INFO:sqlalchemy.engine.Engine:SELECT CAST('test max support' AS NVARCHAR(max))
INFO:sqlalchemy.engine.Engine:[generated in 0.00076s] ()
INFO:sqlalchemy.engine.Engine:SELECT 1 FROM fn_listextendedproperty(default, default, default, default, default, default, default)
INFO:sqlalchemy.engine.Engine:[generated in 0.00119s] ()
INFO:sqlalchemy.engine.Engine:BEGIN (implicit)
INFO:sqlalchemy.engine.Engine:SELECT COUNT(*) FROM esg.companies
INFO:sqlalchemy.engine.Engine:[generated in 0.00156s] ()
INFO:sqlalchemy.engine.Engine:ROLLBACK


693


#### 2 - Create API with basic GETs

In [2]:
from fastapi import FastAPI, HTTPException, Query
import nest_asyncio
import uvicorn
import logging

# Needed to run uvicorn along notebook
nest_asyncio.apply()

# Customize main menu at Swagger
tags_metadata = [
    {   "name": "default", "description": "CRUD & endpoints related with companies.",},
    {   "name": "Stats", "description": "Endpoints with aggregates & metrics (fraud by country/sector/mounth/merchant).",},
    {   "name": "Rankings", "description": "Top merchants & top customers.",},
    {   "name": "Temporal_Series", "description": "Trends along time.",},
    {   "name": "Others", "description": "Get samples, specific customer_id data.",},
]

app = FastAPI(title="ESG API", description="""
API for combined analysis transactions/metrics ESG.
- Endpoints: CRUD, statistical, rankings, temporal series and others.
- Security: API Key at header `x-api-key`.
""", version="1.0", contact={
        "name": "Eros Blázquez",
        "url": "https://github.com/NirgalFromMars",
        "email": "frablade77@google.com"
    },
    license_info={
        "name": "MIT",
        "url": "https://opensource.org/licenses/MIT"
    },
    openapi_tags=tags_metadata)

### LOGGING with Basic Configuration ###

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("api.log"),   # save at file
        logging.StreamHandler()           # show at console
    ]
)
logger = logging.getLogger(__name__)


### MANAGE GLOBAL ERRORS ###
# for unexpected errors, to avoid using try/except on every endpoint

from fastapi.responses import JSONResponse
from fastapi.requests import Request

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    return JSONResponse(
        status_code=500,
        content={"detail": "An unexpected error occurred. Please try again later."},
    )

### --------- ###

@app.get("/")
def root():
    return {"message": "Bienvenido a la API de ESG Project"}

# Get everything from esg.fraud_transactions_enriched (with pagination)
@app.get("/all")
def get_transactions_paginated(
    page: int = Query(1, ge=1, description="sheet number, begins at 1"),
    page_size: int = Query(50, ge=1, le=1000, description="registers by sheet, max 1000"),
):
    logger.info("All transactions endpoint called")
    offset = (page - 1) * page_size
    
    sql_query = text(f"""
        SELECT *
        FROM esg.fraud_transactions_enriched
        ORDER BY trans_date_trans_time
        OFFSET {offset} ROWS
        FETCH NEXT {page_size} ROWS ONLY;
    """)

    with engine.connect() as conn:
        result = conn.execute(sql_query)
        rows = [dict(row._mapping) for row in result]
        logger.info(f"Rows returned: {len(rows)}")

    return {
        "page": page,
        "page_size": page_size,
        "results": rows,
        "count": len(rows)
    }

# Example: get companies list
@app.get("/companies")
def get_companies():
    logger.info("Companies_list endpoint called")
    with engine.connect() as conn:
        result = conn.execute(text("SELECT TOP 10 company_id, name, sector FROM esg.companies"))
        rows = [dict(row) for row in result.mappings()]
        logger.info(f"Rows returned: {len(rows)}")
    return rows

#### 3 - Security - Define API Key

In [3]:
from fastapi import Depends, HTTPException, Security
from fastapi.security.api_key import APIKeyHeader
from security_config import API_KEY, API_KEY_NAME

# Define security schema -> like a receipt/rule to describe how to find key in the right header -> if user sends right header it will return associated 
# key (sent by user, maybe right maybe wrong -> checked inside "get_api_key"), otherwise returns None (if header sent by user is not the right one)
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)

def get_api_key(api_key_header: str = Security(api_key_header)):  # Security() applies mentioned described receipt/rule
    if api_key_header == API_KEY:
        return api_key_header
    else:
        raise HTTPException(
            status_code=401,
            detail="Invalid API Key or not provided",
        )

#### 4 - Adding endpoints CRUD

In [4]:
from pydantic import BaseModel, Field
from typing import List

# Pydantic model (validate I/O data)
class Company(BaseModel):
    company_id: int
    name: str
    sector: str | None = None
    country: str | None = None

# --- R (Read) ---
@app.get("/companies/{company_id}", response_model=Company)
# response_mode & Company class -> to validate input data (when Create/Update(Post/Put) -> if sent JSON is wrong, FASTAPI gives error; and standarize 
# answer when data is extracted -> FASTAPI uses response_model to serialize result as structured JSON ({"": 1, "name": "Airbus",...})
# This helps to visualize data correctly at web responses, with recognized/suitable structure
def read_company(company_id: int):
    logger.info("Read_company (CRUD) endpoint called")
    with engine.connect() as conn:
        result = conn.execute(
            text("SELECT company_id, name, sector, country FROM esg.companies WHERE company_id = :id"),
            {"id": company_id}
        )
        row = result.fetchone()  # tuple with columns items from query
        logger.info(f"Rows returned: {len(row)}")
        if not row:
            raise HTTPException(status_code=404, detail="Company not found")
        return Company(company_id=row[0], name=row[1], sector=row[2], country=row[3])

# --- C (Create) ---
@app.post("/companies/", response_model=Company)
def create_company(company: Company):
    logger.info("Create_company (CRUD) endpoint called")
    query = text("""
        INSERT INTO esg.companies (company_id, name, sector, country)
        VALUES (:company_id, :name, :sector, :country)
    """)
    try:
        with engine.begin() as conn:   # begin() → run commit automatically at the end
            conn.execute(query, {
                "company_id": company.company_id,
                "name": company.name,
                "sector": company.sector,
                "country": company.country
            })
        return company
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# --- Update ---
@app.put("/companies/{company_id}", response_model=Company)
def update_company(company_id: int, company: Company):
    logger.info("Update_company (CRUD) endpoint called")
    query = text("""
        UPDATE esg.companies
        SET name = :name,
            sector = :sector,
            country = :country
        WHERE company_id = :company_id
    """)
    try:
        with engine.begin() as conn:
            result = conn.execute(query, {
                "company_id": company_id,
                "name": company.name,
                "sector": company.sector,
                "country": company.country
            })
            if result.rowcount == 0:
                raise HTTPException(status_code=404, detail="Company not found")
        return company
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


# --- Delete ---
@app.delete("/companies/{company_id}")
def delete_company(company_id: int):
    logger.info("Delete_company (CRUD) endpoint called")
    query = text("DELETE FROM esg.companies WHERE company_id = :company_id")
    try:
        with engine.begin() as conn:
            result = conn.execute(query, {"company_id": company_id})
            if result.rowcount == 0:
                raise HTTPException(status_code=404, detail="Company not found")
        return {"message": f"Company {company_id} deleted successfully"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

#### 5 - Statistical Endpoints

In [5]:
from fastapi import APIRouter

# Router to group statistical endpoints
stats_router = APIRouter(prefix="/stats", tags=["Stats"], dependencies=[Depends(get_api_key)])

class ErrorResponse(BaseModel):
    detail: str = Field(..., example = "An error occurred")


######### BY COUNTRY #########

class CountryStat(BaseModel):
    country: str = Field(..., description="Country name")
    total_tx: int = Field(..., description="Total transactions")
    fraud_tx: int = Field(..., description="Fraud transactions")
    fraud_rate: float = Field(..., description="Fraud rate (0-1)")

@stats_router.get(
    "/fraud_by_country",
    response_model= List[CountryStat],
    summary = "Aggregate fraud by country",
    description = "Returns total transactions, frauds and rate by country.",
    responses={
        404: {
            "model": ErrorResponse,
            "description": "No data found"
        },
        500: {
            "model": ErrorResponse,
            "description": "Internal server error"
        }
    }
)
def stats_by_country():
    logger.info("Fraud by Country endpoint called")
    query = text("""
        SELECT 
            country,
            COUNT(*) AS total_tx,
            SUM(CASE WHEN is_fraud = 1 THEN 1 ELSE 0 END) AS fraud_tx,
            CAST(SUM(CASE WHEN is_fraud = 1 THEN 1 ELSE 0 END) AS FLOAT) / COUNT(*) AS fraud_rate
        FROM esg.fraud_transactions_enriched
        GROUP BY country
        ORDER BY fraud_rate DESC;
    """)
    with engine.connect() as conn:
        result = conn.execute(query)
        rows = result.fetchall()
        logger.info(f"Rows returned: {len(rows)}")
        #return [CountryStat(**row._mapping) for row in result]

    # convert to list of dicts
    stats = []
    for row in rows:
        stats.append({
            "country": row[0],
            "total_tx": row[1],
            "fraud_tx": row[2],
            "fraud_rate": row[3]
        })

    return stats


######### BY MONTH #########

class MonthlyStat(BaseModel):
    month: str = Field(..., description="Month")
    total_tx: int = Field(..., description="Total transactions")
    fraud_tx: int = Field(..., description="Fraud transactions")
    fraud_rate: float = Field(..., description="Fraud rate (0-1)")

@stats_router.get(
    "/fraud_by_month",
    response_model= List[MonthlyStat],
    summary = "Aggregate fraud by month",
    description = "Returns total transactions, frauds and rate by month.",
    responses={
        404: {
            "model": ErrorResponse,
            "description": "No data found"
        },
        500: {
            "model": ErrorResponse,
            "description": "Internal server error"
        }
    }
)
def get_fraud_by_month():
    logger.info("Fraud by Month endpoint called")
    query = text("""
        SELECT 
            FORMAT(trans_date_trans_time, 'yyyy-MM') AS month,
            COUNT(*) AS total_tx,
            SUM(CASE WHEN is_fraud = 1 THEN 1 ELSE 0 END) AS fraud_tx,
            CAST(SUM(CASE WHEN is_fraud = 1 THEN 1 ELSE 0 END) AS FLOAT) / COUNT(*) AS fraud_rate
        FROM esg.fraud_transactions_enriched
        GROUP BY FORMAT(trans_date_trans_time, 'yyyy-MM')
        ORDER BY month
    """)

    with engine.connect() as conn:
        result = conn.execute(query)
        data = [dict(row._mapping) for row in result]
        logger.info(f"Rows returned: {len(data)}")

    return data


######### BY SECTOR #########

class SectorStat(BaseModel):
    sector: str = Field(..., description="Sector")
    total_tx: int = Field(..., description="Total transactions")
    fraud_tx: int = Field(..., description="Fraud transactions")
    fraud_rate: float = Field(..., description="Fraud rate (0-1)")

@stats_router.get(
    "/fraud_by_sector",
    response_model= List[SectorStat],
    summary = "Aggregate fraud by sector",
    description = "Returns total transactions, frauds and rate by sector.",
    responses={
        404: {
            "model": ErrorResponse,
            "description": "No data found"
        },
        500: {
            "model": ErrorResponse,
            "description": "Internal server error"
        }
    }
)
def get_fraud_by_sector():
    logger.info("Fraud by Sector endpoint called")
    query = text("""
        SELECT 
            sector,
            COUNT(*) AS total_tx,
            SUM(CASE WHEN is_fraud = 1 THEN 1 ELSE 0 END) AS fraud_tx,
            CAST(SUM(CASE WHEN is_fraud = 1 THEN 1 ELSE 0 END) AS FLOAT) / COUNT(*) AS fraud_rate
        FROM esg.fraud_transactions_enriched
        GROUP BY sector
        ORDER BY fraud_rate DESC
    """)

    with engine.connect() as conn:
        result = conn.execute(query)
        data = [dict(row._mapping) for row in result]
        logger.info(f"Rows returned: {len(data)}")

    return data


######### BY MERCHANT #########

class MerchantStat(BaseModel):
    company_name: str = Field(..., description="Company name")
    total_tx: int = Field(..., description="Total transactions")
    fraud_tx: int = Field(..., description="Fraud transactions")
    fraud_rate: float = Field(..., description="Fraud rate (0-1)")

@stats_router.get(
    "/fraud_by_merchant",
    response_model= List[MerchantStat],
    summary = "Aggregate fraud by merchant",
    description = "Returns total transactions, frauds and rate by merchant.",
    responses={
        404: {
            "model": ErrorResponse,
            "description": "No data found"
        },
        500: {
            "model": ErrorResponse,
            "description": "Internal server error"
        }
    }
)
def get_fraud_by_merchant():
    logger.info("Fraud by Merchant endpoint called")
    query = text("""
        SELECT TOP 10
            company_name,
            COUNT(*) AS total_tx,
            SUM(CASE WHEN is_fraud = 1 THEN 1 ELSE 0 END) AS fraud_tx,
            CAST(SUM(CASE WHEN is_fraud = 1 THEN 1 ELSE 0 END) AS FLOAT) / COUNT(*) AS fraud_rate
        FROM esg.fraud_transactions_enriched
        GROUP BY company_name
        ORDER BY fraud_rate DESC
    """)

    with engine.connect() as conn:
        result = conn.execute(query)
        data = [dict(row._mapping) for row in result]
        logger.info(f"Rows returned: {len(data)}")

    return data


# Register router at main app
app.include_router(stats_router)

#### 6 - Ranking Endpoints

In [6]:
# Router to group ranking endpoints
rankings_router = APIRouter(prefix="/rankings", tags=["Rankings"])

@rankings_router.get("/top/merchants")
def get_top_merchants(n: int = 10):
    """
    Returns top N of merchants with higher fraud_transactions.
    - `n` = number of returned merchants at top (default = 10, max = 100)
    """
    logger.info("TopN Merchants endpoint called")
    query = text(f"""
        SELECT TOP {n}
            company_name,
            COUNT(*) AS total_tx,
            SUM(CASE WHEN is_fraud = 1 THEN 1 ELSE 0 END) AS fraud_tx,
            CAST(SUM(CASE WHEN is_fraud = 1 THEN 1 ELSE 0 END) AS FLOAT) / COUNT(*) AS fraud_rate
        FROM esg.fraud_transactions_enriched
        GROUP BY company_name
        ORDER BY fraud_tx DESC
    """)

    with engine.connect() as conn:
        result = conn.execute(query)
        data = [dict(row._mapping) for row in result]
        logger.info(f"Rows returned: {len(data)}")

    return {"data": data}

@rankings_router.get("/top/customers")
def get_top_customers(n: int = 10):
    """
    Returns top N of customer with higher fraud_transactions.
    - `n` = numner of returned customer at top (default = 10, max = 100)
    """
    logger.info("TopN Customers endpoint called")
    query = text(f"""
        SELECT TOP {n}
            cc_num AS customer_id,
            COUNT(*) AS total_tx,
            SUM(CASE WHEN is_fraud = 1 THEN 1 ELSE 0 END) AS fraud_tx,
            CAST(SUM(CASE WHEN is_fraud = 1 THEN 1 ELSE 0 END) AS FLOAT) / COUNT(*) AS fraud_rate
        FROM esg.fraud_transactions_enriched
        GROUP BY cc_num
        ORDER BY fraud_tx DESC
    """)

    with engine.connect() as conn:
        result = conn.execute(query)
        data = [dict(row._mapping) for row in result]
        logger.info(f"Rows returned: {len(data)}")

    return {"data": data}

# Register router at main app
app.include_router(rankings_router)

#### 7 - Temporal Series Endpoints

In [7]:
# Router to group temporal series endpoints
temporal_series_router = APIRouter(prefix="/temporal_series", tags=["Temporal_Series"])

@temporal_series_router.get("/trend")
def get_fraud_trend():
    """
    Returns monthly evolution of fraud_transactions vs non_fraud_transactions.
    """
    logger.info("Temporal_Series endpoint called")
    query = text("""
        SELECT 
            FORMAT(trans_date_trans_time, 'yyyy-MM') AS month,
            SUM(CASE WHEN is_fraud = 1 THEN 1 ELSE 0 END) AS fraud_tx,
            SUM(CASE WHEN is_fraud = 0 THEN 1 ELSE 0 END) AS non_fraud_tx
        FROM esg.fraud_transactions_enriched
        GROUP BY FORMAT(trans_date_trans_time, 'yyyy-MM')
        ORDER BY month
    """)

    with engine.connect() as conn:
        result = conn.execute(query)
        data = [dict(row._mapping) for row in result]
        logger.info(f"Rows returned: {len(data)}")

    return {"data": data}

# Register router at main app
app.include_router(temporal_series_router)

#### 8 - Other Analysis Endpoints

In [8]:
# Router to group other analysis endpoints
others_router = APIRouter(prefix="/others", tags=["Others"])

@others_router.get("/sample")
def sample_transactions(n: int = 1000):
    logger.info("Get_Samples endpoint called")
    query = text(f"""
        SELECT TOP {n} *
        FROM esg.fraud_transactions_enriched
        ORDER BY NEWID()
    """)
    with engine.connect() as conn:
        result = conn.execute(query)
        data = [dict(row._mapping) for row in result]
        logger.info(f"Rows returned: {len(data)}")
    return {"data": data}

@others_router.get("/transactions_by_customer/{customer_id}")
def transactions_by_customer(customer_id: str):
    logger.info("Transactions by Customer endpoint called")
    query = text("""
        SELECT *
        FROM esg.fraud_transactions_enriched
        WHERE cc_num = :customer_id
        ORDER BY trans_date_trans_time DESC
    """)
    with engine.connect() as conn:
        result = conn.execute(query, {"customer_id": customer_id})
        data = [dict(row._mapping) for row in result]
        logger.info(f"Rows returned: {len(data)}")
    return {"data": data}


# endpoint to know if API and database are actived -> Basic Metrics

@others_router.get("/health")
def health_check():
    try:
        with engine.connect() as conn:
            conn.execute(text("SELECT 1"))
        return {"status": "ok", "database": "connected"}
    except:
        return {"status": "error", "database": "disconnected"}

# Register router at main app
app.include_router(others_router)

#### 9 - ML Endpoint

In [9]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# basic ML models
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB

# advanced ML models
from sklearn.ensemble import RandomForestClassifier
import xgboost as xgb
import lightgbm as lgb
from catboost import CatBoostClassifier

# function to create confusion_matrix with a suitable format to be visualized at Power BI
from sklearn.metrics import confusion_matrix

def format_confusion_matrix(y_true, y_pred):
    labels = ["No Fraude", "Fraude"]
    cm = confusion_matrix(y_true, y_pred, labels=[0, 1])
    
    formatted = []
    for i, real_label in enumerate(labels):
        for j, pred_label in enumerate(labels):
            formatted.append({
                "real": real_label,
                "predicted": pred_label,
                "count": int(cm[i][j])
            })
    return formatted


# Router to group ML endpoints
ml_router = APIRouter(prefix="/ml", tags=["ML"])

## BASIC MODELS (faster ones, from sklearn)
@ml_router.get("/basic_models")
def basic_models(sample_size: int = Query(0, description="nº rows to filter (0 for no filter)")):

    # 1. Load data
    df = pd.read_csv("original data & synthetic created/kaggle__fraud_transactions/fraudTrain.csv")

    if sample_size == 0:
        df_sample = df
    else:
        if sample_size > len(df): sample_size = len(df)
        sample_fraction = sample_size / len(df)  # to filter n samples -> % of samples to work with
        df_sample, _ = train_test_split(
            df,
            train_size = sample_fraction,
            stratify = df['is_fraud'],
            random_state=42
        )  # we use train_test_split to filter samples (we'll use only df_sample, that is only "train" samples) with same 'is_fraud' distribution as original
    
    # 2. Preprocessing
    df_sample['is_fraud'] = df_sample['is_fraud'].astype(int)
    X = df_sample[['amt', 'category', 'merchant', 'gender', 'state', 'zip', 'city_pop']]  # selection of features
    X = pd.get_dummies(X, columns=['category', 'merchant', 'gender', 'state'], drop_first=True)
    y = df_sample['is_fraud']

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    # 3. Split train/test
    X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)
    
    # 4. Define models
    models = {
        "LogisticRegression": LogisticRegression(max_iter=1000),
        "Decision Tree": DecisionTreeClassifier(),
        "KNN": KNeighborsClassifier(),
        "Naive Bayes": GaussianNB()
    }
    
    # 5. Train & Evaluate
    results = {}
    predictions_list = []
    for name, model in models.items():
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
    
        results[name] = {
            "accuracy": round(accuracy_score(y_test, y_pred), 4),
            "precision": round(precision_score(y_test, y_pred), 4),
            "recall": round(recall_score(y_test, y_pred), 4),
            "f1_score": round(f1_score(y_test, y_pred), 4),
            "confusion_matrix": format_confusion_matrix(y_test, y_pred)
        }

        # dataframe with predictions for this model
        df_preds = pd.DataFrame({
            "model_name": [name] * len(y_test),
            "model_type": ["basic"] * len(y_test),
            "y_real": y_test,
            "y_pred": y_pred
        })

        predictions_list.append(df_preds)

    # concatenate all in a single dataframe
    df_all_preds = pd.concat(predictions_list, ignore_index=True)

    # save at bbdd (replacing with every endpoint running)
    df_all_preds.to_sql("ml_predictions_basic", con=engine, schema="esg", if_exists="replace", index=False)
    
    return {
        "rows_total": len(df),
        "rows_used": len(df_sample),
        "results": results
    }


## ADVANCED MODELS (require more resources (slower run), return better metrics)
@ml_router.get("/advanced_models")
def advanced_models(sample_size: int = Query(0, description="nº rows to filter (0 for no filter)")):

    # 1. Load data
    df = pd.read_csv("original data & synthetic created/kaggle__fraud_transactions/fraudTrain.csv")

    if sample_size == 0:
        df_sample = df
    else:
        if sample_size > len(df): sample_size = len(df)
        sample_fraction = sample_size / len(df)  # to filter n samples -> % of samples to work with
        df_sample, _ = train_test_split(
            df,
            train_size = sample_fraction,
            stratify = df['is_fraud'],
            random_state=42
        )  # we use train_test_split to filter samples (we'll use only df_sample, that is only "train" samples) with same 'is_fraud' distribution as original
    
    # 2. Preprocessing
    df_sample['is_fraud'] = df_sample['is_fraud'].astype(int)
    X = df_sample[['amt', 'category', 'merchant', 'gender', 'state', 'zip', 'city_pop']]  # selection of features
    X = pd.get_dummies(X, columns=['category', 'merchant', 'gender', 'state'], drop_first=True)
    y = df_sample['is_fraud']

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    # 3. Split train/test
    X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)
    
    # 4. Define models
    models = {
        "Random Forest": RandomForestClassifier(n_estimators=100, random_state=42),
        "XGBoost": xgb.XGBClassifier(eval_metric="logloss", use_label_encoder=False, random_state=42),
        "LightGBM": lgb.LGBMClassifier(n_estimators=100, learning_rate=0.1, random_state=42),
        "CatBoost": CatBoostClassifier(iterations=100, learning_rate=0.1, depth=6, verbose=False, random_state=42)
    }
    
    # 5. Train & Evaluate
    results = {}
    predictions_list = []
    for name, model in models.items():
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
    
        results[name] = {
            "accuracy": round(accuracy_score(y_test, y_pred), 4),
            "precision": round(precision_score(y_test, y_pred), 4),
            "recall": round(recall_score(y_test, y_pred), 4),
            "f1_score": round(f1_score(y_test, y_pred), 4),
            "confusion_matrix": format_confusion_matrix(y_test, y_pred)
        }

        # dataframe with predictions for this model
        df_preds = pd.DataFrame({
            "model_name": [name] * len(y_test),
            "model_type": ["advanced"] * len(y_test),
            "y_real": y_test,
            "y_pred": y_pred
        })

        predictions_list.append(df_preds)

    # concatenate all in a single dataframe
    df_all_preds = pd.concat(predictions_list, ignore_index=True)

    # save at bbdd (replacing with every endpoint running)
    df_all_preds.to_sql("ml_predictions_advanced", con=engine, schema="esg", if_exists="replace", index=False)
    
    return {
        "rows_total": len(df),
        "rows_used": len(df_sample),
        "results": results
    }


# Register router at main app
app.include_router(ml_router)

#### 10 - Save Spec as JSON

In [10]:
import json
openapi_spec = app.openapi()  # dict
with open("openapi.json", "w", encoding="utf-8") as f:
    json.dump(openapi_spec, f, indent=2, ensure_ascii=False)

To install, run: pip install email-validator


#### RUN API

In [None]:
# Run API (it remains run at notebook)
uvicorn.run(app, host="0.0.0.0", port=8000)

INFO:     Started server process [15072]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:50283 - "GET /docs HTTP/1.1" 200 OK
INFO:     127.0.0.1:50283 - "GET /openapi.json HTTP/1.1" 200 OK


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
INFO:sqlalchemy.engine.Engine:BEGIN (implicit)
INFO:sqlalchemy.engine.Engine:SELECT [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] 
FROM [INFORMATION_SCHEMA].[TABLES] 
WHERE ([INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max)) OR [INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max))) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] = CAST(? AS NVARCHAR(max)) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_SCHEMA] = CAST(? AS NVARCHAR(max))
INFO:sqlalchemy.engine.Engine:[generated in 0.00080s] ('BASE TABLE', 'VIEW', 'ml_predictions_basic', 'esg')
INFO:sqlalchemy.engine.Engine:
CREATE TABLE esg.ml_predictions_basic (
	model_name VARCHAR(max) NULL, 
	model_type VARCHAR(max) NULL, 
	y_real INTEGER NULL, 
	y_pred INTEGER NULL
)


INFO:sqlalchemy.engine.Engine:[no key 0.00101s] ()
INFO:sqlalchemy.engine.Engine:INSERT INTO esg.ml_predictions_basic (model_name, model_type, y_real, y_pred) VALUES (?, ?

INFO:     127.0.0.1:50286 - "GET /ml/basic_models?sample_size=50000 HTTP/1.1" 200 OK


INFO:sqlalchemy.engine.Engine:BEGIN (implicit)
INFO:sqlalchemy.engine.Engine:SELECT [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] 
FROM [INFORMATION_SCHEMA].[TABLES] 
WHERE ([INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max)) OR [INFORMATION_SCHEMA].[TABLES].[TABLE_TYPE] = CAST(? AS NVARCHAR(max))) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_NAME] = CAST(? AS NVARCHAR(max)) AND [INFORMATION_SCHEMA].[TABLES].[TABLE_SCHEMA] = CAST(? AS NVARCHAR(max))
INFO:sqlalchemy.engine.Engine:[cached since 317.1s ago] ('BASE TABLE', 'VIEW', 'ml_predictions_advanced', 'esg')
INFO:sqlalchemy.engine.Engine:
CREATE TABLE esg.ml_predictions_advanced (
	model_name VARCHAR(max) NULL, 
	model_type VARCHAR(max) NULL, 
	y_real INTEGER NULL, 
	y_pred BIGINT NULL
)


INFO:sqlalchemy.engine.Engine:[no key 0.00074s] ()
INFO:sqlalchemy.engine.Engine:INSERT INTO esg.ml_predictions_advanced (model_name, model_type, y_real, y_pred) VALUES (?, ?, ?, ?), (?, ?, ?, ?), (?, ?, ?, ?), (?, ?, ?, ?), (?, ?, ?, ?)

INFO:     127.0.0.1:50372 - "GET /ml/advanced_models?sample_size=50000 HTTP/1.1" 200 OK
