# A Fuzzy Case Based Reasoning Approach To Value Engineering

The paper presents three main algorithms to improve **case retrieval**, **fuzzy clustering**, and **case adaptation**. Below, I provide a structured explanation of each algorithm with the necessary mathematical formulations.

## Traffic Accidents DataSet :
### About Dataset:
### 🌍 **Dataset Description**

This dataset contains data aimed at predicting the likelihood and severity of traffic accidents based on a variety of factors, including weather conditions, road types, traffic density, driver behavior, and more. It includes features such as weather, road conditions, time of day, vehicle types, driver age, and experience, providing a comprehensive view of the variables that contribute to traffic accidents.

### <font color=''>📊 **Columns:**</font>
- <font color='brown'>**Weather:**</font> The impact of weather conditions on the likelihood of accidents.

    - **Clear:** No adverse weather conditions.
    - **Rainy:** Rainy conditions increase the chance of accidents.
    - **Foggy:** Foggy conditions reduce visibility, increasing accident chances.
    - **Snowy:** Snow can cause slippery roads and higher accident probability.
    - **Stormy:** Stormy weather can create hazardous driving conditions.

- <font color='brown'>**Road_Type:**</font> The type of road, influencing the probability of accidents.

    - **Highway:** High-speed roads with higher chances of severe accidents.
    - **City Road:** Roads within city limits, typically with more traffic and lower speeds.
    - **Rural Road:** Roads outside urban areas, often with fewer vehicles and lower speeds.
    - **Mountain Road:** Roads with curves and elevation changes, increasing accident risk.

- <font color='brown'>**Time_of_Day:**</font> The time of day when the accident occurs.

    - **Morning:** The period between sunrise and noon.
    - **Afternoon:** The period between noon and evening.
    - **Evening:** The period just before sunset.
    - **Night:** The nighttime, often associated with reduced visibility and higher risk.

- <font color='brown'>**Traffic_Density:**</font> The level of traffic on the road.

    - **0:** Low density (few vehicles).
    - **1:** Moderate density.
    - **2:** High density (many vehicles).

- <font color='brown'>**Speed_Limit:**</font> The maximum allowed speed on the road.

- <font color='brown'>**Number_of_Vehicles:**</font> The number of vehicles involved in the accident, ranging from 1 to 5.

- <font color='brown'>**Driver_Alcohol:**</font> Whether the driver consumed alcohol.

    - **0:** No alcohol consumption.
    - **1:** Alcohol consumption (which increases the likelihood of an accident).

- <font color='brown'>**Accident_Severity:**</font> The severity of the accident.

    - **Low:** Minor accident.
    - **Moderate:** Moderate accident with some damage or injuries.
    - **High:** Severe accident with significant damage or injuries.

- <font color='brown'>**Road_Condition:**</font> The condition of the road surface.

    - **Dry:** Dry roads with minimal risk.
    - **Wet:** Wet roads due to rain, increasing the risk of accidents.
    - **Icy:** Ice on the road, significantly increasing the risk of accidents.
    - **Under Construction:** Roads under construction, which may have obstacles or poor road quality.

- <font color='brown'>**Vehicle_Type:**</font> The type of vehicle involved in the accident.

    - **Car:** A regular passenger car.
    - **Truck:** A large vehicle used for transporting goods.
    - **Motorcycle:** A two-wheeled motor vehicle.
    - **Bus:** A large vehicle used for public transportation.

- <font color='brown'>**Driver_Age:**</font> The age of the driver. Values range from 18 to 70 years old.

- <font color='brown'>**Driver_Experience:**</font> The years of experience the driver has. Values range from 0 to 50 years of experience.

- <font color='brown'>**Road_Light_Condition:**</font> The lighting conditions on the road.

    - **Daylight:** Daytime, when visibility is typically good.
    - **Artificial Light:** Road is illuminated with streetlights.
    - **No Light:** Road is not illuminated, typically during the night in poorly lit areas.


## Step 1: Understanding the Problem

### 1.1 Define the Objective
- The goal is to predict Accident Severity based on multiple influencing factors such as Weather, Road Type, Traffic Density, Driver Alcohol Consumption, and more.
- The dataset contains a mix of:
    - Categorical Variables (e.g., Weather, Road Type).
    - Numerical Variables (e.g., Speed Limit, Driver Age).
    - Binary Variables (e.g., Driver Alcohol Consumption).

To handle uncertainties and linguistic data, we will use Fuzzy CBR, which:

- Represents uncertain information using fuzzy logic.
- Retrieves past cases efficiently using fuzzy clustering.
- Adapts retrieved cases for improved predictions.

### 1.2 Dataset Overview

| Feature              | Type              | Description |
|----------------------|------------------|------------------------------------------------------------|
| Weather             | Categorical       | Impact of weather on accidents (Clear, Rainy, Foggy, Snowy, Stormy) |
| Road_Type          | Categorical       | Type of road (Highway, City Road, Rural Road, Mountain Road) |
| Time_of_Day        | Categorical       | Time of day (Morning, Afternoon, Evening, Night) |
| Traffic_Density    | Numeric (0-2)     | 0 = Low, 1 = Moderate, 2 = High |
| Speed_Limit       | Numeric (km/h)    | Maximum speed allowed |
| Number_of_Vehicles | Numeric (1-5)     | Number of vehicles involved |
| Driver_Alcohol    | Binary (0,1)      | 0 = No Alcohol, 1 = Alcohol |
| Accident_Severity | Categorical       | Severity (Low, Moderate, High) |
| Road_Condition    | Categorical       | Road condition (Dry, Wet, Icy, Under Construction) |
| Vehicle_Type      | Categorical       | Type of vehicle (Car, Truck, Motorcycle, Bus) |
| Driver_Age       | Numeric (18-70)   | Age of the driver |
| Driver_Experience | Numeric (0-50)    | Years of driving experience |
| Road_Light_Condition | Categorical | Lighting condition (Daylight, Artificial Light, No Light) |

### 1.3 What Makes This a Fuzzy Problem?

Some features involve linguistic values (e.g., "Rainy," "Night"), which are best handled using Fuzzy Sets.
Other features have uncertainty (e.g., "Accident Severity"), which should be represented fuzzily.

### 1.4 Why Use Fuzzy Case-Based Reasoning (CBR)?

✅ Handles uncertainty in weather, road type, and driver behavior.

✅ Finds similar past cases using fuzzy similarity.

✅ Learns from experience without requiring complex model training.


## Step 2: Data Preprocessing
### 1. Convert Categorical Features to Fuzzy Sets

Some features (like Weather, Road Type, Road Condition) are qualitative and should be converted into fuzzy sets. Example:

In [None]:
import numpy as np
import pandas as pd
from scipy.spatial.distance import cdist
from sklearn.preprocessing import MinMaxScaler
from sklearn.impute import SimpleImputer
from typing import List, Tuple, Dict, Union
from sklearn.metrics import mean_squared_error
import random
from datetime import datetime

class TFN:
    """Triangular Fuzzy Number class with linguistic term support"""
    def __init__(self, left: float, center: float, right: float, linguistic_term: str = None):
        self.left = left
        self.center = center
        self.right = right
        self.linguistic_term = linguistic_term
    
    def alpha_cut(self, alpha: float) -> Tuple[float, float]:
        """Get alpha-cut interval of the fuzzy number"""
        left_alpha = self.left + alpha * (self.center - self.left)
        right_alpha = self.right - alpha * (self.right - self.center)
        return (left_alpha, right_alpha)

class AccidentCase:
    """Represents a traffic accident case with fuzzy features"""
    def __init__(self, features: Dict[str, Union[TFN, str]], severity: str, case_id: str):
        self.features = features
        self.severity = severity
        self.case_id = case_id
        self.cluster_membership = None

class SafetyRule:
    def __init__(self, condition: callable, recommendation: str, priority: int):
        self.condition = condition
        self.recommendation = recommendation
        self.priority = priority

class Solution:
    def __init__(self, severity: str, recommendations: List[str], confidence: float):
        self.severity = severity  # Now a TFN object
        self.recommendations = recommendations
        self.confidence = confidence
        self.timestamp = datetime.now()
        self.feedback = None

class FuzzyKMeans:
    """Implementation of Fuzzy K-Means (Fuzzy C-Means) clustering algorithm"""
    def __init__(self, n_clusters=5, m=2, max_iter=100, tol=1e-5):
        """
        Initialize Fuzzy K-Means
        
        Args:
            n_clusters: Number of clusters
            m: Fuzziness parameter (m > 1)
            max_iter: Maximum number of iterations
            tol: Convergence tolerance
        """
        self.n_clusters = n_clusters
        self.m = m  # Fuzziness coefficient
        self.max_iter = max_iter
        self.tol = tol # Tolerance for convergence
        self.cluster_centers_ = None
        self.membership_matrix_ = None
        
    def fit_predict(self, X):
        """
        Fit the Fuzzy K-Means model to data and return cluster memberships
        
        Args:
            X: Input data (n_samples, n_features)
            
        Returns:
            Hard cluster assignments based on max membership
        """
        X = np.array(X)
        n_samples = X.shape[0]
        
        # Initialize membership matrix randomly
        self.membership_matrix_ = np.random.dirichlet(np.ones(self.n_clusters), n_samples) # Random initialization that have the sum = 1
        
        for iteration in range(self.max_iter):
            # Calculate cluster centers
            membership_powered = self.membership_matrix_ ** self.m
            self.cluster_centers_ = np.dot(membership_powered.T, X) / np.sum(membership_powered, axis=0)[:, None]
            
            # Calculate distances
            distances = cdist(X, self.cluster_centers_)
            
            # Handle zero distances (point exactly at center)
            distances = np.maximum(distances, np.finfo(float).eps)
            
            # Update membership matrix
            distance_power = 2 / (self.m - 1)
            new_membership = np.zeros((n_samples, self.n_clusters))
            
            for i in range(n_samples):
                for j in range(self.n_clusters):
                    new_membership[i, j] = 1 / np.sum((distances[i, j] / distances[i, :]) ** distance_power)
            
            # Check convergence
            if np.linalg.norm(new_membership - self.membership_matrix_) < self.tol:
                break
                
            self.membership_matrix_ = new_membership
            
        # Return hard cluster assignments (for compatibility with original KMeans)
        return np.argmax(self.membership_matrix_, axis=1)

class TrafficAccidentFuzzyCBR:
    
    def __init__(self, num_clusters: int = 3):
        self.case_library = []
        self.num_clusters = num_clusters
        self.clusters = None
        self.feature_weights = self._initialize_feature_weights()
        self.linguistic_mappings = self._initialize_linguistic_mappings()
        self.numerical_imputer = SimpleImputer(strategy='mean')
        self.categorical_imputer = SimpleImputer(strategy='most_frequent') # Most frequent
        self.scaler = MinMaxScaler()
        self.min_similarity_threshold = 0.01  # Add minimum similarity threshold
        self.safety_rules = self._initialize_safety_rules()
        self.solution_history = []
        self.validation_metrics = {
            'mse': [],
            'accuracy': [],
            'recommendation_effectiveness': []
        }
        # Initialize fuzzy k-means 
        self.fuzzy_kmeans = FuzzyKMeans(n_clusters=num_clusters, m=2)
        
    def _initialize_feature_weights(self) -> Dict[str, float]:
        """Initialize feature weights based on domain knowledge"""
        return {
            # Adjusted weights based on importance
            'Weather': 0.15, 
            'Road_Type': 0.1,
            'Time_of_Day': 0.05,
            'Traffic_Density': 0.1,
            'Speed_Limit': 0.1,
            'Driver_Alcohol': 0.15,
            'Road_Condition': 0.1,
            'Vehicle_Type': 0.05,
            'Driver_Age': 0.1,
            'Driver_Experience': 0.05,
            'Road_Light_Condition': 0.05
        }
    
    def _initialize_linguistic_mappings(self) -> Dict[str, Dict[str, TFN]]:
        """Initialize fuzzy linguistic terms for categorical features"""
        mappings = {
            'Weather': {
                'Clear': TFN(0, 0, 0.2, 'Clear'),
                'Rainy': TFN(0.2, 0.4, 0.6, 'Rainy'),
                'Foggy': TFN(0.4, 0.6, 0.8, 'Foggy'),
                'Snowy': TFN(0.6, 0.8, 1.0, 'Snowy'),
                'Stormy': TFN(0.8, 1.0, 1.0, 'Stormy')
            },
            'Traffic_Density': {
                'Low': TFN(0, 0, 0.4, 'Low'),
                'Moderate': TFN(0.3, 0.5, 0.7, 'Moderate'),
                'High': TFN(0.6, 1.0, 1.0, 'High')
            },
            'Road_Condition': {
                'Dry': TFN(0, 0, 0.3, 'Dry'),
                'Wet': TFN(0.2, 0.5, 0.8, 'Wet'),
                'Icy': TFN(0.7, 0.9, 1.0, 'Icy'),
                'Under Construction': TFN(0.5, 0.7, 0.9, 'Under Construction')
            },
            'Road_Type': {
                'Highway': TFN(0.7, 0.9, 1.0, 'Highway'),
                'City Road': TFN(0.3, 0.5, 0.7, 'City Road'),
                'Rural Road': TFN(0.2, 0.4, 0.6, 'Rural Road'),
                'Mountain Road': TFN(0.5, 0.7, 0.9, 'Mountain Road')
            },
            'Time_of_Day': {
                'Morning': TFN(0.2, 0.4, 0.6, 'Morning'),
                'Afternoon': TFN(0.3, 0.5, 0.7, 'Afternoon'),
                'Evening': TFN(0.5, 0.7, 0.9, 'Evening'),
                'Night': TFN(0.7, 0.9, 1.0, 'Night')
            },
            'Vehicle_Type': {
                'Car': TFN(0.2, 0.4, 0.6, 'Car'),
                'Truck': TFN(0.5, 0.7, 0.9, 'Truck'),
                'Motorcycle': TFN(0.6, 0.8, 1.0, 'Motorcycle'),
                'Bus': TFN(0.4, 0.6, 0.8, 'Bus')
            },
            'Road_Light_Condition': {
                'Daylight': TFN(0, 0.2, 0.4, 'Daylight'),
                'Artificial Light': TFN(0.3, 0.5, 0.7, 'Artificial Light'),
                'No Light': TFN(0.7, 0.9, 1.0, 'No Light')
            }
        }
        return mappings
    
    def preprocess_data(self, data: pd.DataFrame) -> List[AccidentCase]:
        """Preprocess raw data into fuzzy cases with handling for missing values"""
        cases = []
         # Create copies to avoid modifying original data
        data_numerical = data.copy()
        data_categorical = data.copy()
        
        # Define feature groups
        numerical_features = ['Speed_Limit', 'Driver_Age', 'Driver_Experience', 'Driver_Alcohol','Number_of_Vehicles']
        categorical_features = ['Weather', 'Road_Type', 'Time_of_Day', 'Traffic_Density',
                            'Road_Condition', 'Vehicle_Type', 'Road_Light_Condition', 'Accident_Severity','Accident']
        
        # Handle missing values
        data_numerical[numerical_features] = self.numerical_imputer.fit_transform(data_numerical[numerical_features])
        data_categorical[categorical_features] = self.categorical_imputer.fit_transform(data_categorical[categorical_features])
        
        # Scale numerical features
        data_numerical[numerical_features] = self.scaler.fit_transform(data_numerical[numerical_features])
        
        for idx in range(len(data)):
            features = {}
            
            # Process numerical features into TFNs
            for feature in numerical_features:
                value = data_numerical.iloc[idx][feature]
                features[feature] = TFN(
                    max(0, value - 0.1),
                    value,
                    min(1, value + 0.1)
                )
            
            # Process categorical features using linguistic mappings
            for feature in categorical_features:
                value = data_categorical.iloc[idx][feature]
                if feature in self.linguistic_mappings and value in self.linguistic_mappings[feature]:
                    features[feature] = self.linguistic_mappings[feature][value]
            
            # Handle Driver_Alcohol as binary TFN
            alcohol_value = float(data.iloc[idx]['Driver_Alcohol'])
            features['Driver_Alcohol'] = TFN(
                alcohol_value,
                alcohol_value,
                alcohol_value
            )
            
            case = AccidentCase(
                features=features,
                severity=data.iloc[idx]['Accident_Severity'],
                case_id=f"case_{idx}"
            )
            cases.append(case)
            
        return cases
    
    def cluster_cases(self):
        """Perform fuzzy clustering on cases using Fuzzy K-Means"""
        if len(self.case_library) < self.num_clusters:
            return
            
        # Extract feature centers for clustering
        feature_names = list(self.feature_weights.keys())
        centers = []
        
        for case in self.case_library:
            case_features = []
            for feature in feature_names:
                if feature in case.features:
                    case_features.append(case.features[feature].center)
                else:
                    case_features.append(0.0)  # Default value for missing features
            centers.append(case_features)
            
        centers = np.array(centers)
        
        # Perform fuzzy clustering
        cluster_labels = self.fuzzy_kmeans.fit_predict(centers)
        
        # Get membership matrix directly from fuzzy k-means
        memberships = self.fuzzy_kmeans.membership_matrix_
        
        # Assign memberships to cases
        for case, membership in zip(self.case_library, memberships):
            case.cluster_membership = membership
    
    def calculate_case_similarity(self, case1: AccidentCase, case2: AccidentCase) -> float:
        """Calculate weighted similarity between two cases with minimum threshold"""
        total_similarity = 0
        total_weight = 0
        
        for feature_name, weight in self.feature_weights.items():
            if feature_name in case1.features and feature_name in case2.features:
                try:
                    distance = self.wasserstein_distance(
                        case1.features[feature_name],
                        case2.features[feature_name]
                    )
                    # Add small epsilon to prevent division by zero
                    similarity = 1 / (1 + distance + 1e-10)
                    total_similarity += weight * similarity
                    total_weight += weight
                except Exception as e:
                    print(f"Error calculating similarity for feature {feature_name}: {str(e)}")
                    continue
        
        if total_weight > 0:
            final_similarity = total_similarity / total_weight
            # Apply minimum threshold
            return max(final_similarity, self.min_similarity_threshold)
        return self.min_similarity_threshold
    
    def add_case(self, case: AccidentCase):
        """Add new case to library and update clusters"""
        self.case_library.append(case)
        if len(self.case_library) >= self.num_clusters:
            self.cluster_cases()

    def retrieve_similar_cases(self, query_case: AccidentCase, k: int = 5) -> List[Tuple[AccidentCase, float]]:
        """Retrieve k most similar cases with error handling"""
        similarities = []
        
        for case in self.case_library:
            try:
                similarity = self.calculate_case_similarity(query_case, case)
                if similarity > self.min_similarity_threshold:
                    similarities.append((case, similarity))
            except Exception as e:
                print(f"Error calculating similarity for case {case.case_id}: {str(e)}")
                continue
        
        # Sort by similarity and ensure we have at least one case
        similarities.sort(key=lambda x: x[1], reverse=True)
        
        # If no similar cases found, return empty list
        if not similarities:
            print("Warning: No similar cases found above similarity threshold")
            return []
            
        return similarities[:k]

    def wasserstein_distance(self, tfn1: TFN, tfn2: TFN) -> float:
        """Calculate Wasserstein-based distance between two TFNs"""
        alphas = np.linspace(0, 1, 50)
        total_distance = 0
        
        for alpha in alphas:
            a1, a2 = tfn1.alpha_cut(alpha)
            b1, b2 = tfn2.alpha_cut(alpha)
            d_alpha = ((a1 - b1)**2 + (a2 - b2)**2) / 2
            total_distance += d_alpha
            
        return np.sqrt(total_distance / len(alphas))
    
    def _initialize_safety_rules(self) -> List[SafetyRule]:
        """Initialize comprehensive safety rules for accident prevention"""
        rules = [
            # Weather-related rules
            SafetyRule(
                lambda case: (
                    'Weather' in case.features and
                    hasattr(case.features['Weather'], 'linguistic_term') and
                    case.features['Weather'].linguistic_term in ['Rainy', 'Snowy'] and
                    'Speed_Limit' in case.features and
                    case.features['Speed_Limit'].center > 0.6
                ),
                "Reduce speed by 30% in adverse weather conditions",
                1
            ),
            SafetyRule(
                lambda case: (
                    'Weather' in case.features and
                    hasattr(case.features['Weather'], 'linguistic_term') and
                    case.features['Weather'].linguistic_term == 'Foggy'
                ),
                "Activate fog lights and maintain increased following distance",
                1
            ),
            
            # Driver condition rules
            SafetyRule(
                lambda case: (
                    'Driver_Alcohol' in case.features and
                    isinstance(case.features['Driver_Alcohol'], TFN) and
                    case.features['Driver_Alcohol'].center > 0
                ),
                "Immediate suspension of driving privileges and mandatory safety course",
                1
            ),
            SafetyRule(
                lambda case: (
                    'Driver_Experience' in case.features and
                    isinstance(case.features['Driver_Experience'], TFN) and
                    case.features['Driver_Experience'].center < 0.3
                ),
                "Additional driver training and supervised driving hours required",
                2
            ),
            
            # Visibility and lighting rules
            SafetyRule(
                lambda case: (
                    'Time_of_Day' in case.features and
                    'Road_Light_Condition' in case.features and
                    hasattr(case.features['Time_of_Day'], 'linguistic_term') and
                    hasattr(case.features['Road_Light_Condition'], 'linguistic_term') and
                    case.features['Time_of_Day'].linguistic_term == 'Night' and
                    case.features['Road_Light_Condition'].linguistic_term == 'No Light'
                ),
                "Install additional road lighting and reflective markers",
                2
            ),
            SafetyRule(
                lambda case: (
                    'Time_of_Day' in case.features and
                    hasattr(case.features['Time_of_Day'], 'linguistic_term') and
                    case.features['Time_of_Day'].linguistic_term in ['Evening', 'Night']
                ),
                "Ensure all vehicle lights are functional and use high-visibility clothing",
                2
            ),
            
            # Road condition rules
            SafetyRule(
                lambda case: (
                    'Road_Condition' in case.features and
                    hasattr(case.features['Road_Condition'], 'linguistic_term') and
                    case.features['Road_Condition'].linguistic_term in ['Wet', 'Icy']
                ),
                "Implement dynamic speed limits based on road conditions",
                2
            ),
            SafetyRule(
                lambda case: (
                    'Road_Condition' in case.features and
                    hasattr(case.features['Road_Condition'], 'linguistic_term') and
                    case.features['Road_Condition'].linguistic_term == 'Under Construction'
                ),
                "Install temporary traffic control devices and warning signs",
                2
            ),
            
            # Traffic management rules
            SafetyRule(
                lambda case: (
                    'Traffic_Density' in case.features and
                    'Road_Type' in case.features and
                    hasattr(case.features['Traffic_Density'], 'linguistic_term') and
                    hasattr(case.features['Road_Type'], 'linguistic_term') and
                    case.features['Traffic_Density'].linguistic_term == 'High' and
                    case.features['Road_Type'].linguistic_term == 'Highway'
                ),
                "Consider implementing smart traffic management systems",
                3
            ),
            SafetyRule(
                lambda case: (
                    'Traffic_Density' in case.features and
                    hasattr(case.features['Traffic_Density'], 'linguistic_term') and
                    case.features['Traffic_Density'].linguistic_term == 'High'
                ),
                "Implement real-time traffic monitoring and alert systems",
                3
            ),
            
            # Vehicle type specific rules
            SafetyRule(
                lambda case: (
                    'Vehicle_Type' in case.features and
                    hasattr(case.features['Vehicle_Type'], 'linguistic_term') and
                    case.features['Vehicle_Type'].linguistic_term == 'Motorcycle'
                ),
                "Enforce mandatory protective gear and lane splitting regulations",
                2
            ),
            SafetyRule(
                lambda case: (
                    'Vehicle_Type' in case.features and
                    hasattr(case.features['Vehicle_Type'], 'linguistic_term') and
                    case.features['Vehicle_Type'].linguistic_term in ['Truck', 'Bus']
                ),
                "Implement regular vehicle maintenance checks and driver rest periods",
                2
            ),
            
            # Road type specific rules
            SafetyRule(
                lambda case: (
                    'Road_Type' in case.features and
                    hasattr(case.features['Road_Type'], 'linguistic_term') and
                    case.features['Road_Type'].linguistic_term == 'Mountain Road'
                ),
                "Install guard rails and warning signs for steep grades and curves",
                2
            ),
            SafetyRule(
                lambda case: (
                    'Road_Type' in case.features and
                    'Weather' in case.features and
                    hasattr(case.features['Road_Type'], 'linguistic_term') and
                    hasattr(case.features['Weather'], 'linguistic_term') and
                    case.features['Road_Type'].linguistic_term == 'Rural Road' and
                    case.features['Weather'].linguistic_term in ['Rainy', 'Snowy', 'Foggy']
                ),
                "Implement weather monitoring systems and emergency response protocols",
                2
            ),
            
            # Age-related rules
            SafetyRule(
                lambda case: (
                    'Driver_Age' in case.features and
                    isinstance(case.features['Driver_Age'], TFN) and
                    case.features['Driver_Age'].center > 0.8
                ),
                "Regular vision and reaction time assessments for elderly drivers",
                2
            ),
            SafetyRule(
                lambda case: (
                    'Driver_Age' in case.features and
                    isinstance(case.features['Driver_Age'], TFN) and
                    case.features['Driver_Age'].center < 0.3
                ),
                "Implement graduated licensing system for young drivers",
                2
            ),
            
            # Combination rules
            SafetyRule(
                lambda case: (
                    'Speed_Limit' in case.features and
                    'Traffic_Density' in case.features and
                    isinstance(case.features['Speed_Limit'], TFN) and
                    hasattr(case.features['Traffic_Density'], 'linguistic_term') and
                    case.features['Speed_Limit'].center > 0.7 and
                    case.features['Traffic_Density'].linguistic_term in ['Moderate', 'High']
                ),
                "Install speed cameras and variable speed limit signs",
                2
            ),
            SafetyRule(
                lambda case: (
                    'Road_Type' in case.features and
                    'Time_of_Day' in case.features and
                    hasattr(case.features['Road_Type'], 'linguistic_term') and
                    hasattr(case.features['Time_of_Day'], 'linguistic_term') and
                    case.features['Road_Type'].linguistic_term == 'City Road' and
                    case.features['Time_of_Day'].linguistic_term in ['Evening', 'Night']
                ),
                "Enhance street lighting and pedestrian crossing visibility",
                2
            )
        ]
        return rules
    
    def generate_recommendations(self, case: AccidentCase) -> List[str]:
        """Generate safety recommendations based on case features with detailed debugging"""
        applicable_rules = []
        
        print("\nDebug: Checking case features:")
        for feature_name, feature_value in case.features.items():
            print(f"Feature: {feature_name}")
            print(f"Value: {feature_value.__dict__ if hasattr(feature_value, '__dict__') else feature_value}")
            
        print("\nDebug: Evaluating safety rules:")
        for i, rule in enumerate(self.safety_rules):
            print(f"\nChecking rule {i + 1}:")
            try:
                # Check each condition part separately for better debugging
                if feature_name == 'Weather':
                    has_weather = 'Weather' in case.features
                    has_linguistic = hasattr(case.features.get('Weather', {}), 'linguistic_term')
                    weather_value = case.features.get('Weather', {}).linguistic_term if has_linguistic else None
                    print(f"Weather check - Has feature: {has_weather}, Has linguistic: {has_linguistic}, Value: {weather_value}")
                
                if feature_name == 'Driver_Alcohol':
                    has_alcohol = 'Driver_Alcohol' in case.features
                    is_tfn = isinstance(case.features.get('Driver_Alcohol', {}), TFN)
                    alcohol_value = case.features.get('Driver_Alcohol', {}).center if is_tfn else None
                    print(f"Alcohol check - Has feature: {has_alcohol}, Is TFN: {is_tfn}, Value: {alcohol_value}")
                
                # Evaluate the full rule
                result = rule.condition(case)
                print(f"Rule evaluation result: {result}")
                
                if result:
                    applicable_rules.append((rule.recommendation, rule.priority))
                    print(f"Rule {i + 1} is applicable. Recommendation: {rule.recommendation}")
                
            except Exception as e:
                print(f"Error evaluating rule {i + 1}: {str(e)}")
                continue
        
        # Sort by priority and return recommendations
        applicable_rules.sort(key=lambda x: x[1])
        recommendations = [rule[0] for rule in applicable_rules]
        
        print("\nDebug: Final recommendations:")
        if recommendations:
            for i, rec in enumerate(recommendations, 1):
                print(f"{i}. {rec}")
        else:
            print("No recommendations generated")
        
        return recommendations

    def predict_severity_fuzzy(self, similar_cases: List[Tuple[AccidentCase, float]]) -> TFN:
        """Predict accident severity as a fuzzy number using weighted voting"""
        if not similar_cases:
            return TFN(0, 0.5, 1, "Unknown")  # Default fuzzy number for unknown cases
        
        # Initialize fuzzy values for each severity level
        severity_mappings = {
            'Low': TFN(0, 0.2, 0.4, 'Low'),
            'Moderate': TFN(0.3, 0.5, 0.7, 'Moderate'),
            'High': TFN(0.6, 0.8, 1.0, 'High')
        }
        
        # Calculate weighted contributions
        total_similarity = sum(sim for _, sim in similar_cases)
        if total_similarity < self.min_similarity_threshold:
            return TFN(0, 0.5, 1, "Unknown")
        
        # Initialize aggregation values
        left_sum = 0
        center_sum = 0
        right_sum = 0
        
        # Aggregate fuzzy values based on similarities
        for case, similarity in similar_cases:
            weight = similarity / total_similarity
            if case.severity in severity_mappings:
                fuzzy_val = severity_mappings[case.severity]
                left_sum += weight * fuzzy_val.left
                center_sum += weight * fuzzy_val.center
                right_sum += weight * fuzzy_val.right
        
        # Create final fuzzy prediction
        fuzzy_prediction = TFN(left_sum, center_sum, right_sum)
        
        # Determine linguistic term based on center value
        if center_sum <= 0.3:
            linguistic_term = 'Low'
        elif center_sum <= 0.6:
            linguistic_term = 'Moderate'
        else:
            linguistic_term = 'High'
        
        fuzzy_prediction.linguistic_term = linguistic_term
        return fuzzy_prediction

    def predict_accident(self, query_features: Dict[str, Union[float, str]]) -> Solution:
        """Enhanced prediction method with fuzzy severity prediction"""
        try:
            # Process features and create query case
            processed_features = {}
            
            for feature, value in query_features.items():
                try:
                    feature_key = feature.replace(' ', '_')
                    if feature_key in self.linguistic_mappings and str(value) in self.linguistic_mappings[feature_key]:
                        processed_features[feature_key] = self.linguistic_mappings[feature_key][str(value)]
                    elif isinstance(value, (int, float)):
                        if feature_key in ['Speed_Limit', 'Driver_Age', 'Driver_Experience']:
                            scaled_value = self.scaler.fit_transform([[value]])[0][0]
                            processed_features[feature_key] = TFN(
                                max(0, scaled_value - 0.1),
                                scaled_value,
                                min(1, scaled_value + 0.1)
                            )
                        else:
                            processed_features[feature_key] = TFN(
                                max(0, float(value) - 0.1),
                                float(value),
                                min(1, float(value) + 0.1)
                            )
                except Exception as e:
                    print(f"Warning: Error processing feature {feature}: {str(e)}")
                    continue
            
            if not processed_features:
                return Solution(TFN(0, 0.5, 1, "Unknown"), [], 0.0)
            
            query_case = AccidentCase(
                features=processed_features,
                severity=None,
                case_id='query'
            )
            
            # Get similar cases and predict fuzzy severity
            similar_cases = self.retrieve_similar_cases(query_case)
            print(f"\nFound {len(similar_cases)} similar cases")
            
            fuzzy_severity = self.predict_severity_fuzzy(similar_cases)
            recommendations = self.generate_recommendations(query_case)
            
            # Calculate confidence based on similar cases
            confidence = max([sim for _, sim in similar_cases]) if similar_cases else 0.5
            
            print("\nFuzzy Prediction Details:")
            print(f"Severity Range: Low :{fuzzy_severity.left:.2f}\nModerate: {fuzzy_severity.center:.2f}\nHight:  {fuzzy_severity.right:.2f}")
            print(f"Predicted Severity: {fuzzy_severity.linguistic_term}")
            print(f"Confidence: {confidence:.2f}")
            
            return Solution(fuzzy_severity, recommendations, confidence)
                
        except Exception as e:
            print(f"Error in prediction process: {str(e)}")
            return Solution(TFN(0, 0.5, 1, "Unknown"), [], 0.0)

    def calculate_mse(self, true_severities: List[str], predicted_severities: List[str]) -> float:
        """Calculate Mean Squared Error for severity predictions with proper error handling"""
        severity_map = {'Low': 0, 'Moderate': 1, 'High': 2}
        try:
            true_values = [severity_map.get(s, 1) for s in true_severities]  # Default to 1 if unknown
            pred_values = [severity_map.get(s, 1) for s in predicted_severities]  # Default to 1 if unknown
            return mean_squared_error(true_values, pred_values)
        except Exception as e:
            print(f"Error calculating MSE: {str(e)}")
            return 0.0

    def update_validation_metrics(self, feedback: Dict[str, Union[str, float]]):
        """Update validation metrics with proper initialization and calculation"""
        # Initialize metrics if they don't exist
        if not self.validation_metrics['mse']:
            self.validation_metrics['mse'] = [0.0]
        if not self.validation_metrics['accuracy']:
            self.validation_metrics['accuracy'] = [0.0]
        
        # Get all solutions including the current one
        true_severities = [s.feedback['actual_severity'] for s in self.solution_history if s.feedback]
        pred_severities = [s.feedback['predicted_severity'] for s in self.solution_history if s.feedback]
        
        # Add current feedback
        true_severities.append(feedback['actual_severity'])
        pred_severities.append(feedback['predicted_severity'])
        
        # Calculate new MSE
        current_mse = self.calculate_mse(true_severities, pred_severities)
        self.validation_metrics['mse'].append(current_mse)
        
        # Calculate new accuracy
        correct_predictions = sum(1 for t, p in zip(true_severities, pred_severities) if t == p)
        current_accuracy = correct_predictions / len(true_severities) if true_severities else 0.0
        self.validation_metrics['accuracy'].append(current_accuracy)

    def retain_solution(self, solution: Solution):
        """Retain solution and update metrics"""
        if solution.feedback:
            # Add to solution history
            self.solution_history.append(solution)
            
            # Update metrics
            self.update_validation_metrics(solution.feedback)
            
            # Print current metrics
            print("\nUpdated Validation Metrics:")
            print(f"Current MSE: {self.validation_metrics['mse'][-1]:.4f}")
            print(f"Current Accuracy: {self.validation_metrics['accuracy'][-1]:.4f}")
            
            return True
        return False

    def revise_solution(self, solution: Solution, feedback: Dict[str, Union[str, float]]) -> Solution:
        """Revise solution with feedback and update feature weights"""
        solution.feedback = feedback
        
        # Update feature weights based on prediction accuracy
        if 'accuracy_score' in feedback:
            weight_adjustment = 0.1 * (feedback['accuracy_score'] - 0.5)
            total_weight = 0
            
            # Adjust weights
            for feature in self.feature_weights:
                self.feature_weights[feature] = max(0.01, min(0.5, 
                    self.feature_weights[feature] + weight_adjustment))
                total_weight += self.feature_weights[feature]
            
            # Normalize weights
            self.feature_weights = {k: v/total_weight for k, v in self.feature_weights.items()}
        
        return solution



In [5]:
print("Fuzzy CBR System for Traffic Accident Severity Prediction\n")
print("Uploading the dataset...")
# Read the dataset
data = pd.read_csv('./MiniProjet/dataset_traffic_accident.csv')

# Define feature groups
numerical_features = ['Speed_Limit', 'Driver_Age', 'Driver_Experience', 'Driver_Alcohol','Number_of_Vehicles']
categorical_features = ['Weather', 'Road_Type', 'Time_of_Day', 'Traffic_Density',
                        'Road_Condition', 'Vehicle_Type', 'Road_Light_Condition', 'Accident_Severity','Accident']

print("handling missing values...")
# Fill missing values in numerical columns with the mean

data.fillna({column: data[column].mean() for column in numerical_features}, inplace=True)

# Fill missing values in the Traffic_Density column with the most frequent value
data['Traffic_Density'] = data['Traffic_Density'].fillna(data['Traffic_Density'].value_counts().index[0])


data.fillna({column: data[column].value_counts().index[0] for column in categorical_features}, inplace=True)

print("Preprocessing the data...")
# Preprocess the data and create fuzzy cases
cbr_system = TrafficAccidentFuzzyCBR()
cases = cbr_system.preprocess_data(data)

##############################################
##############################################
if not cases:
    raise ValueError("No valid cases could be processed from the dataset")
    
print("Adding cases to the CBR system...")
for case in cases:
    cbr_system.add_case(case)


new_case = {
    'Weather': 'Foggy',
    'Road_Type': 'Mountain Road',
    'Time_of_Day': 'Evening',
    'Traffic_Density': 'Moderate',
    'Speed_Limit': 60,
    'Driver_Alcohol': 0.5,
    'Road_Condition': 'Under Construction',
    'Vehicle_Type': 'Motorcycle',
    'Driver_Age': 25,
    'Driver_Experience': 2,
    'Road_Light_Condition': 'Artificial Light'
} # predict the severity of the accident should be moderate

new_case1 = {
    'Weather': 'Clear',
    'Road_Type': 'Highway',
    'Time_of_Day': 'Morning',
    'Traffic_Density': 'Low',
    'Speed_Limit': 100,
    'Driver_Alcohol': 0.0,
    'Road_Condition': 'Dry',
    'Vehicle_Type': 'Car',
    'Driver_Age': 35,
    'Driver_Experience': 5,
    'Road_Light_Condition': 'Daylight'
}

print("Predict Accident...")

# Make prediction and get recommendations
solution = cbr_system.predict_accident(new_case)


Fuzzy CBR System for Traffic Accident Severity Prediction

Uploading the dataset...
handling missing values...
Preprocessing the data...
Adding cases to the CBR system...
Predict Accident...

Found 5 similar cases

Debug: Checking case features:
Feature: Weather
Value: {'left': 0.4, 'center': 0.6, 'right': 0.8, 'linguistic_term': 'Foggy'}
Feature: Road_Type
Value: {'left': 0.5, 'center': 0.7, 'right': 0.9, 'linguistic_term': 'Mountain Road'}
Feature: Time_of_Day
Value: {'left': 0.5, 'center': 0.7, 'right': 0.9, 'linguistic_term': 'Evening'}
Feature: Traffic_Density
Value: {'left': 0.3, 'center': 0.5, 'right': 0.7, 'linguistic_term': 'Moderate'}
Feature: Speed_Limit
Value: {'left': 0, 'center': 0.0, 'right': 0.1, 'linguistic_term': None}
Feature: Driver_Alcohol
Value: {'left': 0.4, 'center': 0.5, 'right': 0.6, 'linguistic_term': None}
Feature: Road_Condition
Value: {'left': 0.5, 'center': 0.7, 'right': 0.9, 'linguistic_term': 'Under Construction'}
Feature: Vehicle_Type
Value: {'left': 0

In [6]:

# Make prediction and get recommendations
solution = cbr_system.predict_accident(new_case)
if solution.severity != "Unknown":
    print(f"Predicted accident severity: {solution.severity}")
    print(f"Confidence: {solution.confidence:.2f}")
    print("\nRecommended safety measures:")
    for i, rec in enumerate(solution.recommendations, 1):
        print(f"{i}. {rec}")
    
    # Example of revision and retention
    feedback = {
        'accuracy_score': 0.8,
        'predicted_severity': solution.severity,
        'actual_severity': 'High',
        'recommendation_effectiveness': 0.9
    }
    
    revised_solution = cbr_system.revise_solution(solution, feedback)
    cbr_system.retain_solution(revised_solution)
    
    # Print validation metrics
    print("\nValidation Metrics:")
    if cbr_system.validation_metrics['mse']:
        print(f"Current MSE: {cbr_system.validation_metrics['mse'][-1]:.4f}")
    if cbr_system.validation_metrics['accuracy']:
        print(f"Current Accuracy: {cbr_system.validation_metrics['accuracy'][-1]:.4f}")
    





Found 5 similar cases

Debug: Checking case features:
Feature: Weather
Value: {'left': 0.4, 'center': 0.6, 'right': 0.8, 'linguistic_term': 'Foggy'}
Feature: Road_Type
Value: {'left': 0.5, 'center': 0.7, 'right': 0.9, 'linguistic_term': 'Mountain Road'}
Feature: Time_of_Day
Value: {'left': 0.5, 'center': 0.7, 'right': 0.9, 'linguistic_term': 'Evening'}
Feature: Traffic_Density
Value: {'left': 0.3, 'center': 0.5, 'right': 0.7, 'linguistic_term': 'Moderate'}
Feature: Speed_Limit
Value: {'left': 0, 'center': 0.0, 'right': 0.1, 'linguistic_term': None}
Feature: Driver_Alcohol
Value: {'left': 0.4, 'center': 0.5, 'right': 0.6, 'linguistic_term': None}
Feature: Road_Condition
Value: {'left': 0.5, 'center': 0.7, 'right': 0.9, 'linguistic_term': 'Under Construction'}
Feature: Vehicle_Type
Value: {'left': 0.6, 'center': 0.8, 'right': 1.0, 'linguistic_term': 'Motorcycle'}
Feature: Driver_Age
Value: {'left': 0, 'center': 0.0, 'right': 0.1, 'linguistic_term': None}
Feature: Driver_Experience
Value

In [7]:

# Make prediction and get recommendations
solution = cbr_system.predict_accident(new_case1)
if solution.severity != "Unknown":
    print(f"Predicted accident severity: {solution.severity}")
    print(f"Confidence: {solution.confidence:.2f}")
    print("\nRecommended safety measures:")
    for i, rec in enumerate(solution.recommendations, 1):
        print(f"{i}. {rec}")
    
    # Example of revision and retention
    feedback = {
        'accuracy_score': 0.8,
        'predicted_severity': solution.severity,
        'actual_severity': 'High',
        'recommendation_effectiveness': 0.9
    }
    
    revised_solution = cbr_system.revise_solution(solution, feedback)
    cbr_system.retain_solution(revised_solution)
    
    # Print validation metrics
    print("\nValidation Metrics:")
    if cbr_system.validation_metrics['mse']:
        print(f"Current MSE: {cbr_system.validation_metrics['mse'][-1]:.4f}")
    if cbr_system.validation_metrics['accuracy']:
        print(f"Current Accuracy: {cbr_system.validation_metrics['accuracy'][-1]:.4f}")
    





Found 5 similar cases

Debug: Checking case features:
Feature: Weather
Value: {'left': 0, 'center': 0, 'right': 0.2, 'linguistic_term': 'Clear'}
Feature: Road_Type
Value: {'left': 0.7, 'center': 0.9, 'right': 1.0, 'linguistic_term': 'Highway'}
Feature: Time_of_Day
Value: {'left': 0.2, 'center': 0.4, 'right': 0.6, 'linguistic_term': 'Morning'}
Feature: Traffic_Density
Value: {'left': 0, 'center': 0, 'right': 0.4, 'linguistic_term': 'Low'}
Feature: Speed_Limit
Value: {'left': 0, 'center': 0.0, 'right': 0.1, 'linguistic_term': None}
Feature: Driver_Alcohol
Value: {'left': 0, 'center': 0.0, 'right': 0.1, 'linguistic_term': None}
Feature: Road_Condition
Value: {'left': 0, 'center': 0, 'right': 0.3, 'linguistic_term': 'Dry'}
Feature: Vehicle_Type
Value: {'left': 0.2, 'center': 0.4, 'right': 0.6, 'linguistic_term': 'Car'}
Feature: Driver_Age
Value: {'left': 0, 'center': 0.0, 'right': 0.1, 'linguistic_term': None}
Feature: Driver_Experience
Value: {'left': 0, 'center': 0.0, 'right': 0.1, 'lin