In [2]:
# Libraries required 
import os
import re
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt 
from typing import List
from pathlib import Path
import logging

In [3]:
# Processing the data
class DatasetProcessor:
    """A class to process HAI datasets and calculate Euclidean distances."""
    
    def __init__(self, base_dir: str, output_dir: str):
        self.base_dir = Path(base_dir)
        self.output_dir = Path(output_dir)
        self._setup_logging()
        
    def _setup_logging(self) -> None:
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
    
    @staticmethod
    def _get_physical_columns(df: pd.DataFrame) -> List[str]:
        return [
            col for col in df.columns
            if not re.search(r'(time|timestamp|attack|Attack)', col, re.IGNORECASE)
        ]
    
    @staticmethod
    def _get_attack_columns(df: pd.DataFrame) -> List[str]:
        return [
            col for col in df.columns
            if re.search(r'attack', col, re.IGNORECASE)
        ]
    
    def filter_attack_data(self, df: pd.DataFrame) -> pd.DataFrame:
        attack_columns = self._get_attack_columns(df)
        if attack_columns:
            return df[(df[attack_columns] == 0).all(axis=1)].copy()
        return df.copy()
    
    def filter_non_numeric_and_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
        physical_columns = self._get_physical_columns(df)
        df[physical_columns] = df[physical_columns].apply(pd.to_numeric, errors='coerce')
        df = df.dropna(subset=physical_columns).copy()
        
        Q1 = df[physical_columns].quantile(0.25)
        Q3 = df[physical_columns].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        return df[~((df[physical_columns] < lower_bound) | 
                   (df[physical_columns] > upper_bound)).any(axis=1)]
    
    def normalize_data(self, df: pd.DataFrame) -> pd.DataFrame:
        physical_columns = self._get_physical_columns(df)
        scaler = MinMaxScaler()
        df[physical_columns] = scaler.fit_transform(df[physical_columns])
        return df
    
    def calculate_distances(self, df: pd.DataFrame) -> np.ndarray:
        physical_columns = self._get_physical_columns(df)
        data = df[physical_columns].values
        differences = np.diff(data, axis=0)
        return np.sqrt(np.sum(differences ** 2, axis=1))
    
    def process_file(self, file_path: Path) -> None:
        try:
            self.logger.info(f"Processing file: {file_path}")
            df = pd.read_csv(file_path)
            df = self.filter_attack_data(df)
            df = self.filter_non_numeric_and_outliers(df)
            df = self.normalize_data(df)
            distances = self.calculate_distances(df)
            
            dataset_version = file_path.parent.name
            output_dir = self.output_dir / dataset_version
            output_dir.mkdir(parents=True, exist_ok=True)
            
            output_file = output_dir / f"{file_path.name}_distances.csv"
            pd.DataFrame(distances, columns=["Euclidean_Distance"]).to_csv(
                output_file, index=False
            )
            
            self.logger.info(f"Saved distances to: {output_file}")
            
        except Exception as e:
            self.logger.error(f"Error processing {file_path}: {str(e)}")
    
    def process_datasets(self) -> None:
        for file_path in self.base_dir.rglob("*"):
            if file_path.is_file() and re.search(r'(test|train)', file_path.name, re.IGNORECASE):
                self.process_file(file_path)

In [4]:
# Generating histograms

# Directory paths
DISTANCES_DIR = os.path.expanduser("~/dev/stuProj/data/distances")

def load_distances():
    # ... existing load_distances function remains the same ...
    train_distances = []
    test_distances = []

    for dataset_version in os.listdir(DISTANCES_DIR):
        version_path = os.path.join(DISTANCES_DIR, dataset_version)
        
        if not os.path.isdir(version_path):
            continue
            
        for file in os.listdir(version_path):
            file_path = os.path.join(version_path, file)
            try:
                if "train" in file.lower():
                    distances = pd.read_csv(file_path)["Euclidean_Distance"].tolist()
                    train_distances.extend(distances)
                    print(f"Loaded train data from: {file_path}")
                elif "test" in file.lower():
                    distances = pd.read_csv(file_path)["Euclidean_Distance"].tolist()
                    test_distances.extend(distances)
                    print(f"Loaded test data from: {file_path}")
            except Exception as e:
                print(f"Error reading file {file_path}: {str(e)}")
    
    return train_distances, test_distances

def plot_histograms_matplotlib(train_distances, test_distances, num_bins=50):
    """
    Create and save histograms using matplotlib
    """
    # Set style to a built-in style
    plt.style.use('ggplot')  # Alternative options: 'classic', 'default', 'bmh', 'fivethirtyeight'
    
    # Create figure with two subplots
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
    
    # Plot train data histogram
    if train_distances:
        ax1.hist(train_distances, bins=num_bins, color='blue', alpha=0.7)
        ax1.set_title('Train Set Euclidean Distance Histogram')
        ax1.set_xlabel('Distance')
        ax1.set_ylabel('Frequency')
        ax1.grid(True, alpha=0.3)
    else:
        ax1.text(0.5, 0.5, 'No train distances found', 
                horizontalalignment='center', verticalalignment='center')
    
    # Plot test data histogram
    if test_distances:
        ax2.hist(test_distances, bins=num_bins, color='green', alpha=0.7)
        ax2.set_title('Test Set Euclidean Distance Histogram')
        ax2.set_xlabel('Distance')
        ax2.set_ylabel('Frequency')
        ax2.grid(True, alpha=0.3)
    else:
        ax2.text(0.5, 0.5, 'No test distances found', 
                horizontalalignment='center', verticalalignment='center')
    
    # Adjust layout and display
    plt.tight_layout()
    
    # Save the plot
    output_dir = os.path.expanduser("~/dev/stuProj/results")
    os.makedirs(output_dir, exist_ok=True)
    plt.savefig(os.path.join(output_dir, 'distance_histograms.png'), dpi=300, bbox_inches='tight')
    print(f"Histograms saved to: {os.path.join(output_dir, 'distance_histograms.png')}")
    
    # Display the plot
    plt.show()

# Usage

1. Update the `BASE_DIR` path to poin to the original dataset on your computer.
2. Set the `OUTPUT_DIR` to where you wish to save the eucliden distances calculated.

In [None]:
# Running the code
BASE_DIR = "path/to/original/data"
OUTPUT_DIR = "path/to/save/distances"

# Running the processor
processor = DatasetProcessor(BASE_DIR, OUTPUT_DIR)
processor.process_datasets()

# Generating histograms
train_distances, test_distances = load_distances()
plot_histograms_matplotlib(train_distances, test_distances)
