In [1]:
import logging
import sys
import os

LOG_DIR = "./Logs"
os.makedirs(LOG_DIR, exist_ok=True)

def remove_logger_handlers(logger_name):
    logger = logging.getLogger(logger_name)
    for handler in logger.handlers[:]:
        logger.removeHandler(handler)
    logger.handlers.clear()

class CoolLogger:
    def __init__(self, name="CoolLogger", log_file="app.log", level=logging.DEBUG):
        """
        Initializes the logger with console and file handlers.

        :param name: Logger name
        :param log_file: Log file path
        :param level: Logging level (default: DEBUG)
        """
        remove_logger_handlers(name)
        self.logger = logging.getLogger(name)
        self.logger.setLevel(level)

        log_format = logging.Formatter(
            "%(asctime)s | %(levelname)s | %(message)s", "%Y-%m-%d %H:%M:%S"
        )

        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setFormatter(self._get_colored_formatter())

        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(log_format)

        self.logger.addHandler(console_handler)
        self.logger.addHandler(file_handler)

    def _get_colored_formatter(self):
        """ Returns a colorized log formatter """
        class ColoredFormatter(logging.Formatter):
            COLORS = {
                "DEBUG": "\033[94m",  # Blue
                "INFO": "\033[92m",  # Green
                "WARNING": "\033[93m",  # Yellow
                "ERROR": "\033[91m",  # Red
                "CRITICAL": "\033[41m",  # Background Red
                "RESET": "\033[0m",
            }

            def format(self, record):
                log_color = self.COLORS.get(record.levelname, self.COLORS["RESET"])
                reset = self.COLORS["RESET"]
                return f"{log_color}{record.levelname} | {record.getMessage()}{reset}"

        return ColoredFormatter("%(levelname)s | %(message)s")

    def debug(self, message):
        self.logger.debug(message)

    def info(self, message):
        self.logger.info(message)

    def warning(self, message):
        self.logger.warning(message)

    def error(self, message):
        self.logger.error(message)

    def critical(self, message):
        self.logger.critical(message)


logging.root.handlers.clear()
remove_logger_handlers("RepoHandlerLogger")
remove_logger_handlers("ModelLogger")

# Initialize loggers
log = CoolLogger(name="RepoHandlerLogger", log_file=f"{LOG_DIR}/repo_handler.log")
model_log = CoolLogger(name="ModelLogger", log_file=f"{LOG_DIR}/xg_trainer.log")

In [2]:


class repo_handler:

    def __init__(self):
        log.info("Initializing Repository Handler")
        self.malicious_data = []
        self.malicious_data_df = []
        self.LFI_df = []
        self.SQLi_df = []
        self.XSS_df = []
        self.RFI_df = []
        self.RCE_df = []
        self.normal_data = []
        self.normal_data_df = []
        self.excel_file = "./Repository/Dataset.xlsx"

        try:
            self.load_data()
            self.full_data = pd.concat([self.normal_data, self.malicious_data], ignore_index=True)
            self.malicious_data_df = pd.concat([self.RCE_df,self.LFI_df,self.RFI_df,self.SQLi_df,self.XSS_df])
            self.normal_data_df = pd.DataFrame(self.normal_data, columns=["UriQuery"])
            self.normal_data_df["category"] = "Benign"
            self.malicious_data_df["isVulnerable"] = 1 # 1 for malicious
            self.normal_data_df["isVulnerable"] = 0 # 0 for normal
            self.full_data_df = pd.concat([self.malicious_data_df,self.normal_data_df],ignore_index=True)
            self.randomize_n_time(5)
            log.info("Repository successfully initialized!")
        except Exception as e:
            log.error(f"Error during initialization: {e}")

    def load_data(self):
        """ Loads and processes data from the Excel file. """
        log.info("Loading Data from Excel...")

        try:
            excel_data = pd.read_excel(self.excel_file, sheet_name=None)
            data = {
                sheet_name: d["UriQuery"].drop_duplicates().dropna()
                for sheet_name, d in excel_data.items() if "UriQuery" in d.columns
            }

            if not data:
                log.warning("No data found in the Excel file!")

            self.normal_data = data["Benign"]
            self.RCE_df = pd.DataFrame(data["RCE"], columns=["UriQuery"])
            self.RCE_df["category"] = "RCE"

            self.LFI_df = pd.DataFrame(data["LFI"], columns=["UriQuery"])
            self.LFI_df["category"] = "LFI"

            self.SQLi_df = pd.DataFrame(data["SQLi"], columns=["UriQuery"])
            self.SQLi_df["category"] = "SQLi"

            self.XSS_df = pd.DataFrame(data["XSS"], columns=["UriQuery"])
            self.XSS_df["category"] = "XSS"

            self.RFI_df = pd.DataFrame(data["RFI"], columns=["UriQuery"])
            self.RFI_df["category"] = "RFI"


            self.malicious_data = pd.concat([data["RCE"],data["LFI"],data["SQLi"],data["XSS"],data["RFI"]], ignore_index=True)
            log.info(f"Loaded {len(data['SQLi'])} SQLi, {len(data['RFI'])} RFI, {len(data['LFI'])} LFI, {len(data['XSS'])} XSS and {len(data['RCE'])} RCE entries.")
            log.info(f"Loaded {len(self.normal_data)} normal and {len(self.malicious_data)} malicious entries.")
        except Exception as e:
            log.error(f"Error loading data: {e}")

    def randomize_n_time(self, counter):
        """ Randomizes the dataset multiple times. """
        log.info(f"Randomizing dataset {counter} times...")

        try:
            for _ in range(counter):
                self.full_data_df = self.full_data_df.sample(frac=1).reset_index(drop=True)
            log.info("Dataset successfully randomized!")
        except Exception as e:
            log.error(f"Error during randomization: {e}")


In [3]:
from torch import cuda

device = 'cuda' if cuda.is_available() else 'cpu'
print(device)

cpu


In [7]:
import torch

class WebAttackDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels=None):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        if self.labels:
          item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.encodings["input_ids"])

In [11]:
from transformers import BertTokenizerFast, BertForSequenceClassification, Trainer, TrainingArguments, BertTokenizer
#from datasets import load_dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score
import matplotlib.pyplot as plt
import numpy as np





class BERT:
    def __init__(self,data_frame=None):
        self.data = data_frame
        self.model = None
        self.id2label,self.label2id = self.labelization()
        #model_log.info(f"id2label: {self.id2label}")
        model_log.info(f"label2id: {self.label2id}")
        self.represent()
        self.train_model()
        #self.create_model()


    def train_model(self):
      X_train, X_val, y_train, y_val = train_test_split(list(self.data["UriQuery"]),list(self.data["categories_label"]), test_size=0.2,stratify=self.data["categories_label"])
      tokenizer = BertTokenizerFast.from_pretrained("bert-base-uncased")
      X_train_tokenized = tokenizer(X_train, truncation=True, padding=True, max_length=512)
      X_val_tokineized = tokenizer(X_val, truncation=True, padding=True, max_length=512)
      train_dataset = WebAttackDataset(X_train_tokenized, y_train)
      val_dataset = WebAttackDataset(X_val_tokineized, y_val)
      training_args = TrainingArguments(
          output_dir="./Results",
          num_train_epochs=3,
          per_device_train_batch_size=16
      )

      self.load_model()

      trainer = Trainer(
          model=self.model,
          args=training_args,
          train_dataset=train_dataset,
          eval_dataset=val_dataset,
          compute_metrics=self.compute_metric

      )

      trainer.train()
      trainer.evaluate()

      """


      #print(self.model)

      training_args = TrainingArguments(
      output_dir="./Results",  # Directory to store model and logs
      num_train_epochs=3,      # Number of epochs
      per_device_train_batch_size=16,  # Batch size for training
      per_device_eval_batch_size=16,   # Batch size for evaluation
      eval_strategy="epoch",           # Evaluate at the end of each epoch
      save_strategy="epoch",          # Save model at the end of each epoch
      logging_dir="./Logs",           # Directory for logs
      logging_steps=200,              # Log after every 200 steps
      load_best_model_at_end=True,    # Load the best model based on evaluation
      metric_for_best_model="accuracy",  # Metric to track for the best model
      # Optional: To improve checkpoint saving
      save_total_limit=3,             # Keep only the last 3 saved checkpoints
      weight_decay=0.01,              # Apply weight decay to reduce overfitting
      warmup_steps=500,               # Warm-up for the first 500 steps to stabilize training
      fp16=True,                      # Use mixed precision (if supported by your hardware for faster training)
      seed=42,                        # Set random seed for reproducibility
      )

      trainer = Trainer(
      model=self.model,                  # Pretrained BERT model
      args=training_args,           # Training arguments
      train_dataset=train_dataset,  # Training dataset
      eval_dataset=val_dataset,     # Validation dataset
      )

      trainer.train()

      results = trainer.evaluate()
      print(results)
      """

    def compute_metric(self,p):
      print(type(p))
      pred, labels = p
      pred = np.argmax(pred, axis=1)

      accuracy = accuracy_score(y_true=labels, y_pred=pred)
      recall = recall_score(y_true=labels, y_pred=pred)
      precision = precision_score(y_true=labels, y_pred=pred)
      f1 = f1_score(y_true=labels, y_pred=pred)

      return {"Accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1}

    def labelization(self):
      labels = self.data['category'].unique().tolist()
      labels = [s.strip() for s in labels]
      return {id: label for id,label in enumerate(labels)},{label: id for id,label in enumerate(labels)}

    def represent(self):
      self.data["categories_label"] = pd.factorize(self.data.category)[0]

    def load_model(self):
      self.model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=len(self.label2id))


    def viewGraph(self):
      self.data['category'].value_counts().plot(
      kind='pie',
      figsize=(8, 8),
      autopct=lambda p: f'{p:.2f}%'
      )

      plt.ylabel('')
      plt.title('Category Distribution')
      plt.show()




ImportError: cannot import name '_imaging' from 'PIL' (d:\Setups\Conda\envs\AIM\lib\site-packages\PIL\__init__.py)

In [None]:
log.info("Starting the model...")

try:
    data = repo_handler()
    unique_values = data.full_data.unique()
    log.info(f"Unique UriQuery values: {len(unique_values)}")
    model = BERT(data.full_data_df)

except Exception as e:
    log.error(f"Error in model execution: {e}")



[92mINFO | Starting the model...[0m
[92mINFO | Initializing Repository Handler[0m
[92mINFO | Loading Data from Excel...[0m
[92mINFO | Loaded 3855 SQLi, 86 RFI, 2696 LFI, 3102 XSS and 932 RCE entries.[0m
[92mINFO | Loaded 10558 normal and 10671 malicious entries.[0m
[92mINFO | Randomizing dataset 5 times...[0m
[92mINFO | Dataset successfully randomized![0m
[92mINFO | Repository successfully initialized![0m
[92mINFO | Unique UriQuery values: 21229[0m
[92mINFO | label2id: {'Benign': 0, 'LFI': 1, 'XSS': 2, 'SQLi': 3, 'RCE': 4, 'RFI': 5}[0m


config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]


KeyboardInterrupt



In [None]:
#data.full_data_df['category_num'] = pd.factorize(data.full_data_df.category)[0]
print(data.full_data_df.head())