# MLPdf: An Effective Machine Learning Based Approach for PDF Malware Detection

## Features
- User Authentication (Supabase Auth)
- PDF Malware Detection
- PDF Statistics (percentage why it's Benign or Malicious)
- Sign of saved PDF with Digital Signature (if privyID can provide API??)
- Save signed PDF on Database (Supabase for now)
- Optional: Face Signature??

## TODO
- Optimally Preprocessing Data
- Supabase Auth
- TensorFlow Model vs Transfer Learning
- Statistics parameter
- Signature DrawBox
- Temporary Database with Deadlines
- Face Feature as Signed PDF UUID

## Tech Stack
- Streamlit vs Taipy
- FastAPI
- Supabase
- TensorFlow
- Others Statistics libraries

## Download dataset from official repository

In [2]:
# just save the dataset on local storage, for saving cost ;)
# SO THE DIRECTORY are based on LOCAL WORKSPACE
# JUST FOR MODEL and FEATURE DEVELOPMENT
!wget --no-check-certificate \
  http://205.174.165.80/CICDataset/CIC-EvasivePDF2022/Dataset/Benign.zip \
  -O ../../datasets/PDFBenign.zip

!wget --no-check-certificate \
  http://205.174.165.80/CICDataset/CIC-EvasivePDF2022/Dataset/Malicious.zip \
  -O ../../datasets/PDFMalicious.zip

--2024-04-19 02:13:01--  http://205.174.165.80/CICDataset/CIC-EvasivePDF2022/Dataset/Benign.zip
Connecting to 205.174.165.80:80... connected.
HTTP request sent, awaiting response... 

  pid, fd = os.forkpty()


200 OK
Length: 753571486 (719M) [application/zip]
Saving to: ‘../../datasets/PDFBenign.zip’


2024-04-19 03:14:30 (200 KB/s) - ‘../../datasets/PDFBenign.zip’ saved [753571486/753571486]

--2024-04-19 03:14:30--  http://205.174.165.80/CICDataset/CIC-EvasivePDF2022/Dataset/Malicious.zip
Connecting to 205.174.165.80:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 503579900 (480M) [application/zip]
Saving to: ‘../../datasets/PDFMalicious.zip’


2024-04-19 04:03:52 (166 KB/s) - ‘../../datasets/PDFMalicious.zip’ saved [503579900/503579900]



In [6]:
from __future__ import annotations
import os, sys, shutil, urllib.request, zipfile, tarfile
from pathlib import Path
from typing import List, Optional, Dict

import numpy as np
import pandas as pd
from pandas import DataFrame
import tensorflow as tf
import keras

## Dataset Extraction: Layer 1

In [2]:
def extract_zip_L1(pdfClass: str) -> str:
    # pdfClass: PDFBenign & PDFMalicious
    dpath = 'datasets/CICDatasets'
    if not dpath.is_dir():
        os.makedirs(dpath, exist_ok=True)

    zip_path = dpath.parent / f'{pdfClass}.zip'
    if not zip_path.exists():
        return f"File not found: {zip_path}"

    try:
        with zipfile.ZipFile(zip_path, 'r') as zip:
            zip.extractall(path = dpath / f'{pdfClass}')
        zip.close()
        # os.remove(zip_path)

        extracted_path = dpath / f'{pdfClass}'
        index = 1
        for filename in os.listdir(extracted_path):
            old_path = os.path.join(extracted_path / filename)
            new_path = os.path.join(extracted_path, f"{pdfClass}_{index}.zip")
            os.rename(old_path, new_path)
            index += 1
        return f"Extraction L1: {pdfClass}.zip completed successfully"

    except Exception as e:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        # fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        # print(exc_type, fname, )
        return f"An error occurred: In line [{exc_tb.tb_lineno}] {str(e)}"

## Dataset Extraction: Layer 2

In [3]:
def extract_zip_L2(pdfClass: str) -> str:
    L1_DIR = Path(os.getcwd()).resolve().parent.parent / 'datasets/CICDatasets'
    fpath = L1_DIR / f'{pdfClass}'
    try:
        for filename in os.listdir(fpath):
            if filename.endswith(".zip"):
                zip_file_path = fpath / filename
                sub_folder = fpath / os.path.splitext(os.path.basename(filename))[0]
                if not sub_folder.is_dir():
                    try:
                        with zipfile.ZipFile(zip_file_path, 'r') as zip:
                            zip.extractall(path=sub_folder)

                        # Delete the zip file after extraction
                        os.remove(zip_file_path)

                    # Handle bad zip files
                    except zipfile.BadZipfile as e:
                        print("BAD ZIP: " + str(zip_file_path))
                        try:
                            os.remove(zip_file_path)
                        except OSError as e:
                            if e.errno != errno.ENOENT:
                                raise

        length_dir = len(os.listdir(fpath))
        message = f"The {pdfClass}.zip files successfully extracted. The length of directories: {length_dir}"
        return message

    except Exception as e:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        # fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        # print(exc_type, fname, )
        return f"An error occurred: In line [{exc_tb.tb_lineno}]: {str(e)}"

### Execute the extractions from L1 to L2

In [6]:
benign_L1 = extract_zip_L1(pdfClass =  'PDFBenign')
malicious_L1 = extract_zip_L1(pdfClass =  'PDFMalicious')

benign_L2 = extract_zip_L2(pdfClass =  'PDFBenign')
malicious_L2 = extract_zip_L2(pdfClass =  'PDFMalicious')

print(benign_L1)
print(malicious_L1)
print(benign_L2)
print(malicious_L2)


KeyboardInterrupt



## Exploratory Data Analysis

Implementations that will be applied:
- Shapes of Data
- PDF extraction sample
- Traditional Detection
- Malicious Statistics and differenciate with Benign Statistics
- Correlations

## Preprocessing Data

In [None]:
def spliting_data(self) -> str:
    fpath = Path(f"{self.curr_path}/data/{self.pdf_type}")

    train_dir = Path(fpath / 'train')
    test_dir = Path(fpath / 'test')

    # Create train_dir if it doesn't exist
    os.makedirs(train_dir, exist_ok=True)

    # Create test_dir if it doesn't exist
    os.makedirs(test_dir, exist_ok=True)

    # Record train and test directory to exclude from removing
    exclude_dir = {train_dir, test_dir}

    try:
        # Create a filtered list of directories to process
        directories_to_process = [folder for folder in os.listdir(
            fpath) if Path(fpath, folder) not in exclude_dir]

        for folder in directories_to_process:
            # Full path to the subfolder
            folder_path = os.path.join(fpath, folder)

            # Listing all the subfolders
            if os.path.isdir(folder_path):
                files = os.listdir(folder_path)

                # Split the files into train and test sets
                train_files = files[:int(len(files) * self.split_ratio)]
                test_files = files[int(len(files) * self.split_ratio):]

                # Copy the train files to the train directory
                for file in train_files:
                    src_file_path = os.path.join(folder_path, file)
                    dst_file_path = os.path.join(train_dir, file)
                    shutil.move(src_file_path, dst_file_path)

                # Copy the test files to the test directory
                for file in test_files:
                    src_file_path = os.path.join(folder_path, file)
                    dst_file_path = os.path.join(test_dir, file)
                    shutil.move(src_file_path, dst_file_path)

            # Remove the source folder after splitting
            shutil.rmtree(folder_path)
        return f"Splitting {self.pdf_type} is completed"

    except Exception as e:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        # fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        # print(exc_type, fname, )
        return f"An error occurred: In line [{exc_tb.tb_lineno}]: {str(e)}"

def get_file_byte_string(self, file) -> bytes:
    curr_file = open(file, "rb")
    data = curr_file.read()
    data_str = str(data)
    data_delim = ' '.join(data_str[i:i+4]
                            for i in range(0, len(data_str), 4))
    data_bytes = bytes(data_delim, 'utf-8')
    curr_file.close()
    return data_bytes

def create_row(self, filetype, file, writer) -> None:
    file_data = []
    file_data.append(self.id_counter)
    file_data.append(filetype)
    file_data.append(os.path.basename(os.path.normpath(file)))
    bytecode = self.get_file_byte_string(file)
    file_data.append(bytecode)
    writer.writerow(file_data)
    file_data.clear()
    self.id_counter += 1

def csv_generator(self) -> None:

    fpath = Path(f"{self.curr_path}/data/{self.pdf_type}")

    with open('testing.csv', 'a+') as testing_csv:
        writer = csv.writer(testing_csv)

        writer.writerow(self.header)

        for files in os.listdir(os.path.join(fpath, 'test')):

            # put all this into "do_list_creation(filetype, file) function"
            self.create_row(self.ftype, os.path.join(
                fpath, 'test', files), writer)

    with open('training.csv', 'a+') as training_csv:
        writer = csv.writer(training_csv)
        writer.writerow(self.header)
        for files in os.listdir(os.path.join(fpath, 'train')):
            self.create_row(self.ftype, os.path.join(
                fpath, 'train', files), writer)

    return "Succesfully Completed"

## PDF Detection Model

![MLP Architecture](assets/mlp_architecture.png)