In [4]:
from pyvis.network import Network
import pandas as pd

## 1. CPG (Code Property Graph) - single file

In [14]:
import ast
import networkx as nx

class CPGBuilder(ast.NodeVisitor):
    def __init__(self):
        self.graph = nx.MultiDiGraph()
        self.current_scope = "global"
        self.node_counter = 0
    
    def generic_visit(self, node):
        # Alapértelmezett viselkedés minden csomóponttípusra
        node_id = self._add_node(node)
        for field, value in ast.iter_fields(node):
            if isinstance(value, ast.AST):
                child_id = self.visit(value)
                self.graph.add_edge(node_id, child_id, label=field)
            elif isinstance(value, list):
                for item in value:
                    if isinstance(item, ast.AST):
                        child_id = self.visit(item)
                        self.graph.add_edge(node_id, child_id, label=field)
        return node_id
    
    def _add_node(self, node):
        node_id = f"{type(node).__name__}_{self.node_counter}"
        self.node_counter += 1
        
        # Alap attribútumok
        attrs = {
            'type': type(node).__name__,
            'lineno': getattr(node, 'lineno', None),
            'col_offset': getattr(node, 'col_offset', None)
        }
        
        # Típus-specifikus attribútumok
        if isinstance(node, ast.Name):
            attrs['name'] = node.id
        elif isinstance(node, ast.Constant):
            attrs['value'] = node.value
        elif isinstance(node, ast.FunctionDef):
            attrs['name'] = node.name
            self.current_scope = node.name
        
        self.graph.add_node(node_id, **attrs)
        return node_id
    
    # Példa egy specifikus csomópultípus kezelésére
    def visit_FunctionDef(self, node):
        func_id = self._add_node(node)
        
        # Paraméterek hozzáadása
        for arg in node.args.args:
            arg_id = self.visit(arg)
            self.graph.add_edge(func_id, arg_id, label='param')
        
        # Törzs hozzáadása
        for stmt in node.body:
            stmt_id = self.visit(stmt)
            self.graph.add_edge(func_id, stmt_id, label='body')
        
        return func_id

def build_cpg(source_code):
    tree = ast.parse(source_code)
    builder = CPGBuilder()
    builder.visit(tree)
    return builder.graph

# Példa használat
source = """
import time
import torch
import sys
sys.path.insert(0,'..')
from utils.learning_helpers import *
from utils.lie_algebra import se3_log_exp
import numpy as np
from liegroups import SE3
from pyslam.metrics import TrajectoryMetrics

def Validate(device, pose_model, spatial_trans, dset, loss):
    start = time.time()
    pose_model.train(False)  # Set model to evaluate mode
    pose_model.eval()        #used for batch normalization  # Set model to training mode
    spatial_trans.train(False)  
    spatial_trans.eval()        
    dset_size = dset.dataset.__len__()
    running_loss = 0.0           
        # Iterate over data.
    for data in dset:
            # get the inputs
        imgs, gt_lie_alg, intrinsics, vo_lie_alg, gt_correction = data
        gt_lie_alg = gt_lie_alg.type(torch.FloatTensor).to(device) 
        vo_lie_alg = vo_lie_alg.type(torch.FloatTensor).to(device)
        img_list = []
        for im in imgs: 
            img_list.append(im.to(device))

        intrinsics = intrinsics.type(torch.FloatTensor).to(device)[:,0,:,:] #only need one matrix since it's constant across the sequence
  
        corr, exp_mask, disparities = pose_model(img_list[0:3], vo_lie_alg)
        pose = se3_log_exp(corr, vo_lie_alg)
        minibatch_loss = loss.forward(img_list[-2], img_list[-1], pose, exp_mask, disparities, intrinsics, pose_vec_weight = vo_lie_alg, validate=True)

        running_loss += minibatch_loss.item()
     
    epoch_loss = running_loss / float(dset_size) 
    print('Validation Loss: {:.6f}'.format(epoch_loss))
    print("Validation epoch completed in {} seconds.".format(timeSince(start)))
    return epoch_loss

def test_depth_and_reconstruction(device, pose_model, spatial_trans,  dset, img_idx=[0,100,200,300]):
#    idx = np.random.randint(0,high=dset.dataset.__len__())
    exp_mask_array = torch.zeros(0)
    img_array = torch.zeros(0)
    disp_array = torch.zeros(0)
    for i in img_idx: #[1943,1944,1945,1946, 1947]:
        imgs, gt_lie_alg, intrinsics, vo_lie_alg, gt_correction = dset.dataset.__getitem__(i)
        gt_lie_alg = torch.FloatTensor(gt_lie_alg).to(device) 
        vo_lie_alg = torch.FloatTensor(vo_lie_alg).to(device)
        img_list = []
        for im in imgs:              
            img_list.append(im.to(device).unsqueeze(0))
        intrinsics = torch.FloatTensor(intrinsics).to(device)[0,:,:].unsqueeze(0)
    
        pose_model.train(False)  # Set model to evaluate mode
        pose_model.eval()        #used for batch normalization  # Set model to training mode
        spatial_trans.train(False)  
        spatial_trans.eval()        
        
        corr, exp_mask, disp = pose_model(img_list[0:3], vo_lie_alg.unsqueeze(0))
        ###comment for stereo
        exp_mask, disp = exp_mask[0], disp[0]
        disp = disp.unsqueeze(1)
        disp_array = torch.cat((disp_array, disp[0].cpu().detach()))
        depth = 1.0/disp[:,0].clone()
        pose = se3_log_exp(corr, vo_lie_alg)

        img_reconstructed = spatial_trans(img_list[-2], depth, -pose.clone(), intrinsics, intrinsics.inverse())    
        imgs = torch.stack((img_list[-2],img_reconstructed,img_list[-1]),dim=1)[0].cpu().detach()
        img_array = torch.cat((img_array, imgs))
        if exp_mask is not None:
            exp_mask = exp_mask.cpu().detach()
            exp_mask_array = torch.cat((exp_mask_array, exp_mask))

    return img_array, disp_array.numpy().squeeze(), exp_mask_array

def test_trajectory(device, pose_model, spatial_trans, dset, epoch):
    pose_model.train(False)  # Set model to evaluate mode
    pose_model.eval()        #used for batch normalization  # Set model to training mode
    spatial_trans.train(False)  
    spatial_trans.eval()     
    
    #initialize the relevant outputs
    full_corr_lie_alg_stacked, rot_corr_lie_alg_stacked, gt_lie_alg_stacked, vo_lie_alg_stacked, corrections_stacked, gt_corrections_stacked= \
            np.empty((0,6)), np.empty((0,6)), np.empty((0,6)), np.empty((0,6)), np.empty((0,6)), np.empty((0,6))

    for data in dset:
        imgs, gt_lie_alg, intrinsics, vo_lie_alg, gt_correction = data
        gt_lie_alg = gt_lie_alg.type(torch.FloatTensor).to(device)   
        vo_lie_alg = vo_lie_alg.type(torch.FloatTensor).to(device)
        img_list = []
        for im in imgs:              
            img_list.append(im.to(device))

        corr, exp_mask, disp = pose_model(img_list[0:3], vo_lie_alg)
        exp_mask, disp = exp_mask[0], disp[0][:,0]
        corr_rot = torch.clone(corr)
        corr_rot[:,0:3]=0

        corrected_pose = se3_log_exp(corr, vo_lie_alg)
        corrected_pose_rot_only = se3_log_exp(corr_rot, vo_lie_alg)
        
        
        corrections_stacked = np.vstack((corrections_stacked, corr.cpu().detach().numpy()))
        gt_corrections_stacked = np.vstack((gt_corrections_stacked, gt_correction.cpu().detach().numpy()))
        full_corr_lie_alg_stacked = np.vstack((full_corr_lie_alg_stacked, corrected_pose.cpu().detach().numpy()))
        rot_corr_lie_alg_stacked = np.vstack((rot_corr_lie_alg_stacked, corrected_pose_rot_only.cpu().detach().numpy()))
        gt_lie_alg_stacked = np.vstack((gt_lie_alg_stacked, gt_lie_alg.cpu().detach().numpy()))
        vo_lie_alg_stacked = np.vstack((vo_lie_alg_stacked, vo_lie_alg.cpu().detach().numpy()))

    est_traj, corr_traj, corr_traj_rot, gt_traj = [],[],[],[]
    gt_traj = dset.dataset.raw_gt_trials[0]
    est_traj.append(gt_traj[0])
    corr_traj.append(gt_traj[0])
    corr_traj_rot.append(gt_traj[0])

    cum_dist = [0]
    for i in range(0,full_corr_lie_alg_stacked.shape[0]):
        #classically estimated traj
        dT = SE3.exp(vo_lie_alg_stacked[i])
        new_est = SE3.as_matrix((dT.dot(SE3.from_matrix(est_traj[i],normalize=True).inv())).inv())
        est_traj.append(new_est)
        cum_dist.append(cum_dist[i]+np.linalg.norm(dT.trans))

        #corrected traj (rotation only)
        dT = SE3.exp(rot_corr_lie_alg_stacked[i])
        new_est = SE3.as_matrix((dT.dot(SE3.from_matrix(corr_traj_rot[i],normalize=True).inv())).inv())
        corr_traj_rot.append(new_est)
#        
#        
#        #corrected traj (full pose)
        dT = SE3.exp(full_corr_lie_alg_stacked[i])
        new_est = SE3.as_matrix((dT.dot(SE3.from_matrix(corr_traj[i],normalize=True).inv())).inv())
        corr_traj.append(new_est)

    gt_traj_se3 = [SE3.from_matrix(T,normalize=True) for T in gt_traj]
    est_traj_se3 = [SE3.from_matrix(T,normalize=True) for T in est_traj]
    corr_traj_se3 = [SE3.from_matrix(T,normalize=True) for T in corr_traj]
    corr_traj_rot_se3 = [SE3.from_matrix(T,normalize=True) for T in corr_traj_rot]
    
    tm_est = TrajectoryMetrics(gt_traj_se3, est_traj_se3, convention = 'Twv')
    tm_corr = TrajectoryMetrics(gt_traj_se3, corr_traj_se3, convention = 'Twv')
    tm_corr_rot = TrajectoryMetrics(gt_traj_se3, corr_traj_rot_se3, convention = 'Twv')
    
    if epoch >= 0:
        est_mean_trans, est_mean_rot = tm_est.mean_err()
        corr_mean_trans, corr_mean_rot = tm_corr.mean_err()
        corr_rot_mean_trans, corr_rot_mean_rot = tm_corr_rot.mean_err()
        print("Odom. mean trans. error: {} | mean rot. error: {}".format(est_mean_trans, est_mean_rot*180/np.pi))
        print("Corr. mean trans. error: {} | mean rot. error: {}".format(corr_mean_trans, corr_mean_rot*180/np.pi))
        print("Corr. (rot. only) mean trans. error: {} | mean rot. error: {}".format(corr_rot_mean_trans, corr_rot_mean_rot*180/np.pi))
        
        seg_lengths = list(range(100,801,100))
        _, seg_errs_est = tm_est.segment_errors(seg_lengths, rot_unit='rad')
        _, seg_errs_corr = tm_corr.segment_errors(seg_lengths, rot_unit='rad')
        _, seg_errs_corr_rot = tm_corr_rot.segment_errors(seg_lengths, rot_unit='rad')
        print("Odom. mean Segment Errors: {} (trans, %) | {} (rot, deg/100m)".format(np.mean(seg_errs_est[:,1])*100, 100*np.mean(seg_errs_est[:,2])*180/np.pi))
        print("Corr. mean Segment Errors: {} (trans, %) | {} (rot, deg/100m)".format(np.mean(seg_errs_corr[:,1])*100, 100*np.mean(seg_errs_corr[:,2])*180/np.pi))
        print("Corr. (rot. only) mean Segment Errors: {} (trans, %) | {} (rot, deg/100m)".format(np.mean(seg_errs_corr_rot[:,1])*100, 100*np.mean(seg_errs_corr_rot[:,2])*180/np.pi)) 
        
    rot_seg_err = 100*np.mean(seg_errs_corr_rot[:,2])*180/np.pi

    return corrections_stacked, gt_corrections_stacked, full_corr_lie_alg_stacked, vo_lie_alg_stacked, gt_lie_alg_stacked, \
        np.array(corr_traj), np.array(corr_traj_rot), np.array(est_traj), np.array(gt_traj), rot_seg_err, corr_rot_mean_trans, np.array(cum_dist)
        

"""

cpg = build_cpg(source)

In [16]:
import matplotlib.pyplot as plt

def visualize_cpg_pyvis(cpg, filename="cpg.html"):
    # Create PyVis network
    net = Network(
        directed=True,
        height="1000px",
        width="100%",
        notebook=False,
        bgcolor="#222222",
        font_color="white"
    )
    
    # Add nodes with all attributes
    for node, data in cpg.nodes(data=True):
        net.add_node(
            node,
            label=data.get('label', data['type']),
            title=data.get('title', ""),
            group=data['type'],  # Group by node type
            shape="box",
            color={
                'FunctionDef': '#FF6B6B',
                'Name': '#4ECDC4',
                'Constant': '#FFE66D',
                'If': '#A5D8FF',
                'Call': '#C8A2C8'
            }.get(data['type'], '#7FB3D5')
        )
    
    # Add edges with labels
    for u, v, data in cpg.edges(data=True):
        net.add_edge(u, v, label=data.get('label', ''), color='#888888')
    
    # Configure physics for better layout
    net.show_buttons(filter_=['physics'])
    
    # Save to HTML file
    net.save_graph(filename)

def build_and_visualize(source_code, output_file="cpg.html"):
    tree = ast.parse(source_code)
    builder = CPGBuilder()
    builder.visit(tree)
    return visualize_cpg_pyvis(builder.graph, output_file)

output_html = build_and_visualize(source, "factorial_cpg.html")
print(f"CPG visualization saved to {output_html}")

CPG visualization saved to None


## 2. CPG (Code Property Graph) - multiple files

In [17]:
import ast
import networkx as nx
import os
from pathlib import Path

class MultiFileCPGBuilder:
    def __init__(self):
        self.graph = nx.MultiDiGraph()
        self.node_counter = 0
        self.current_scope = []
        self.file_nodes = {}  # Fájlokhoz tartozó root node-ok
        
    def add_source_file(self, file_path, source_code):
        # Fájl root node hozzáadása
        file_id = f"File_{os.path.basename(file_path)}_{self.node_counter}"
        self.node_counter += 1
        self.graph.add_node(file_id, type='File', name=file_path, path=str(file_path))
        self.file_nodes[file_path] = file_id
        
        # AST elemzés
        tree = ast.parse(source_code)
        visitor = FileVisitor(self.graph, self.node_counter, self.current_scope.copy())
        visitor.visit(tree)
        
        # Kapcsolat a fájl node és a globális elemek között
        for node in ast.walk(tree):
            if isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.AsyncFunctionDef)):
                def_id = f"{type(node).__name__}_{node.name}_{visitor.node_counter}"
                if def_id in visitor.local_ids:
                    self.graph.add_edge(file_id, def_id, label='contains')
        
        # Frissítjük a node számlálót
        self.node_counter = visitor.node_counter
        
        return file_id

class FileVisitor(ast.NodeVisitor):
    def __init__(self, graph, start_counter, current_scope):
        self.graph = graph
        self.node_counter = start_counter
        self.current_scope = current_scope
        self.local_ids = set()
    
    def _add_node(self, node, extra_attrs=None):
        node_id = f"{type(node).__name__}_{self.node_counter}"
        self.node_counter += 1
        
        attrs = {
            'type': type(node).__name__,
            'lineno': getattr(node, 'lineno', None),
            'col_offset': getattr(node, 'col_offset', None),
            'scope': '::'.join(self.current_scope) if self.current_scope else 'global'
        }
        
        if extra_attrs:
            attrs.update(extra_attrs)
        
        if isinstance(node, ast.Name):
            attrs['name'] = node.id
        elif isinstance(node, ast.Constant):
            attrs['value'] = node.value
        elif isinstance(node, ast.FunctionDef):
            attrs['name'] = node.name
            attrs['label'] = f"Function: {node.name}"
        
        self.graph.add_node(node_id, **attrs)
        self.local_ids.add(node_id)
        return node_id
    
    def visit_FunctionDef(self, node):
        self.current_scope.append(node.name)
        func_id = self._add_node(node, {'name': node.name})
        
        # Paraméterek
        for arg in node.args.args:
            arg_id = self._add_node(arg, {'name': arg.arg})
            self.graph.add_edge(func_id, arg_id, label='param')
        
        # Törzs
        for stmt in node.body:
            stmt_id = self.visit(stmt)
            self.graph.add_edge(func_id, stmt_id, label='body')
        
        self.current_scope.pop()
        return func_id
    
    def visit_ClassDef(self, node):
        self.current_scope.append(node.name)
        class_id = self._add_node(node, {'name': node.name})
        
        # Osztály törzse
        for stmt in node.body:
            stmt_id = self.visit(stmt)
            self.graph.add_edge(class_id, stmt_id, label='body')
        
        self.current_scope.pop()
        return class_id
    
    def generic_visit(self, node):
        node_id = self._add_node(node)
        for field, value in ast.iter_fields(node):
            if isinstance(value, ast.AST):
                child_id = self.visit(value)
                self.graph.add_edge(node_id, child_id, label=field)
            elif isinstance(value, list):
                for item in value:
                    if isinstance(item, ast.AST):
                        child_id = self.visit(item)
                        self.graph.add_edge(node_id, child_id, label=field)
        return node_id

def build_multi_file_cpg(source_files):
    """
    source_files: dictionary {file_path: source_code}
    """
    builder = MultiFileCPGBuilder()
    
    for file_path, source_code in source_files.items():
        builder.add_source_file(file_path, source_code)
    
    return builder.graph

# Példa használat:
def load_source_files(directory):
    source_files = {}
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith('.py'):
                path = Path(root) / file
                with open(path, 'r', encoding='utf-8') as f:
                    source_files[str(path)] = f.read()
    return source_files

# Kódbázis beolvasása
source_files = load_source_files('repos/ss-dpc-net_python')

# CPG létrehozása
cpg = build_multi_file_cpg(source_files)

# Gráf vizualizáció (használd a korábbi PyVis megoldást)
visualize_cpg_pyvis(cpg, "full_codebase_cpg.html")



## 3. CFG (Control Flow Graph)

In [7]:
from py2cfg import CFGBuilder

# CFG építése egy szkripthez
cfg = CFGBuilder().build_from_file('example', 'repos/ss-dpc-net_python/train_mono.py')
cfg.build_visual('output_cfg', 'pdf')




'output_cfg.pdf'

In [3]:
# Adatok gyűjtésére szolgáló listák
nodes_data = []
edges_data = []

# Node-ok bejárása és adatok gyűjtése
for node_id, node in cfg.nodes.items():
    nodes_data.append({
        'node_id': node_id,
        'label': str(node),
        'line_number': getattr(node, 'lineno', None),
        'type': type(node).__name__,
        'out_edges': len(node.outgoing),
        'in_edges': len(node.incoming)
    })
    
    # Élek gyűjtése (a node outgoining élein keresztül)
    for edge in node.outgoing:
        edges_data.append({
            'source': node_id,
            'target': edge.target.id,
            'label': edge.label if hasattr(edge, 'label') else None,
            'edge_type': type(edge).__name__
        })

# DataFrame-ek létrehozása
nodes_df = pd.DataFrame(nodes_data)
edges_df = pd.DataFrame(edges_data)

# Megjelenítés
print("Node-ok:")
print(nodes_df.head())
print("\nÉlek:")
print(edges_df.head())

AttributeError: 'CFG' object has no attribute 'nodes'

## GZ Call graph

In [47]:
node_ids = pd.read_csv('bsc-code/gz/onlab/output_dir/processed_combined_method_nodes.csv', header=None)
nodes = pd.read_csv('bsc-code/gz/onlab/output_dir/nodes_token_ready.csv', header=None)
edges = pd.read_csv('bsc-code/gz/onlab/output_dir/method_only_edges.csv', header=None)


nodes['OGID'] = node_ids[0]
nodes['ID'] = nodes.index

edges = edges.merge(nodes[['OGID', 'ID']].rename(columns={'ID': 'src'}), left_on=0, right_on='OGID', how='left').drop(columns=['OGID'])
edges = edges.merge(nodes[['OGID', 'ID']].rename(columns={'ID': 'target'}), left_on=1, right_on='OGID', how='left').drop(columns=['OGID', 0, 1])
edges

Unnamed: 0,src,target
0,2,1
1,2,3
2,2,4
3,2,8
4,2,17
...,...,...
1943,1232,1355
1944,1232,1356
1945,1232,1357
1946,1232,1358


In [51]:
# create pyg graph
import torch
from torch_geometric.data import Data
from torch_geometric.utils import from_networkx

pyg = Data(x=torch.tensor(nodes.values), edge_index=torch.tensor(edges.values))
pyg

Data(x=[1360, 354], edge_index=[1948, 2])

In [54]:
# Visualize a pyg graph using PyVis
def visualize_pyg_graph(pyg_data, filename="pyg_graph.html"):
    net = Network(directed=True)
    
    # Add nodes
    for i in range(pyg_data.num_nodes):
        net.add_node(i, label=str(pyg_data.x[i].tolist()))
    
    # Add edges
    for edge in pyg_data.edge_index.tolist():
        net.add_edge(edge[0], edge[1])
    
    # Save to HTML file
    net.save_graph(filename)


visualize_pyg_graph(pyg, "pyg_graph.html")