In [1]:
!pip install tenseal
!pip install transformers accelerate torch
!pip install -q --upgrade openai

Collecting tenseal
  Downloading tenseal-0.3.16-cp312-cp312-win_amd64.whl.metadata (8.6 kB)
Downloading tenseal-0.3.16-cp312-cp312-win_amd64.whl (2.2 MB)
   ---------------------------------------- 0.0/2.2 MB ? eta -:--:--
   ---------------------------------------- 0.0/2.2 MB ? eta -:--:--
    --------------------------------------- 0.0/2.2 MB 435.7 kB/s eta 0:00:05
   - -------------------------------------- 0.1/2.2 MB 751.6 kB/s eta 0:00:03
   ---- ----------------------------------- 0.2/2.2 MB 1.3 MB/s eta 0:00:02
   ----- ---------------------------------- 0.3/2.2 MB 1.3 MB/s eta 0:00:02
   ------- -------------------------------- 0.4/2.2 MB 1.6 MB/s eta 0:00:02
   --------- ------------------------------ 0.5/2.2 MB 1.7 MB/s eta 0:00:01
   ------------ --------------------------- 0.7/2.2 MB 1.9 MB/s eta 0:00:01
   --------------- ------------------------ 0.8/2.2 MB 1.9 MB/s eta 0:00:01
   ---------------- ----------------------- 0.9/2.2 MB 2.0 MB/s eta 0:00:01
   -----------------


[notice] A new release of pip is available: 24.0 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


Collecting transformers
  Downloading transformers-4.52.4-py3-none-any.whl.metadata (38 kB)
Collecting accelerate
  Downloading accelerate-1.7.0-py3-none-any.whl.metadata (19 kB)
Collecting huggingface-hub<1.0,>=0.30.0 (from transformers)
  Downloading huggingface_hub-0.32.4-py3-none-any.whl.metadata (14 kB)
Collecting regex!=2019.12.17 (from transformers)
  Downloading regex-2024.11.6-cp312-cp312-win_amd64.whl.metadata (41 kB)
     ---------------------------------------- 0.0/41.5 kB ? eta -:--:--
     ---------------------------------------- 41.5/41.5 kB 2.0 MB/s eta 0:00:00
Collecting tokenizers<0.22,>=0.21 (from transformers)
  Downloading tokenizers-0.21.1-cp39-abi3-win_amd64.whl.metadata (6.9 kB)
Collecting safetensors>=0.4.3 (from transformers)
  Downloading safetensors-0.5.3-cp38-abi3-win_amd64.whl.metadata (3.9 kB)
Downloading transformers-4.52.4-py3-none-any.whl (10.5 MB)
   ---------------------------------------- 0.0/10.5 MB ? eta -:--:--
   --------------------------------


[notice] A new release of pip is available: 24.0 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip

[notice] A new release of pip is available: 24.0 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
from typing import List, Dict, Tuple
from sklearn.feature_extraction.text import TfidfVectorizer
import os
import json
import openai
from openai import OpenAI
import re
import hashlib
import os
import numpy as np
from pathlib import Path

In [None]:
def hash_ngram(ngram, buckets=1024):
    return int(hashlib.sha256(ngram.encode()).hexdigest(), 16) % buckets

def vectorize(text, n=4, buckets=1024):
    vec = [0] * buckets
    text = text.lower()
    for i in range(len(text) - n + 1):
        idx = hash_ngram(text[i:i+n], buckets)
        vec[idx] += 1
    return vec

# Connect to OpenRouter using their API key and base URL
client = OpenAI(
    api_key="...",
    base_url="https://openrouter.ai/api/v1"
)

# 1. Helper: LLM phrase generation from regex
def generate_phrases_from_regex(regex: str, max_phrases=10) -> List[str]:
    prompt = f"""You are a cybersecurity expert. Given the following regex from a phishing YARA rule:

{regex}

List {max_phrases} natural phrases or sentences that could appear in a phishing email and match the intent of this regex.
Just list each phrase on a new line without explanations."""

    response = client.chat.completions.create(
        model="mistralai/mistral-7b-instruct",
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": prompt}
        ],
        temperature=0.7,
        max_tokens=300
    )

    raw_text = response.choices[0].message.content

    phrases = [
        re.sub(r"^\s*[\d]+[\.\)\-]*\s*", "", line).strip(" -•\n").lstrip("\"")
        for line in raw_text.split("\n") if line.strip()
    ]
    return phrases

# 2. Helper: extract n-grams (3-5 words) from phrases
def extract_ngrams(text: str, min_n=3, max_n=5) -> List[str]:
    words = text.lower().split()
    return [
        " ".join(words[i:i+n])
        for n in range(min_n, max_n + 1)
        for i in range(len(words) - n + 1)
    ]

# 3. Convert YARA regexes to vectorizer and feature space
def vectorize_yara_phrases(rules: List[Dict[str, object]]) -> Tuple[TfidfVectorizer, List[str]]:
    all_phrases = []
    for rule in rules:
        pattern = rule["pattern"]
        phrases = generate_phrases_from_regex(pattern)
        all_phrases.extend(phrases)

    # Convert to 3–5 word n-grams
    ngrams = set()
    for phrase in all_phrases:
        ngrams.update(extract_ngrams(phrase, 3, 5))

    sorted_vocab = sorted(ngrams)
    vectorized_vocab = [vectorize(phrase) for phrase in sorted_vocab]

    return vectorized_vocab, sorted_vocab



def generate_regexs_from_rule_text(rule_text: str, max_regex=3) -> List[Dict[str, str]]:
 
    prompt = f"""You are a cybersecurity expert. Here is a YARA rule:
    {rule_text}

    Your task is to extract up to {max_regex} regular expressions that are explicitly written or clearly implied by the rule.

    For each regex, estimate the "weight" field (as a float) to reflect how strongly the pattern indicates phishing (the higher the weight, the more suspicious/phishy the pattern is). Assign higher weights to patterns that are more typical for phishing, and lower weights to more generic or less suspicious patterns.

    Return the result as a valid Python list of dictionaries, using the following exact format:
    [
        {{ "pattern": r"/regex1/i", "weight": 1.0 }},
        {{ "pattern": r"/regex2/i", "weight": 0.7 }}
    ]

    Do not include any explanations, markdown, or comments!

    - Each pattern must be a Python raw string (starting with r") and contain a YARA-style regex: between slashes (e.g., /something/i).
    - Do NOT use extra quotes inside the pattern (e.g., no "r\"...\"" or escapes).
    - Do NOT include any markdown, explanations, comments, or extra text — just the list.
    - If the rule contains no regex, infer up to 3 regex-style patterns that match the intent of the rule (e.g., suspicious links, phishing phrases, etc.).
    - The pattern must be a valid Python raw string.
    - Avoid unescaped quotes. Prefer single quotes `'` around strings.
    - Do not include YARA flags like `nocase` as raw identifiers — use `/.../i` inside the string instead.

    """

    response = client.chat.completions.create(
        model="mistralai/mistral-7b-instruct",
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": prompt}
        ],
        temperature=0.5,
        max_tokens=300
    )

    raw_output = response.choices[0].message.content

    try:
        return eval(raw_output.strip())  # UWAGA: tylko jeśli masz pełną kontrolę nad odpowiedzią
    except Exception as e:
        print("Błąd parsowania odpowiedzi z LLM:", e)
        print("Odpowiedź:\n", raw_output)
        return []


def generate_regexs_from_folder(folder_path: str, max_regex=3):
    """
    Zwraca listę krotek (nazwa_pliku, lista_reguł)
    """
    all_rule_sets = []

    for filename in os.listdir(folder_path):
        if not filename.endswith(".yar"):
            continue

        file_path = os.path.join(folder_path, filename)
        with open(file_path, "r", encoding="utf-8") as f:
            rule_text = f.read()

        regex_list = generate_regexs_from_rule_text(rule_text, max_regex=max_regex)
        if regex_list:
            all_rule_sets.append((filename, regex_list))
        else:
            print(f"Brak regexów w pliku {filename}")

    return all_rule_sets


In [10]:
import tenseal as ts
import hashlib

# --- Configuration ---
BUCKETS = 1024
NGRAM_SIZE = 4
SCALE = 2 ** 40

# --- Text Hashing Vectorizer ---
def hash_ngram(ngram, buckets=BUCKETS):
    return int(hashlib.sha256(ngram.encode()).hexdigest(), 16) % buckets

def vectorize(text, n=NGRAM_SIZE, buckets=BUCKETS):
    vec = [0] * buckets
    text = text.lower()
    for i in range(len(text) - n + 1):
        idx = hash_ngram(text[i:i+n], buckets)
        vec[idx] += 1
    return vec

# --- CKKS Context ---
def create_ckks_context():
    context = ts.context(
        ts.SCHEME_TYPE.CKKS,
        poly_modulus_degree=8192,
        coeff_mod_bit_sizes=[60, 40, 40, 60]  # Must match scale size
    )
    context.global_scale = SCALE
    context.generate_galois_keys()
    return context

# --- Encryption Helper ---
def encrypt_vector(vector, context):
    return ts.ckks_vector(context, vector)

In [24]:
# yara_rules = [
#     r"/pass(word)?\s*reset/i"]
# vectorized_vocab, sorted_vocab = vectorize_yara_phrases(yara_rules)
# print(sorted_vocab)

In [50]:
# --- YARA rules with weights ---
rules = [
    {"pattern": r"/pass(word)?\s*reset/i", "weight": 1.5},
    {"pattern": r"/account\s+(suspended|locked|restricted)/i", "weight": 1.0},
    {"pattern": r"/verify\s+your\s+identity/i", "weight": 1.2},
    {"pattern": r"/click\s+the\s+link/i", "weight": 0.8},
    {"pattern": r"/secure\s+login/i", "weight": 1.3},
]
vectorized_vocab, sorted_vocab = vectorize_yara_phrases(rules)
# print(sorted_vocab)



def rule_matches(pattern: str, text: str) -> bool:
    return re.search(pattern.strip("/i"), text, re.IGNORECASE) is not None

In [35]:
# -- YARA rules with weights for every file in the "rules" folder ---
rules_sets = generate_regexs_from_folder("rules/", max_regex=3)

all_vectorized_vocabs = []  # lista: [vectorized_vocab dla każdego pliku]
all_sorted_vocabs = []      # lista: [sorted_vocab dla każdego pliku]
all_filenames = []          # lista nazw plików 
all_rules = []              # lista reguł 

for filename, rules in rules_sets:
    print(f"\n--- Zestaw reguł z pliku: {filename} ---")
    print("Regexy w zestawie:")
    for rule in rules:
        print(f"  {rule['pattern']}")
    vectorized_vocab, sorted_vocab = vectorize_yara_phrases(rules)
    all_vectorized_vocabs.append(vectorized_vocab)
    all_sorted_vocabs.append(sorted_vocab)
    all_filenames.append(filename)
    all_rules.extend(rules)
    # print("Lista fraz:", sorted_vocab)


# -- YARA rules with weights for all files in the "rules" folder ---
global_vectorized_vocab, global_sorted_vocab = vectorize_yara_phrases(all_rules)
# print("\n=== GLOBALNA LISTA SŁOWNIKÓW FRAZ (ze wszystkich plików) ===")
# print(global_sorted_vocab)


--- Zestaw reguł z pliku: complex_html_rule.yar ---
Regexy w zestawie:
  <!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.01
  <a[^>]+>
  (Hello,|Best regards,)

--- Zestaw reguł z pliku: domains_rule.yar ---
Regexy w zestawie:
  chainsmokers\-feeling\.org
  [a-z0-9]+(?:\.|_){2,}[a-z]{2,}\b
  (?:amazonses|sendgrid|sparkpostmail)\.com

--- Zestaw reguł z pliku: encoded_reply_rule.yar ---
Regexy w zestawie:
  /Reply-To:\s*=\?UTF-8\?B\?.{20,}\?=/i
  /Reply-To:\s*=\?.*\?.{20,}\?=/i
  /Reply-To:\s*=.*\?B\?.{20,}\?=/i

--- Zestaw reguł z pliku: phrases_rule.yar ---
Regexy w zestawie:
  "Renew your subscription"
  "Update your payment details"
  "Password Expiration Notification"
  "Click below"

--- Zestaw reguł z pliku: sekurak_yara_example.yar ---
Regexy w zestawie:
  "sekret"
  "password"
  "sekret.*password"

--- Zestaw reguł z pliku: suspicious_links_rule.yar ---
Regexy w zestawie:
  bit\.ly/
  tinyurl\.com/
  https?://[^ ]*@(.*)

--- Zestaw reguł z pliku: suspicious_tld_rule.yar ---
Regexy 

In [None]:
#--- Example usage ---
print(F"Plik: {rules_sets[1][0]} \nLista słowników regexów z wagami: {rules_sets[1][1]}") 

Plik: domains_rule.yar 
Lista słowników regexów z wagami: [{'pattern': 'chainsmokers\\-feeling\\.org', 'weight': 1.0}, {'pattern': '[a-z0-9]+(?:\\.|_){2,}[a-z]{2,}\\b', 'weight': 0.8}, {'pattern': '(?:amazonses|sendgrid|sparkpostmail)\\.com', 'weight': 0.7}]


In [8]:
# --- Main Logic ---
context = create_ckks_context()

email_text = """Subject: Immediate Action Required – Password Reset

Dear user,

We have detected suspicious activity on your account and, as a security precaution, your access has been temporarily limited.

To restore access, please follow the link below to initiate a password reset:

👉 https://secure-login-authenticator.com/reset

If you do not reset your password within 24 hours, your account will be permanently locked.

Thank you for your cooperation,
Security Team
"""
yara_rule_text = "password reset"

# Vectorize
email_vector = vectorize(email_text)
yara_vector = vectorize(yara_rule_text)

# Pad to same length if needed (shouldn't be necessary here)
if len(email_vector) != len(yara_vector):
    max_len = max(len(email_vector), len(yara_vector))
    email_vector += [0] * (max_len - len(email_vector))
    yara_vector += [0] * (max_len - len(yara_vector))

# Encrypt both vectors
enc_email = encrypt_vector(email_vector, context)
enc_yara = encrypt_vector(yara_vector, context)

# Compute encrypted dot product
# This works because both vectors have same scale & context
enc_score = enc_email.dot(enc_yara)

# --- Threshold Configuration ---
THRESHOLD = 10.0

# Decrypt the result
score = enc_score.decrypt()[0]
print("Similarity Score (Dot Product, decrypted):", score)

# Check against threshold
is_phishing = score >= THRESHOLD

# Additional verification with YARA rules
yara_matches = [
    rule for rule in rules
    if rule_matches(rule["pattern"], email_text)
]
yara_score = sum(rule["weight"] for rule in yara_matches)

# Combined approach: similarity + YARA rules
final_score = score + yara_score
final_threshold = THRESHOLD + 2.0  # Higher threshold for combined approach

is_phishing_final = final_score >= final_threshold

print("\n--- Results ---")
print(f"Similarity score: {score:.2f}")
print(f"YARA rules matched: {len(yara_matches)} (total weight: {yara_score:.2f})")
print(f"Combined score: {final_score:.2f}")
print(f"Threshold: {THRESHOLD:.2f} (similarity), {final_threshold:.2f} (combined)")
if is_phishing_final:
    print("\nPHISHING!")
    print("Matched YARA rules:")
    for rule in yara_matches:
        print(f"- {rule['pattern']} (weight: {rule['weight']})")
else:
    print("\nEmail appears legitimate")

Similarity Score (Dot Product, decrypted): 36.00000536888277

--- Results ---
Similarity score: 36.00
YARA rules matched: 0 (total weight: 0.00)
Combined score: 36.00
Threshold: 10.00 (similarity), 12.00 (combined)

PHISHING!
Matched YARA rules:


In [9]:
# Padding
max_len = max(len(email_vector), len(vectorized_vocab[0]))
email_vector += [0] * (max_len - len(email_vector))

enc_email = encrypt_vector(email_vector, context)

# Calculate similarity with each phrase vector from the vocab:
scores = []
for phrase_vec in vectorized_vocab:
    vec = phrase_vec
    vec += [0] * (max_len - len(vec))  # padding
    enc_phrase = encrypt_vector(vec, context)
    enc_score = enc_email.dot(enc_phrase)
    score = enc_score.decrypt()[0]
    scores.append(score)

max_score = max(scores)

# Check which YARA rules match the email text
yara_matches = [rule for rule in rules if rule_matches(rule["pattern"], email_text)]
yara_score = sum(rule["weight"] for rule in yara_matches)

# Calculate the combined score
final_score = max_score + yara_score

THRESHOLD = 10.0
final_threshold = THRESHOLD + 2.0

is_phishing_final = final_score >= final_threshold

print("\n--- Results ---")
print(f"Max similarity score fraz: {max_score:.2f}")
print(f"YARA rules matched: {len(yara_matches)} (total weight: {yara_score:.2f})")
print(f"Combined score: {final_score:.2f}")
print(f"Threshold: {THRESHOLD:.2f} (similarity), {final_threshold:.2f} (combined)")
if is_phishing_final:
    print("\nPHISHING!")
    print("Matched YARA rules:")
    for rule in yara_matches:
        print(f"- {rule['pattern']} (weight: {rule['weight']})")
else:
    print("\nEmail appears legitimate")


--- Results ---
Max similarity score fraz: 96.00
YARA rules matched: 0 (total weight: 0.00)
Combined score: 96.00
Threshold: 10.00 (similarity), 12.00 (combined)

PHISHING!
Matched YARA rules:


In [10]:
def vectorize_all_emails(input_dir: str, output_path: str) -> None:
    input_dir = Path(input_dir)
    vectors: list[np.ndarray] = []
    for dirpath, dirnames, filenames in os.walk(input_dir):
        for filename in filenames:
            file_path = os.path.join(dirpath, filename)
            with open(file_path, encoding="utf-8", errors="replace") as file:
                text = file.read()
                vec = vectorize(text)
                vectors.append(np.array(vec))

    np.savez_compressed(output_path, np.array(vectors))
    
    return np.array(vectors)



In [43]:
def iter_emails(path: str):
    array = np.load(path)['arr_0']
    for row in array:
        yield row

In [12]:
phishing = vectorize_all_emails(
    input_dir="samples/phishing_mails",
    output_path="./samples/phishing_mails_vectorized"
)

regular = vectorize_all_emails(
    input_dir="./samples/regular_mails",
    output_path="./samples/regular_mails_vectorized"
)

In [13]:
mail_vectors = iter_emails('./samples/phishing_mails_vectorized.npz')

In [39]:
def calibrate_threshold(regular_path, vectorized_vocab, vocab_boost=2.0, percentile=75):
    context = create_ckks_context()
    max_len = max(max(len(vec) for vec in vectorized_vocab), BUCKETS)

    padded_vocab = [vec + [0] * (max_len - len(vec)) for vec in vectorized_vocab]
    encrypted_vocab = [encrypt_vector(vec, context) for vec in padded_vocab]

    def evaluate(vec):
        vec = list(vec) + [0] * (max_len - len(vec))
        enc_email = encrypt_vector(vec, context)
        similarity_scores = [enc_email.dot(enc_phrase).decrypt()[0] for enc_phrase in encrypted_vocab]
        return max(similarity_scores)

    similarities = []
    for vec in iter_emails(regular_path):
        similarities.append(evaluate(vec))

    import numpy as np
    base_threshold = np.percentile(similarities, percentile)

    return base_threshold + vocab_boost


In [38]:
def scan_vectorized_emails(
    phishing_path: str,
    regular_path: str,
    output_path: str,
    rules: List[Dict[str, object]],
    vectorized_vocab: List[List[int]],
    threshold: float = 10.0,
    vocab_boost: float = 2.0,
    sample_fraction: float = 1.0,
    max_files: int = None
):
    import random

    context = create_ckks_context()
    max_len = max(max(len(vec) for vec in vectorized_vocab), BUCKETS)

    padded_vocab = [vec + [0] * (max_len - len(vec)) for vec in vectorized_vocab]
    encrypted_vocab = [encrypt_vector(vec, context) for vec in padded_vocab]

    def evaluate_email(vec):
        vec = list(vec) + [0] * (max_len - len(vec))
        enc_email = encrypt_vector(vec, context)
        similarity_scores = [enc_email.dot(enc_phrase).decrypt()[0] for enc_phrase in encrypted_vocab]
        return max(similarity_scores)

    if threshold == "auto":
        threshold = calibrate_threshold(
            regular_path, vectorized_vocab,
            vocab_boost=vocab_boost
        )
        print(f"Dynamicznie ustawiony próg: {threshold:.2f}")

    with open(output_path, 'w', encoding='utf-8') as f:
        for label, path in [("phishing_mails", phishing_path), ("regular_mails", regular_path)]:
            all_vecs = list(iter_emails(path))
            random.shuffle(all_vecs)
            sample_size = min(int(len(all_vecs) * sample_fraction), max_files or len(all_vecs))

            for idx, vec in enumerate(all_vecs[:sample_size]):
                similarity = evaluate_email(vec)
                yara_score = 0.0
                text = ""  # optionally left empty
                for rule in rules:
                    if rule_matches(rule["pattern"], text):
                        yara_score += rule["weight"]

                final_score = similarity + yara_score
                final_threshold = threshold
                result = "MATCH" if final_score >= final_threshold else "NO MATCH"

                f.write(f"{result}: {label}/{idx} -> similarity={similarity:.2f}, yara_score={yara_score:.2f}, total={final_score:.2f}\n")


In [41]:
def analyze_vectorized_results(result_file_path: str):
    phishing_matches = phishing_total = 0
    regular_matches = regular_total = 0
    other_matches = other_total = 0

    with open(result_file_path, 'r', encoding='utf-8') as f:
        for line in f:
            is_match = line.startswith("MATCH")
            if "phishing_mails" in line:
                phishing_total += 1
                if is_match:
                    phishing_matches += 1
            elif "regular_mails" in line:
                regular_total += 1
                if is_match:
                    regular_matches += 1
            else:
                other_total += 1
                if is_match:
                    other_matches += 1

    total_matches = phishing_matches + regular_matches + other_matches
    total_scanned = phishing_total + regular_total + other_total

    print("Analiza wyników (vectorized approach):")
    print(f"Phishing mails: {phishing_matches} / {phishing_total} dopasowań")
    print(f"Regular mails:  {regular_matches} / {regular_total} dopasowań")
    print(f"Inne pliki:      {other_matches} / {other_total} dopasowań")
    print(f"SUMA:            {total_matches} / {total_scanned} plików dopasowanych\n")


In [None]:
# ---All YARA files processing in loop--
for (filename, rules), vectorized_vocab in zip(rules_sets, all_vectorized_vocabs):
    print(f"\n--- Przetwarzanie reguł z pliku: {filename} ---")
    output_path = f"results/{filename}_results.txt"
    
    scan_vectorized_emails(
        phishing_path="./samples/phishing_mails_vectorized.npz",
        regular_path="./samples/regular_mails_vectorized.npz",
        output_path=output_path,
        rules=rules,
        vectorized_vocab=vectorized_vocab,
        threshold="auto",
        vocab_boost=2.0
    )
    
    analyze_vectorized_results(output_path)


In [None]:
# --- Global YARA rules processing ---
scan_vectorized_emails(
    phishing_path="./samples/phishing_mails_vectorized.npz",
    regular_path="./samples/regular_mails_vectorized.npz",
    output_path="results/global_results.txt",
    rules=rules,
    vectorized_vocab=global_vectorized_vocab,
    threshold="auto",
    vocab_boost=2.0
)

analyze_vectorized_results("results/global_results.txt")