In [1]:
import pandas as pd

COLUMNS = ['folder', 'url', 'phish_category', 'pred_target', 'matched_domain', 'siamese_conf', 'logo_recog_time',
           'logo_match_time']

# VP - newly crawled phishing
df = pd.read_csv(
    '/Users/mjarczewski/Repositories/inz/20250312_results.txt',
    sep='\t',
    names=COLUMNS
)
df

Unnamed: 0,folder,url,phish_category,pred_target,matched_domain,siamese_conf,logo_recog_time,logo_match_time
0,paypal.com--679,https://paypal.com,0,,,0.967144,0.1851,0.0741
1,walmart.com--913,https://walmart.com,0,,,0.978121,0.1579,0.0559
2,americanexpress.com--255,https://americanexpress.com,0,,,0.877718,0.1534,0.0738
3,grupobancolombia.com--852,https://grupobancolombia.com,0,,,0.960032,0.1566,0.0527
4,bradesco.com.br--610,https://bradesco.com.br,0,,,0.938148,0.1525,0.0491
...,...,...,...,...,...,...,...,...
1014,facebook.com--126,https://facebook.com,0,,,0.919449,0.1896,0.0564
1015,wellsfargo.com--031,https://wellsfargo.com,0,,,0.915240,0.1820,0.0514
1016,onedrive.live.com--309,https://onedrive.live.com,1,Microsoft,['microsoft.com'],0.961259,0.2159,0.0745
1017,bradesco.com.br--087,https://bradesco.com.br,0,,,0.715971,0.2276,0.0762


In [2]:
from src.models.phishpedia.prepare_vp import special_domains

urlToCompany = {v: k for k, v in special_domains.items()}

In [3]:
def add_classification_columns(df):
    """
    Add 'pred_class' and 'class' columns to the dataframe.
    - pred_class: 'benign' if phish_category is 0, else value from pred_target
    - class: value from folder column

    Args:
        df: DataFrame with columns including 'phish_category', 'pred_target', and 'folder'

    Returns:
        DataFrame: Original dataframe with additional columns
    """
    # Create a copy to avoid modifying the original
    result_df = df.copy()

    # Add pred_class column
    result_df['pred_class'] = result_df.apply(
        lambda row: 'benign' if row['phish_category'] == 0 else row['pred_target'],
        axis=1
    )

    # Add class column (from folder value)
    result_df['class'] = result_df['folder'].apply(lambda x: urlToCompany.get(x.split('--')[0], '<>'))

    return result_df

In [17]:
import pandas as pd
import re


def normalize_brand(brand):
    """
    Normalize a brand name for better matching:
    - Convert to lowercase
    - Remove special characters
    - Remove common words like 'inc', 'llc', etc.
    """
    if pd.isna(brand):
        return None

    # Convert to lowercase
    normalized = brand.lower()

    # Replace underscores with spaces
    normalized = normalized.replace('_', ' ')

    # Remove special characters
    normalized = re.sub(r'[^\w\s]', '', normalized)

    # Remove common company suffixes and articles
    common_words = ['inc', 'llc', 'ltd', 'corp', 'corporation', 'the', 'company']
    for word in common_words:
        normalized = re.sub(r'\b' + word + r'\b', '', normalized)

    # Remove extra whitespace
    normalized = re.sub(r'\s+', ' ', normalized).strip()

    return normalized


# Define exceptions for manual matching
# Format: (brand1, brand2) where either one can match the other
manual_exceptions = [
    ('ms_onedrive', 'Microsoft'),
    ('ms_outlook', 'Microsoft'),
    ('office365', 'Microsoft'),
    ('lloyds_bank', 'Lloyds TSB Group'),
    ('caixa_bank', 'Caixa Economica Federal'),
    ('gov_uk', 'Government of the United Kingdom'),
    ('gov.uk', 'Government of the United Kingdom'),  # Added with period
    ('ms_office', 'Microsoft'),
    ('cibc', 'Canadian Imperial Bank of Commerce'),
    ('bnp_paribas', 'BNP Paribas'),
    ('impots_gov', 'DGI French Tax Authority'),
]

# Create a dictionary for faster lookups
exception_dict = {}
for brand1, brand2 in manual_exceptions:
    brand1_key = brand1.lower().replace('_', '').replace('.', '')
    brand2_key = brand2.lower().replace('_', '').replace('.', '')

    if brand1_key not in exception_dict:
        exception_dict[brand1_key] = set()
    if brand2_key not in exception_dict:
        exception_dict[brand2_key] = set()

    exception_dict[brand1_key].add(brand2_key)
    exception_dict[brand2_key].add(brand1_key)


def check_same(name, target):
    """
    Check if name and target represent the same brand

    Args:
        name (str): First brand name
        target (str): Second brand name

    Returns:
        bool: True if they match, False otherwise
    """
    # Handle the case where name might contain an ID suffix
    if '--' in name:
        name_base = name.split('--')[0]
    else:
        name_base = name

    # Get the company name from urlToCompany dictionary
    name_mapped = urlToCompany.get(name_base, '<>')

    if pd.isna(name_mapped) or pd.isna(target):
        return False

    # Create variations of the name for comparison
    name_variations = [
        name_mapped.lower().replace('_', '').replace('.', ''),
        name_base.lower().replace('_', '').replace('.', ''),
        name_mapped.lower(),
        name_base.lower()
    ]

    target_base = target.lower().replace('_', '').replace('.', '')

    # Check exception dictionary
    for name_var in name_variations:
        if name_var in exception_dict and target_base in exception_dict[name_var]:
            return True

    # Standard matching
    return (
        name_mapped in normalize_brand(target) or
        normalize_brand(name_mapped) in target or
        any(name_var == target_base for name_var in name_variations)
    )


def add_classification_columns(df):
    """
    Add 'pred_class' and 'class' columns to the dataframe.
    - pred_class: 'benign' if phish_category is 0, else value from pred_target
    - class: value from folder column
    """
    # Create a copy to avoid modifying the original
    result_df = df.copy()

    # Add pred_class column
    result_df['pred_class'] = result_df.apply(
        lambda row: 'benign' if row['phish_category'] == 0 else row['pred_target'],
        axis=1
    )

    # Add class column (from folder value)
    result_df['class'] = result_df['folder']

    return result_df


def evaluate_matches(df):
    """
    Evaluate matches between pred_class and class columns

    Args:
        df: DataFrame with required columns

    Returns:
        DataFrame: Original dataframe with added match results
    """
    # Add classification columns
    result_df = add_classification_columns(df)

    # Add matched column by applying check_same to each row
    result_df['matched'] = result_df.apply(
        lambda row: check_same(row['folder'], row['pred_target']) if row['phish_category'] != 0 else False,
        axis=1
    )

    # Calculate summary statistics
    correct_matches = result_df['matched'].sum()
    match_percentage = correct_matches / len(result_df) * 100

    print(f"Number of correct matches: {correct_matches}")
    print(f"Percentage of correct matches: {match_percentage:.2f}%")

    return result_df


# Usage
result = evaluate_matches(df)
result

Number of correct matches: 169
Percentage of correct matches: 16.58%


Unnamed: 0,folder,url,phish_category,pred_target,matched_domain,siamese_conf,logo_recog_time,logo_match_time,pred_class,class,matched
0,paypal.com--679,https://paypal.com,0,,,0.967144,0.1851,0.0741,benign,paypal.com--679,False
1,walmart.com--913,https://walmart.com,0,,,0.978121,0.1579,0.0559,benign,walmart.com--913,False
2,americanexpress.com--255,https://americanexpress.com,0,,,0.877718,0.1534,0.0738,benign,americanexpress.com--255,False
3,grupobancolombia.com--852,https://grupobancolombia.com,0,,,0.960032,0.1566,0.0527,benign,grupobancolombia.com--852,False
4,bradesco.com.br--610,https://bradesco.com.br,0,,,0.938148,0.1525,0.0491,benign,bradesco.com.br--610,False
...,...,...,...,...,...,...,...,...,...,...,...
1014,facebook.com--126,https://facebook.com,0,,,0.919449,0.1896,0.0564,benign,facebook.com--126,False
1015,wellsfargo.com--031,https://wellsfargo.com,0,,,0.915240,0.1820,0.0514,benign,wellsfargo.com--031,False
1016,onedrive.live.com--309,https://onedrive.live.com,1,Microsoft,['microsoft.com'],0.961259,0.2159,0.0745,Microsoft,onedrive.live.com--309,True
1017,bradesco.com.br--087,https://bradesco.com.br,0,,,0.715971,0.2276,0.0762,benign,bradesco.com.br--087,False


In [None]:
import pickle

with open('../domain_map.pkl', "rb") as handle:
    domain_map = pickle.load(handle)
domain_map.get('onedrive', 'aaa')

In [None]:
def write_dict_keys_to_file(dictionary, filename):
    """
    Write the keys of a dictionary to a file, with each key on a separate line.

    Parameters:
    dictionary (dict): The dictionary whose keys you want to write
    filename (str): Path to the output file
    """
    with open(filename, 'w') as f:
        for key in dictionary.keys():
            f.write(f"{key}\n")

    print(f"Keys written to {filename}")

In [None]:
def create_brand_mapping(lowercase_file, mixedcase_file, output_file):
    """
    Create a mapping from lowercase brand names to their proper mixed case versions.
    Only processes unique brand names from both input files.

    Parameters:
    lowercase_file (str): Path to the file with lowercase brand names
    mixedcase_file (str): Path to the file with mixed case brand names
    output_file (str): Path to save the mapping results
    """
    # Read the lowercase brands file (assume one brand per line)
    with open(lowercase_file, 'r') as f:
        lowercase_brands = [line.strip() for line in f.readlines()]

    # Read the mixed case brands file (assume one brand per line)
    with open(mixedcase_file, 'r') as f:
        # Remove any quotes and strip whitespace
        mixedcase_brands = [line.strip().strip("'").strip('"') for line in f.readlines()]

    # Identify and report duplicates in lowercase brands
    lowercase_brands_set = set()
    lowercase_duplicates = []
    unique_lowercase_brands = []

    for brand in lowercase_brands:
        if brand in lowercase_brands_set:
            lowercase_duplicates.append(brand)
        else:
            lowercase_brands_set.add(brand)
            unique_lowercase_brands.append(brand)

    # Identify and report duplicates in mixed case brands
    mixedcase_brands_set = set()
    mixedcase_duplicates = []
    unique_mixedcase_brands = []

    for brand in mixedcase_brands:
        if brand in mixedcase_brands_set:
            mixedcase_duplicates.append(brand)
        else:
            mixedcase_brands_set.add(brand)
            unique_mixedcase_brands.append(brand)

    # Report on duplicates
    if lowercase_duplicates:
        print(f"Found {len(lowercase_duplicates)} duplicate lowercase brands (removed):")
        for i, brand in enumerate(lowercase_duplicates[:5], 1):
            print(f"  {i}. {brand}")
        if len(lowercase_duplicates) > 5:
            print(f"  ... and {len(lowercase_duplicates) - 5} more")

    if mixedcase_duplicates:
        print(f"Found {len(mixedcase_duplicates)} duplicate mixed case brands (removed):")
        for i, brand in enumerate(mixedcase_duplicates[:5], 1):
            print(f"  {i}. {brand}")
        if len(mixedcase_duplicates) > 5:
            print(f"  ... and {len(mixedcase_duplicates) - 5} more")

    # Create DataFrames with unique brands
    df_lower = pd.DataFrame({'lowercase_brand': unique_lowercase_brands})
    df_mixed = pd.DataFrame({'proper_brand': unique_mixedcase_brands})

    # Add a normalized column to both DataFrames for matching
    df_lower['normalized'] = df_lower['lowercase_brand'].apply(normalize_brand)
    df_mixed['normalized'] = df_mixed['proper_brand'].apply(normalize_brand)

    # Check for duplicates in normalized brands
    dup_normalized_mixed = df_mixed['normalized'].duplicated(keep=False)
    if dup_normalized_mixed.any():
        print(
            f"Warning: Found {dup_normalized_mixed.sum()} brands in the mixed case file that normalize to the same value:")
        duplicated_groups = df_mixed[dup_normalized_mixed].groupby('normalized')['proper_brand'].apply(list)
        dup_items = list(duplicated_groups.items())
        display_count = min(5, len(dup_items))
        for i, (norm, brands) in enumerate(dup_items[:display_count], 1):
            print(f"  {i}. '{norm}': {brands}")
        if len(duplicated_groups) > 5:
            print(f"  ... and {len(duplicated_groups) - 5} more groups")

    # Create a mapping dictionary from normalized to proper case
    # If there are duplicates in normalized form, use the first one
    mapping_dict = dict(zip(df_mixed['normalized'], df_mixed['proper_brand']))

    # Apply the mapping to the lowercase brands
    df_lower['mapped_brand'] = df_lower['normalized'].map(mapping_dict)

    # For brands that didn't get mapped, keep the original lowercase brand
    df_lower['final_brand'] = df_lower['mapped_brand'].fillna(df_lower['lowercase_brand'])

    # Create a mapping report
    mapping_report = pd.DataFrame({
        'lowercase_brand': df_lower['lowercase_brand'],
        'proper_brand': df_lower['final_brand'],
        'matched': ~df_lower['mapped_brand'].isna()
    })

    # Save the mapping to a CSV file
    mapping_report.to_csv(output_file, index=False)

    # Print summary statistics
    total_brands = len(df_lower)
    matched_brands = sum(~df_lower['mapped_brand'].isna())
    print(f"\nTotal unique brands in lowercase file: {total_brands}")
    print(f"Total unique brands in mixed case file: {len(df_mixed)}")
    print(f"Successfully mapped: {matched_brands} ({matched_brands / total_brands * 100:.1f}%)")
    print(f"Unmapped brands: {total_brands - matched_brands}")
    print(f"Mapping saved to: {output_file}")

    # Create additional files for unmapped brands
    unmapped_df = mapping_report[~mapping_report['matched']]
    if len(unmapped_df) > 0:
        unmapped_file = output_file.replace('.csv', '_unmapped.csv')
        unmapped_df.to_csv(unmapped_file, index=False)
        print(f"Unmapped brands saved to: {unmapped_file}")

    return mapping_report


def normalize_brand(brand):
    """
    Normalize a brand name for better matching:
    - Convert to lowercase
    - Remove special characters
    - Remove common words like 'inc', 'llc', etc.
    """
    if pd.isna(brand):
        return None

    # Convert to lowercase
    normalized = brand.lower()

    # Replace underscores with spaces
    normalized = normalized.replace('_', ' ')

    # Remove special characters
    normalized = re.sub(r'[^\w\s]', '', normalized)

    # Remove common company suffixes and articles
    common_words = ['inc', 'llc', 'ltd', 'corp', 'corporation', 'the', 'company']
    for word in common_words:
        normalized = re.sub(r'\b' + word + r'\b', '', normalized)

    # Remove extra whitespace
    normalized = re.sub(r'\s+', ' ', normalized).strip()

    return normalized


def write_dict_keys_to_file(dictionary, filename):
    """
    Write the keys of a dictionary to a file, with each key on a separate line.

    Parameters:
    dictionary (dict): The dictionary whose keys you want to write
    filename (str): Path to the output file
    """
    with open(filename, 'w') as f:
        for key in dictionary.keys():
            f.write(f"{key}\n")

    print(f"Keys written to {filename}")

In [None]:
write_dict_keys_to_file(domain_map, 'domain_map_keys.txt')

In [None]:
df = create_brand_mapping('/Users/mjarczewski/Repositories/inz/src/models/phishpedia/test/labels.txt',
                          'domain_map_keys.txt', 'brand_mapping.csv')

In [None]:
df[df['matched'] == False]