In [None]:
# Updating and installing strace to the Linux Terminal
!sudo apt update && sudo apt install strace

In [None]:
import subprocess # Library linking work in python with linux terminal
import re
import csv
import pickle
import pandas as pd
import numpy as np
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler
import os
from collections import Counter

In [None]:
# Global variables to maintain state
lof_model = None # Local Outlier Factor Model
feature_scaler = None
is_trained = False

In [None]:
# Getting the dataset of benign binaries
!git clone https://github.com/packing-box/dataset-packed-elf

In [None]:
# Note: 'test_binaries' folder has a set of malicious binaries already uploaded.
# Seperating into training and test binaries set.
# not-packed (training set) and test_binaries (test set); of binaries.
!mv ./dataset-packed-elf/not-packed/{cat, cp, cut, date, dir, echo, head, less, ls, mv} ./test_binaries/

In [None]:
def trace_binary(binary_path, timeout=5):
    # Trace binary execution using strace and return system calls
    if not os.path.exists(binary_path):
        print(f"Error: Binary not found: {binary_path}")
        return []

    if not os.access(binary_path, os.X_OK):
        print(f"Error: Binary not executable: {binary_path}")
        return []

    try:
        # Use -yy for better socket and file descriptor info
        # Use -f to follow 'child' processes
        cmd = ['strace', '-yy', '-f', '-e', 'trace=all', binary_path]
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=timeout,
            stdin=subprocess.DEVNULL  # Prevent waiting for input (Timeout)
        )
        return parse_strace_output(result.stderr)
    except subprocess.TimeoutExpired as e:
        print(f"Timeout tracing {binary_path}")
        # Try to capture any partial output
        if e.stderr:
            return parse_strace_output(e.stderr.decode('utf-8', errors='ignore'))
        return []
    except FileNotFoundError:
        print(f"Error: strace not installed or binary not found")
        return []
    except Exception as e:
        print(f"Error tracing {binary_path}: {e}")
        return []

In [None]:
def parse_strace_output(strace_output):
    # Parse strace output to extract system call names
    system_calls = []
    # More robust pattern that handles process IDs and other prefixes
    call_pattern = r'^(?:\d+\s+)?(\w+)\('

    for line in strace_output.split('\n'):
        line = line.strip()
        # Skip various strace metadata lines
        if (not line or
            line.startswith('+++') or
            line.startswith('---') or
            'unfinished' in line or
            'resumed' in line):
            continue

        match = re.match(call_pattern, line)
        if match:
            system_call = match.group(1)
            # Additional filtering
            if (system_call and
                not system_call.isdigit() and
                not system_call.startswith('+++') and
                not system_call.startswith('---')):
                system_calls.append(system_call)

    return system_calls

In [None]:
def extract_10_features(system_calls):
    # Extract exactly 10 pure features from strace logs
    if not system_calls:
        return np.zeros(10)

    call_counter = Counter(system_calls)
    total_calls = len(system_calls)
    unique_calls = len(call_counter)

    # Feature definitions
    total_call_count = total_calls

    unique_call_ratio = unique_calls / total_calls if total_calls > 0 else 0

    file_ops = ['open', 'openat', 'read', 'write', 'close', 'stat', 'fstat', 'lseek']
    file_op_count = sum(call_counter.get(op, 0) for op in file_ops)
    file_op_ratio = file_op_count / total_calls if total_calls > 0 else 0

    process_ops = ['fork', 'clone', 'execve', 'wait4', 'waitpid']
    process_op_count = sum(call_counter.get(op, 0) for op in process_ops)
    process_op_ratio = process_op_count / total_calls if total_calls > 0 else 0

    network_ops = ['socket', 'connect', 'bind', 'accept', 'sendto', 'recvfrom']
    network_op_count = sum(call_counter.get(op, 0) for op in network_ops)
    network_op_ratio = network_op_count / total_calls if total_calls > 0 else 0

    memory_ops = ['mmap', 'mprotect', 'brk', 'munmap']
    memory_op_count = sum(call_counter.get(op, 0) for op in memory_ops)
    memory_op_ratio = memory_op_count / total_calls if total_calls > 0 else 0

    security_ops = ['ptrace', 'chmod', 'chown', 'setuid', 'setgid', 'capset']
    security_op_count = sum(call_counter.get(op, 0) for op in security_ops)
    security_op_ratio = security_op_count / total_calls if total_calls > 0 else 0

    most_frequent_ratio = max(call_counter.values()) / total_calls if call_counter else 0

    # Entropy calculation
    entropy = 0
    for count in call_counter.values():
        p = count / total_calls
        entropy -= p * np.log2(p) if p > 0 else 0

    # Transition rate calculation
    transitions = 0
    for i in range(1, len(system_calls)):
        if system_calls[i] != system_calls[i-1]:
            transitions += 1
    transition_rate = transitions / total_calls if total_calls > 0 else 0

    features = [
        total_call_count,      # Feature 1
        unique_call_ratio,     # Feature 2
        file_op_ratio,         # Feature 3
        process_op_ratio,      # Feature 4
        network_op_ratio,      # Feature 5
        memory_op_ratio,       # Feature 6
        security_op_ratio,     # Feature 7
        most_frequent_ratio,   # Feature 8
        entropy,               # Feature 9
        transition_rate        # Feature 10
    ]

    return np.array(features)

In [None]:
def create_binary_dataset(binaries_dir, output_csv="binary_dataset.csv", timeout=5):
    # Create CSV dataset from all binaries in directory

    if not os.path.exists(binaries_dir):
        print(f"Error: Directory {binaries_dir} does not exist")
        return

    binaries = []
    for file in os.listdir(binaries_dir):
        file_path = os.path.join(binaries_dir, file)
        if os.path.isfile(file_path) and os.access(file_path, os.X_OK):
            binaries.append(file_path)

    print(f"Found {len(binaries)} executables in {binaries_dir}")

    headers = ['binary_path', 'total_call_count', 'unique_call_ratio', 'file_op_ratio',
               'process_op_ratio', 'network_op_ratio', 'memory_op_ratio', 'security_op_ratio',
               'most_frequent_ratio', 'entropy', 'transition_rate', 'trace_successful']

    with open(output_csv, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(headers)

        for binary_path in binaries:
            print(f"Processing {os.path.basename(binary_path)}...")
            try:
                system_calls = trace_binary(binary_path, timeout)
                features = extract_10_features(system_calls)
                row = [binary_path] + features.tolist() + [len(system_calls) > 0]
                writer.writerow(row)
            except Exception as e:
                print(f"Failed {binary_path}: {e}")
                writer.writerow([binary_path] + [0]*10 + [False])

    print(f"Dataset saved to {output_csv}")

create_binary_dataset("dataset-packed-elf/not-packed/")

In [None]:
def train_lof_detector(csv_file, n_neighbors=15, contamination=0.1, model_file="lof_model.pkl"):
    # Train the LOF model from CSV dataset and save to file
    global lof_model, feature_scaler, is_trained

    print(f"Training LOF detector from {csv_file}...")

    # Read CSV file
    df = pd.read_csv(csv_file)

    # Filter successful traces only
    successful_df = df[df['trace_successful'] == True]

    if len(successful_df) < 5:
        raise ValueError(f"Need at least 5 training samples, got {len(successful_df)}")

    # Extract feature columns (exclude binary_path and trace_successful)
    feature_columns = [col for col in successful_df.columns if col not in ['binary_path', 'trace_successful']]
    X_train = successful_df[feature_columns].values

    # Scale features
    feature_scaler = StandardScaler()
    X_train_scaled = feature_scaler.fit_transform(X_train)

    # Train LOF model
    lof_model = LocalOutlierFactor(
        n_neighbors=min(n_neighbors, len(X_train_scaled) - 1),
        contamination=contamination,
        novelty=True
    )
    lof_model.fit(X_train_scaled)

    # Save model and scaler to file
    model_data = {
        'lof_model': lof_model,
        'feature_scaler': feature_scaler,
        'feature_columns': feature_columns
    }

    with open(model_file, 'wb') as f:
        pickle.dump(model_data, f)

    is_trained = True
    print(f"Training completed with {len(successful_df)} samples")
    print(f"Feature matrix shape: {X_train.shape}")
    print(f"Model saved to {model_file}")

    return True

train_lof_detector("binary_dataset.csv")

In [None]:
def detect_anomaly(model_path, test_binaries_folder):
    # Detect anomalies in all binaries in test folder using saved model
    # Load model
    with open(model_path, 'rb') as f:
        model_data = pickle.load(f)

    lof_model = model_data['lof_model']
    feature_scaler = model_data['feature_scaler']

    results = []

    # Process all binaries in test folder
    for file in os.listdir(test_binaries_folder):
        binary_path = os.path.join(test_binaries_folder, file)
        if os.path.isfile(binary_path) and os.access(binary_path, os.X_OK):
            try:
                system_calls = trace_binary(binary_path)
                if not system_calls or len(system_calls) < 3:
                    results.append((binary_path, True, -1.0))  # Too few calls = anomalous
                    continue

                features = extract_10_features(system_calls)
                features_scaled = feature_scaler.transform([features])

                prediction = lof_model.predict(features_scaled)[0]
                anomaly_score = lof_model.decision_function(features_scaled)[0]

                is_anomalous = prediction == -1
                results.append((binary_path, is_anomalous, anomaly_score))

            except Exception as e:
                print(f"Error processing {binary_path}: {e}")
                results.append((binary_path, True, -1.0))

    return results

In [None]:
def print_detection_report(test_binaries_folder):
    # Print detection results as table with accuracy and F1 score
    import os
    from sklearn.metrics import accuracy_score, f1_score

    results = detect_anomaly("./lof_model.pkl", test_binaries_folder)

    # Extract predictions and ground truth
    y_true = []
    y_pred = []

    print("DETECTION RESULTS")
    print("=" * 50)
    print(f"{'Binary':<20} {'Verdict':<8} {'Score':<12} {'Status':<6}")
    print("-" * 50)

    for binary_path, is_anomalous, score in results:
        binary_name = os.path.basename(binary_path)

        # Truncate long filenames
        if len(binary_name) > 18:
            if '.elf' in binary_name:
                # For .elf files, show first 8 chars + ... + last 6 chars
                binary_name = binary_name[:8] + '...' + binary_name[-6:]
            else:
                binary_name = binary_name[:15] + '...'

        # Determine ground truth: .elf files are malware, others are legit
        is_malware_truth = '.elf' in binary_name or binary_name.endswith('.elf')
        y_true.append(1 if is_malware_truth else 0)
        y_pred.append(1 if is_anomalous else 0)

        verdict = "MALWARE" if is_anomalous else "LEGIT"
        status = "TRUE" if is_anomalous == is_malware_truth else "FALSE"

        print(f"{binary_name:<20} {verdict:<8} {score:<12.3f} {status:<6}")

    print("-" * 50)

    # Calculate metrics
    accuracy = accuracy_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)

    # Detailed breakdown
    tp = sum(1 for true, pred in zip(y_true, y_pred) if true == 1 and pred == 1)
    fp = sum(1 for true, pred in zip(y_true, y_pred) if true == 0 and pred == 1)
    tn = sum(1 for true, pred in zip(y_true, y_pred) if true == 0 and pred == 0)
    fn = sum(1 for true, pred in zip(y_true, y_pred) if true == 1 and pred == 0)

    print(f"\nPERFORMANCE METRICS")
    print("=" * 30)
    print(f"Accuracy:  {accuracy:.4f}")
    print(f"F1 Score:  {f1:.4f}")
    print(f"Precision: {tp/(tp+fp) if (tp+fp) > 0 else 0:.4f}")
    print(f"Recall:    {tp/(tp+fn) if (tp+fn) > 0 else 0:.4f}")
    print(f"\nConfusion Matrix:")
    print(f"TP: {tp}  FP: {fp}")
    print(f"FN: {fn}  TN: {tn}")
# TP: True Positive; FP: False Positive; FN: False Negative; TN: True Negative