In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


INSTALL ALL THE DEPENDENCIES !

In [None]:
!pip install --upgrade pip

!pip uninstall google-cloud-storage numpy

!pip install google-cloud-storage==2.10.0
!pip install numpy==1.26.4

!pip install transformers datasets torch evaluate scikit-learn flask joblib mlflow peft accelerate bitsandbytes

IMPORT ALL THE REQUIRED LIBRARIES

In [2]:
import torch
import random
import numpy as np
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
from datasets import Dataset, DatasetDict
import evaluate
import os

MAIN CODE

In [6]:
# Create a Small Custom Dataset
data = {
    "text": [
        "The system detected a possible intrusion attempt.",
        "User logged in successfully.",
        "Suspicious network activity was observed.",
        "Firewall blocked a malicious request.",
        "Normal system operation detected.",
        "Unauthorized access attempt detected.",
        "Routine maintenance activity logged.",
        "Malware detected in the downloaded file.",
        "User logged out without issues.",
        "Anomaly detected in login behavior."
    ],
    "label": [1, 0, 1, 1, 0, 1, 0, 1, 0, 1]  # 1 = Threat, 0 = Normal
}

dataset = Dataset.from_dict(data)

# Load a Small Model & Tokenizer
model_name = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Add Padding Token
tokenizer.add_special_tokens({'pad_token': '[PAD]'})
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)
model.resize_token_embeddings(len(tokenizer))  # Resize embeddings

# Tokenization Function
def tokenize_function(example):
    return tokenizer(example["text"], padding="max_length", truncation=True)

# Apply tokenization
tokenized_dataset = dataset.map(tokenize_function, batched=True)

# Split Dataset into Train & Test
split_data = tokenized_dataset.train_test_split(test_size=0.2, seed=42)
train_dataset = split_data["train"]
test_dataset = split_data["test"]

# Define Training Arguments
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=5,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=10,
    load_best_model_at_end=True,
)

# Define Accuracy Metric
metric = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)

# Train the Model**
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
)

# Disable Weights & Biases logging
os.environ["WANDB_DISABLED"] = "true"

trainer.train()

# Save the Model
model.save_pretrained("./fine_tuned_model")
tokenizer.save_pretrained("./fine_tuned_model")

# Detect the available device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Move model to the same device
model.to(device)

def predict(text):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
    inputs = {key: value.to(device) for key, value in inputs.items()}  # Move input tensors to the correct device

    with torch.no_grad():
        outputs = model(**inputs)  # Ensure inference runs on the same device
    prediction = torch.argmax(outputs.logits).item()
    return "Threat" if prediction == 1 else "Normal"

# Test Inference
sample_text = "Unusual login attempt detected from a foreign country."
print(f"Prediction: {predict(sample_text)}")


Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Map:   0%|          | 0/10 [00:00<?, ? examples/s]

Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).
2025/03/23 06:18:48 ERROR mlflow.utils.async_logging.async_logging_queue: Run Id 29699817ea714ee2a57f4f0b9aab3951: Failed to log run data: Exception: Changing param values is not allowed. Param with key='report_to' was already logged with value='['mlflow', 'tensorboard', 'wandb']' for run ID='29699817ea714ee2a57f4f0b9aab3951'. Attempted logging new value '['mlflow', 'tensorboard']'.


Epoch,Training Loss,Validation Loss,Accuracy
1,No log,0.669809,0.5
2,No log,0.662132,0.5
3,No log,0.652689,0.5
4,No log,0.642626,0.5
5,No log,0.637269,0.5


Prediction: Threat


INSTALL FLASK AND NGROK

- We cannoyt test flask api's in Colab, to do so use ngrok
- Create your account in https://ngrok.com/
- Go to your dashboard in ngrok and copy the "Your Authtoken"

In [None]:
!pip install flask flask-tunnel transformers torch
!pip install flask flask-ngrok transformers torch

In [11]:
!ngrok authtoken "Your Authtoken"

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [12]:
!pip install pyngrok



In [13]:
!ngrok authtoken "Your Authtoken"

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


FLASK API

In [14]:
from flask import Flask, request, jsonify
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import threading
from pyngrok import ngrok

app = Flask(__name__)

# Load fine-tuned model
MODEL_PATH = "./fine_tuned_model"
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_PATH)

@app.route('/predict', methods=['POST'])
def predict():
    data = request.get_json()
    if "text" not in data:
        return jsonify({"error": "Missing 'text' field"}), 400

    text = data["text"]
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
    outputs = model(**inputs)
    prediction = torch.argmax(outputs.logits).item()
    label = "Threat" if prediction == 1 else "Normal"

    return jsonify({"prediction": label})

# Start Flask in a separate thread
def run_flask():
    app.run(host="0.0.0.0", port=5000)

thread = threading.Thread(target=run_flask)
thread.start()

# Open an ngrok tunnel to the Flask server
public_url = ngrok.connect(5000).public_url
print(f"🌍 Public URL: {public_url}")

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://172.28.0.12:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m


🌍 Public URL: https://8ae7-34-87-158-193.ngrok-free.app


TEST THE FLASK API

In [15]:
import requests

BASE_URL = "http://127.0.0.1:5000"  # Replace with the printed URL

data = {"text": "Unauthorized access attempt detected"}
response = requests.post(f"{BASE_URL}/predict", json=data)

print("Status Code:", response.status_code)
print("Response JSON:", response.json())

INFO:werkzeug:127.0.0.1 - - [23/Mar/2025 06:21:39] "POST /predict HTTP/1.1" 200 -


Status Code: 200
Response JSON: {'prediction': 'Threat'}


ZIP THE FINE TUNED MODEL & DOWNLOAD IT TO YOUR LOCAL SYSTEM

In [None]:
!zip -r fine_tuned_model.zip ./fine_tuned_model/

from google.colab import files
files.download("fine_tuned_model.zip")