### **Dask-Based Random Forest Classifier on US Accident Data**
#### **Introduction:**
This project demonstrates the use of a distributed Random Forest model to classify accident severity using Dask's distributed computing capabilities. We'll:
1. Use a cleaned dataset of US accidents.
2. Preprocess the data.
3. Train multiple Random Forest configurations.
4. Evaluate and visualize the results.


In [1]:
from dask.distributed import Client, LocalCluster
import time
import pandas as pd
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
import dask.dataframe as dd
from dask_ml.model_selection import train_test_split
from dask_ml.preprocessing import MinMaxScaler
from dask_ml.wrappers import ParallelPostFit
import joblib

In [2]:
! ifconfig

em1: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 9000
        inet 10.99.252.133  netmask 255.255.0.0  broadcast 10.99.255.255
        inet6 fe80::425c:fdff:fe81:cbea  prefixlen 64  scopeid 0x20<link>
        ether 40:5c:fd:81:cb:ea  txqueuelen 1000  (Ethernet)
        RX packets 161152277554  bytes 759861724509543 (691.0 TiB)
        RX errors 17891  dropped 0  overruns 0  frame 17891
        TX packets 155857343214  bytes 672134134924612 (611.3 TiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0
        device interrupt 64  memory 0x93000000-937fffff  

lo: flags=73<UP,LOOPBACK,RUNNING>  mtu 65536
        inet 127.0.0.1  netmask 255.0.0.0
        inet6 ::1  prefixlen 128  scopeid 0x10<host>
        loop  txqueuelen 1000  (Local Loopback)
        RX packets 1084864512  bytes 11338178448591 (10.3 TiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 1084864512  bytes 11338178448591 (10.3 TiB)
        TX errors 0  dropped 0 overruns 0  carri

#### 1. Initialize Dask Cluster
This function sets up a dynamic Dask LocalCluster with workers and provides a link to the Dask dashboard for monitoring.

##### Args:
1. n_workers (int): Number of worker processes to start.
2. threads_per_worker (int): Number of threads per worker process.
3. memory_limit (str): Memory limit for each worker process.

##### Returns:
tuple: A tuple containing the Dask client and cluster objects.

In [2]:
def initialize_cluster(n_workers, threads_per_worker, memory_limit):
    cluster = LocalCluster(
        n_workers=0,
        threads_per_worker=threads_per_worker,
        memory_limit=memory_limit,
        processes=True,
    )
    client = Client(cluster)
    cluster.scale(n_workers)  # Dynamically scale to the desired number of workers
    return client, cluster


#### 2. Data Loading and Preprocessing
This function loads the dataset using Dask, normalizes the features, performs feature engineering, and splits the data into training and test sets.

##### Args:
1. dataset_path (str): Path to the CSV dataset.
2. features (list): List of feature column names.
3. target (str): Name of the target column.

##### Returns:
tuple: Tuple of (X_train, X_test, y_train, y_test)

In [3]:
def data_loading_and_preprocessing(client, dataset_path, features, target):
    """
    Loads and preprocesses the dataset using the Dask cluster.
    """
    df = dd.read_csv(dataset_path)
    X = df[features]
    Y = df[target]
    scaler = MinMaxScaler()
    X_normalized = scaler.fit_transform(X)
    X_normalized['Temp_Wind'] = X_normalized['Temperature(F)'] * X_normalized['Wind_Speed(mph)']
    X_normalized['Visibility_Wind'] = X_normalized['Visibility(mi)'] * X_normalized['Wind_Speed(mph)']
    X_train, X_test, y_train, y_test = train_test_split(
        X_normalized, 
        Y, 
        test_size=0.2, 
        random_state=42, 
        shuffle=False
    )
    return X_train, X_test, y_train, y_test

#### 3. Model Training and Evaluation
This function trains multiple Random Forest models with different configurations and evaluates their performance in terms of accuracy and time.

#### Args:
1. X_train (array): Training features.
2. X_test (array): Testing features.
3. y_train (array): Training target labels.
4. y_test (array): Testing target labels.
5. configurations (dict): Dictionary of hyperparameter configurations.
6. client (dask.distributed.Client): Dask client object for parallel execution.

#### Returns:
dict: Dictionary containing evaluation results for each configuration.

- 1Key: Configuration name (string)
- Value: Dictionary with performance metrics
    - 'accuracy' (float): Accuracy score achieved on the testing data
    - 'time' (float): Time taken for training and evaluation (seconds)

In [4]:
def model_training_and_evaluation(client, X_train, X_test, y_train, y_test, configurations):
    """
    Trains and evaluates multiple Random Forest models with different configurations using the Dask cluster.
    """
    results = {}
    for config_name, params in configurations.items():
        start_time = time.time()
        with joblib.parallel_backend('dask'):
            model = ParallelPostFit(RandomForestClassifier(**params))
            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)
        end_time = time.time()
        accuracy = accuracy_score(y_test.compute(), y_pred.compute())
        results[config_name] = {
            'accuracy': accuracy,
            'time': end_time - start_time,
        }
    return results

#### 4. Sequential_execution
Performs sequential data loading, preprocessing, training, and evaluation for Random Forest models with different configurations.


#### Args:
1. dataset_path (str): Path to the CSV dataset.
2. features (list): List of feature column names.
3. target (str): Name of the target column.
4. model_configs (dict): Dictionary containing hyperparameter configurations for the models.

#### Returns:
dict: Dictionary containing evaluation results for each configuration.
- Key: Configuration name (string)
- Value: Dictionary with performance metrics
    - 'accuracy' (float): Accuracy score achieved on the testing data
    - 'time' (float): Time taken for training and evaluation (seconds)

In [9]:
def sequential_execution(dataset_path, features, target, model_configs):
    df = pd.read_csv(dataset_path)
    X = df[features]
    Y = df[target]
    scaler = MinMaxScaler()
    X_normalized = scaler.fit_transform(X)
    X_normalized['Temp_Wind'] = X_normalized['Temperature(F)'] * X_normalized['Wind_Speed(mph)']
    X_normalized['Visibility_Wind'] = X_normalized['Visibility(mi)'] * X_normalized['Wind_Speed(mph)']
    X_train, X_test, y_train, y_test = train_test_split(
        X_normalized, Y, test_size=0.2, random_state=42, shuffle=False
    )
    
    results = {}
    for config_name, params in model_configs.items():
        start_time = time.time()
        model = RandomForestClassifier(**params)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        end_time = time.time()
        accuracy = accuracy_score(y_test, y_pred)
        results[config_name] = {
            'accuracy': accuracy,
            'time': end_time - start_time,
        }
    return results

#### 5. Visualization
Visualizes performance metrics for cluster and model configurations using subplots.

#### Args:
1. cluster_results (dict): Dictionary containing cluster performance data.
    - Keys: Configuration names (strings)
    - Values: Dictionaries with performance metrics
        - 'time' (float): Time taken for execution (seconds)
        - 'accuracy' (float): Accuracy score achieved
2. model_results (dict): Dictionary containing model performance data.
    Same structure as cluster_results




In [5]:
def visualize_performance(cluster_results, model_results):
    """
    Visualizes performance metrics for different cluster and model configurations using line graphs.
    """
    # Cluster Performance
    cluster_configs = list(cluster_results.keys())
    cluster_times = [cluster_results[config]['time'] for config in cluster_configs]
    cluster_speedups = [cluster_results[config]['speedup'] for config in cluster_configs]
    cluster_efficiencies = [cluster_results[config]['efficiency'] for config in cluster_configs]

    fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 18))

    # Execution Time Graph
    ax1.plot(cluster_configs, cluster_times, marker='o', color='orange', linewidth=2)
    ax1.set_ylabel('Time (seconds)', color='orange')
    ax1.set_title('Cluster Configuration Execution Time')
    ax1.grid(True)
    ax1.tick_params(axis='x', rotation=45)

    # Speedup Graph
    ax2.plot(cluster_configs, cluster_speedups, marker='s', color='blue', linewidth=2)
    ax2.set_ylabel('Speedup', color='blue')
    ax2.set_title('Cluster Configuration Speedup')
    ax2.grid(True)
    ax2.tick_params(axis='x', rotation=45)

    # Efficiency Graph
    ax3.plot(cluster_configs, cluster_efficiencies, marker='^', color='green', linewidth=2)
    ax3.set_ylabel('Efficiency', color='green')
    ax3.set_title('Cluster Configuration Efficiency')
    ax3.grid(True)
    ax3.tick_params(axis='x', rotation=45)

    plt.tight_layout()
    plt.show()

    # Model Performance
    model_configs = list(model_results.keys())
    model_times = [model_results[config]['time'] for config in model_configs]
    model_accuracies = [model_results[config]['accuracy'] for config in model_configs]

    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 12))

    # Speed (Time) Graph
    ax1.plot(model_configs, model_times, marker='o', color='purple', linewidth=2)
    ax1.set_ylabel('Time (seconds)', color='purple')
    ax1.set_title('Model Configuration Speed')
    ax1.grid(True)
    ax1.tick_params(axis='x', rotation=45)

    # Accuracy Graph
    ax2.plot(model_configs, model_accuracies, marker='s', color='red', linewidth=2)
    ax2.set_ylabel('Accuracy', color='red')
    ax2.set_title('Model Configuration Accuracy')
    ax2.grid(True)
    ax2.tick_params(axis='x', rotation=45)

    plt.tight_layout()
    plt.show()


#### 6. Main Function
The main() function orchestrates the entire process by calling the individual functions.

In [7]:
def main():
    """
    Main function to test different cluster configurations and evaluate performance.
    """
    dataset_path = "../final_project/Cleaned_US_Accidents_March23.csv"
    features = ['Temperature(F)', 'Visibility(mi)', 'Wind_Speed(mph)']
    target = 'Severity'

    # Cluster configurations
    cluster_configs = {
        "config_1": {"n_workers": 4, "threads_per_worker": 2, "memory_limit": "8GB"},
        "config_2": {"n_workers": 6, "threads_per_worker": 2, "memory_limit": "12GB"},
        "config_3": {"n_workers": 8, "threads_per_worker": 4, "memory_limit": "16GB"},
    }

    # Model configurations
    model_configs = {
        'config_1': {'n_estimators': 50, 'max_depth': 3, 'random_state': 42},
        'config_2': {'n_estimators': 100, 'max_depth': 3, 'random_state': 42},
        'config_3': {'n_estimators': 150, 'max_depth': 5, 'random_state': 42},

    }

    print("Running sequential execution...")
    sequential_results = sequential_execution(dataset_path, features, target, model_configs)
    sequential_time = sum([res['time'] for res in sequential_results.values()])

    cluster_results = {}
    model_results = {}

    for config_name, config_params in cluster_configs.items():
        print(f"Testing {config_name}...")
        # Initialize cluster
        client, cluster = initialize_cluster(**config_params)

        try:
            # Data loading and preprocessing
            X_train, X_test, y_train, y_test = data_loading_and_preprocessing(
                client, dataset_path, features, target
            )

            # Model training and evaluation
            results = model_training_and_evaluation(client, X_train, X_test, y_train, y_test, model_configs)

            # Collect performance metrics for the cluster
            avg_accuracy = sum([res['accuracy'] for res in results.values()]) / len(results)
            total_time = sum([res['time'] for res in results.values()])
            speedup = sequential_time / total_time
            efficiency = speedup / config_params['n_workers']
            
            cluster_results[config_name] = {
                "accuracy": avg_accuracy,
                "time": total_time,
                "speedup": speedup,
                "efficiency": efficiency
            }

            # Collect performance metrics for the models
            model_results.update(results)

        finally:
            client.close()
            cluster.close()

    # Visualize the performance of different cluster and model configurations
    visualize_performance(cluster_results, model_results)

In [10]:
if __name__ == "__main__":
    main()

Running sequential execution...
