In [1]:
import random
from typing import List, Dict, Any

import networkx as nx
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
from sklearn.decomposition import PCA

In [2]:
class GraphVisualizer:
    def __init__(self, G: nx.Graph, cmap='Pastel1', seed=42):
        self.G = G
        self.cmap = cmap
        self.fig, self.ax = plt.subplots(figsize=(6, 6))
        self.pos = nx.spring_layout(G, seed=42)
        self.seed = seed

    def _generate_node_colors(self):
        """Genera colores para los nodos basados en sus etiquetas."""
        # Extraemos las etiquetas de los nodos
        labels = self.G.labels.values() # Asume que el grafo tiene un atributo 'labels' con las etiquetas de los nodos

        # Creamos una lista de colores para las etiquetas
        color_map = plt.colormaps[self.cmap]  # Colores automáticos
        unique_labels, num_labels = set(labels), len(set(labels))
        self.label_to_color = {label: color_map(i / num_labels) for i, label in enumerate(unique_labels)}

        # Extraemos los colores para cada nodo
        return [self.label_to_color[node] for node in labels]
    
    def _random_walk_strategy(self, neighbors: List[int], walk: List[int] = None, **kwargs) -> int:
        selected = random.choice(neighbors) # Elegimos un vecino aleatorio
        return selected, None # Devolvemos el vecino seleccionado
    
    def _node2vec_walk_strategy(self, neighbors: List[int], walk: List[int] = None, p=1, q=1, **kwargs):
        """
        p: Return parameter - Controla la probabilidad de volver al nodo anterior, valores altos(>1) reducen la probabilidad de regresar.
        q: In-out parameter - Controla la probabilidad de visitar nodos no visitados recientemente, valores altos(>1) favorecen la exploración.
        """

        if len(walk) == 1:
            return random.choice(neighbors), None

        prev_node = walk[-2] # Nodo previo (penúltimo nodo visitado)
        weights = []

        for neighbor in neighbors:
            if neighbor == prev_node: # El vecino es el nodo previo
                weights.append(1 / p)
            elif self.G.has_edge(prev_node, neighbor): # El vecino es vecino directo del nodo previo
                weights.append(1)
            else: # El vecino es un vecino lejano
                weights.append(1 / q)

        # Normalizamos las probabilidades
        total = sum(weights)
        weights = [prob / total for prob in weights]

        # Elegimos el siguiente nodo en función de las probabilidades
        selected = random.choices(neighbors, weights=weights, k=1)[0]

        return selected, weights

    def _init_animation(self):
        """Inicializa la figura para la animación."""
        self.ax.clear()
        nx.draw(self.G, self.pos, ax=self.ax, with_labels=True, font_size=8, edge_color="gray")
        return self.ax,

    def _update_animation(self, frame):
        """Actualiza la animación en cada frame."""
        self.ax.clear()
        nx.draw(self.G, self.pos, ax=self.ax, with_labels=True, font_size=8, edge_color="gray")

        step = frame // 2
        current_node = self.walk[step]
        visited_nodes = self.walk[:step + 1]
        visited_edges = [(self.walk[i], self.walk[i + 1]) for i in range(len(visited_nodes) - 1)]

        # Dibujar nodos y aristas visitados
        nx.draw_networkx_nodes(self.G, self.pos, ax=self.ax, nodelist=visited_nodes, node_color="yellow")
        nx.draw_networkx_edges(self.G, self.pos, ax=self.ax, edgelist=visited_edges, edge_color="red", width=2, alpha=0.3)

        # Destacar el nodo actual
        nx.draw_networkx_nodes(self.G, self.pos, ax=self.ax, nodelist=[current_node], node_color="green")

        neighbors_info = ""
        if frame % 2 == 1: # Expansión
            neighbors = list(nx.all_neighbors(self.G, self.node))
            neighbors_edges = [(self.node, neighbor) for neighbor in neighbors]
            selected, weights = self.R(neighbors, walk=self.walk, **self.R_params)
            self.node = selected
            self.walk.append(self.node)
            nx.draw_networkx_edges(self.G, self.pos, ax=self.ax, edgelist=neighbors_edges, edge_color="blue", width=2, alpha=0.3)

            if weights:
                neighbors_weights = {neighbor: weight for neighbor, weight in zip(neighbors, weights)}
            else:
                neighbors_weights = {neighbor: 1./len(neighbors) for neighbor in neighbors}
            neighbors_info = "\n".join([f"Node {neighbor}: {weight:.2f}" for neighbor, weight in neighbors_weights.items()])

        self.ax.text(0.01, 0.99, f'Start Node: {self.start_node}\nStep: {step + 1}\nCurrent Walk: {", ".join(map(str, visited_nodes))}\n{neighbors_info}',
                     transform=self.ax.transAxes, fontsize=9, verticalalignment='top',
                     bbox=dict(boxstyle='round', facecolor='white', alpha=0.5))

        return self.ax,

    def visualize(self, edge_labels=None):
        # Dibujamos los nodos y las aristas
        nx.draw_networkx_nodes(self.G, ax=self.ax, pos=self.pos, node_size=500)
        nx.draw_networkx_edges(self.G, ax=self.ax, pos=self.pos, alpha=0.3)
        
        # Dibujamos las etiquetas de los nodos (IDs)
        nx.draw_networkx_labels(self.G, self.pos, labels={node: node for node in self.G.nodes()}, font_size=8)

        # Dibujamos las etiquetas de las aristas
        if edge_labels:
            nx.draw_networkx_edge_labels(self.G, self.pos, edge_labels=edge_labels, font_size=8)
 
        plt.show()

    def visualize_walk(self,
                        R, # Función R o estrategia de camino 
                        start_node: int, # Nodo de inicio
                        walk_length: int, # Longitud del camino
                        save_path: str = None, # Ruta donde guardar la animación
                        **kwargs: Dict[str, Any]
                    ) -> FuncAnimation:
        """Crea y retorna la animación de un camino dada cierta estrategia."""
        match R:
            case 'random_walk':
                self.R = self._random_walk_strategy
            case 'node2vec':
                self.R = self._node2vec_walk_strategy
        self.R_params = kwargs
        self.start_node = start_node
        self.walk_length = walk_length
        self.walk = [start_node]
        self.node = start_node

        """Crea y retorna la animación del random walk."""
        ani = FuncAnimation(self.fig, self._update_animation, frames=2 * self.walk_length, init_func=self._init_animation, blit=False, interval=1000)
        
        if save_path:
            # Guarda la animación como un GIF
            ani.save(save_path, writer='pillow')

        plt.close(self.fig)
        return ani
    
    def visualize_graph_with_pca(self, features):
        embeddings = features.drop(columns=['label']) # Eliminamos la columna de etiquetas
        pca = PCA(n_components=2, random_state=42)
        coords = pca.fit_transform(embeddings)

        pos = {int(node): (coords[i, 0], coords[i, 1]) for i, node in enumerate(features.index)}
        
        # Dibujar el grafo con las coordenadas PCA
        nx.draw_networkx_nodes(self.G, pos, cmap="Set2", node_size=500)
        nx.draw_networkx_edges(self.G, pos, alpha=0.3)
        
        # Dibujar las etiquetas de los nodos
        nx.draw_networkx_labels(self.G, pos, labels={node: node for node in self.G.nodes()}, font_size=8)
        
        plt.title("Visualización del grafo con PCA")
        plt.show()

In [None]:
import networkx as nx

# Crear el grafo
G = nx.Graph()
G.add_edge('1', '2')
G.add_edge('1', '4')
G.add_edge('2', '4')
G.add_edge('4', '6')
G.add_edge('6', '5')
G.add_edge('5', '3')

# Calcular métricas
closeness = nx.closeness_centrality(G)
betweenness = nx.betweenness_centrality(G)
clustering = nx.clustering(G)
pagerank = nx.pagerank(G)

print("Cercanía:", closeness)
print("Intermediación:", betweenness)
print("Clustering:", clustering)
print("PageRank:", pagerank)


Cercanía: {'1': 0.45454545454545453, '2': 0.45454545454545453, '4': 0.625, '6': 0.625, '5': 0.5, '3': 0.35714285714285715}
Intermediación: {'1': 0.0, '2': 0.0, '4': 0.6000000000000001, '6': 0.6000000000000001, '5': 0.4, '3': 0.0}
Clustering: {'1': 1.0, '2': 1.0, '4': 0.3333333333333333, '6': 0, '5': 0, '3': 0}
PageRank: {'1': 0.15675737546868884, '2': 0.15675737546868884, '4': 0.22988931794348189, '6': 0.16857960700496266, '5': 0.1845723087243261, '3': 0.10344401538985135}


In [10]:
visualizer = GraphVisualizer(G)
ani = visualizer.visualize_walk('random_walk', start_node='1', walk_length=10, p=5, q=2, save_path='gifs/random_walk.gif')
HTML(ani.to_jshtml())