In [2]:
import pandas as pd
import numpy as np
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report, accuracy_score
import os
import glob

Read and process opcode files

In [3]:
def read_opcode_files(base_path):
    data = []
    labels = []
    
    # Walk through all directories
    for apt_folder in os.listdir(base_path):
        apt_path = os.path.join(base_path, apt_folder)
        if os.path.isdir(apt_path):
            # Process each file in the APT folder
            for file_path in glob.glob(os.path.join(apt_path, '*')):
                with open(file_path, 'r') as f:
                    opcodes = f.read().strip()
                    data.append(opcodes)
                    labels.append(apt_folder)
    
    # Create the DataFrame
    df = pd.DataFrame({'Opcodes': data, 'APT': labels})
    
    # Print the first few rows of the DataFrame
    print("Preview of the DataFrame:")
    print(df.head())
    
    # Print some basic information about the DataFrame
    print("\nDataFrame Info:")
    print(df.info())
    
    # Save to CSV
    output_path = 'opcodes_dataset.csv'
    df.to_csv(output_path, index=False)
    print(f"\nDataset saved to: {output_path}")
    
    return df


def extract_ngram_features(df, n=1):
    if n == 1:
        unique_ngrams = set()
        for opcodes in df['Opcodes']:
            unique_ngrams.update(op.strip() for op in opcodes.split(', '))
        
        unique_ngrams = sorted(unique_ngrams)
        ngram_counts_df = pd.DataFrame(0, index=range(len(df)), columns=unique_ngrams)
        
        for i, row in df.iterrows():
            opcode_list = [op.strip() for op in row['Opcodes'].split(', ')]
            counts = Counter(opcode_list)
            
            for opcode, count in counts.items():
                ngram_counts_df.at[i, opcode] = count
    
    elif n == 2: 
        def generate_ngrams(sequence, n):
            return ['{'+', '.join(sequence[i:i + n])+'}' for i in range(len(sequence) - n + 1)]
        
        unique_ngrams = set()
        for opcodes in df['Opcodes']:
            opcode_list = [op.strip() for op in opcodes.split(', ')]
            ngrams = generate_ngrams(opcode_list, n)
            unique_ngrams.update(ngrams)
        
        unique_ngrams = sorted(unique_ngrams)
        ngram_counts_df = pd.DataFrame(0, index=range(len(df)), columns=unique_ngrams)
        
        for i, row in df.iterrows():
            opcode_list = [op.strip() for op in row['Opcodes'].split(', ')]
            ngrams = generate_ngrams(opcode_list, n)
            counts = Counter(ngrams)
            
            for ngram, count in counts.items():
                ngram_counts_df.at[i, ngram] = count
    
    else: 
        print("Invalid n value. Please use 1 or 2.")
        return None
    
    return ngram_counts_df.values

Read the data
Extract features
Combine features
Encode labels

In [11]:
base_path = '/Users/jeffreyjeyachandren/Desktop/opscode_ml/opcodes'  # Adjust this to your opscode folder path
df = read_opcode_files(base_path)

# Extract features
X_1gram = extract_ngram_features(df, n=1)
X_2gram = extract_ngram_features(df, n=2)

# Combine features
X = np.concatenate([X_1gram, X_2gram], axis=1)

# Encode labels
le = LabelEncoder()
y = le.fit_transform(df['APT'])

Preview of the DataFrame:
                                             Opcodes            APT
0  JMP\nJMP\nJMP\nJMP\nJMP\nJMP\nJMP\nJMP\nPOP\nP...        Evilnum
1  PUSH\nMOV\nMOV\nCALL\nPUSH\nPUSH\nPUSH\nPUSH\n...  APT19_opcodes
2  MOV\nPUSH\nMOV\nCALL\nTEST\nPUSH\nCALL\nADD\nM...  APT19_opcodes
3  PUSH\nPUSH\nPUSH\nMOV\nCMP\nJE\nCMP\nJNE\nPUSH...  APT19_opcodes
4  PUSH\nPUSH\nPUSH\nMOV\nCMP\nJE\nCMP\nJNE\nPUSH...  APT19_opcodes

DataFrame Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 215 entries, 0 to 214
Data columns (total 2 columns):
 #   Column   Non-Null Count  Dtype 
---  ------   --------------  ----- 
 0   Opcodes  215 non-null    object
 1   APT      215 non-null    object
dtypes: object(2)
memory usage: 3.5+ KB
None

Dataset saved to: opcodes_dataset.csv


Train, test, split

In [8]:
# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize classifiers
classifiers = {
    'SVM': SVC(kernel='rbf'),
    'KNN-3': KNeighborsClassifier(n_neighbors=3),
    'KNN-5': KNeighborsClassifier(n_neighbors=5),
    'Decision Tree': DecisionTreeClassifier()
}

# Train and evaluate classifiers
results = {}
for name, clf in classifiers.items():
    # Train
    clf.fit(X_train, y_train)
    
    # Predict
    y_pred = clf.predict(X_test)
    
    # Calculate accuracy
    accuracy = accuracy_score(y_test, y_pred)
    
        # Get unique classes present in the test set
    unique_classes = np.unique(y_test)
    target_names = le.inverse_transform(unique_classes)
    
    # Generate classification report
    report = classification_report(y_test, y_pred, 
                                 labels=unique_classes,
                                 target_names=target_names)
    
    results[name] = {
        'accuracy': accuracy,
        'report': report
    }

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Results

In [9]:
# Print results
for name, result in results.items():
    print(f"\n{name} Results:")
    print(f"Accuracy: {result['accuracy']:.4f}")
    print("\nClassification Report:")
    print(result['report'])


SVM Results:
Accuracy: 0.3488

Classification Report:
                 precision    recall  f1-score   support

  APT12_opcodes       0.00      0.00      0.00         1
  APT17_opcodes       0.00      0.00      0.00         1
   APT1_opcodes       0.00      0.00      0.00         4
          APT28       0.00      0.00      0.00         3
          APT29       0.20      1.00      0.33         6
  APT30_opcodes       1.00      0.33      0.50         3
BlueMockingbird       0.00      0.00      0.00         1
      Elderwood       0.00      0.00      0.00         1
           FIN7       0.00      0.00      0.00         1
        Gallium       0.00      0.00      0.00         1
      Gamaredon       0.00      0.00      0.00         1
       Ke3chang       0.00      0.00      0.00         2
       MenuPass       0.00      0.00      0.00         1
         Moafee       0.73      0.73      0.73        11
       Sandworm       0.00      0.00      0.00         1
          Turla       0.00      