In [None]:
from sentence_transformers import SentenceTransformer, CrossEncoder, util
from sklearn.preprocessing import normalize
import faiss
import numpy as np
import pandas as pd
import json

open_path = r"C:\Users\Lenovo\Desktop\open_redirect_clean.csv"
nosql_path = r"C:\Users\Lenovo\Desktop\logs_nosql.csv"

df_open = pd.read_csv(open_path)
df_nosql = pd.read_csv(nosql_path)

df_open['label'] = 'open_redirect'
df_nosql['label'] = 'nosql_injection'

df_all = pd.concat([df_open, df_nosql], ignore_index=True)
print(f"[+] Total training samples: {len(df_all)}")

bi_encoder = SentenceTransformer('all-MiniLM-L6-v2')
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

corpus = df_all['log'].astype(str).tolist()
corpus_labels = df_all['label'].tolist()

embeddings = bi_encoder.encode(corpus, convert_to_numpy=True, show_progress_bar=True)
embeddings = normalize(embeddings, norm='l2')

dimension = embeddings.shape[1]
faiss_index = faiss.IndexFlatIP(dimension)
faiss_index.add(embeddings)

def detect_attack(log_line, top_k_faiss=50, top_k_final=3):
    rule_label = None
    if any(k in log_line.lower() for k in ["$regex", "$ne", "$gt", "$where", "$in"]):
        rule_label = "nosql_injection"
    elif any(k in log_line.lower() for k in ["redirect", "next=", "url=", "http", "https"]):
        rule_label = "open_redirect"

    query_emb = bi_encoder.encode([log_line], convert_to_numpy=True)
    query_emb = normalize(query_emb, norm='l2')
    _, top_k_idx = faiss_index.search(query_emb, top_k_faiss)
    top_k_idx = top_k_idx[0]

    cross_inp = [[log_line, corpus[i]] for i in top_k_idx]
    cross_scores = cross_encoder.predict(cross_inp)

    reranked = sorted(zip(top_k_idx, cross_scores), key=lambda x: x[1], reverse=True)
    best_idx, best_score = reranked[0]
    sim_label = corpus_labels[best_idx]

    final_label = rule_label or sim_label
    return {
        "log": log_line,
        "rule_based": rule_label,
        "similarity_based": sim_label,
        "similarity_score": float(best_score),
        "final_decision": final_label
    }

test_logs = [
    "GET /index.html",
    '{"username":{"$ne":""}, "password":{"$ne":""}}',
    "GET /login?next=https://evil.com",
    "/api/search?query=$where: 'return true'"
]

results = [detect_attack(log) for log in test_logs]
df_res = pd.DataFrame(results)
print(df_res)
