In [1]:
!pip install py7zr
!pip install sentence_transformers

Collecting py7zr
  Downloading py7zr-0.22.0-py3-none-any.whl.metadata (16 kB)
Collecting texttable (from py7zr)
  Downloading texttable-1.7.0-py2.py3-none-any.whl.metadata (9.8 kB)
Collecting pycryptodomex>=3.16.0 (from py7zr)
  Downloading pycryptodomex-3.21.0-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Collecting pyzstd>=0.15.9 (from py7zr)
  Downloading pyzstd-0.16.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.4 kB)
Collecting pyppmd<1.2.0,>=1.1.0 (from py7zr)
  Downloading pyppmd-1.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.7 kB)
Collecting pybcj<1.1.0,>=1.0.0 (from py7zr)
  Downloading pybcj-1.0.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.0 kB)
Collecting multivolumefile>=0.2.3 (from py7zr)
  Downloading multivolumefile-0.2.3-py3-none-any.whl.metadata (6.3 kB)
Collecting inflate64<1.1.0,>=1.0.0 (from py7zr)
  Downloading inflate64-1.0.0-cp310-cp310-manylinux_2_17_

In [2]:
import py7zr
import zipfile
import pandas as pd
import numpy as np
import time
import os
import codecs
from concurrent.futures import ThreadPoolExecutor
from sentence_transformers import SentenceTransformer
from sklearn.preprocessing import LabelEncoder
from keras.models import Sequential
from keras.layers import LSTM, Dense, Input, Dropout, SimpleRNN, Bidirectional
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.regularizers import l2
import tensorflow as tf

  from tqdm.autonotebook import tqdm, trange


# Reading & Processing Data

In [None]:
# Read in the train and test labels
train_labels = pd.read_csv('/content/train_labels.csv')[["Id", "Class"]]
test_labels = pd.read_csv('/content/test_labels.csv')[["Id", "Class"]]

# Encode the remaining labels
label_encoder = LabelEncoder()
train_labels['Family'] = label_encoder.fit_transform(train_labels['Class'])
test_labels['Family'] = label_encoder.fit_transform(test_labels['Class'])
num_class = train_labels['Family'].nunique()

# Create dictionary where key-value is id-label for optimisation
train_dict = dict(zip(train_labels['Id'], train_labels['Family']))
test_dict = dict(zip(test_labels['Id'], test_labels['Family']))

print(len(train_dict), len(test_dict))
train_labels.head()

In [None]:
bytes_folder = '/bytes_50_files/'
asm_folder = '/asm_50_files/'
bytes_zip = '/content/bytes_50_files.zip'
asm_zip = '/content/asm_50_files.zip'

# Extract bytes
with zipfile.ZipFile(bytes_zip, 'r') as zip_bytes:
  zip_bytes.extractall('/content/')

# # Extract asm
with zipfile.ZipFile(asm_zip, 'r') as zip_asm:
  zip_asm.extractall('/content/')

bytes_files_list = os.listdir(bytes_folder)
asm_files_list = os.listdir(asm_folder)

In [None]:
opcodes = ['jmp', 'mov', 'retf', 'push', 'pop', 'xor', 'retn', 'nop', 'sub', 'inc', 'dec', 'add','imul', 'xchg', 'or', 'shr', 'cmp', 'call', 'shl', 'ror', 'rol', 'jnb','jz','rtn','lea','movzx']
opcodes_dict = dict(zip(opcodes, [1 for i in range(len(opcodes))]))

In [None]:
def process_text(path):
  text = []
  if path.endswith('.bytes'):
    # Change output_dir to all_files_folder when running for the full dataset
    with open(bytes_folder + path, 'r') as fp:
      for line in fp:
        row =line.rstrip().split(" ")[1:] # remove address
        text.extend(row)
    fp.close()

  if path.endswith('asm'):
    # Change output_dir to all_files_folder when running for the full dataset
    with codecs.open(asm_folder + path, encoding='cp1252',errors ='replace') as fli:
      for line in fli:
        line=line.rstrip().split(" ")[1:] # remove address
        row = [element for element in line if opcodes_dict.get(element) == 1]
        text.extend(row)
    fli.close()

  path_id = path.split('.')[0] # remove the extension
  label = train_dict[path_id] if path_id in train_dict.keys() else test_dict[path_id]
  return [' '.join(text), label]

In [None]:
# Read and process the train and test files in parallel
%%time
with ThreadPoolExecutor() as executor:
  bytes_texts_train = np.array(list((executor.map(process_text, bytes_50_train))))
  bytes_texts_test = np.array(list((executor.map(process_text, bytes_50_test))))
  asm_texts_train = np.array(list((executor.map(process_text, asm_50_train))))
  asm_texts_test = np.array(list((executor.map(process_text, asm_50_test))))

In [None]:
# Embed all the texts
%%time
embedder = SentenceTransformer('all-MiniLM-L6-v2')

bytes_embeddings_train = embedder.encode(bytes_texts_train[:, 0])
bytes_label_train = to_categorical(bytes_texts_train[:, 1], num_classes=num_class)
bytes_embeddings_test = embedder.encode(bytes_texts_test[:, 0])
bytes_label_test = to_categorical(bytes_texts_test[:, 1], num_classes=num_class)

asm_embeddings_train = embedder.encode(asm_texts_train[:, 0])
asm_label_train = to_categorical(asm_texts_train[:, 1], num_classes=num_class)
asm_embeddings_test = embedder.encode(asm_texts_test[:, 0])
asm_label_test = to_categorical(asm_texts_test[:, 1], num_classes=num_class)

embedding_size = asm_embeddings_train.shape[1]
asm_embeddings_train.shape, asm_label_train.shape

In [None]:
# Normalize the asm & bytes embeddings using L2 normalization
asm_embeddings_train = tf.math.l2_normalize(asm_embeddings_train, axis=1)
asm_embeddings_test = tf.math.l2_normalize(asm_embeddings_test, axis=1)
bytes_embeddings_train = tf.math.l2_normalize(bytes_embeddings_train, axis=1)
bytes_embeddings_test = tf.math.l2_normalize(bytes_embeddings_test, axis=1)

print("Normalized ASM embeddings shape:", asm_embeddings_train.shape)
print("Normalized bytes embeddings shape:", bytes_embeddings_train.shape)

# Save embeddings into csv

In [None]:
bytes_embedding_train_df = pd.DataFrame(bytes_embeddings_train)
bytes_embedding_train_df['Label'] = bytes_texts_train[:, 1]
bytes_embedding_train_df.to_csv('bytes_embedding_train.csv', index=False)

In [None]:
bytes_embedding_test_df = pd.DataFrame(bytes_embeddings_test)
bytes_embedding_test_df['Label'] = bytes_texts_test[:, 1]
bytes_embedding_test_df.to_csv('bytes_embedding_test.csv', index=False)

In [None]:
asm_embedding_train_df = pd.DataFrame(asm_embeddings_train)
asm_embedding_train_df['Label'] = asm_texts_train[:, 1]
asm_embedding_train_df.to_csv('asm_embedding_train.csv', index=False)

In [None]:
asm_embedding_test_df = pd.DataFrame(asm_embeddings_test)
asm_embedding_test_df['Label'] = asm_texts_test[:, 1]
asm_embedding_test_df.to_csv('asm_embedding_test.csv', index=False)