In [1]:
import logging
import os
import shutil
import subprocess
from typing import List, Dict

import pandas as pd


In [2]:
def read_file(base_directory: str) -> pd.DataFrame:
    """
    Traverse a directory and its subdirectories to find and process files with the ".res.c" extension.
    Files are processed in alphabetical order, ensuring consistent execution across runs on the same system.
    
    Parameters:
    - base_directory (str): The base directory path from which to start the search.
    
    Returns:
    - pd.DataFrame: A DataFrame containing the file path without the ".res.c" extension, the directory name,
      and a boolean indicating whether a corresponding ".cocci" file exists for each ".res.c" file found.
    """
    out: List[Dict[str, any]] = []  # Initialize a list to hold the file information dictionaries.
    for root, dirs, files in os.walk(base_directory):
        files.sort()  # Sort files alphabetically within each directory for consistent processing order.
        dirs.sort()   # Sort subdirectories alphabetically to ensure consistent traversal order.
        for filename in files:
            if filename.endswith(".res.c.sanitized.res.c") and "gpt" not in filename and "spinfer" not in filename and "deepseek" not in filename and "claude" not in filename:
                filepath = os.path.join(root, filename).split(".res.c.sanitized.res.c")[0]
                temp_dict: Dict[str, any] = {
                    "filepath": filepath,
                    "directory": root.split("/")[-1] if "treewide" not in root else "treewide",
                    "is_cocci": os.path.isfile(filepath + ".cocci")
                }
                out.append(temp_dict)
    return pd.DataFrame(out)

def write_index_file(df: pd.DataFrame)->List[str]:
    directories = df["directory"].unique()
    outputs = []
    for d in directories:
        temp_df = df[df["directory"]==d]
        written = []
        index = []
        for idx, row in temp_df.iterrows():
            base_directory = row["filepath"].split("/") 

            if row["filepath"] not in written:
                c_init = row["filepath"] + ".c.sanitized.c"
                c_final = row["filepath"] + ".res.c.sanitized.res.c"
                written.append(row["filepath"])

            index.append(f"{c_init} {c_final}")

        index = "\n".join(index)

        base_directory = base_directory[:-1]
        output_filepath = os.path.join("/".join(base_directory), "index")
        with open(output_filepath, "w") as f:
            f.write(index)    
        outputs.append(output_filepath)
    return outputs

def run_spinfer(index_filepath: str, output_filepath: str):
    try:
        # Define the command to run
        command = ["spinfer.native", "-f", index_filepath, "-o", output_filepath]
        
        # Execute the command
        result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
        
        # Print the output if necessary
        logging.info(f"{index_filepath} has been processed!")
    except subprocess.CalledProcessError as e:
        # Handle errors in command execution
        logging.error(f"Error occurred while executing command: {e}")

In [3]:
INFERENCE_DIRECTORY = f"./"
directory = INFERENCE_DIRECTORY
logging.info(f"Processing directory: {directory}")
# read c files
df = read_file(directory)

# write index files
index_filepaths = write_index_file(df)

# # run spinfer
for p in index_filepaths:
    directory = p.split("/")[:-1]
    directory = "/".join(directory)
    output_filepath = os.path.join(directory, f"final_response_0.spinfer.cocci")
    print(output_filepath)
    # run_spinfer(p, output_filepath)

./EXP0-7/final_response_0.spinfer.cocci
./dasd_smalloc/final_response_0.spinfer.cocci
./dma_pool_alloc-52/final_response_0.spinfer.cocci
./early_memunmap/final_response_0.spinfer.cocci
./free_bootmem-77/final_response_0.spinfer.cocci
./kees_timer1/final_response_0.spinfer.cocci
./perf_evlist__mmap-69/final_response_0.spinfer.cocci
./random_ether_addr-84/final_response_0.spinfer.cocci
./snd_soc/final_response_0.spinfer.cocci
./sock_poll_wait-84/final_response_0.spinfer.cocci
./tcaction/final_response_0.spinfer.cocci
./tcf_block_get-61/final_response_0.spinfer.cocci
./ttm_bo_init-60/final_response_0.spinfer.cocci
./uartlite/final_response_0.spinfer.cocci


In [4]:
df.iloc[0].filepath

'./EXP0-7/1533533124_2018-08-06_4fd786e6c3d6_backref_btrfs_check_shared'

In [6]:
TEST_DIRECTORY = f"../test"
for p in index_filepaths:
    directory = p.split("/")[:-1]
    directory = "/".join(directory)
    output_filepath = os.path.join(directory, f"final_response_0.spinfer.cocci")

    # copy sp to test directory
    if os.path.exists(output_filepath):
        directory = directory.split("/")[-1]
        cocci_filepath = os.path.join(TEST_DIRECTORY, directory, f"sp_out.final.spinfer.cocci")
        shutil.copy(output_filepath, cocci_filepath)
    else:
        print(f"Failed to copy {output_filepath}! The file does not exist.")

In [7]:
def apply_cocci(cocci_filepath: str, c_before_filepath: str, c_after_result_filepath: str, debug_filepath: str) -> None:
    '''
    Applies a semantic patch specified by cocci_filepath to a C file, writes the result to c_after_result_filepath,
    and writes debug information to debug_filepath.
    '''
    # Construct the command as a list for better security and cross-platform support
    command = [
        "spatch",
        "--sp-file", cocci_filepath,
        c_before_filepath,
        "-o", c_after_result_filepath,
        "--debug",
        "--timeout", "30"
    ]
    
    # Open the debug file to write the output and error information
    with open(debug_filepath, "w") as output_file:
        try:
            # Run the command using subprocess.run
            result = subprocess.run(command, stdout=output_file, stderr=output_file, check=True)
            logging.info(f"{' '.join(command)} is executed!")
        except subprocess.CalledProcessError as e:
            # Handle errors in command execution
            logging.error(f"Error occurred: {e}")

In [8]:
directories = df["directory"].unique()
for d in directories:
    logging.info(f"Processing directory: {d}")

    filter = df["directory"] == d
    temp_df = df[filter]

    for idx, row in temp_df.iterrows():
        logging.info(f"Processing file: {row['filepath']}")
        
        current_d = "/".join(row["filepath"].split("/")[:-1])
        
        cocci_filepath = os.path.join(current_d, f"final_response_0.spinfer.cocci")

        if os.path.exists(cocci_filepath):
            logging.info(f"Cocci filepath is found: {cocci_filepath}")
            
            c_init = row["filepath"] + ".c.sanitized.c"
            c_final = row["filepath"] + f".0.spinfer.res.c"
            debug = row["filepath"] + f".0.spinfer.res.c"
            # if os.path.exists(c_final):
            #     os.remove(c_final)
            #     os.remove(debug)
            
            apply_cocci(cocci_filepath, c_init, c_final, debug)

        else:
            logging.info(f"Cocci filepath not found: {cocci_filepath}")

ERROR:root:Error occurred: Command '['spatch', '--sp-file', './snd_soc/final_response_0.spinfer.cocci', './snd_soc/1524629994_2018-04-25_e4b31b816c47_mt2701-afe-pcm_mt2701_dlm_fe_startup.c.sanitized.c', '-o', './snd_soc/1524629994_2018-04-25_e4b31b816c47_mt2701-afe-pcm_mt2701_dlm_fe_startup.0.spinfer.res.c', '--debug', '--timeout', '30']' returned non-zero exit status 255.
ERROR:root:Error occurred: Command '['spatch', '--sp-file', './snd_soc/final_response_0.spinfer.cocci', './snd_soc/1524629994_2018-04-25_e4b31b816c47_mtk-afe-fe-dai_mtk_afe_fe_trigger.c.sanitized.c', '-o', './snd_soc/1524629994_2018-04-25_e4b31b816c47_mtk-afe-fe-dai_mtk_afe_fe_trigger.0.spinfer.res.c', '--debug', '--timeout', '30']' returned non-zero exit status 255.
ERROR:root:Error occurred: Command '['spatch', '--sp-file', './tcf_block_get-61/final_response_0.spinfer.cocci', './tcf_block_get-61/1507896057_2017-10-13_69d78ef25c7b_sch_drr_drr_init_qdisc.c.sanitized.c', '-o', './tcf_block_get-61/1507896057_2017-10-13