In [1]:
from typing import Dict
import os
from glob import glob
from tqdm.auto import tqdm
import pandas as pd
import shutil

In [2]:
edge_reader_model = """let edges_reader = EdgeFileReader::new("{target_path}")?
        .set_rows_to_skip({rows_to_skip})
        .set_header({header})
        .set_separator({separator})?
        .set_verbose({verbose})
        .set_max_rows_number({max_rows_number})
        .set_sources_column("{sources_column}")?
        .set_sources_column_number({sources_column_number})?
        .set_destinations_column("{destinations_column}")?
        .set_destinations_column_number({destinations_column_number})?
        .set_weights_column("{weights_column}")?
        .set_weights_column_number({weights_column_number})?
        .set_default_weight({default_weight})
        .set_ignore_duplicates({ignore_duplicates})
        .set_skip_self_loops({skip_self_loops})
        .set_numeric_edge_type_ids({numeric_edge_type_ids})
        .set_numeric_node_ids({numeric_node_ids})
        .set_default_edge_type("{default_edge_type}"))
        .set_skip_weights_if_unavailable({skip_weights_if_unavailable})
        .set_skip_edge_types_if_unavailable({skip_edge_types_if_unavailable})
        .set_edge_types_column("{edge_types_column}")?
        .set_edge_types_column_number({edge_types_column_number})?"""

In [3]:
node_reader_model = """let nodes_reader = Some(NodeFileReader::new("{target_path}")?
        .set_rows_to_skip({rows_to_skip})
        .set_separator({separator})?
        .set_header({header})
        .set_verbose({verbose})
        .set_ignore_duplicates({ignore_duplicates})
        .set_default_node_type("{default_node_type}")
        .set_nodes_column("{nodes_column}")?
        .set_nodes_column_number({nodes_column_number})
        .set_node_types_column("{node_types_column}")?
        .set_node_types_column_number({node_types_column_number})"""

In [4]:
default_node_reader_model = """let nodes_reader = None;"""

In [5]:
regression_test_model = """extern crate graph;

use graph::{{{useful_imports}}};

#[test]
/// This is a regression test that has been automatically generated
/// by the fuzzer harness.
/// The test originally caused a panic in the file {filename},
/// specifically (at the time) line {line_number} and column {column_number}.
///
fn test_regression_{current_test_id}() -> Result<(), String> {{
    {edges_reader}

    {nodes_reader}

    let mut graph = Graph::from_unsorted_csv(
        edges_reader,
        nodes_reader,
        {directed}, // Directed
        {directed_edge_list}, // Directed edge list
        "{name}" // Name of the graph
    )?;
    let _ = graph::test_utilities::default_test_suite(&mut graph, false);
    Ok(())
}}
"""

In [6]:
def load_metadata(path:str)->Dict:
    with open(path, "r") as f:
        return dict([
            (line.strip().split(",", 1)[0], "")
            if line.strip().split(",", 1)[1] == ""
            else line.strip().split(",", 1)
            for line in f.readlines()
        ])

def format_constructor(model, metadata_path, current_test_id, list_type) -> str:
    if list_type not in ("edges", "nodes"):
        raise ValueError("Given list type is not supported!")
    
    # Move the new test edge list
    target_path = os.path.join(
        "tests/data/regression/"
        "{}.{}".format(current_test_id, list_type)
    )
    
    file_metadata = load_metadata(
        metadata_path
    )
        
    # Remove all None values
    simplified_model = "\n".join([
        line
        for line in model.split("\n")
        if not any(
            "set_{}".format(param) in line
            for param, value in file_metadata.items()
            if value == "None"
        )
    ])
    
    if list_type == "nodes":
        simplified_model += ")"
    
    simplified_model += ";"
        
    return simplified_model.format(
        target_path=target_path,
        **file_metadata
    ), target_path, file_metadata

In [7]:
move = True

for test_directory in tqdm(
    glob("fuzzing/unit_tests/*"),
    desc="Building regression tests"
):
    edges_path = os.path.join(
        test_directory,
        "edges.edges"
    )
    edges_metadata_path = os.path.join(
        test_directory,
        "edges_metadata.csv"
    )
    report_path = os.path.join(
        test_directory,
        "report.txt"
    )
    
    if os.path.exists(report_path):
        report = open(report_path, "r").read()
    else:
        report = "Report not provided"
    
    graph_metadata = load_metadata(os.path.join(
        test_directory,
        "graph_metadata.csv"
    ))
    panic_metadata = load_metadata(os.path.join(
        test_directory,
        "panic.csv"
    ))
    
    usefull_imports = ["Graph", "EdgeFileReader"]
    
    has_node_file = any(
        "node" in file
        for file in os.listdir(test_directory)
    )
    
    current_test_id = max([
        int(test_name.split(".")[0])
        for test_name in os.listdir("graph/tests/data/regression")
    ]) + 1
    
    if has_node_file:
        usefull_imports.append("NodeFileReader")
        nodes_path = os.path.join(
            test_directory,
            "nodes.nodes"
        )
        nodes_metadata_path = os.path.join(
            test_directory,
            "nodes_metadata.csv"
        )
        nodes_reader, target_node_path, _ = format_constructor(
            node_reader_model,
            nodes_metadata_path,
            current_test_id,
            "nodes"
        )
        if move:
            os.rename(
                nodes_path,
                os.path.join(
                    "graph",
                    target_node_path
                )
            )
    else:
        nodes_reader = default_node_reader_model

    edges_reader, target_edge_path, metadata = format_constructor(
        edge_reader_model,
        edges_metadata_path,
        current_test_id,
        "edges"
    )

    if move:
        os.rename(
            edges_path,
            os.path.join(
                "graph",
                target_edge_path
            )
        )
        
    regression_test = regression_test_model.format(
        edges_reader=edges_reader,
        nodes_reader=nodes_reader,
        current_test_id=current_test_id,
        filename=panic_metadata["file"].split(os.sep)[-1].strip('"'),
        line_number=panic_metadata["line"],
        column_number=panic_metadata["col"],
        useful_imports=", ".join(usefull_imports),
        report=report,
        **graph_metadata
    )
    
    with open("graph/tests/test_regression_{}.rs".format(current_test_id), "w") as f:
        f.write(regression_test)
    
    if move:
        shutil.rmtree(test_directory)

Building regression tests:   0%|          | 0/1 [00:00<?, ?it/s]

{'verbose': 'Some(false)', 'separator': 'Some(",")', 'header': 'Some(false)', 'rows_to_skip': 'Some(0)', 'ignore_duplicates': 'Some(false)', 'max_rows_number': 'None', 'default_node_type': 'None', 'nodes_column_number': 'Some(0)', 'node_types_separator': 'Some("|")', 'node_types_column': 'None', 'node_types_column_number': 'None', 'numeric_node_ids': 'Some(false)', 'numeric_node_type_ids': 'Some(true)', 'skip_node_types_if_unavailable': 'Some(false)', 'nodes_column': 'None'}
{'verbose': 'Some(false)', 'separator': 'Some(",")', 'header': 'Some(false)', 'rows_to_skip': 'Some(0)', 'ignore_duplicates': 'Some(false)', 'max_rows_number': 'None', 'sources_column_number': 'Some(0)', 'sources_column': 'None', 'destinations_column_number': 'Some(1)', 'destinations_column': 'None', 'edge_types_column_number': 'None', 'edge_types_column': 'None', 'weights_column_number': 'None', 'weights_column': 'None', 'default_weight': 'None', 'default_edge_type': 'None', 'skip_self_loops': 'Some(false)', 'nume