In [1]:
import re, json, zipfile
from pymorphy3 import MorphAnalyzer
morph = MorphAnalyzer()

In [2]:
def load_config(path: str) -> dict:
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    return data

In [3]:
data = load_config(path="config.json")

_shorteners = data.get("shorteners", [])
_ricky_exts = data.get("risky_exts", [])
_brand_domains = data.get("brand_domains", [])
keywords_dangerous = data.get("keywords_dangerous", [])
_keywords_domains = list(keywords_dangerous.keys())
_keywords_phishing = []
for kws in keywords_dangerous.values():
    _keywords_phishing.extend(kws)
_keywords_phishing = list(set(_keywords_phishing))

In [4]:
text = "Ваш аккаунт заблокирован. Учётная запись заблокирована из-за подозрительной активности. Срочная подтвердите пароль! http://example.com https://example.org/path?query=1"

In [5]:
def extract_urls(text):
    return re.findall(r'https?://[^\s)>\]"]+', text, flags=re.I)

In [6]:
def extract_domains(text):
    urls = extract_urls(text)
    domains = []
    for url in urls:
        match = re.search(r'https?://([^/\s)>\]"]+)', url, flags=re.I)
        if match:
            domains.append(match.group(1).lower())
    return domains

In [7]:
extract_urls(text)

['http://example.com', 'https://example.org/path?query=1']

In [8]:
extract_domains(text)

['example.com', 'example.org']

In [9]:
def normalize_words(text: str):
    words = re.findall(r"[а-яА-ЯёЁ]+", (text or "").lower())
    return [morph.parse(w)[0].normal_form for w in words]

In [10]:
normalize_words(text)

['ваш',
 'аккаунт',
 'заблокировать',
 'учётный',
 'запись',
 'заблокировать',
 'из',
 'за',
 'подозрительный',
 'активность',
 'срочный',
 'подтвердить',
 'пароль']

In [11]:
def keyword_hits(text: str, keywords_sample: list[str]):
    words = normalize_words(text)
    count = 0
    keywords_phishing = []
    for word in words:
        if word in keywords_sample:
            count += 1
            keywords_phishing.append(word)
    return round(count * 100 / len(words)), keywords_phishing

In [12]:
sample_texts = [
    "Ваш аккаунт заблокирован.",
    "Учётная запись заблокирована из-за подозрительной активности.",
    "Срочная подтвердите пароль!",
]

for t in sample_texts:
    print(t, "->", keyword_hits(t, _keywords_phishing))

Ваш аккаунт заблокирован. -> (67, ['аккаунт', 'заблокировать'])
Учётная запись заблокирована из-за подозрительной активности. -> (57, ['учётный', 'запись', 'заблокировать', 'подозрительный'])
Срочная подтвердите пароль! -> (100, ['срочный', 'подтвердить', 'пароль'])


In [13]:
def exist_shorted_url(text: str, shorteners: list[str]) -> bool:
    urls = extract_urls(text)
    for url in urls:
        if any(short in url for short in shorteners):
            return True
    return False

In [14]:
urls = [
    "http://bit.ly/abc",
    "https://tinyurl.com/xyz",
    "http://example.com/full-url",
]
for url in urls:
    print(url, "->", exist_shorted_url(url, _shorteners))

http://bit.ly/abc -> True
https://tinyurl.com/xyz -> True
http://example.com/full-url -> False


In [15]:
def exits_dangerous_domain(text: str, keywords_dangerous_domains: list[str]) -> bool:
    domains = extract_domains(text)
    for domain in domains:
        if any(danger in domain for danger in keywords_dangerous_domains):
            return True
    return False

In [16]:
urls = [
    "http://cloud-verify.com",
    "https://safe-site.org",
    "http://apple-auth.net"
]
for url in urls:
    print(url, "->", exits_dangerous_domain(url, _keywords_domains))

http://cloud-verify.com -> True
https://safe-site.org -> False
http://apple-auth.net -> True


In [17]:
def exits_unencrypted_url(text: str) -> bool:
    urls = extract_urls(text)
    for url in urls:
        if url.startswith("http://"):
            return True
    return False

In [18]:
urls = [
    "http://example.com",
    "https://secure-site.org",
    "http://another-site.net",
]
for url in urls:
    print(url, "->", exits_unencrypted_url(url))

http://example.com -> True
https://secure-site.org -> False
http://another-site.net -> True


In [19]:
def is_dangerous_attachment(attachment: str, risky_exts: list[str]) -> bool:
    # lấy đuôi tệp gồm cả dấu chấm
    ext = '.' + attachment.split('.')[-1].lower() if '.' in attachment else ''
    return ext in risky_exts

In [20]:
attachments = [
    "document.pdf",
    "script.exe",
    "image.jpg",
    "lab",
    "malware.bat"

]
for att in attachments:
    print(att, "->", is_dangerous_attachment(att, _ricky_exts))

document.pdf -> False
script.exe -> True
image.jpg -> False
lab -> False
malware.bat -> True


In [21]:
def exits_fake_domain(text: str, brand_domains: dict) -> bool:
    domains = extract_domains(text)
    for domain in domains:
        for brand, brand_domains in brand_domains.items():
            if brand.lower() in domain and all(brand_domain not in domain for brand_domain in brand_domains):
                return True
    return False

In [22]:
domains = [
    "http://apple-secure.com",
    "https://microsoft.com",
    "http://google-support.net",
    "https://amazon.com"
]
for domain in domains:
    print(domain, "->", exits_fake_domain(domain, _brand_domains))

http://apple-secure.com -> True
https://microsoft.com -> False
http://google-support.net -> True
https://amazon.com -> False


In [23]:
def validate_letter_format(data: dict) -> bool:
    required_fields = {
        "id": str,
        "datetime": str,
        "sender": str,
        "subject": str,
        "attachment": (str, type(None)),
        "text": str
    }
    for field, field_type in required_fields.items():
        if field not in data:
            # print(f"Missing field: {field}")
            return False
        if not isinstance(data[field], field_type):
            # print(f"Field '{field}' has wrong type: {type(data[field])}, expected {field_type}")
            return False
    # Simple email format check
    if not re.match(r"[^@]+@[^@]+\.[^@]+", data["sender"]):
        # print("Invalid sender email format")
        return False
    if data.get("text", "") == "":
        # print("Text field is empty")
        return False
    return True

In [24]:
letter = {
    "id": "12345",
    "datetime": "2023-10-01T12:34:56",
    "sender": "cloud@verify.ru",
    "subject": "Account Verification",
    "attachment": None,
    "text": "Your account is blocked. Please verify your password at http://cloud-verify.com"
}
print("Letter valid:", validate_letter_format(letter))

Letter valid: True


In [25]:
def score_letter(data: dict) -> int:
    score = 0
    sender = data.get("sender", "")
    subject = data.get("subject", "")
    attachment = data.get("attachment", None)
    text = data.get("text", "")

    # check sender domain +2
    if exits_dangerous_domain(sender, _keywords_domains):
        score += 2
        #print("Dangerous sender domain detected.")
    # check subject keywords +1
    subject_score, _ = keyword_hits(subject, _keywords_phishing)
    if subject_score >= 20:
        score += 1
        #print("Dangerous subject detected.")
    # check text keywords +2
    text_score, _ = keyword_hits(text, _keywords_phishing)
    if text_score >= 20:
        score += 1
        #print("Dangerous text detected.")
    # check shortened URLs +2
    if exist_shorted_url(text, _shorteners):
        score += 1
        #print("Dangerous shortened url detected.")
    # check unencrypted URLs +1
    if exits_unencrypted_url(text):
        score += 1
        #print("Dangerous unencrypted url detected.")
    # check fake brand domains +3
    if exits_fake_domain(text, _brand_domains):
        score += 3
        #print("Dangerous domain detected.")
    # check dangerous attachments +2
    if attachment and is_dangerous_attachment(attachment, _ricky_exts):
        score += 2
        #print("Dangerous attachment detected.")

    return score

In [26]:
data = {
  "id": "j5g6h7",
  "datetime": "2025-09-11T14:20:50+03:00",
  "sender": "noreply@amazon-secure.ru",
  "subject": "Проблема с вашим заказом №45231",
  "attachment": None,
  "text": "Ваш заказ не может быть обработан. Для подтверждения данных перейдите по ссылке: http://amazon-confirm.ru/login. В противном случае заказ будет отменён."
}
score_letter(data)

5

In [33]:
def classify_zip(zip_path):
    ph = 0
    no_ph = 0
    with zipfile.ZipFile(zip_path, 'r') as z:
        for name in z.namelist():
            if not name.lower().endswith('.json') or name.startswith('__MACOSX/'):
                continue
            try:
                data = json.loads(z.read(name).decode('utf-8'))


                if not validate_letter_format(data):
                    # print(f"Invalid letter format in file: {name}")
                    continue
                score = score_letter(data)
                if score >= 3:
                    ph += 1
                    print(name)
                else:
                    no_ph += 1
            except json.JSONDecodeError:
                #print(f"Error decoding JSON in file: {name}")
                continue
    zip_name = zip_path.split('/')[-1]
    print(f"Total phishing letters in zip file {zip_name}: {ph}")
    print(f"Total non-phishing letters in zip file {zip_name}: {no_ph}")

In [34]:
classify_zip('letters_1.zip')

phish/t5q6r7.json
phish/f1a2b3.json
phish/j5g6h7.json
phish/n9k0l1.json
phish/k6h7i8.json
phish/g2c3d4.json
phish/m8j9k0.json
phish/o0l1m2.json
phish/s4p5q6.json
phish/u6r7s8.json
phish/h3e4f5.json
phish/l7i8j9.json
phish/r3o4p5.json
phish/q2n3o4.json
phish/p1m2n3.json
phish/i4f5g6.json
Total phishing letters in zip file letters_1.zip: 16
Total non-phishing letters in zip file letters_1.zip: 20


In [35]:
# tìm tất cả file zip có trong thu mục rồi chạy classify_zip
import os

def classify_all_zips_in_directory(directory: str):
    for filename in os.listdir(directory):
        if filename.lower().endswith('.zip'):
            zip_path = os.path.join(directory, filename)
            classify_zip(zip_path)

In [36]:
classify_all_zips_in_directory(".")

phish/t5q6r7.json
phish/f1a2b3.json
phish/j5g6h7.json
phish/n9k0l1.json
phish/k6h7i8.json
phish/g2c3d4.json
phish/m8j9k0.json
phish/o0l1m2.json
phish/s4p5q6.json
phish/u6r7s8.json
phish/h3e4f5.json
phish/l7i8j9.json
phish/r3o4p5.json
phish/q2n3o4.json
phish/p1m2n3.json
phish/i4f5g6.json
Total phishing letters in zip file .\letters_1.zip: 16
Total non-phishing letters in zip file .\letters_1.zip: 20
test1/t5q6r7.json
test1/f1a2b3.json
test1/j5g6h7.json
test1/n9k0l1.json
test1/k6h7i8.json
test1/g2c3d4.json
test1/m8j9k0.json
test1/o0l1m2.json
test1/s4p5q6.json
test1/u6r7s8.json
test1/h3e4f5.json
test1/l7i8j9.json
test1/r3o4p5.json
test1/q2n3o4.json
test1/p1m2n3.json
test1/i4f5g6.json
Total phishing letters in zip file .\test1.zip: 16
Total non-phishing letters in zip file .\test1.zip: 20
test2/t5q6r7.json
test2/f1a2b3.json
test2/j5g6h7.json
test2/n9k0l1.json
test2/k6h7i8.json
test2/g2c3d4.json
test2/m8j9k0.json
test2/o0l1m2.json
test2/s4p5q6.json
test2/u6r7s8.json
test2/h3e4f5.json
test2/

In [31]:
import hashlib

def file_checksum(path, algo="sha256", block_size=65536):
    h = hashlib.new(algo)
    with open(path, "rb") as f:
        while chunk := f.read(block_size):
            h.update(chunk)
    return h.hexdigest()

In [32]:
for filename in os.listdir("."):
    if filename.lower().endswith('.zip'):
        print(f"{filename}: {file_checksum(filename)}")

letters_1.zip: 0a84369c1315c05f1a43702faca3a9e5dfedb64a682dbd85ebf4bc6b00d23e4e
test1.zip: 9949b5f6358633c0ec8675b26fea116d01587f0c173cc7fdc38704de0c4b8247
test2.zip: 49f7ddb84e33036bf6cae79fca34da7a8b536923d86a5cde7668f1e108c2d026
test3.zip: 1a8fcb6f3e88900cc90891b3c8bb321a8384125981e562cbc09e02a22c3b2042
test4.zip: 7567e35ba0226cd257149fd3e0779fb95304fa409a80d431956801858edb3736
test5.zip: 9e2b5710b3aed4a4d961640719f63a67e41240f5cdd4055cfe182fa93de9bfe5
