In [1]:
import os

In [2]:
%%capture
!pip install llama-index llama-index-core llama-index-readers-file llama-index-embeddings-huggingface
!pip install transformers faiss-cpu sentence-transformers
!pip install pymupdf pdfplumber

In [3]:
import pandas as pd
import torch
import numpy as np
import pdfplumber
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, classification_report
from torch.utils.data import TensorDataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch.optim as optim

In [4]:
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex, ServiceContext, Settings
from llama_index.core.schema import Document
from llama_index.embeddings.huggingface import HuggingFaceEmbedding

  from .autonotebook import tqdm as notebook_tqdm
E0000 00:00:1747045600.539849      10 common_lib.cc:612] Could not set metric server port: INVALID_ARGUMENT: Could not find SliceBuilder port 8471 in any of the 0 ports provided in `tpu_process_addresses`="local"
=== Source Location Trace: ===
learning/45eac/tfrc/runtime/common_lib.cc:230


In [5]:
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    RagTokenizer,
    RagSequenceForGeneration,
    get_linear_schedule_with_warmup
)

In [6]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [7]:
doc_folder = "/kaggle/input/ragdocuments"

In [8]:
text_documents = []

for filename in os.listdir(doc_folder):
    if filename.endswith(".pdf"):
        file_path = os.path.join(doc_folder, filename)
        try:
            with pdfplumber.open(file_path) as pdf:
                text = "\n".join([page.extract_text() for page in pdf.pages if page.extract_text()])
                text_documents.append(Document(text=text))
        except Exception as e:
            print(f"Error reading {filename}: {e}")

csv_files = [f for f in os.listdir(doc_folder) if f.endswith(".csv")]
dataset_documents = []

for csv_file in csv_files:
    file_path = os.path.join(doc_folder, csv_file)

    try:
        df = pd.read_csv(file_path, encoding="utf-8", low_memory=False)

        if "phishtank" in csv_file.lower():
            phishing_urls = df.iloc[:, 0].astype(str).tolist()
            dataset_documents.extend([Document(text=url) for url in phishing_urls])

        elif "alexa" in csv_file.lower():
            alexa_urls = df.iloc[:, 0].astype(str).tolist()
            dataset_documents.extend([Document(text=url) for url in alexa_urls])

    except Exception as e:
        print(f"Error reading {csv_file}: {e}")



In [9]:
all_documents = text_documents + dataset_documents

embed_model = HuggingFaceEmbedding(model_name="sentence-transformers/all-MiniLM-L6-v2")

In [10]:
Settings.embed_model = embed_model  

index = VectorStoreIndex.from_documents(all_documents)

retriever = index.as_retriever(similarity_top_k=3)

In [11]:
rag_tokenizer = RagTokenizer.from_pretrained("facebook/rag-sequence-base")
rag_model = RagSequenceForGeneration.from_pretrained("facebook/rag-sequence-base").to(device)

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'RagTokenizer'. 
The class this function is called from is 'DPRQuestionEncoderTokenizer'.
The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'RagTokenizer'. 
The class this function is called from is 'DPRQuestionEncoderTokenizerFast'.
The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'RagTokenizer'. 
The class this function is called from is 'BartTokenizer'.
The tokenizer class you load from this checkpoint is not the same type as the class this function is called fr

In [12]:
phish_model_name = "H1tak3/phishing-url-detector"

phish_tokenizer = AutoTokenizer.from_pretrained(phish_model_name)
phish_model = AutoModelForSequenceClassification.from_pretrained(phish_model_name).to(device)

In [13]:
def classify_url(url):
    """Classify the URL as phishing or legitimate."""
    inputs = phish_tokenizer(url, return_tensors="pt", truncation=True, padding=True).to(device)
    with torch.no_grad():
        outputs = phish_model(**inputs)

    prediction = torch.argmax(outputs.logits, dim=1).item()
    return "Phishing" if prediction == 1 else "Legitimate"

In [14]:
def retrieve_top_k_docs(query):
    """Retrieve top-K most relevant documents using LlamaIndex."""
    retrieved_docs = retriever.retrieve(query)
    return [doc.text for doc in retrieved_docs]

In [15]:
def generate_explanation(query, retrieved_docs):
    """Generate an explanation using RAG model."""
    context_text = " ".join(retrieved_docs)

    rag_input = f"Query: {query}\nContext: {context_text}\nExplain the type of phishing attack."

    inputs = rag_tokenizer(
        rag_input, return_tensors="pt", max_length=1024, truncation=True
    ).to(device)

    output = rag_model.generate(**inputs, max_length=100)

    return rag_tokenizer.decode(output[0], skip_special_tokens=True)

In [16]:
def generate_explanation(query, retrieved_docs):
    """Generate an explanation using RAG model."""
    if not retrieved_docs:
        return "No relevant phishing attack explanation found."

    context_text = " ".join(retrieved_docs)

    rag_input = f"Query: {query}\nContext: {context_text}\nExplain the type of phishing attack."

    inputs = rag_tokenizer(
        rag_input, return_tensors="pt", max_length=1024, truncation=True
    ).to(device)

    print("Tokenized Input:", inputs)

    try:
        output = rag_model.generate(**inputs, max_length=100, do_sample=True, num_return_sequences=1)

        explanation = rag_tokenizer.decode(output[0], skip_special_tokens=True)
        return explanation

    except Exception as e:
        print(f"⚠ Error during generation: {e}")
        return "⚠ An error occurred while generating the explanation."

In [17]:
def detect_phishing_type(url):
    """Main pipeline: Classify URL, retrieve documents, and generate an explanation."""
    classification = classify_url(url)

    if classification == "Phishing":
        top_k_docs = retrieve_top_k_docs(url)
        explanation = generate_explanation(url, top_k_docs)
        return f"Phishing detected!\nAttack Explanation: {explanation}"

    return "The URL is legitimate."

In [18]:
test_url = "https://www.sb1.com"

result = detect_phishing_type(test_url)

print(result)

Tokenized Input: {'input_ids': tensor([[  101,  1024,  1024,  1013,  1013,  2833,  1012,  2680,  2487,  1012,
  4012,   102]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])}
Phishing detected!
Attack Explanation: The URL uses a misleading domain name that mimics a legitimate financial institution. Attackers often register short, plausible-looking domains like "sb1.com" to trick users into believing they are visiting a trusted banking site. This tactic exploits brand impersonation and can lead to credential theft or financial fraud.
