# Create CSV of labels

In [None]:
import os
import csv

directories = ["data/benign", "data/malicious", "data/mixed"]
output_file = "labels.csv"

with open(output_file, 'a', newline='') as csvfile:
    writer = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL)
    writer.writerow(["filename", "label"])
for d in directories:
    label = os.path.basename(d)
    label = "malicious" if label == "mixed" else label
    for root, _, files in os.walk(d):
        for file in files:
            with open(output_file, 'a', newline='') as csvfile:
                writer = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL)
                writer.writerow([f"{root}/{file}", label])

# Feature extraction

In [None]:
import pandas as pd
labels = pd.read_csv('labels.csv')
labels

## URL or IP
- Boolean
- Checks for the usage of URL or IP within the file

In [None]:
import re
def has_url_or_ip(content: str) -> bool:
    ip_addr_regex = r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}"
    url_regex = r"(http.*?)['\"]?"
    has_ip = bool(re.search(ip_addr_regex, content))
    has_url = bool(re.search(url_regex, content))
    return int(has_ip or has_url)

In [None]:
def get_character_length_info(content: str):
    """
    Returns number of strings, max length of strings, and average length
    """
    # Get all strings
    import re
    regex = r"(['\"])(.*?)\1"
    matches = re.findall(regex, content)
    if not matches:
        return 0, 0, 0
    
    count, sum_length, max_length = 0, 0, 0
    for match in matches:
        string = match[1]
        string_length = len(string)

        count += 1
        sum_length += string_length
    
        if string_length > max_length:
            max_length = string_length

    return (count, max_length, sum_length/count)

In [None]:
def get_top_character_occurences(content: str, top: int=5):
    content = content.replace(' ', '')
    return [x[0] for x in collections.Counter(content).most_common(top)]

In [None]:
# TODO: Calculate entropy

In [None]:
import os
import collections

def get_manual_features(row):
    with open(row["filename"], encoding="utf-8") as f:
        content = f.read().lower()
        content = re.sub(r"[\s]+", " ", content)
        row["str_count"], row["str_max"], row["str_avg"] = get_character_length_info(content)
        row["has_url_or_ip"] = has_url_or_ip(content)
        row = pd.concat([row, pd.Series(get_top_character_occurences(content), index=[f"top{i}" for i in range(5)])])
        return row

labels = labels.apply(get_manual_features, axis=1)

# Split train and test data

In [None]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(labels, test_size=0.25)

# FastText Preprocessing

Training the fasttext model

In [None]:
import re
        
def build_fasttext_vocab(filename, corpus_filename):
    with open(filename, encoding="utf-8") as f:
        content = f.read().lower()
        content = re.sub("[1-9]", "*", content)
        content = re.sub("[^a-zA-Z* $]", " ", content)
        content = re.sub(r"[\s]+", " ", content)
        content = content.strip()

    with open(f'processed/{filename}', "w") as f:
        f.write(content)

    with open(f'{corpus_filename}.txt', "a") as f:
        f.write(content + "\n")

train["filename"].apply(lambda x: build_fasttext_vocab(x, "train"))
test["filename"].apply(lambda x: build_fasttext_vocab(x, "test"))

In [None]:
from gensim.models import FastText

words = [a.split() for a in open("train.txt")]
model = FastText()
model.build_vocab(corpus_iterable=words)

model_name = "model.bin"
if os.path.exists(model_name):
    model = FastText.load("model.bin")
else:
    model.train(corpus_iterable=words, total_examples=len(words), epochs=10)  # train
    model.save("model.bin")

In [None]:
import numpy as np

counter = 0
def build_doc_embedding(row):
    filename = row["filename"]
    with open(f"processed/{filename}") as f:
        words = f.read().split()
        num_features = 100
        feature_vec = np.zeros((num_features,),dtype="float32")  # pre-initialize (for speed)
        for word in words:
            feature_vec = np.add(feature_vec, model.wv[word])
        global counter
        print(counter, end='\r')
        counter += 1
        feature_vec = np.divide(feature_vec, len(words))
        return pd.concat([row, pd.Series(feature_vec, index=list(map(str, range(num_features))))])

train = train.apply(build_doc_embedding, axis=1)
test = test.apply(build_doc_embedding, axis=1)

In [None]:
X_train = train.drop(columns=["filename", "label"])
y_train = train["label"]
X_test = test.drop(columns=["filename", "label"])
y_test = test["label"]

In [None]:
from sklearn.preprocessing import OneHotEncoder, LabelEncoder

le = LabelEncoder()
y_train = le.fit_transform(y_train)
y_test = le.transform(y_test)
le.classes_

In [None]:
enc = OneHotEncoder(handle_unknown='ignore')
transformed = enc.fit_transform(X_train[[f"top{i}" for i in range(5)]])
transformed_test = enc.transform(X_test[[f"top{i}" for i in range(5)]])
X_train = X_train.join(pd.DataFrame(transformed.toarray(), columns=enc.get_feature_names_out())).drop(columns=[f"top{i}" for i in range(5)]).fillna(0)
X_test = X_test.join(pd.DataFrame(transformed_test.toarray(), columns=enc.get_feature_names_out())).drop(columns=[f"top{i}" for i in range(5)]).fillna(0)
import pickle
with open("enc.bin", "wb") as f:
    pickle.dump(enc, f)

In [None]:
from sklearn.ensemble import RandomForestClassifier
import xgboost as xgb

# clf = xgb.XGBClassifier()
clf = RandomForestClassifier()
clf.fit(X_train, y_train)

In [None]:
from sklearn.metrics import classification_report
y_pred = clf.predict(X_test) 

print(classification_report(y_test, y_pred, target_names=le.classes_))

In [None]:
import pickle
with open("rf.bin", "wb") as f:
    pickle.dump(clf, f)