In [2]:
!pip install pyDOE2

Collecting pyDOE2
  Downloading pyDOE2-1.3.0.tar.gz (19 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyDOE2
  Building wheel for pyDOE2 (setup.py) ... [?25l[?25hdone
  Created wheel for pyDOE2: filename=pyDOE2-1.3.0-py3-none-any.whl size=25524 sha256=2e30a2bfa647a92054f13368fb7455ec470a491021c3effa8ebf6cbcd9b86045
  Stored in directory: /root/.cache/pip/wheels/56/11/60/a0b234151910cf9cff9fdc072f5d42e3c35157b649dbdff3ef
Successfully built pyDOE2
Installing collected packages: pyDOE2
Successfully installed pyDOE2-1.3.0


In [3]:
import numpy as np
from pyDOE2 import fullfact, ccdesign
from itertools import product
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
import warnings
warnings.filterwarnings('ignore')

First, define our independent variables and get every possible experiment combination.

In [14]:
# Define independent variables & their levels
direction_levels = ['left_turn', 'straight_left', 'straight_right', 'right_turn']
alert_levels = ['none', 'crosswalk', 'sign_3', 'sign_2', 'sign_3_crosswalk', 'sign_2_crosswalk']
ped_leg_crossing = ['right', 'straight', 'left']

# Create all possible combinations (full factorial design trials)
all_combinations = list(product(
    direction_levels,
    alert_levels,
    ped_leg_crossing
))
df_full = pd.DataFrame(all_combinations,
                      columns=['direction', 'alert', 'ped_crossing'])

# One-hot encode variables (turns them into colums with binary values for matrix reasons)
# (see factor scaling section: https://www.ncss.com/wp-content/themes/ncss/pdf/Procedures/NCSS/D-Optimal_Designs.pdf)
enc = OneHotEncoder(sparse_output=False)
encoded_matrix = enc.fit_transform(df_full)
feature_names = enc.get_feature_names_out(['direction', 'alert', 'ped_crossing'])

# Define weights that prioritize alert type variable
weights = np.ones(len(feature_names))
alert_indices = [i for i, name in enumerate(feature_names) if 'alert' in name]
weights[alert_indices] = 1 # adjust the importance of the alert variable

# Function to calculate D-optimality criterion
def d_optimality(subset_indices, X, weights):
    weighted_X = X * weights
    subset = weighted_X[subset_indices]
    try:
        return np.linalg.det(subset.T @ subset)
    except:
        return -np.inf

# Function for coordinate exchange algorithm
def coordinate_exchange(X, n_trials, weights, max_iter=1000):
    n_rows, n_cols = X.shape
    # Start with random design
    current_design = np.random.choice(n_rows, n_trials, replace=False)
    current_criterion = d_optimality(current_design, X, weights)

    for _ in range(max_iter):
        improved = False
        for i in range(n_trials):
            best_criterion = current_criterion
            best_point = current_design[i]

            # Try replacing each point
            for candidate in range(n_rows):
                if candidate not in current_design:
                    temp_design = current_design.copy()
                    temp_design[i] = candidate
                    new_criterion = d_optimality(temp_design, X, weights)

                    if new_criterion > best_criterion:
                        best_criterion = new_criterion
                        best_point = candidate
                        improved = True

            current_design[i] = best_point
            current_criterion = best_criterion

        if not improved:
            break

    return current_design, current_criterion

# Generate D-optimal design (can play with number of trials)
n_desired_trials = 15
best_indices, criterion = coordinate_exchange(encoded_matrix, n_desired_trials, weights)

# Get the selected combinations
optimal_design = df_full.iloc[best_indices].reset_index(drop=True)

# correlation matrix for selected design
correlation_matrix = optimal_design.apply(lambda x: pd.factorize(x)[0]).corr()

# results
print(f"\nD-Optimal Design ({n_desired_trials} trials):")
print(optimal_design)
print("\nCorrelation Matrix:")
print(correlation_matrix.round(2))

# balance of alert system types
print("\nAlert System Type Distribution:")
print(optimal_design['alert'].value_counts())

# efficiency measures
print("\nEfficiency Measures:")
alert_balance = optimal_design['alert'].value_counts().std() / optimal_design['alert'].value_counts().mean()
print(f"Alert System Balance (lower is better): {alert_balance:.3f}")


D-Optimal Design (15 trials):
         direction             alert ped_crossing
0        left_turn  sign_2_crosswalk     straight
1    straight_left         crosswalk         left
2   straight_right              none         left
3   straight_right  sign_2_crosswalk        right
4        left_turn            sign_3         left
5    straight_left              none        right
6        left_turn  sign_3_crosswalk         left
7        left_turn  sign_3_crosswalk     straight
8   straight_right            sign_3     straight
9       right_turn         crosswalk     straight
10       left_turn            sign_2        right
11      right_turn            sign_3        right
12       left_turn         crosswalk         left
13      right_turn            sign_2         left
14   straight_left            sign_2     straight

Correlation Matrix:
              direction  alert  ped_crossing
direction          1.00  -0.06          0.09
alert             -0.06   1.00         -0.02
ped_crossing 