# Import Required Libraries
Import necessary libraries such as pandas, numpy, matplotlib, and seaborn for data manipulation, analysis, and visualization.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import ast
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

# Set style for plots
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

# For text analysis
from collections import Counter
import re
from wordcloud import WordCloud
from sklearn.feature_extraction.text import TfidfVectorizer

# For advanced analysis
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from scipy import stats

# Load the CSV Data
Use pandas to read the task1_test.csv file into a DataFrame, handling any potential encoding issues.

In [None]:
# Load all CSV files from incremental_tasks_csv directory
data_dir = Path('incremental_tasks_csv')
dfs = {}

print("Loading CSV files...")
for file_path in data_dir.glob('*.csv'):
    name = file_path.stem  # e.g., 'task1_train'
    print(f"Loading {name}.csv...")
    try:
        dfs[name] = pd.read_csv(file_path, encoding='utf-8')
        print(f"Loaded {name}.csv with shape: {dfs[name].shape}")
    except UnicodeDecodeError:
        dfs[name] = pd.read_csv(file_path, encoding='latin-1')
        print(f"Loaded {name}.csv with latin-1 encoding, shape: {dfs[name].shape}")

# Combine all dataframes for overall analysis (sample for large dataset)
sample_fraction = 0.1  # Use 10% sample for faster analysis
all_data = pd.concat([df.sample(frac=sample_fraction, random_state=42) for df in dfs.values()], ignore_index=True)
print(f"\nCombined dataset shape (sampled): {all_data.shape}")

# For detailed analysis, use full data if needed
# all_data_full = pd.concat(dfs.values(), ignore_index=True)

# Data Cleaning and Preprocessing
Clean the data by handling missing values, converting data types (e.g., dates, scores), and parsing lists like cwe_ids.

In [None]:
# Data Cleaning
print("Data Cleaning Steps:")

# Handle missing values
print("Missing Values Summary:")
missing_data = all_data.isnull().sum()
missing_percent = (missing_data / len(all_data)) * 100
missing_df = pd.DataFrame({'Missing Count': missing_data, 'Missing Percentage': missing_percent})
display(missing_df[missing_df['Missing Count'] > 0])

# Convert data types
all_data['task'] = all_data['task'].astype(int)
all_data['cvss_is_v3'] = all_data['cvss_is_v3'].astype(int)
if 'Base Score' in all_data.columns:
    all_data['Base Score'] = pd.to_numeric(all_data['Base Score'], errors='coerce')

# Parse cwe_ids
all_data['cwe_list'] = all_data['cwe_ids'].fillna("[]").apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else [])

# Convert commit_time if exists
if 'commit_time' in all_data.columns:
    all_data['commit_time'] = pd.to_datetime(all_data['commit_time'], errors='coerce')

# Remove duplicates
initial_shape = all_data.shape
all_data = all_data.drop_duplicates()
print(f"Removed {initial_shape[0] - all_data.shape[0]} duplicate rows")

# Add computed features for analysis
if 'abstract_func_before' in all_data.columns:
    all_data['code_length'] = all_data['abstract_func_before'].fillna('').str.len()
if 'description' in all_data.columns:
    all_data['desc_length'] = all_data['description'].fillna('').str.len()

print(f"Dataset shape after cleaning: {all_data.shape}")
print("Data cleaning completed.")

# Exploratory Data Analysis
Perform basic EDA to understand the dataset structure, including summary statistics and data distributions.

In [None]:
# Basic EDA
print("Dataset Info:")
print(all_data.info())

print("\n" + "="*50)
print("Column Names:")
print(all_data.columns.tolist())

print("\n" + "="*50)
print("Data Types:")
print(all_data.dtypes)

print("\n" + "="*50)
print("Summary Statistics for Numerical Columns:")
numerical_cols = all_data.select_dtypes(include=[np.number]).columns
display(all_data[numerical_cols].describe())

print("\n" + "="*50)
print("Value Counts for Key Categorical Columns:")
key_cols = ['task']
for col in key_cols:
    if col in all_data.columns:
        print(f"\n{col} distribution:")
        display(all_data[col].value_counts())

# Univariate Analysis: Histograms and Boxplots
fig, axes = plt.subplots(2, 2, figsize=(15, 12))

# Histogram of code lengths
if 'code_length' in all_data.columns:
    all_data['code_length'].hist(bins=50, ax=axes[0,0], alpha=0.7)
    axes[0,0].set_title('Code Length Distribution')
    axes[0,0].set_xlabel('Code Length (characters)')
    axes[0,0].set_ylabel('Frequency')

# Histogram of description lengths
if 'desc_length' in all_data.columns:
    all_data['desc_length'].hist(bins=50, ax=axes[0,1], alpha=0.7)
    axes[0,1].set_title('Description Length Distribution')
    axes[0,1].set_xlabel('Description Length (characters)')
    axes[0,1].set_ylabel('Frequency')

# Boxplot of code lengths
if 'code_length' in all_data.columns:
    all_data.boxplot(column='code_length', ax=axes[1,0])
    axes[1,0].set_title('Code Length Boxplot')

# Boxplot of description lengths
if 'desc_length' in all_data.columns:
    all_data.boxplot(column='desc_length', ax=axes[1,1])
    axes[1,1].set_title('Description Length Boxplot')

plt.tight_layout()
plt.show()

# Analyze CWE IDs
Extract and analyze the CWE IDs, counting frequencies and identifying common vulnerability types.

In [None]:
# CWE IDs Analysis
cwe_exploded = all_data.explode('cwe_list')
cwe_exploded = cwe_exploded[cwe_exploded['cwe_list'].notna()]

print("CWE IDs Analysis")
print("="*50)
print(f"Total unique CWE IDs: {cwe_exploded['cwe_list'].nunique()}")
print(f"Total CWE entries: {len(cwe_exploded)}")

# Top CWE IDs
cwe_counts = cwe_exploded['cwe_list'].value_counts()
print(f"\nTop 10 CWE IDs:")
display(cwe_counts.head(10))

# CWE distribution by task
cwe_per_task = cwe_exploded.groupby('task')['cwe_list'].nunique()
print(f"\nUnique CWEs per task:")
for task, count in cwe_per_task.items():
    print(f"  Task {task}: {count}")

# Visualize
fig, axes = plt.subplots(1, 2, figsize=(15, 6))

# Bar chart of top CWEs
cwe_counts.head(15).plot(kind='barh', ax=axes[0], color='purple')
axes[0].set_title('Top 15 CWE IDs by Frequency')
axes[0].set_xlabel('Count')

# Unique CWEs per task
cwe_per_task.plot(kind='bar', ax=axes[1], color='orange')
axes[1].set_title('Unique CWE IDs per Task')
axes[1].set_xlabel('Task')
axes[1].set_ylabel('Unique CWE Count')
axes[1].xticklabels = [f'Task {int(x)}' for x in axes[1].get_xticks()]

plt.tight_layout()
plt.show()

# CWE IDs Statistics by Task
Detailed statistics of CWE IDs across incremental tasks, including total counts and distinct counts with visualizations.

In [None]:
# CWE IDs Statistics by Task
print("CWE IDs Statistics by Task")
print("="*50)

# Calculate total count and distinct count per task
cwe_task_stats = cwe_exploded.groupby('task').agg(
    total_cwe_count=('cwe_list', 'count'),
    distinct_cwe_count=('cwe_list', 'nunique')
).reset_index()

print("CWE Statistics per Task:")
display(cwe_task_stats)

# Count each CWE per task
cwe_per_task_counts = cwe_exploded.groupby(['task', 'cwe_list']).size().reset_index(name='count')

# Add percentage column
cwe_per_task_counts = cwe_per_task_counts.merge(cwe_task_stats[['task', 'total_cwe_count']], on='task')
cwe_per_task_counts['percentage'] = (cwe_per_task_counts['count'] / cwe_per_task_counts['total_cwe_count'] * 100).round(2)
cwe_per_task_counts = cwe_per_task_counts.drop(columns=['total_cwe_count'])

print("\nDetailed CWE Counts per Task (Top 20):")
display(cwe_per_task_counts.head(20))

# Pivot for visualization
cwe_task_pivot = cwe_per_task_counts.pivot(index='cwe_list', columns='task', values='count').fillna(0)
print("\nCWE vs Task Count Matrix (Top 10 CWEs):")
display(cwe_task_pivot.head(10))

# Visualize
fig, axes = plt.subplots(1, 2, figsize=(15, 6))

# Bar chart for total CWE count per task
axes[0].bar(cwe_task_stats['task'], cwe_task_stats['total_cwe_count'], color='skyblue', alpha=0.7)
axes[0].set_title('Total CWE Count per Task')
axes[0].set_xlabel('Task')
axes[0].set_ylabel('Total CWE Count')
axes[0].grid(True, alpha=0.3)

# Bar chart for distinct CWE count per task
axes[1].bar(cwe_task_stats['task'], cwe_task_stats['distinct_cwe_count'], color='orange', alpha=0.7)
axes[1].set_title('Distinct CWE Count per Task')
axes[1].set_xlabel('Task')
axes[1].set_ylabel('Distinct CWE Count')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Additional visualization: Heatmap for top CWEs across tasks
top_cwes = cwe_counts.head(10).index
cwe_task_top = cwe_task_pivot.loc[top_cwes]

# Create percentage pivot
cwe_task_percentage = cwe_task_top.div(cwe_task_stats.set_index('task')['total_cwe_count'], axis=1) * 100

plt.figure(figsize=(12, 8))
sns.heatmap(cwe_task_percentage, annot=True, fmt='.1f', cmap='Blues', cbar_kws={'label': 'Percentage (%)'})
plt.title('Top 10 CWE Percentage Across Tasks')
plt.xlabel('Task')
plt.ylabel('CWE ID')
plt.show()

# Additional visualization: Line plot for progression
plt.figure(figsize=(10, 6))
plt.plot(cwe_task_stats['task'], cwe_task_stats['total_cwe_count'], marker='o', label='Total CWE Count', color='blue')
plt.plot(cwe_task_stats['task'], cwe_task_stats['distinct_cwe_count'], marker='s', label='Distinct CWE Count', color='red')
plt.title('CWE Counts Progression Across Tasks')
plt.xlabel('Task')
plt.ylabel('Count')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

# Analyze CVSS Scores
Compute and analyze CVSS base scores, severities, and vectors to assess vulnerability impacts.

In [None]:
# CVSS Scores Analysis
print("CVSS Scores Analysis")
print("="*50)

if 'Base Score' in all_data.columns:
    print("Base Score Statistics:")
    display(all_data['Base Score'].describe())
    
    # Severity distribution
    if 'Base Severity' in all_data.columns:
        severity_counts = all_data['Base Severity'].value_counts()
        print("\nSeverity Distribution:")
        display(severity_counts)
        
        # Visualize
        fig, axes = plt.subplots(1, 2, figsize=(15, 6))
        
        # Histogram of Base Scores
        all_data['Base Score'].hist(bins=20, ax=axes[0], alpha=0.7, color='green')
        axes[0].set_title('CVSS Base Score Distribution')
        axes[0].set_xlabel('Base Score')
        axes[0].set_ylabel('Frequency')
        
        # Bar chart of Severities
        severity_counts.plot(kind='bar', ax=axes[1], color='red')
        axes[1].set_title('CVSS Severity Distribution')
        axes[1].set_xlabel('Severity')
        axes[1].set_ylabel('Count')
        axes[1].tick_params(axis='x', rotation=45)
        
        plt.tight_layout()
        plt.show()
        
        # Severity vs CWE
        if len(cwe_exploded) > 0:
            severity_cwe = pd.crosstab(cwe_exploded['cwe_list'], all_data.loc[cwe_exploded.index, 'Base Severity'])
            print("\nSeverity vs Top CWEs:")
            display(severity_cwe.head(10))
    else:
        print("Base Severity column not found")
else:
    print("Base Score column not found")

# Visualize Vulnerability Trends
Create plots to visualize trends, such as CVSS scores over time or distributions of CWE types.

In [None]:
# Vulnerability Trends Visualization
print("Vulnerability Trends")
print("="*50)

# Task-wise analysis
task_stats = all_data.groupby('task').agg({
    'code_length': 'mean',
    'desc_length': 'mean',
    'Base Score': 'mean'
}).round(2)
print("Task-wise Statistics:")
display(task_stats)

# Visualize task progression
fig, axes = plt.subplots(2, 2, figsize=(15, 12))

# Code length by task
if 'code_length' in all_data.columns:
    all_data.groupby('task')['code_length'].mean().plot(kind='line', marker='o', ax=axes[0,0], color='blue')
    axes[0,0].set_title('Average Code Length by Task')
    axes[0,0].set_xlabel('Task')
    axes[0,0].set_ylabel('Average Code Length')

# Description length by task
if 'desc_length' in all_data.columns:
    all_data.groupby('task')['desc_length'].mean().plot(kind='line', marker='o', ax=axes[0,1], color='green')
    axes[0,1].set_title('Average Description Length by Task')
    axes[0,1].set_xlabel('Task')
    axes[0,1].set_ylabel('Average Description Length')

# Base Score by task
if 'Base Score' in all_data.columns:
    all_data.groupby('task')['Base Score'].mean().plot(kind='line', marker='o', ax=axes[1,0], color='red')
    axes[1,0].set_title('Average CVSS Base Score by Task')
    axes[1,0].set_xlabel('Task')
    axes[1,0].set_ylabel('Average Base Score')

# CWE diversity by task
cwe_per_task.plot(kind='line', marker='o', ax=axes[1,1], color='purple')
axes[1,1].set_title('CWE Diversity by Task')
axes[1,1].set_xlabel('Task')
axes[1,1].set_ylabel('Unique CWE Count')

plt.tight_layout()
plt.show()

# Time-based analysis if commit_time exists
if 'commit_time' in all_data.columns and all_data['commit_time'].notna().sum() > 0:
    all_data['year'] = all_data['commit_time'].dt.year
    yearly_stats = all_data.groupby('year').agg({
        'Base Score': 'mean',
        'code_length': 'mean'
    })
    
    plt.figure(figsize=(12, 6))
    yearly_stats.plot(kind='line', marker='o')
    plt.title('Vulnerability Trends Over Time')
    plt.xlabel('Year')
    plt.ylabel('Average Values')
    plt.legend(['CVSS Base Score', 'Code Length'])
    plt.grid(True, alpha=0.3)
    plt.show()
else:
    print("No valid commit_time data for time-based analysis")

# Correlation heatmap
corr_features = ['task', 'code_length', 'desc_length', 'Base Score']
corr_matrix = all_data[corr_features].corr()
plt.figure(figsize=(8, 6))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, fmt='.2f')
plt.title('Correlation Matrix of Key Features')
plt.show()

print("\nEDA Complete! This analysis provides comprehensive insights for continual learning vulnerability classification.")