# Implementation in Python

In [1]:
import numpy as np
from pgmpy.models import DiscreteBayesianNetwork
from pgmpy.factors.discrete import TabularCPD
from pgmpy.inference import VariableElimination

## Network Structure

In [2]:
edges = [
    # Net 1: Collision risk with pedestrian
    ("t_veh", "collision_prob"),
    ("t_ped", "collision_prob"),
    ("crossing_in_sight", "crossing_probability"),
    ("POI", "crossing_probability"),
    ("pedestrian_orientation", "crossing_probability"),
    ("collision_prob", "collision_risk"),
    ("crossing_probability", "collision_risk"),

    # Net 2: Possibility of manual breaking
    ("t_veh", "time_left"),
    ("pavement_status", "grip"),
    ("vehicle_velocity", "breaking_possibility"),
    ("grip", "breaking_possibility"),
    ("time_left", "can_break_manually"),
    ("breaking_possibility", "can_break_manually"),

    # Net 3: Collision risk with following vehicle
    ("following_vehicle_exists", "rear_collision_risk"),
    ("following_vehicle_distance", "rear_collision_risk"),
    ("following_vehicle_velocity", "rear_collision_risk"),

    # Finalized decision
    ("collision_risk", "emergency_break"),
    ("can_break_manually", "emergency_break"),
    ("rear_collision_risk", "emergency_break"),
]

model = DiscreteBayesianNetwork(edges)

## Gewichtungen

### Net 1: Collision Risk with Pedestrians

Basic probabilities:

Probability of a vehicle time difference to a point:
$1$ | $2$ | $3$ | $4$ | $>=5$
----|-----|-----|-----|---------
$0.1$| $0.2$ | $0.4$ | $0.2$ | $0.1$

Probability of a vehicle time difference to a point:
$1$ | $2$ | $3$ | $4$ | $>=5$
----|-----|-----|-----|---------
$0.1$| $0.2$ | $0.4$ | $0.2$ | $0.1$

Probability of a crossing being in sight.
In cities, many points for crossings exist, such as traffic lights:
Yes | No
----|----
0.6 | 0.4

Probability of a Point of Interest (POI) being in sight.
Usually, not many POIs are expected, only an occasional Döner or train station:
Yes | No
----|----
0.3 | 0.7


Probability of the pedestrian facing the direction of the intersection point (equally distributed):
Yes | No
----|----
0.5 | 0.5

In [3]:
# Discrete probability how far away an intersection point is by time from the vehicle
cpd_t_vehicle = TabularCPD(
    variable="t_veh", variable_card=5,
    values=[[0.1], [0.2], [0.4], [0.2], [0.1]]  # 1-4s, 5 or more
)

# Discrete probability how far away an intersection point is by time from the pedestrian
cpd_t_pedestrian = TabularCPD(
    variable="t_ped", variable_card=5,
    values=[[0.1], [0.2], [0.4], [0.2], [0.1]]  # 1-4s, 5 or more
)

# Is crossing in sight? (binary)
# Assumption: Fairly many crossings may exist within a city road
#             E.g., Traffic Lights, Crossings, etc.
cpd_crossing_in_sight = TabularCPD(
    variable="crossing_in_sight", variable_card=2,
    values=[[0.6], [0.4]]   # True, False
)

# Is Point of Interest (POI) nearby? (binary)
# Assumption: the road side is not clustered with POIs
cpd_poi = TabularCPD(
    variable="POI", variable_card=2,
    values=[[0.3], [0.7]]   # True, False
)

# Is pedestrian looking the direction of the intersection point? (binary)
# Assumption: it is equally distributed
cpd_pedestrian_orientation = TabularCPD(
    variable="pedestrian_orientation", variable_card=2,
    values=[[0.5], [0.5]]
)

Time difference $\Delta t$:
If the absolute time difference is 1s or lower, TODO

In [4]:
def collision_prob_mapping(t_vehicle, t_pedestrian):
    """
    Maps onto the quantitative discrete variables.
    """
    d = abs(t_vehicle - t_pedestrian)
    if d <= 1:
        return 2      # high
    elif d <= 3:
        return 1      # med
    else:
        return 0      # low

# 25 states: 1-5 sec vehicle 1-5, sec pedestrian combinations
collision_prob_array = np.full((3, 25), 0.05)  # Initialized with base probability
col = 0
for t_vehicle in range(5):
    for t_pedestrian in range(5):
        s = collision_prob_mapping(t_vehicle, t_pedestrian)
        collision_prob_array[s, col] = 0.9  # Set dominant probability
        col += 1

cpd_collision_prob = TabularCPD(
    variable="collision_prob", variable_card=3,
    values=collision_prob_array,
    evidence=["t_veh", "t_ped"],
    evidence_card=[5, 5]
)

Crossing probability:

In [5]:
# crossing_prob_array aus crossing_in_sight, POI, pedestrian_orientation
def compute_crossing_prob(zeb, poi, ori):
    """Calculate crossing probability based on evidence."""
    base = 0.2 + 0.2 * zeb + 0.2 * poi + 0.2 * ori
    high = min(0.7, base)
    low = max(0.1, 1 - high - 0.2)
    mid = 1 - low - high
    return [low, mid, high]

crossing_prob_array = np.array([
    compute_crossing_prob(crossing, poi, orientation)
    for crossing in [0, 1]
    for poi in [0, 1]
    for orientation in [0, 1]
]).T  # 3 x 8

cpd_crossing = TabularCPD(
    variable="crossing_probability", variable_card=3,
    values=crossing_prob_array,
    evidence=["crossing_in_sight", "POI", "pedestrian_orientation"],
    evidence_card=[2, 2, 2]
)

Final Collision Risk:

In [6]:
# cpd_collision_risk based on time_difference und crossing_probability
# Lookup table for collision risk probabilities
collision_probs = {
    (0, 2): [0.05, 0.25, 0.70],  # low time diff, high crossing
    (0, 1): [0.2, 0.5, 0.3],     # low time diff, medium crossing
    (1, 2): [0.2, 0.5, 0.3],     # medium time diff, high crossing
}

coll_risk_array = np.zeros((3, 9))
for col, (time_diff, crossing_prob) in enumerate((time_diff, crossing_prob) for time_diff in range(3) for crossing_prob in range(3)):
    if (time_diff, crossing_prob) in collision_probs:
        probs = collision_probs[(time_diff, crossing_prob)]
    elif time_diff == 2 or crossing_prob == 0:
        probs = [0.7, 0.2, 0.1]
    else:
        probs = [0.3, 0.5, 0.2]
    coll_risk_array[:, col] = probs

cpd_collision_risk = TabularCPD(
    variable="collision_risk", variable_card=3,
    values=coll_risk_array,
    evidence=["collision_prob", "crossing_probability"],
    evidence_card=[3, 3]
)

### Net 2: Possibility of Manual Breaking

Basic probabilities:

In [7]:
cpd_pavement_status = TabularCPD(
    variable="pavement_status", variable_card=2,
    values=[[0.7], [0.3]]   # dry, wet
)

cpd_speed_vehicle = TabularCPD(
    variable="vehicle_velocity", variable_card=3,
    values=[[0.3], [0.5], [0.2]]   # slow, medium, high
)

# Reusing cpd_t_vehicle ("t_veh") from Net 1

Time Left until Impact:

In [8]:
# Remaining time from t_veh
cpd_t_impact = TabularCPD(
    variable="time_left", variable_card=3,
    values=np.array([
        [0.7, 0.2, 0.1, 0.05, 0.05],  # very_short
        [0.2, 0.5, 0.4, 0.3, 0.2],    # short
        [0.1, 0.3, 0.5, 0.65, 0.75],  # long
    ]),
    evidence=["t_veh"],
    evidence_card=[5]
)

Grip:

In [9]:
cpd_grip = TabularCPD(
    variable="grip", variable_card=2,
    values=[
        [0.2, 0.7],   # low
        [0.8, 0.3],   # high
    ],
    evidence=["pavement_status"],
    evidence_card=[2]
)

Breaking Capability:

In [10]:
# Bremsfähigkeit based on vehicle_velocity and Grip using lookup table
brake_capability_probs = {
    (0, 1): [0.05, 0.25, 0.70],  # low speed, high grip
    (1, 1): [0.1, 0.4, 0.5],     # medium speed, high grip
    (2, 1): [0.2, 0.5, 0.3],     # high speed, high grip
    (0, 0): [0.2, 0.5, 0.3],     # low speed, low grip
    (1, 0): [0.4, 0.4, 0.2],     # medium speed, low grip
    (2, 0): [0.6, 0.3, 0.1],     # high speed, low grip
}

brake_capability_array = np.column_stack([
    brake_capability_probs[(v, g)]
    for v in range(3)
    for g in range(2)
])

cpd_breaking_capability = TabularCPD(
    variable="breaking_possibility", variable_card=3,
    values=brake_capability_array,
    evidence=["vehicle_velocity", "grip"],
    evidence_card=[3, 2]
)

Final evaluation of manual breaking possibility:

In [11]:
# can_break_manually based on time_left and breaking_possibility
def compute_manual_breaking_prob(time_diff, breaking_prob) -> tuple[float, float, float]:
    """
    Calculate probability of normal braking capability.
    Returns list of probabilities [P(no), P(marginal), P(yes)]
    """
    if time_diff == 2 and breaking_prob == 2:   # lots of time, optimal braking capabilities
        return [0.05, 0.15, 0.80]
    elif time_diff == 2:
        return [0.1, 0.3, 0.6]
    elif time_diff == 1 and breaking_prob >= 1:
        return [0.2, 0.5, 0.3]
    elif time_diff == 0 and breaking_prob == 0:
        return [0.8, 0.15, 0.05]
    else:
        return [0.5, 0.3, 0.2]

manual_breaking_array = np.column_stack([
    compute_manual_breaking_prob(z, b)
    for z in range(3)  # time_left: very_short, short, long
    for b in range(3)  # breaking_possibility: bad, acceptable, optimal
])

cpd_can_manually_break = TabularCPD(
    variable="can_break_manually", variable_card=3,
    values=manual_breaking_array,
    evidence=["time_left", "breaking_possibility"],
    evidence_card=[3, 3]
)

### Net 3: Risk of Rear Collision

Basic probabilities:

In [12]:
cpd_following_vehicle_exists = TabularCPD(
    variable="following_vehicle_exists", variable_card=2,
    values=[[0.5], [0.5]]
)

cpd_distance_following_vehicle = TabularCPD(
    variable="following_vehicle_distance", variable_card=3,
    values=[[0.2], [0.3], [0.5]]
)

cpd_speed_following_vehicle = TabularCPD(
    variable="following_vehicle_velocity", variable_card=3,
    values=[[0.3], [0.5], [0.2]]
)

Final rear collision risk on emergency break:

In [13]:
# rear_collision_risk based on following_vehicle_exists, distance_following_vehicle, speed_following_vehicle
def compute_rear_collision_risk(exists, distance, velocity):
    """Calculate rear collision risk based on following vehicle parameters."""
    if exists == 0:
        return [0.9, 0.08, 0.02]
    
    # Calculate risk score
    score = (2 if distance == 0 else 1 if distance == 1 else 0) + (2 if velocity == 2 else 1 if velocity == 1 else 0)
    
    if score >= 3:
        return [0.1, 0.3, 0.6]
    elif score == 2:
        return [0.3, 0.5, 0.2]
    else:
        return [0.7, 0.2, 0.1]

rear_collision_risks_array = np.column_stack([
    compute_rear_collision_risk(nv, ab, gv)
    for nv in range(2)
    for ab in range(3)
    for gv in range(3)
])

cpd_rear_collision = TabularCPD(
    variable="rear_collision_risk", variable_card=3,
    values=rear_collision_risks_array,
    evidence=["following_vehicle_exists", "following_vehicle_distance", "following_vehicle_velocity"],
    evidence_card=[2, 3, 3]
)

## Final Decision & Model Initialization

Should I break or should I go?

In [14]:
# emergency_break decision logic
def compute_emergency_brake_prob(pedestrian_collision, can_manually_break, rear_collision_risk):
    """Calculate emergency braking probability based on risk factors."""
    if pedestrian_collision == 2 and can_manually_break == 0:
        if rear_collision_risk == 0:
            return [0.1, 0.9]
        elif rear_collision_risk == 1:
            return [0.3, 0.7]
        else:  # rear == 2
            return [0.6, 0.4]
    elif pedestrian_collision == 2 and can_manually_break == 1:
        return [0.4, 0.6]
    elif pedestrian_collision == 1 and can_manually_break == 0 and rear_collision_risk <= 1:
        return [0.4, 0.6]
    elif pedestrian_collision == 1 and can_manually_break == 2:
        return [0.8, 0.2]
    elif pedestrian_collision == 0:
        return [0.95, 0.05]
    else:
        return [0.7, 0.3]

vals_array = np.column_stack([
    compute_emergency_brake_prob(pedestrian_collision, can_manually_break, rear_collision_risk)
    for pedestrian_collision in range(3)
    for can_manually_break in range(3)
    for rear_collision_risk in range(3)
])

cpd_emergency_break = TabularCPD(
    variable="emergency_break", variable_card=2,
    values=vals_array,
    evidence=["collision_risk", "can_break_manually", "rear_collision_risk"],
    evidence_card=[3, 3, 3]
)

## Create & validate model

In [15]:
model.add_cpds(
    cpd_t_vehicle, cpd_t_pedestrian, cpd_crossing_in_sight, cpd_poi, cpd_pedestrian_orientation,
    cpd_collision_prob, cpd_crossing, cpd_collision_risk,
    cpd_pavement_status, cpd_speed_vehicle,
    cpd_t_impact, cpd_grip, cpd_breaking_capability, cpd_can_manually_break,
    cpd_following_vehicle_exists, cpd_distance_following_vehicle,
    cpd_speed_following_vehicle, cpd_rear_collision,
    cpd_emergency_break
)

print("Model is valid" if model.check_model() else "Model is invalid!")


Model is valid


In [16]:
# Initialize Interference
infer = VariableElimination(model)

# Example Test

"Evidence" from assignment:
- Crossing and POI present
- Car and pedestrian meet at point in 2s
- Road pavement is wet
- Pedestrian is facing towards the point

Also, we have another vehicle following with a medium distance and velocity.

In [17]:
evidence = {
    "t_veh": 2, "t_ped": 2,
    "crossing_in_sight": 1, "POI": 1, "pedestrian_orientation": 1,
    "pavement_status": 1, "vehicle_velocity": 2,
    "following_vehicle_exists": 1, "following_vehicle_distance": 1, "following_vehicle_velocity": 1,
}

# Query all variables in one call for efficiency
variables_to_query = ["collision_risk", "can_break_manually", "rear_collision_risk", "emergency_break"]
results = {var: infer.query(variables=[var], evidence=evidence) for var in variables_to_query}

# Display results
for var, result in results.items():
    print(f"P({var} | Evidenz) =\n{result}\n")


P(collision_risk | Evidenz) =
+-------------------+-----------------------+
| collision_risk    |   phi(collision_risk) |
| collision_risk(0) |                0.6507 |
+-------------------+-----------------------+
| collision_risk(1) |                0.2182 |
+-------------------+-----------------------+
| collision_risk(2) |                0.1310 |
+-------------------+-----------------------+

P(can_break_manually | Evidenz) =
+-----------------------+---------------------------+
| can_break_manually    |   phi(can_break_manually) |
| can_break_manually(0) |                    0.2480 |
+-----------------------+---------------------------+
| can_break_manually(1) |                    0.3224 |
+-----------------------+---------------------------+
| can_break_manually(2) |                    0.4296 |
+-----------------------+---------------------------+

P(rear_collision_risk | Evidenz) =
+------------------------+----------------------------+
| rear_collision_risk    |   phi(rear_colli

In [18]:
p_nb_ja = float(results["emergency_break"].values[1])
THRESHOLD = 0.5

print(f"The probability an emergency break is necessary has been evaluated as {p_nb_ja:.3f}.")
print(f"Decision: {'DO emergency break' if p_nb_ja >= THRESHOLD else 'Do NOT do an emergency break'}")


The probability an emergency break is necessary has been evaluated as 0.167.
Decision: Do NOT do an emergency break


In [19]:
# subdependency is broken
# model_graphviz = model.to_graphviz()
# model_graphviz.draw("sachs.pdf", prog="dot")
# model_graphviz.draw("sachs.svg", prog="dot")
# model_graphviz