In [12]:
#importing all the required libraries
import hashlib
import os
import email
from bs4 import BeautifulSoup
import re
import json
import requests
from urllib.parse import urlparse,quote
from urlextract import URLExtract
import pandas as pd
import joblib

#import the custom classes from the seperate file to avoid retraining
from model_classes import SenderPatternFeatures, URLFeatureExtractor

#Load all API keys from .env file
from dotenv import load_dotenv
load_dotenv()

True

In [13]:
# Define constants
EML_DIR = '../malicious_emails/'
WHITELIST_FILE = 'whitelist.json'
PHISHTANK_API_URL = "https://checkurl.phishtank.com/checkurl/"
USER_AGENT = "phishtank/PhishDet"

PHISHTANK_API_KEY = "" # Blank

VIRUSTOTAL_API_KEY = os.getenv("VIRUSTOTAL_API_KEY")
VIRUSTOTAL_URL = "https://www.virustotal.com/api/v3/files/"

In [14]:
# getting the from address, subject , body , and unique urls from an email file
def parse_emails(filepath) :
    with open (filepath, 'rb') as f:
        raw_email = f.read()

    msg = email.message_from_bytes(raw_email) #reads the file as raw bytes

    # extract subject
    subject_header = msg['Subject']
    if subject_header :
        subject_decoded = email.header.decode_header(subject_header)
        subject = ''
        for fragment , encoding in subject_decoded :
            if isinstance(fragment, bytes):
                try :
                    subject += fragment.decode(encoding or 'utf-8', errors='replace')
                except LookupError :
                    subject += fragment.decode('utf-8', errors='replace')
            else :
                subject += fragment
    else :
        subject = '(No Subject)'

    # extract from address
    from_addr = email.utils.parseaddr(msg.get('From'))[1]

    #extract body and urls
    body =''
    urls = [] #initialize empty list to store URLs
    extractor = URLExtract()

    for part in msg.walk() :
        content_type = part.get_content_type()

        # handling plaintext
        if content_type == 'text/plain':
            try:
                plain_text_content = part.get_payload(decode=True).decode(errors='replace')
                body += plain_text_content
                # Find URLs in the plain text part
                urls.extend(extractor.find_urls(plain_text_content))
            except:
                continue
            
        #handling html format
        elif content_type == 'text/html':
            try:
                html_content = part.get_payload(decode=True).decode(errors='replace')
                soup = BeautifulSoup(html_content, 'html.parser')
                # Add the text version to the body for ML analysis
                body += soup.get_text()
                # Find all <a> tags with an href attribute and extract the links
                for a_tag in soup.find_all('a', href=True):
                    urls.append(a_tag['href'])
            except :
                continue
    # remove duplicate urls
    unique_urls = list(set(urls))

    return from_addr, subject, body ,unique_urls, msg





In [15]:
#loading the whitelist file
def load_whitelist(filepath):
    with open(filepath, 'r') as f:
        whitelist = json.load(f)    
    return whitelist

In [16]:
#checking if the url is whitelisted or not
def is_white_listed(url, whitelist) :
    parsed = urlparse(url) #breaks down the input url into its components (like the protocol, domain, and path).
    domain = parsed.netloc #This extracts just the domain name (e.g., google.com) from the parsed URL. This is used for checking against domain-based whitelists.

    # exact URL match
    if url in whitelist.get('exactMatching', {}).get('url', []) :
        return True

    #domain match
    for whitelisted_domain in whitelist.get('domainMatching', {}).get('domain', []):
        if whitelisted_domain in domain:
            return True
    
    #domains in urls
    for domain_substring in whitelist.get('domainsInURLs', []):
        if domain_substring in url:
            return True
    
    return False


In [17]:
#phishtank API call to check if URL is malicious, suspicous or unknown
def check_phishtank(url):
    headers = {'User-Agent': USER_AGENT}
    payload = {
        'url': url,  # Do NOT encode with quote()
        'format': 'json'
    }
    if PHISHTANK_API_KEY:
        payload['app_key'] = PHISHTANK_API_KEY

    try:
        response = requests.post(PHISHTANK_API_URL, data=payload, headers=headers, timeout=10)
        print(f"Querying PhishTank for: {url}")
        print(f"Status code: {response.status_code}")
        if response.status_code == 200:
            data = response.json()
            print(f"PhishTank response: {data}")
            results = data.get('results', {})
            in_database = results.get('in_database', False)
            verified = results.get('verified', False)
            valid = results.get('valid', False)

            if in_database and verified and valid:
                return 'malicious'
            elif in_database and not verified:
                return 'suspicious'
            else:
                # URL not in database - treat as unknown, not safe
                return 'unknown'
        else:
            print(f" API call failed: {response.status_code}")
            return 'unknown'
    except Exception as e:
        print(f" Error querying PhishTank: {e}")
        return 'unknown'

In [18]:
#getting attachments from an email file
def extract_attachments(msg, save_dir='attachments'):
    attachments = []
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    for part in msg.walk():
        if part.get_content_maintype() == 'multipart':
            continue
        if part.get('Content-Disposition') is None:
            continue

        filename = part.get_filename()
        if filename:
            # Clean filename to avoid path issues
            safe_filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
            filepath = os.path.join(save_dir, safe_filename)
            try:
                with open(filepath, "wb") as f:
                    f.write(part.get_payload(decode=True))
                attachments.append(filepath)
            except Exception as e:
                print(f"Error saving attachment {filename}: {e}")
    return attachments

In [19]:
#getting the the sha256 hash of a file
def get_file_sha256(filepath):
    sha256_hash = hashlib.sha256()
    with open(filepath, "rb") as f:
        for byte_block in iter(lambda: f.read(4096), b""):
            sha256_hash.update(byte_block)
    return sha256_hash.hexdigest()

In [20]:
#Virustotal API call  to check if the file is malicious or not
def check_virustotal(file_hash):
    headers = {
        "x-apikey": VIRUSTOTAL_API_KEY
    }
    url = VIRUSTOTAL_URL + file_hash
    try:
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            data = response.json()
            stats = data['data']['attributes']['last_analysis_stats']
            malicious_count = stats.get('malicious', 0)
            if malicious_count > 0:
                return 'malicious'
            else:
                return 'safe'
        elif response.status_code == 404:
            return 'unknown'  # File not found in VT database
        else:
            print(f" VirusTotal API error: {response.status_code}")
            return 'unknown'
    except Exception as e:
        print(f" Error checking VirusTotal: {e}")
        return 'unknown'

In [21]:
def ml_detection_check(from_addr, subject, body, urls):
    
    #Use ML model to detect phishing emails when blacklist checks fail
    
    try:
        # Load the trained model the pickled file made from ml training script
        model = joblib.load("../phishing_email_model_fixed.pkl")
        
        # Prepare data for ML model
        email_data = {
            'subject': subject,
            'body': body,
            'sender': from_addr,
            'urls': len(urls)
        }
        
        # Create DataFrame for prediction
        test_df = pd.DataFrame([email_data])
        
        # Make prediction
        prediction = model.predict(test_df)[0]
        prediction_proba = model.predict_proba(test_df)[0]
        confidence = max(prediction_proba) * 100
        
        # Determine result
        result = "PHISHING" if prediction == 1 else "LEGITIMATE"
        
        return result, confidence
        
    except Exception as e:
        print(f"Error in ML detection: {e}")
        return "ERROR", 0

In [22]:
if __name__ == '__main__':
    whitelist = load_whitelist(WHITELIST_FILE)

    if not os.path.exists(EML_DIR):
        print(f"Directory {EML_DIR} does not exist. Please create it and add it your .eml files.")
    else:
        eml_files = [f for f in os.listdir(EML_DIR) if f.endswith('.eml')]
        if not eml_files:
            print(f"Please add your .eml files to the {EML_DIR} directory for analysis.")
        else :
            for eml_file in eml_files:
                filepath = os.path.join(EML_DIR, eml_file)
                from_addr, subject, body, urls, msg = parse_emails(filepath)
                print(f"File : {eml_file}")
                print(f"From : {from_addr}")
                print(f"Subject : {subject}")
            
                #initial conditions
                email_status = 'SAFE'
                blacklist_failed = False

                #1. Phishtank check and whitelist url check
                print('BLACKLIST ANALYSIS:')
                print("Extracted URLs:")
                for url in urls:
                    print(f" - {url}")
                    if is_white_listed(url, whitelist):
                        print(" WHITELISTED - skipping analysis...")
                        continue
                    status = check_phishtank(url)
                    if status == 'malicious' :
                        print("! MALICIOUS URL DETECTED !")
                        email_status = 'MALICIOUS'
                    elif status == 'suspicious' :
                        print("! SUSPICIOUS URL DETECTED !")
                        email_status = 'MALICIOUS'
                    elif status == 'unknown' :
                        print(" ? URL NOT IN DATABASE ? requires further ML analysis...")
                        blacklist_failed = True
                
                #2. Virustotal attachment checking
                attachments = extract_attachments(msg)
                if attachments:
                    print("Attachments found:")
                    for attachment in attachments:
                        file_hash = get_file_sha256(attachment)
                        status = check_virustotal(file_hash)
                        if status == 'malicious':
                            print(f"Malicious attachment detected: {attachment}")
                            email_status = "MALICIOUS"
                        elif status == 'safe':
                            print(f"Attachment is safe: {attachment}")
                        else:
                            print(f"File Not Found in VirusTotal Database (Unknown): {attachment}")
                            blacklist_failed = True
                else:
                    print("No attachments found.")

                #3. ML Detection as Fallback
                if email_status != "MALICIOUS" and blacklist_failed:
                    print("\n ML DETECTION (Fallback):")
                    ml_result, confidence = ml_detection_check(from_addr, subject, body, urls)
                    if ml_result == "PHISHING":
                        print(f"ML Model detected PHISHING (Confidence: {confidence:.2f}%)")
                        email_status = "MALICIOUS"
                    elif ml_result == "LEGITIMATE":
                        print(f"ML Model detected LEGITIMATE (Confidence: {confidence:.2f}%)")
                    else:
                        print(f"ML detection error")
                
                # Final verdict
                print(f"\n FINAL VERDICT: {email_status}")
                if email_status == "MALICIOUS":
                    print("EMAIL IS MALICIOUS - TAKE ACTION!")
                else:
                    print("Email appears to be safe")
                
                print("\n" + "-"*60 + "\n")

                





                




File : sample-1.eml
From : banco.bradesco@atendimento.com.br
Subject : CLIENTE PRIME - BRADESCO LIVELO: Seu cartão tem 92.990 pontos LIVELO expirando hoje!
BLACKLIST ANALYSIS:
Extracted URLs:
 - https://blog1seguimentmydomaine2bra.me/
Querying PhishTank for: https://blog1seguimentmydomaine2bra.me/
Status code: 200
PhishTank response: {'meta': {'timestamp': '2025-09-26T06:42:13+00:00', 'serverid': 'e5f3084e', 'status': 'success', 'requestid': '172.17.128.1.68d635c5303354.82648285'}, 'results': {'url': 'https://blog1seguimentmydomaine2bra.me/', 'in_database': False}}
 ? URL NOT IN DATABASE ? requires further ML analysis...
No attachments found.

 ML DETECTION (Fallback):
ML Model detected PHISHING (Confidence: 70.34%)

 FINAL VERDICT: MALICIOUS
EMAIL IS MALICIOUS - TAKE ACTION!

------------------------------------------------------------

File : sample-10.eml
From : 
Subject : Microsoft account unusual signin activity
BLACKLIST ANALYSIS:
Extracted URLs:
 - mailto:sotrecognizd@gmail.com?