In [1]:
import numpy as np
from collections import defaultdict, Counter
import math
import os
import re
from sklearn.model_selection import train_test_split

In [2]:
def split_annotated_files(folder, test_size=0.15):
    filenames = [f for f in os.listdir(folder) if f.endswith(".ann")]
    train_files, test_files = train_test_split(filenames, test_size=test_size, random_state=31)
    return train_files, test_files

In [3]:
def load_training_data(folder, filenames):
    obs_all = []
    state_all = []
    for filename in filenames:
        filepath = os.path.join(folder, filename)
        obs_seq = []
        state_seq = []
        with open(filepath, "r") as f:
            for line in f:
                tokens = line.strip().split()
                obs_line = []
                state_line = []
                for t in tokens:
                    if "::" in t:
                        obs_tok, state_tok = t.split("::", 1)
                        obs_line.append(obs_tok)
                        state_line.append(state_tok)
                obs_seq.extend(reversed(obs_line))
                state_seq.extend(reversed(state_line))
        obs_all.append(obs_seq)
        state_all.append(state_seq)
    return obs_all, state_all

In [4]:
def add_file_to_mappings(mappings: dict, filename: str, lines: list) -> dict:
    file_map = []

    for line in lines:
        mapped_tokens = []

        for token in line:
            if re.search(r'\d', token):
                mapped_tokens.append(token)
            else:
                mapped_tokens.append("_")

        file_map.append(mapped_tokens)

    file_id = str(len(mappings) + 1)
    mappings[file_id] = {
        "nome": filename,
        "map": file_map
    }

    return mappings

In [5]:
def remove_var_no(lines):
    new_lines = []
    for line in lines:
        line = re.sub(r'\bvar\d+(?:_\d+)?\b', 'var', line)
        line = re.sub(r'\bfunction_name\d+\b', 'function_name', line)
        line = re.sub(r'\bclass_name\d+\b', 'class_name', line)
        line = re.sub(r'->function_name\d+\b', '->function_name', line)
        new_lines.append(line)
    return new_lines
    

In [6]:
# Processar ficheiros de teste com anotação
def process_test_files(folder, ann_filenames):
    obs_all = []
    state_all = []
    mappings = {}

    for ann_filename in ann_filenames:
        filepath = os.path.join(folder, ann_filename)

        if not os.path.isfile(filepath):
            continue

        with open(filepath, "r") as f:
            lines = [line.strip().split() for line in f]

        # Remove true states and "::" separator, keep only observations and states
        obs_lines = []
        state_lines = []
        for line in lines:
            obs_line = []
            state_line = []
            for token in line:
                if "::" in token:
                    obs_tok, state_tok = token.split("::", 1)
                    obs_line.append(obs_tok)
                    state_line.append(state_tok)
            obs_lines.append(obs_line)
            state_lines.append(state_line)

        # Save mapping before removing var_no
        mappings = add_file_to_mappings(mappings, ann_filename, obs_lines)

        # Remove var_no from obs_lines for output
        obs_lines_cleaned = [remove_var_no(obs_line) for obs_line in obs_lines]

        # Flatten and clean for output
        flattened_obs = [token for line in obs_lines_cleaned for token in reversed(line)]
        flattened_state = [token for line in state_lines for token in reversed(line)]
        obs_all.append((ann_filename, flattened_obs))
        state_all.append((ann_filename, flattened_state))

    return obs_all, state_all, mappings


In [7]:
def train(states, observations, sequences, labels):
    start_counts = Counter()
    trans_counts = defaultdict(Counter)
    emit_counts = defaultdict(Counter)
    state_counts = Counter()

    for obs_seq, state_seq in zip(sequences, labels):
        assert len(obs_seq) == len(state_seq)
        for s in state_seq:
            states.add(s)
        for o in obs_seq:
            observations.add(o)

        start_counts[state_seq[0]] += 1

        for i in range(len(state_seq)):
            state = state_seq[i]
            obs = obs_seq[i]
            state_counts[state] += 1
            emit_counts[state][obs] += 1
            if i > 0:
                prev_state = state_seq[i - 1]
                trans_counts[prev_state][state] += 1

    # Add-1 smoothing: initialize all possible combinations
    states = sorted(states)
    observations = sorted(observations)

    # Start probabilities
    total_starts = sum(start_counts.values()) + len(states)
    start_probs = {
        s: (start_counts[s] + 1) / total_starts
        for s in states
    }

    # Transition probabilities
    trans_probs = {
        s1: {
            s2: (trans_counts[s1][s2] + 1) / (sum(trans_counts[s1].values()) + len(states))
            for s2 in states
        }
        for s1 in states
    }

    # Emission probabilities
    emit_probs = {
        s: {
            o: (emit_counts[s][o] + 1) / (state_counts[s] + len(observations))
            for o in observations
        }
        for s in states
    }

    return start_probs, trans_probs, emit_probs

In [8]:
SMOOTH = 1e-6

def viterbi(sequence, states, start_probs, trans_probs, emit_probs):
    V = [{}]
    path = {}

    for state in states:
        start_p = start_probs.get(state, SMOOTH)
        emit_p = emit_probs.get(state, {}).get(sequence[0], SMOOTH)
        V[0][state] = math.log(max(start_p, SMOOTH)) + math.log(max(emit_p, SMOOTH))
        path[state] = [state]

    for t in range(1, len(sequence)):
        V.append({})
        new_path = {}

        for curr_state in states:
            emit_p = emit_probs.get(curr_state, {}).get(sequence[t], SMOOTH)
            max_prob, prev_st = max(
                (
                    V[t - 1][prev_state]
                    + math.log(max(trans_probs.get(prev_state, {}).get(curr_state, SMOOTH), SMOOTH))
                    + math.log(max(emit_p, SMOOTH)),
                    prev_state,
                )
                for prev_state in states
            )
            V[t][curr_state] = max_prob
            new_path[curr_state] = path[prev_st] + [curr_state]

        path = new_path

    final_state = max(V[-1], key=V[-1].get)
    return path[final_state], V[-1][final_state]

In [9]:
corpus_folder = "PHP2IL/hmm/dataset_ann_ss_func_no_bias__/vuln"

train_files, test_files = split_annotated_files(corpus_folder)

# 1. Carregar dados anotados (ficheiros .ann com estado-token)
obs_train, state_train = load_training_data(corpus_folder, train_files)



test_obs, test_state, mappings = process_test_files(corpus_folder, test_files)



# 3. Extrair o vocabulário de observações e estados
observations = set(tok for seq in obs_train for tok in seq)
states = set(tag for seq in state_train for tag in seq)

# 4. Treinar modelo HMM supervisionado (a partir de dados anotados)
start_probs, trans_probs, emit_probs = train(states, observations, obs_train, state_train)

# 5. Avaliar o modelo nos dados de teste (tagging)
true_tags = []
pred_tags = []
predictions = []

for filename, obs_seq in test_obs:
    pred_seq, _ = viterbi(obs_seq, states, start_probs, trans_probs, emit_probs)
    predictions.append((filename, obs_seq, pred_seq))

    # Encontrar a sequência real para este ficheiro no conjunto anotado
    idx = test_files.index(filename)
    gold_seq = test_state[idx]
    # gold_seq is a tuple: (filename, [tags])
    if isinstance(gold_seq, tuple):
        gold_seq = gold_seq[1]
    assert len(gold_seq) == len(pred_seq), f"Mismatch in lengths for file {filename}: gold={len(gold_seq)} vs pred={len(pred_seq)}"
    true_tags.extend(gold_seq)
    pred_tags.extend(pred_seq)

from sklearn.metrics import accuracy_score, classification_report

# 6. Métricas de avaliação
print("\n--- Avaliação do modelo HMM como POS tagger ---")
print("Accuracy total:", accuracy_score(true_tags, pred_tags))
print("\nRelatório detalhado (por estado):")
print(classification_report(true_tags, pred_tags, digits=3))


--- Avaliação do modelo HMM como POS tagger ---
Accuracy total: 0.9114799446749654

Relatório detalhado (por estado):
              precision    recall  f1-score   support

         san      1.000     1.000     1.000         6
       taint      0.880     1.000     0.936       469
         und      1.000     0.742     0.852       248

    accuracy                          0.911       723
   macro avg      0.960     0.914     0.929       723
weighted avg      0.922     0.911     0.908       723



In [10]:
def restructure_predictions_by_mapping(pred_tokens, pred_tags, mapping_lines):
    """
    Divide e inverte tokens e tags por linha com base no mapping.
    """
    structured_tokens = []
    structured_tags = []
    
    idx = 0
    for line in mapping_lines:
        line_len = len(line)
        # Garantir que não estamos a exceder o comprimento
        line_tokens = pred_tokens[idx:idx + line_len]
        line_tags = pred_tags[idx:idx + line_len]

        structured_tokens.append(list(reversed(line_tokens)))
        structured_tags.append(list(reversed(line_tags)))

        idx += line_len

    return structured_tokens, structured_tags

In [11]:
def reconstruct_with_ids_by_line(pred_filename, tokens_by_line, tags_by_line, mappings):
    mapping_entry = next((v for v in mappings.values() if v["nome"] == pred_filename), None)
    if not mapping_entry:
        raise ValueError(f"Filename {pred_filename} not found in mappings.")
    
    full_map = mapping_entry["map"]
    
    reconstructed = []
    for map_line, token_line, tag_line in zip(full_map, tokens_by_line, tags_by_line):
        if not (len(map_line) == len(token_line) == len(tag_line)):
            raise ValueError("Line lengths don't match during reconstruction.")

        line_result = []
        for token_map, token_pred, tag_pred in zip(map_line, token_line, tag_line):
            if token_map == "_":
                line_result.append((token_pred, tag_pred))
            else:
                line_result.append((token_map, tag_pred))
        reconstructed.append(line_result)
    
    return reconstructed

In [12]:
reconstructed_all = {}

for filename, pred_tokens, pred_tags in predictions:

    mapping_lines = next(v["map"] for v in mappings.values() if v["nome"] == filename)

    tokens_by_line, tags_by_line = restructure_predictions_by_mapping(pred_tokens, pred_tags, mapping_lines)

    final = reconstruct_with_ids_by_line(filename, tokens_by_line, tags_by_line, mappings)
    reconstructed_all[filename] = final

In [13]:
print(start_probs)
print(emit_probs)
print(trans_probs)

{'san': 0.004629629629629629, 'taint': 0.7083333333333334, 'und': 0.28703703703703703}
{'san': {'4096': 0.01639344262295082, 'GET': 0.01639344262295082, 'POST': 0.01639344262295082, 'SESSION': 0.01639344262295082, 'SQL_QUERY': 0.01639344262295082, 'array': 0.01639344262295082, 'echo': 0.01639344262295082, 'else': 0.01639344262295082, 'exec': 0.01639344262295082, 'false': 0.01639344262295082, 'fclose': 0.01639344262295082, 'fgets': 0.01639344262295082, 'fopen': 0.01639344262295082, 'fread': 0.01639344262295082, 'if': 0.01639344262295082, 'mysql_connect': 0.01639344262295082, 'mysql_query': 0.01639344262295082, 'mysql_select_db': 0.01639344262295082, 'pclose': 0.01639344262295082, 'popen': 0.01639344262295082, 'preg_match': 0.6065573770491803, 'sprintf': 0.01639344262295082, 'system': 0.01639344262295082, 'unserialize': 0.01639344262295082, 'var': 0.01639344262295082}, 'taint': {'4096': 0.00039478878799842083, 'GET': 0.018160284247927358, 'POST': 0.0189498618239242, 'SESSION': 0.00986971

In [14]:
reconstructed_all

{'CWE_89__array-GET__no_sanitizing__multiple_AS-concatenation_simple_quote.ann': [[('var',
    'taint'),
   ('array', 'und')],
  [('var', 'taint')],
  [('var', 'taint'), ('GET', 'taint')],
  [('var', 'taint')],
  [('var', 'taint'), ('var', 'taint')],
  [('var', 'taint'), ('SQL_QUERY', 'taint'), ('var', 'taint')],
  [('var', 'taint'), ('mysql_connect', 'und')],
  [('mysql_select_db', 'und')],
  [('echo', 'und'), ('var', 'taint')],
  [('var', 'taint'), ('mysql_query', 'taint'), ('var', 'taint')]],
 'CWE_89__popen__no_sanitizing__select_from-interpretation_simple_quote.ann': [[('var',
    'taint'),
   ('popen', 'taint')],
  [('var', 'taint'), ('fread', 'taint'), ('var', 'taint'), ('4096', 'und')],
  [('pclose', 'und'), ('var', 'taint')],
  [('var', 'taint'), ('SQL_QUERY', 'taint'), ('var', 'taint')],
  [('var', 'taint'), ('mysql_connect', 'und')],
  [('mysql_select_db', 'und')],
  [('echo', 'und'), ('var', 'taint')],
  [('var', 'taint'), ('mysql_query', 'taint'), ('var', 'taint')]],
 'CWE