In [60]:
# Import necessary libraries
import pandas as pd

In [61]:
class CrossingLTSCalculator:
    # CrossingLTSCalculator class providing the xLTS calculation for crossing stress.

    def xLTS_calc(self, control, bike_lane_pocket, crossing_distance_sig, two_stage, link_type,
                  speed_limit, x_island_depth, adt, num_lanes, visibility_limit,
                  traffic_flow_direction, ramp_conflict, bike_signal, bike_box, rt_volume, leading_thru):

        """
        Calculate the Level of Traffic Stress for a crossing (xLTS)

        :param control: Type of traffic control (e.g., 'Stop4', 'Signal')
        :param bike_lane_pocket: Indicator for presence of bike lane pocket
        :param crossing_distance_sig: crossing distance at a signalized intersection
        :param two_stage: Indicator for two-stage crossing
        :param link_type: Type of approach link (e.g., 'Road', 'Trail')
        :param speed_limit: Speed limit of the road being crossed
        :param x_island_depth: Depth of the crossing island of an unsignalized, non-priority crossing
        :param adt: Average daily traffic
        :param num_lanes: Number of lanes being crossed
        :param visibility_limit: Indicator for low visibility commercial/busy driveway (blind entry):param traffic_flow_direction: Direction of circulation for the leg being crossed
        :param ramp_conflict: Indicator for ramp conflict
        :param bike_signal: Indicator for presence of bike signal
        :param bike_box: Indicator for presence of bike box (advanced stopline)
        :param rt_volume: Right-turn volume
        :param leading_thru: Indicator for leading through interval
        """

        xlts = 1
        num_lanes_crossed = num_lanes * (2 if traffic_flow_direction == 2 else 1)

        # Evaluate xLTS based on control type
        if control == 'Stop4':
            xlts = 1
        elif control == 'Signal':
            xlts = self._calculate_signal_xlts(bike_signal, bike_box, rt_volume, crossing_distance_sig, two_stage, bike_lane_pocket, leading_thru)
        else:
            xlts = self._calculate_unsignalized_xlts(link_type, num_lanes_crossed, adt, speed_limit, x_island_depth)

        xlts = max(xlts, self._adjust_for_visibility_and_ramp(xlts, visibility_limit, ramp_conflict))
        return xlts

    def _calculate_signal_xlts(self, bike_signal, bike_box, rt_volume, crossing_distance_sig, two_stage, bike_lane_pocket, leading_thru):
        """
        Calculate xLTS for signalized crossings
        """

        xlts = 1

        # Right-turn volume stress adjustments without bike signal
        if not bike_signal:
            if (bike_box or leading_thru) and rt_volume > 240:
                xlts = max(xlts, 2)
            elif rt_volume > 120:
                xlts = max(xlts, 2)

        # Crossing distance and two-stage crossing impacts
        if crossing_distance_sig > 30 or two_stage or bike_lane_pocket == 2:
            xlts = max(xlts, 2)
        elif bike_lane_pocket == 3:
            xlts = max(xlts, 3)

        return xlts

    def _calculate_unsignalized_xlts(self, link_type, num_lanes_crossed, adt, speed_limit, x_island_depth):
        """
        Calculate xLTS for unsignalized crossings
        """

        if link_type == 'Trail':
            xlts = self._calculate_trail_crossing_xlts(num_lanes_crossed, adt, speed_limit, x_island_depth)
        else:
            xlts = self.calc_xLTS_static_method(speed_limit, x_island_depth, adt, num_lanes_crossed)
        return xlts

    def _calculate_trail_crossing_xlts(self, num_lanes_crossed, adt, speed_limit, x_island_depth):
        """
        Calculate xLTS for trail crossings
        """

        if num_lanes_crossed in (1, 2):
            if adt <= 8000:
                return 1 if speed_limit <= 62 else self.calc_xLTS_static_method(speed_limit, x_island_depth, adt, num_lanes_crossed)
            else:
                return 1 if speed_limit <= 46 and x_island_depth > 0 else self.calc_xLTS_static_method(speed_limit, x_island_depth, adt, num_lanes_crossed)
        else:
            return self.calc_xLTS_static_method(speed_limit, x_island_depth, adt, num_lanes_crossed)

    def _adjust_for_visibility_and_ramp(self, xlts, visibility_limit, ramp_conflict):
        if visibility_limit == 1 or ramp_conflict == 1:
            xlts = 2
        return xlts

    @staticmethod
    def calc_xLTS_static_method(speed_limit, x_island_depth, adt, num_lanes_crossed):
        """
        Calculate the baseline xLTS for a crossing
        """

        # 85th percentile speed less than 62 km/h
        if speed_limit <= 62:
            if 0 < x_island_depth < 1.8:
                if adt <= 6000:
                    return 1
                elif adt <= 24000:
                    return 2
                return 3
            elif x_island_depth >= 1.8:
                if adt <= 12000:
                    return 1
                elif adt <= 24000:
                    return 2
                return 3
            elif num_lanes_crossed <= 3:
                if adt <= 9000:
                    return 1
                elif adt <= 17000:
                    return 2
                return 3
            else:
                return 3 if adt <= 19000 else 4
        else:
            # 85th percentile speed greater than 62 km/h
            if 0 < x_island_depth < 1.8:
                return 2 if adt <= 17000 else 3
            elif x_island_depth >= 1.8:
                if adt <= 8000:
                    return 1
                elif adt <= 17000:
                    return 2
                return 3
            elif num_lanes_crossed <= 3:
                if adt <= 6000:
                    return 1
                elif adt <= 12000:
                    return 2
                elif adt <= 18000:
                    return 3
                return 4
            else:
                return 4 if adt > 14000 else 3

In [62]:
# Load data
file_path = '/content/drive/MyDrive/Colab/LTS/sample_data.xlsx'
df = pd.read_excel(file_path, sheet_name='xlts')
df.head()

Unnamed: 0,link_type,control,crossing_distance_sig,num_lanes,traffic_flow_direction,speed_limit,x_island_depth,rt_volume,adt,bike_signal,bike_box,bike_lane_pocket,two_stage,leading_thru,visibility_limit,ramp_conflict
0,Road,Signal,50,5,2,50,1.2,400,30000,0,1,1,0,0,0,1
1,Trail,Stop4,5,1,1,30,0.0,0,5000,0,0,0,1,0,0,0
2,Road,Signal,16,2,2,60,2.0,250,18000,0,1,3,0,1,1,0
3,Road,Signal,8,2,1,45,0.0,130,10000,1,1,2,1,0,0,1
4,Trail,Stop4,16,2,2,40,1.5,0,3000,0,0,0,0,0,1,0


In [63]:
# Calculate xLTS
calc = CrossingLTSCalculator()
df['xLTS'] = df.apply(lambda row: calc.xLTS_calc(
    row['control'], row['bike_lane_pocket'], row['crossing_distance_sig'], row['two_stage'],
    row['link_type'], row['speed_limit'], row['x_island_depth'], row['adt'], row['num_lanes'],
    row['visibility_limit'], row['traffic_flow_direction'], row['ramp_conflict'], row['bike_signal'],
    row['bike_box'], row['rt_volume'], row['leading_thru']
), axis=1)

df

Unnamed: 0,link_type,control,crossing_distance_sig,num_lanes,traffic_flow_direction,speed_limit,x_island_depth,rt_volume,adt,bike_signal,bike_box,bike_lane_pocket,two_stage,leading_thru,visibility_limit,ramp_conflict,xLTS
0,Road,Signal,50,5,2,50,1.2,400,30000,0,1,1,0,0,0,1,2
1,Trail,Stop4,5,1,1,30,0.0,0,5000,0,0,0,1,0,0,0,1
2,Road,Signal,16,2,2,60,2.0,250,18000,0,1,3,0,1,1,0,3
3,Road,Signal,8,2,1,45,0.0,130,10000,1,1,2,1,0,0,1,2
4,Trail,Stop4,16,2,2,40,1.5,0,3000,0,0,0,0,0,1,0,2
5,Road,Signal,40,4,2,55,2.5,180,15000,0,1,1,1,1,0,0,2
6,Trail,,20,2,2,35,1.8,0,6000,0,0,0,0,0,1,0,2
7,Road,Signal,16,4,1,65,0.0,270,20000,0,0,2,0,0,0,1,2
8,Road,Signal,16,2,2,70,1.6,300,22000,0,1,3,0,1,1,0,3
9,Road,Stop4,24,3,2,40,1.2,0,30000,0,0,0,0,0,0,0,1


In [64]:
def _adjust_for_visibility_and_ramp(self, visibility_limit, ramp_type, departure_speed, ramp_entry_speed, receiving_speed, ramp_exit_speed, bike_path_angle, has_stop_sign, bike_lane_pocket):
    """
    Adjust the xLTS based on visibility limitations and ramp conflict criteria.

    :param visibility_limit: Indicator of visibility limitation (1 if limited, 0 otherwise)
    :param ramp_type: Type of ramp conflict ("on-ramp" or "off-ramp")
    :param departure_speed: 85th percentile speed of the departure road (for on-ramp)
    :param ramp_entry_speed: Ramp entry speed (for on-ramp)
    :param receiving_speed: 85th percentile speed of the receiving road (for off-ramp)
    :param ramp_exit_speed: Ramp exit speed (for off-ramp)
    :param bike_path_angle: Whether the bike path crosses the ramp at a right angle (True if at right angle, False otherwise)
    :param has_stop_sign: Indicator if the conflicting traffic has a stop sign (1 if yes, 0 otherwise)
    :param bike_lane_pocket: Indicator if a bike lane pocket is present (1 if yes, 0 otherwise)
    :return: Adjusted xLTS level
    """

    # Initialize xLTS adjustment based on visibility
    adjusted_xlts = 2 if visibility_limit == 1 else 1

    # Ramp conflict adjustments
    if ramp_type == "on-ramp":
        if has_stop_sign == 0:  # Only apply if no stop sign is present
            # Check xLTS 3 conditions for on-ramp
            if (departure_speed > 54 and ramp_entry_speed >= 24) or bike_lane_pocket:
                adjusted_xlts = max(adjusted_xlts, 3)
            # Check xLTS 2 conditions for on-ramp
            else:
                adjusted_xlts = max(adjusted_xlts, 2)

    elif ramp_type == "off-ramp":
        if has_stop_sign == 0:  # Only apply if no stop sign is present
            # Check xLTS 3 conditions for off-ramp
            if (receiving_speed > 62 and ramp_exit_speed >= 50 and not bike_path_angle):
                adjusted_xlts = max(adjusted_xlts, 3)
            # Check xLTS 2 conditions for off-ramp
            else:
                adjusted_xlts = max(adjusted_xlts, 2)

    return adjusted_xlts

In [None]:
# Save the updated data to a new CSV file
# df.to_csv("crossing_data_with_xlts.csv", index=False)