# Overview
1. Read CVSS CSV file, clean and sort it, remove all except CVE, Description columns


In [38]:
import pandas as pd
from rapidfuzz import fuzz, process
from tqdm import tqdm
import re
import csv
import unicodedata
from datetime import datetime
import json
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns
import gzip
import scipy

In [17]:
# Input and output file paths
input_file = './data_in/CVSSData.csv.gz'
output_file = './cleaned_optimized_fuzzy_deduplicated_file.csv.gz'
removed_file = './removed_duplicates.csv.gz'
exact_file = './exact_duplicates.csv.gz'
output_json_file = './duplicate_info.json.gz'


In [18]:
# Read the CSV file
df = pd.read_csv(input_file,quoting=csv.QUOTE_ALL, escapechar='\\', compression='gzip')
df = df[['CVE', 'Description']]
#df=df[:50000] #test sample
df

Unnamed: 0,CVE,Description
0,CVE-1999-0095,"The debug command in Sendmail is enabled, allo..."
1,CVE-1999-0082,CWD ~root command in ftpd allows root access.
2,CVE-1999-1471,Buffer overflow in passwd in BSD based operati...
3,CVE-1999-1122,Vulnerability in restore in SunOS 4.0.3 and ea...
4,CVE-1999-1467,Vulnerability in rcp on SunOS 4.0.x allows rem...
...,...,...
275716,CVE-2025-0214,A vulnerability was found in TMD Custom Header...
275717,CVE-2024-13130,A vulnerability was found in Dahua IPC-HFW1200...
275718,CVE-2024-13131,A vulnerability classified as problematic has ...
275719,CVE-2024-13132,A vulnerability classified as problematic was ...


In [None]:
def clean_description(text):
    if not isinstance(text, str):
        return ''

    # Normalize unicode characters
    text = unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII')

    # Convert to lowercase
    text = text.lower()

    # Remove newlines and carriage returns
    text = text.replace('\n', ' ').replace('\r', '')

    # Remove extra spaces
    text = re.sub(r'\s+', ' ', text)

    return text


def mark_and_count_dupes(txt_col, threshold=80, window=1000):
    txt_list = txt_col.to_list()
    marked = [True] * len(txt_list)
    duplicate_count = 0
    duplicate_groups = []
    
    for i in tqdm(range(len(txt_list)), desc="Checking for duplicates"):
        if not marked[i]:  # don't check duplicates of text rows marked for removal
            continue
        
        group = [i]
        # Define the window
        start = max(0, i + 1)
        end = min(len(txt_list), i + window + 1)
        
        for j in range(start, end):
            if marked[j]:  # only look through vals not already marked for removal
                if fuzz.ratio(txt_list[i], txt_list[j], score_cutoff=threshold):
                    marked[j] = False  # mark for removal
                    duplicate_count += 1
                    group.append(j)
        
        if len(group) > 1:
            duplicate_groups.append(group)
    
    return marked, duplicate_count, duplicate_groups



In [None]:
def save_duplicate_info(df, duplicate_groups, output_file, include_examples=True):
    """
    Save comprehensive duplicate information to a JSON file with enhanced analytics.
    
    Args:
        df: DataFrame containing CVE data
        duplicate_groups: List of duplicate groups
        output_file: Path to save the JSON output
        include_examples: Whether to include example matches
    """
    from collections import Counter
    import json
    import gzip
    import numpy as np
    from datetime import datetime
    from rapidfuzz import fuzz
    
    # Sort duplicate_groups by size (largest to smallest)
    duplicate_groups.sort(key=len, reverse=True)
    
    # Prepare detailed analytics
    group_sizes = [len(group) for group in duplicate_groups]
    size_counter = Counter(group_sizes)
    sorted_sizes = sorted(size_counter.items(), key=lambda x: x[0], reverse=True)
    
    # Calculate statistics
    total_duplicates = sum(len(group) - 1 for group in duplicate_groups)
    
    # Basic statistics
    stats = {
        "summary": {
            "total_cves": len(df),
            "total_duplicate_groups": len(duplicate_groups),
            "total_duplicate_entries": total_duplicates,
            "unique_group_sizes": len(size_counter),
            "average_group_size": np.mean(group_sizes),
            "median_group_size": np.median(group_sizes),
            "largest_group_size": max(group_sizes),
            "deduplication_ratio": total_duplicates / len(duplicate_groups),
            "timestamp": datetime.now().isoformat()
        },
        "size_distribution": [
            {
                "group_size": size,
                "count": count,
                "percentage": (count * size / len(df)) * 100
            }
            for size, count in sorted_sizes
        ],
        "percentiles": {
            f"p{p}": np.percentile(group_sizes, p)
            for p in [25, 50, 75, 90, 95, 99]
        }
    }
    
    # Prepare detailed group information
    duplicate_info = []
    for group in duplicate_groups:
        # Get base description for comparison
        base_desc = df.iloc[group[0]]['Clean_Description']
        
        # Calculate similarity scores for all members
        similarities = []
        for idx in group[1:]:
            comp_desc = df.iloc[idx]['Clean_Description']
            score = fuzz.ratio(base_desc, comp_desc)
            similarities.append(score)
        
        group_info = {
            "group_size": len(group),
            "base_cve": df.iloc[group[0]]['CVE'],
            "base_description": df.iloc[group[0]]['Description'],
            "similarity_stats": {
                "min_score": min(similarities) if similarities else 100,
                "max_score": max(similarities) if similarities else 100,
                "avg_score": np.mean(similarities) if similarities else 100
            }
        }
        
        if include_examples:
            # Include a few example matches with their similarity scores
            group_info["examples"] = [
                {
                    "cve": df.iloc[idx]['CVE'],
                    "description": df.iloc[idx]['Description'],
                    "similarity_score": score
                }
                for idx, score in zip(group[1:3], similarities[:2])  # First 2 examples
            ]
        
        duplicate_info.append(group_info)
    
    # Combine all information
    result = {
        "metadata": {
            "analysis_date": datetime.now().isoformat(),
            "threshold": 80,  # Add your actual threshold here
            "window_size": 1000  # Add your actual window size here
        },
        "statistics": stats,
        "groups": duplicate_info[:100],  # Top 100 largest groups
        "sample_analysis": {
            "largest_groups": duplicate_info[:5],  # Top 5 largest groups
            "medium_groups": duplicate_info[len(duplicate_info)//2:len(duplicate_info)//2+3],  # 3 medium-sized groups
            "threshold_analysis": {
                "high_similarity": [g for g in duplicate_info[:10] if g["similarity_stats"]["min_score"] > 90],
                "medium_similarity": [g for g in duplicate_info[:10] if 80 <= g["similarity_stats"]["min_score"] <= 90]
            }
        }
    }
    
    # Save to compressed JSON file
    with gzip.open(output_file, 'wt', encoding='utf-8') as f:
        json.dump(result, f, indent=2, ensure_ascii=False)
    
    print(f"Enhanced duplicate information saved to {output_file}")
    
    # Save a separate file for exact duplicates
    exact_dupes = [group for group in duplicate_info 
                   if group["similarity_stats"]["min_score"] == 100]
    
    exact_output = output_file.replace('.json.gz', '_exact.json.gz')
    with gzip.open(exact_output, 'wt', encoding='utf-8') as f:
        json.dump({
            "metadata": result["metadata"],
            "statistics": {
                "total_exact_duplicates": len(exact_dupes),
                "exact_duplicate_ratio": len(exact_dupes) / len(duplicate_groups)
            },
            "exact_duplicate_groups": exact_dupes
        }, f, indent=2, ensure_ascii=False)
    
    print(f"Exact duplicate information saved to {exact_output}")
    
    return result

In [None]:
def plot_duplicate_counts(duplicate_groups, output_file):
    """
    Create and save a horizontal bar plot of duplicate group sizes.
    
    Args:
        duplicate_groups: List of duplicate groups
        output_file: Base output file path for determining plot name
    """
    from pathlib import Path
    import matplotlib.pyplot as plt
    from collections import Counter
    
    # Create images directory if it doesn't exist
    images_dir = Path('images')
    images_dir.mkdir(parents=True, exist_ok=True)
    
    # Count the sizes of duplicate groups
    group_sizes = [len(group) for group in duplicate_groups]
    size_counter = Counter(group_sizes)
    
    # Sort the sizes in descending order
    sorted_sizes = sorted(size_counter.items(), key=lambda x: x[0], reverse=True)
    
    # Take top 100 or all if less than 100
    top_100 = sorted_sizes[:100]
    sizes, counts = zip(*top_100)
    
    # Create the plot
    plt.figure(figsize=(14, 20))  # Increased figure size
    bars = plt.barh(range(len(sizes)), counts, align='center')
    plt.ylabel('Group Size')
    plt.xlabel('Count of Groups')
    plt.title('Top 100 Largest Duplicate Group Sizes')
    
    # Set y-ticks to show group sizes
    plt.yticks(range(len(sizes)), sizes)
    
    # Add value labels on the bars
    for i, (size, count) in enumerate(zip(sizes, counts)):
        plt.text(count, i, f' {count}', va='center')
    
    # Adjust layout to prevent clipping of labels
    plt.tight_layout()
    
    # Save the plot in images directory
    plot_filename = 'duplicate_groups_distribution.png'
    plot_path = images_dir / plot_filename
    plt.savefig(plot_path, dpi=300, bbox_inches='tight')
    plt.close()
    
    print(f"Top 100 largest duplicate group sizes plot saved to {plot_path}")
    return str(plot_path)  # Return the path for reference

In [None]:
def create_duplicate_analysis_plots(duplicate_groups, output_dir='images'):
    """
    Create comprehensive visualizations for duplicate analysis with improved readability.
    
    Args:
        duplicate_groups: List of duplicate groups
        output_dir: Directory to save plots
    """
    from pathlib import Path
    import matplotlib.pyplot as plt
    import numpy as np
    from collections import Counter
    from scipy import stats
    
    # Create output directory if it doesn't exist
    images_dir = Path(output_dir)
    images_dir.mkdir(parents=True, exist_ok=True)
    
    # Set basic style parameters
    plt.style.use('default')
    
    # Prepare data
    group_sizes = [len(group) for group in duplicate_groups]
    size_counter = Counter(group_sizes)
    sorted_sizes = sorted(size_counter.items(), key=lambda x: x[0], reverse=True)
    
    # Create a figure with multiple subplots
    fig = plt.figure(figsize=(20, 15))
    
    # 1. Enhanced duplicate group sizes plot
    ax1 = plt.subplot2grid((2, 2), (0, 0), colspan=2)
    
    # Use scatter plot instead of bar chart to show all points
    sizes, counts = zip(*sorted_sizes)
    
    # Create a colormap based on group size
    colors = plt.cm.viridis(np.linspace(0, 1, len(sizes)))
    
    # Plot scatter with connecting lines for better visualization
    ax1.plot(counts, sizes, color='lightgray', alpha=0.5, zorder=1)
    scatter = ax1.scatter(counts, sizes, c=sizes, cmap='viridis', 
                         s=100, alpha=0.7, zorder=2)
    
    # Add colorbar
    cbar = plt.colorbar(scatter, ax=ax1)
    cbar.set_label('Group Size', fontsize=12)
    
    # Improve axes
    ax1.set_xlabel('Number of Groups', fontsize=12)
    ax1.set_ylabel('Group Size', fontsize=12)
    ax1.set_title('Duplicate Group Size Distribution', pad=20, fontsize=14, fontweight='bold')
    ax1.grid(True, alpha=0.3, linestyle='--')
    
    # Set log scale for better visualization of size distribution
    ax1.set_yscale('log')
    
    # Add annotations for interesting points
    max_count = max(counts)
    max_size = max(sizes)
    ax1.annotate(f'Largest group: {max_size}',
                xy=(size_counter[max_size], max_size),
                xytext=(10, 10), textcoords='offset points',
                bbox=dict(boxstyle='round,pad=0.5', fc='white', alpha=0.8))
    
    # 2. Distribution plot with log scale
    ax2 = plt.subplot2grid((2, 2), (1, 0))
    
    # Create histogram with log scale
    hist_data = np.array(group_sizes)
    n, bins, patches = ax2.hist(hist_data, bins=50, color='purple', alpha=0.7)
    ax2.set_yscale('log')
    ax2.set_title('Distribution of Duplicate Group Sizes (Log Scale)', fontsize=14, fontweight='bold')
    ax2.set_xlabel('Group Size', fontsize=12)
    ax2.set_ylabel('Frequency (Log Scale)', fontsize=12)
    ax2.grid(True, alpha=0.3, linestyle='--', which='both')
    
    # Add statistical annotations
    stats_text = f'Mean: {np.mean(hist_data):.2f}\n'
    stats_text += f'Median: {np.median(hist_data):.2f}\n'
    stats_text += f'Mode: {stats.mode(hist_data)[0]:.2f}\n'
    stats_text += f'StdDev: {np.std(hist_data):.2f}\n'
    stats_text += f'Total Groups: {len(hist_data):,}'
    ax2.text(0.95, 0.95, stats_text, transform=ax2.transAxes,
             verticalalignment='top', horizontalalignment='right',
             bbox=dict(boxstyle='round', facecolor='white', alpha=0.9, edgecolor='gray'),
             fontsize=10)
    
    # 3. Cumulative distribution plot
    ax3 = plt.subplot2grid((2, 2), (1, 1))
    
    total_duplicates = sum(count * (size - 1) for size, count in size_counter.items())
    cumsum = 0
    cum_percentages = []
    sizes_cum = []
    
    for size, count in sorted_sizes:
        cumsum += count * (size - 1)
        cum_percentages.append((cumsum / total_duplicates) * 100)
        sizes_cum.append(size)
    
    # Plot with enhanced styling
    ax3.plot(sizes_cum, cum_percentages, color='gold', linewidth=2.5)
    ax3.set_title('Cumulative % of Duplicates by Group Size', fontsize=14, fontweight='bold')
    ax3.set_xlabel('Group Size', fontsize=12)
    ax3.set_ylabel('Cumulative % of Total Duplicates', fontsize=12)
    ax3.grid(True, alpha=0.3, linestyle='--')
    
    # Add percentage markers
    percentiles = [25, 50, 75, 90]
    for p in percentiles:
        idx = next(i for i, x in enumerate(cum_percentages) if x >= p)
        ax3.axhline(y=p, color='red', linestyle='--', alpha=0.3)
        ax3.text(sizes_cum[idx], p + 1, f'{p}%', va='bottom', ha='right',
                bbox=dict(facecolor='white', alpha=0.9, edgecolor='gray', pad=2),
                fontsize=10, fontweight='bold')
    
    # Add summary statistics
    summary_stats = f'Total Duplicate Groups: {len(duplicate_groups):,}\n'
    summary_stats += f'Total Duplicate Entries: {total_duplicates:,}\n'
    summary_stats += f'Unique Group Sizes: {len(size_counter):,}\n'
    summary_stats += f'Deduplication Ratio: {total_duplicates/len(duplicate_groups):.2f} duplicates/group'
    
    fig.text(0.02, 0.02, summary_stats, fontsize=11,
             bbox=dict(boxstyle='round', facecolor='white', alpha=0.9, edgecolor='gray'),
             fontweight='bold')
    
    # Add a title for the entire figure
    fig.suptitle('CVE Description Duplicate Analysis', fontsize=16, fontweight='bold', y=0.95)
    
    # Adjust layout and save
    plt.tight_layout()
    plot_path = images_dir / 'duplicate_analysis.png'
    plt.savefig(plot_path, dpi=300, bbox_inches='tight', facecolor='white')
    plt.close()
    
    print(f"Enhanced duplicate analysis plots saved to {plot_path}")
    return str(plot_path)

In [None]:
def create_separate_duplicate_analysis(df, duplicate_groups, output_dir='images'):
    """
    Create separate visualizations for exact and fuzzy duplicates.
    
    Args:
        df: DataFrame with CVE data
        duplicate_groups: List of duplicate groups
        output_dir: Directory to save plots
    """
    from pathlib import Path
    import matplotlib.pyplot as plt
    import numpy as np
    from collections import Counter
    from scipy import stats
    from rapidfuzz import fuzz
    
    # Create output directory
    images_dir = Path(output_dir)
    images_dir.mkdir(parents=True, exist_ok=True)
    
    # Separate exact and fuzzy duplicates
    exact_groups = []
    fuzzy_groups = []
    
    for group in duplicate_groups:
        base_desc = df.iloc[group[0]]['Clean_Description']
        is_exact = True
        
        # Check if all descriptions in group are exact matches
        for idx in group[1:]:
            comp_desc = df.iloc[idx]['Clean_Description']
            if base_desc != comp_desc:
                is_exact = False
                break
        
        if is_exact:
            exact_groups.append(group)
        else:
            fuzzy_groups.append(group)
    
    # Create figure
    fig = plt.figure(figsize=(20, 15))
    plt.style.use('default')
    
    # 1. Side-by-side group size distributions (top row)
    ax1 = plt.subplot2grid((2, 2), (0, 0))
    ax2 = plt.subplot2grid((2, 2), (0, 1))
    
    def plot_size_distribution(ax, groups, title, color):
        group_sizes = [len(group) for group in groups]
        size_counter = Counter(group_sizes)
        sorted_sizes = sorted(size_counter.items())
        
        if sorted_sizes:
            sizes, counts = zip(*sorted_sizes)
            ax.scatter(sizes, counts, alpha=0.6, c=color, s=100)
            ax.plot(sizes, counts, alpha=0.3, c=color)
            
        ax.set_yscale('log')
        ax.set_xscale('log')
        ax.grid(True, alpha=0.3, linestyle='--')
        ax.set_title(title, fontsize=14, fontweight='bold')
        ax.set_xlabel('Group Size', fontsize=12)
        ax.set_ylabel('Frequency (Log Scale)', fontsize=12)
        
        # Add statistics
        if groups:
            all_sizes = [len(g) for g in groups]
            stats_text = f'Total Groups: {len(groups):,}\n'
            stats_text += f'Mean Size: {np.mean(all_sizes):.1f}\n'
            stats_text += f'Median Size: {np.median(all_sizes):.1f}\n'
            stats_text += f'Max Size: {max(all_sizes)}'
            
            ax.text(0.95, 0.95, stats_text,
                   transform=ax.transAxes,
                   verticalalignment='top',
                   horizontalalignment='right',
                   bbox=dict(boxstyle='round', facecolor='white', alpha=0.9),
                   fontsize=10)
    
    plot_size_distribution(ax1, exact_groups, 'Exact Duplicate Groups', 'blue')
    plot_size_distribution(ax2, fuzzy_groups, 'Fuzzy Duplicate Groups', 'orange')
    
    # 2. Combined cumulative distribution (bottom left)
    ax3 = plt.subplot2grid((2, 2), (1, 0))
    
    def plot_cumulative(groups, label, color):
        if not groups:
            return
        
        group_sizes = [len(group) for group in groups]
        size_counter = Counter(group_sizes)
        sorted_sizes = sorted(size_counter.items())
        
        total = sum(count for _, count in sorted_sizes)
        cumsum = 0
        cum_percentages = []
        sizes = []
        
        for size, count in sorted_sizes:
            cumsum += count
            cum_percentages.append((cumsum / total) * 100)
            sizes.append(size)
        
        ax3.plot(sizes, cum_percentages, label=label, color=color, linewidth=2)
    
    plot_cumulative(exact_groups, 'Exact Duplicates', 'blue')
    plot_cumulative(fuzzy_groups, 'Fuzzy Duplicates', 'orange')
    
    ax3.set_xscale('log')
    ax3.grid(True, alpha=0.3, linestyle='--')
    ax3.set_title('Cumulative Distribution of Group Sizes', fontsize=14, fontweight='bold')
    ax3.set_xlabel('Group Size', fontsize=12)
    ax3.set_ylabel('Cumulative %', fontsize=12)
    ax3.legend()
    
    # 3. Summary statistics (bottom right)
    ax4 = plt.subplot2grid((2, 2), (1, 1))
    ax4.axis('off')
    
    total_exact = sum(len(group) - 1 for group in exact_groups)
    total_fuzzy = sum(len(group) - 1 for group in fuzzy_groups)
    
    summary_text = [
        "Duplicate Analysis Summary",
        "------------------------",
        f"Total Duplicate Groups: {len(duplicate_groups):,}",
        f"Exact Duplicate Groups: {len(exact_groups):,}",
        f"Fuzzy Duplicate Groups: {len(fuzzy_groups):,}",
        "",
        f"Total Duplicate Entries: {total_exact + total_fuzzy:,}",
        f"Exact Duplicates: {total_exact:,}",
        f"Fuzzy Duplicates: {total_fuzzy:,}",
        "",
        "Ratios:",
        f"Exact/Total Groups: {len(exact_groups)/len(duplicate_groups):.1%}",
        f"Fuzzy/Total Groups: {len(fuzzy_groups)/len(duplicate_groups):.1%}",
        "",
        "Average Group Sizes:",
        f"Exact Groups: {np.mean([len(g) for g in exact_groups]):.1f}",
        f"Fuzzy Groups: {np.mean([len(g) for g in fuzzy_groups]):.1f}"
    ]
    
    ax4.text(0.05, 0.95, '\n'.join(summary_text),
             transform=ax4.transAxes,
             verticalalignment='top',
             fontfamily='monospace',
             bbox=dict(boxstyle='round', facecolor='white', alpha=0.9),
             fontsize=10)
    
    # Overall title
    plt.suptitle('Exact vs Fuzzy Duplicates Analysis', 
                fontsize=16, fontweight='bold', y=0.95)
    
    # Adjust layout and save
    plt.tight_layout()
    plot_path = images_dir / 'duplicate_comparison.png'
    plt.savefig(plot_path, dpi=300, bbox_inches='tight', facecolor='white')
    plt.close()
    
    print(f"Duplicate comparison plot saved to {plot_path}")
    return str(plot_path)

In [65]:
# Clean the Description column
df['Clean_Description'] = df['Description'].apply(clean_description)

# Remove exact duplicates first, using the cleaned description
#df = df.drop_duplicates(subset='Clean_Description', keep='first')
#print(f"Shape after removing exact duplicates: {df.shape}")

# Sort the DataFrame by the cleaned description
df = df.sort_values('Clean_Description')

# Reset index for proper functioning of the fuzzy_dedupe function
df = df.reset_index(drop=True)



Shape after removing exact duplicates: (251400, 3)


In [66]:
print("Starting deduplication process...")
chk, dup_count, dup_groups = mark_and_count_dupes(df['Clean_Description'], threshold=80, window=1000)

dfx = df[chk]
print(f"Deduplication complete.")
print(f"Original row count: {len(df)}")
print(f"Rows remaining after deduplication: {len(dfx)}")
print(f"Number of duplicates found: {dup_count}")
print(f"Number of duplicate groups: {len(dup_groups)}")

Starting deduplication process...


Checking for duplicates:   1%|          | 1399/251400 [00:01<04:26, 937.50it/s]


KeyboardInterrupt: 

In [None]:
plot_duplicate_counts(dup_groups, output_plot_file)

Top 100 largest duplicate group sizes plot saved to images/duplicate_groups_distribution.png


'images/duplicate_groups_distribution.png'

In [None]:
plot_path = create_duplicate_analysis_plots(dup_groups)


Enhanced duplicate analysis plots saved to images/duplicate_analysis.png


In [None]:
plot_path = create_separate_duplicate_analysis(df, dup_groups)


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


Duplicate comparison plot saved to images/duplicate_comparison.png


In [None]:
# see the removed duplicates:
duplicates = df[~pd.Series(chk)]
print(f"Number of duplicates removed: {len(duplicates)}")
# Save the duplicates
duplicates.to_csv(removed_file, quoting=csv.QUOTE_ALL, escapechar='\\', compression='gzip')


# Optionally, save the deduplicated DataFrame
dfx.to_csv(output_file, quoting=csv.QUOTE_ALL, escapechar='\\', compression='gzip')



# Save duplicate information to file
save_duplicate_info(df, dup_groups, output_json_file)
print(f"Duplicate information saved to {output_json_file}")


Number of duplicates removed: 87642
Enhanced duplicate information saved to ./duplicate_info.json.gz
Exact duplicate information saved to ./duplicate_info_exact.json.gz
Duplicate information saved to ./duplicate_info.json.gz
