# Dataset

## cargar dataset

In [None]:
import pandas as pd

path = "Siniestros_urbanos_Metropolitana_2024.csv"
df = pd.read_csv(path)
print("Filas y columnas:", df.shape)
df.head()


## Columnas a usar

In [None]:
cols_base = [
    "id_accidente",
    "fecha",
    "hora",
    "latitud",
    "longitud",
    "n_heridos",
    "n_fallecidos",
    "tipo_accidente",
    "calle", # ojo: seria la intersección calle1 - calle2
    "dist_hospital",
    "tiempo_hospital"
]

df = df[cols_base]
df = df.dropna(subset=["latitud", "longitud"])
df.head()


## gravedad

In [None]:
# metrica simple de gravedad (como en el pdf)
df["gravedad_accidente"] = 3*df["n_fallecidos"] + 2*df["n_heridos"]
df["gravedad_accidente"].describe()

agg_df = df.groupby("calle").agg(
    n_accidentes=("id_accidente", "count"),
    total_heridos=("n_heridos", "sum"),
    total_fallecidos=("n_fallecidos", "sum"),
    gravedad_total=("gravedad_accidente", "sum"),
    dist_prom=("dist_hospital", "mean"),
    tiempo_prom=("tiempo_hospital", "mean")
).reset_index()

agg_df.head()

## métricas de "peligrosidad"

In [None]:
import numpy as np
#0 = calles seguras
#1 = riesgo medio
#2 = muy peligrosas

p1 = np.percentile(agg_df["gravedad_total"], 30)
p2 = np.percentile(agg_df["gravedad_total"], 80)

def clasificar_peligrosidad(g):
    if g <= p1:
        return 0
    elif g <= p2:
        return 1
    else:
        return 2

agg_df["peligrosidad"] = agg_df["gravedad_total"].apply(clasificar_peligrosidad)
agg_df["peligrosidad"].value_counts()


## ver como se comportan las clases

In [None]:
import matplotlib.pyplot as plt

agg_df["peligrosidad"].value_counts().sort_index().plot(
    kind="bar", color=["green", "orange", "red"],
    title="Distribución de clases de peligrosidad (0=Baja, 1=Media, 2=Alta)"
)
plt.xlabel("Categoría")
plt.ylabel("Número de calles")
plt.show()

## limpiamos dataset

In [None]:
import pandas as pd
import numpy as np

# Cargar el dataset limpio del punto anterior (si no existe en memoria)
path = "Siniestros_urbanos_Metropolitana_2024.csv"
df = pd.read_csv(path)

# Eliminar duplicados y registros con coordenadas faltantes o fuera de rango
df = df.drop_duplicates(subset=["id_accidente"])
df = df.dropna(subset=["latitud", "longitud"])
df = df[(df["latitud"].between(-34.5, -33.0)) & (df["longitud"].between(-71.0, -69.0))]

# Convertir fechas y horas a formato datetime
if "fecha" in df.columns and "hora" in df.columns:
    df["fecha_hora"] = pd.to_datetime(df["fecha"] + " " + df["hora"], errors="coerce")
    df = df.dropna(subset=["fecha_hora"])

# Validar tipos numéricos
cols_numericas = ["n_heridos", "n_fallecidos", "dist_hospital", "tiempo_hospital"]
for c in cols_numericas:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0)

print("Datos limpios:", df.shape)
df.head()


# Armar grafo

grafo de calles

In [None]:
!pip install osmnx

In [None]:
import osmnx as ox
import networkx as nx
import geopandas as gpd

G = ox.graph_from_place("Santiago Metropolitan Region, Chile",
                        network_type="drive", simplify=True)
G_proj = ox.project_graph(G)
nodes_gdf, edges_gdf = ox.graph_to_gdfs(G_proj, nodes=True, edges=True)
edges_gdf = edges_gdf.reset_index().reset_index().rename(columns={"index": "segment_id"})
edges_gdf[["segment_id", "u", "v", "key", "name"]].head()

accidentes a tramos

In [None]:
# ---
# 3.2 Convertir accidentes a GeoDataFrame y asignar tramo más cercano
# ---

from shapely.geometry import Point
import numpy as np
acc_gdf = gpd.GeoDataFrame(
    df.copy(),
    geometry=gpd.points_from_xy(df["longitud"], df["latitud"]),
    crs="EPSG:4326"
)

acc_gdf = acc_gdf.to_crs(edges_gdf.crs)
X = acc_gdf.geometry.x.values
Y = acc_gdf.geometry.y.values

nearest = ox.distance.nearest_edges(G_proj, X, Y)
nearest_uvk = pd.DataFrame(nearest, columns=["u", "v", "key"])
edge_keys = edges_gdf[["u", "v", "key", "segment_id"]].copy()

acc_gdf = pd.concat([acc_gdf.reset_index(drop=True), nearest_uvk], axis=1)
acc_gdf = acc_gdf.merge(edge_keys, on=["u", "v", "key"], how="left")

acc_gdf[["id_accidente", "calle", "segment_id"]].head()


Grafo de calles G

In [None]:
import networkx as nx

#Cada nodo representa un tramo de calle (segment_id).
#Dos nodos estarán conectados si COMPARTEN un nodo extremo (u o v)

G_segments = nx.Graph()

for _, row in edges_gdf.iterrows():
    G_segments.add_node(
        int(row["segment_id"]),
        name=row.get("name", None),
        length=row.get("length", None),
        highway=row.get("highway", None),
        oneway=row.get("oneway", None)
    )

for node_id, edges in edges_gdf.groupby("u"):
    segments = edges["segment_id"].tolist()
    for i in range(len(segments)):
        for j in range(i + 1, len(segments)):
            G_segments.add_edge(int(segments[i]), int(segments[j]))

for node_id, edges in edges_gdf.groupby("v"):
    segments = edges["segment_id"].tolist()
    for i in range(len(segments)):
        for j in range(i + 1, len(segments)):
            G_segments.add_edge(int(segments[i]), int(segments[j]))

print("Nodos:", G_segments.number_of_nodes(), " | Aristas:", G_segments.number_of_edges())


agregar features por tramo

In [None]:
agg_features = acc_gdf.groupby("segment_id").agg(
    n_accidentes=("id_accidente", "count"),
    total_heridos=("n_heridos", "sum"),
    total_fallecidos=("n_fallecidos", "sum"),
    dist_prom=("dist_hospital", "mean"),
    tiempo_prom=("tiempo_hospital", "mean"),
).reset_index()

# Calcular índice de gravedad y peligrosidad
agg_features["gravedad_total"] = (
    3 * agg_features["total_fallecidos"] + 2 * agg_features["total_heridos"]
)

p1 = np.percentile(agg_features["gravedad_total"], 30)
p2 = np.percentile(agg_features["gravedad_total"], 80)

def peligrosidad_cat(g):
    if g <= p1:
        return 0
    elif g <= p2:
        return 1
    else:
        return 2

agg_features["peligrosidad"] = agg_features["gravedad_total"].apply(peligrosidad_cat)
agg_features.head()


In [None]:
node_features = agg_features.set_index("segment_id").to_dict(orient="index")

# añadimos atributos al grafo
nx.set_node_attributes(G_segments, node_features)
for node in G_segments.nodes():
    G_segments.nodes[node]["grado"] = G_segments.degree[node]
first_node = list(G_segments.nodes())[0]
print(G_segments.nodes[first_node])

visualización del grafo

In [None]:
import matplotlib.pyplot as plt

nodes_with_label = [n for n, d in G_segments.nodes(data=True) if "peligrosidad" in d]

color_map = []
for n in nodes_with_label:
    p = G_segments.nodes[n]["peligrosidad"]
    color_map.append({0: "green", 1: "orange", 2: "red"}.get(p, "gray"))

plt.figure(figsize=(8, 8))
nx.draw(
    G_segments.subgraph(nodes_with_label),
    node_color=color_map,
    node_size=10,
    with_labels=False
)
plt.title("Grafo de calles coloreado por peligrosidad")
plt.show()

# Preparar datos para GraphSAGE

pasar de networkx a pytorch geometric

In [None]:
import torch
from torch_geometric.utils import from_networkx

feature_cols = [
    "n_accidentes", "total_heridos", "total_fallecidos",
    "dist_prom", "tiempo_prom", "gravedad_total", "grado"
]

for n in G_segments.nodes:
    for c in feature_cols:
        G_segments.nodes[n][c] = G_segments.nodes[n].get(c, 0) or 0
data = from_networkx(G_segments)

X = torch.tensor(
    [[G_segments.nodes[n][c] for c in feature_cols] for n in G_segments.nodes],
    dtype=torch.float
)
y = torch.tensor(
    [G_segments.nodes[n].get("peligrosidad", 0) for n in G_segments.nodes],
    dtype=torch.long
)

data.x = X
data.y = y

print(data)
print("Shape X:", data.x.shape, "  Shape y:", data.y.shape)


entrenamiento, validación y test

In [None]:
from sklearn.model_selection import train_test_split

num_nodes = data.num_nodes
indices = list(range(num_nodes))

train_idx, test_idx = train_test_split(indices, test_size=0.3, random_state=42)
val_idx, test_idx = train_test_split(test_idx, test_size=0.5, random_state=42)

data.train_mask = torch.zeros(num_nodes, dtype=torch.bool)
data.val_mask = torch.zeros(num_nodes, dtype=torch.bool)
data.test_mask = torch.zeros(num_nodes, dtype=torch.bool)

data.train_mask[train_idx] = True
data.val_mask[val_idx] = True
data.test_mask[test_idx] = True

print(f"Train: {data.train_mask.sum()} | Val: {data.val_mask.sum()} | Test: {data.test_mask.sum()}")


# GraphSAGE + MLP (esta es la parte importante del cuaderno)

In [None]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import SAGEConv

class GraphSAGEClassifier(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels, dropout=0.3):
        super().__init__()

        # Dos capas GraphSAGE
        self.conv1 = SAGEConv(in_channels, hidden_channels)
        self.conv2 = SAGEConv(hidden_channels, hidden_channels)

        # Clasificador MLP, lo haré simple por ahora
        self.lin1 = torch.nn.Linear(hidden_channels, hidden_channels)
        self.lin2 = torch.nn.Linear(hidden_channels, out_channels)
        self.dropout = dropout

    def forward(self, x, edge_index):
        # 1ra capa GraphSAGE
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, p=self.dropout, training=self.training)

        # 2da capa GraphSAGE
        x = self.conv2(x, edge_index)
        x = F.relu(x)

        # MLP
        x = self.lin1(x)
        x = F.relu(x)
        x = F.dropout(x, p=self.dropout, training=self.training)
        x = self.lin2(x)
        return x

# instancia de modelo
in_channels = data.x.size(1)   # n de features x nodo
hidden_channels = 64           # R^64 como en el graphical abstract
out_channels = 3               # las 3 clases de peligrosidad

model = GraphSAGEClassifier(in_channels, hidden_channels, out_channels)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
data = data.to(device)

print(model)


entrenamiento y validación

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
criterion = torch.nn.CrossEntropyLoss()
# ojo que CrossEntropyLoss ya se encarga de hacer el softmax
# el loss ya hace log_softmax + NLLLoss

def train():
    model.train()
    optimizer.zero_grad()
    out = model(data.x, data.edge_index)
    loss = criterion(out[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()
    return loss.item()

@torch.no_grad()
def evaluate(mask):
    model.eval()
    out = model(data.x, data.edge_index)
    preds = out[mask].argmax(dim=1)
    y_true = data.y[mask]
    acc = (preds == y_true).float().mean().item()
    return acc, preds, y_true

num_epochs = 100
for epoch in range(1, num_epochs + 1):
    loss = train()
    train_acc, _, _ = evaluate(data.train_mask)
    val_acc, _, _ = evaluate(data.val_mask)

    if epoch % 10 == 0 or epoch == 1:
        print(f"Epoch {epoch:03d} | Loss: {loss:.4f} | Train Acc: {train_acc:.3f} | Val Acc: {val_acc:.3f}")


metricas

In [None]:
from sklearn.metrics import classification_report, confusion_matrix

test_acc, test_preds, test_true = evaluate(data.test_mask)
print(f"Test Accuracy: {test_acc:.3f}\n")

test_preds_np = test_preds.cpu().numpy()
test_true_np = test_true.cpu().numpy()

print("Reporte de clasificación (0=Baja, 1=Media, 2=Alta):")
print(classification_report(test_true_np, test_preds_np, digits=3))

print("Matriz de confusión:")
print(confusion_matrix(test_true_np, test_preds_np))