In [0]:
#Importing relevant packages - polars for the dataframe manipulation, numpy for series manipulation and time for timing
import polars as pl #dataframe manipulation
from polars import testing # testing purposes (dataframe comparison)
import numpy as np # calculations
import time #benchmarking
from sklearn.model_selection import train_test_split #Used in training the model that fits the function 
from sklearn.preprocessing import PolynomialFeatures
from sklearn import linear_model

In [0]:
# Timer for benchmarking
start = time.perf_counter()

#Input Data
Importing all of the input data that the calculations and functions are going to need. This includes the output from the Input Script saved in the form of a "LazyFrame", objects only partially evaluated on a "as needed" basis

##Secondary Collision 
The probability of a secondary collision was originally given via a lookup table. This has been replaced with a Linear Regression Model trained on the values given by the look up table

In [0]:
# Loading in the original lookup table
secondaryCollisionProbability = pl.read_csv("path"
                                              ).select(
                                                  pl.exclude("Index")
                                              ).rename(
                                                  {
                                                      "Train speed (Mph)": "train_speed_mph", 
                                                      "Braking Rate (%g)": "braking_rate", 
                                                      "Trains per hour": "frequency", 
                                                      "Probabilty of secondary collision": "probability"
                                                    }
                                                )

# Determining our x and Y variables 
x = secondaryCollisionProbability.select(["train_speed_mph", "braking_rate", "frequency"]).to_numpy()
Y = secondaryCollisionProbability.select("probability").to_numpy()

# Splitting the look-up table into training and testing sets
x_train, x_test, Y_train, Y_test = train_test_split(x, Y, random_state=0, test_size = 0.1)

# Determining the degree of the polynomial we are going to use
degrees = 6
poly = PolynomialFeatures(degree = degrees)

# Fitting our training data
X_train = poly.fit_transform(x_train)
X_test = poly.fit_transform(x_test)

clf = linear_model.LinearRegression(fit_intercept= False)
model = clf.fit(X_train, Y_train)

Uploading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

##Output DataFrames


###Set path (Check that this is pointing to the right location!!!!)

In [0]:
# Setting the input file path 
path = #insertPath

In [0]:
#Here we lazily scan the csv files that we are going to need for the calculations so that they are available in the form of LazyFrames
#This is done individually as 1) not all outputs are necessary for the calculation steps and 2) not all fields are necessary
#This way we have more freedom in what we load in, as well as the ability to change data types in the future



# Period ID and service section ID are being converted into strings so that they can be converted into categorical data during string caching 
sectionSegments = pl.read_csv(
    path + "/section", schema_overrides={"periodID":pl.String, "serviceSectionID": pl.String}
).filter(
    (pl.col("serviceSectionID") != "4")
).with_columns(
    (pl.when(pl.col("serviceSectionID") == "3"
            ).then(pl.lit("2")
                   ).otherwise(pl.col("serviceSectionID"))
            ).alias("serviceSectionID")
).select(
    pl.exclude("Ref_ID", "numOfTracks", "right_correction", "left_correction", "SRS", "nextSectionUp", "nextSectionDown")
)

trainGroup = pl.read_csv(path + "/train_group", schema_overrides={"serviceID": pl.Categorical, "operator": pl.Categorical, "vehicleClass": pl.Categorical,"carshworthiness":pl.Categorical, "power":pl.Categorical, "serviceSectionID": pl.String})


derailmentType = pl.read_csv(path + "/derailment_type")


trainType = pl.read_csv(path + "/train_type")


assetInSection = pl.read_csv(path + "/asset_in_section", schema_overrides= {"assetInSection": pl.Categorical, "assetID": pl.Categorical}
).select(pl.exclude("Ref_ID"))


assetType = pl.read_csv(path + "/asset_type")


embankmentHeights = pl.read_csv(path + "/embankment_heights.csv").rename(
    {"GIS_ref": "sectionID",
        "Embankment drop off hazard": "embankmentDropOffHazard",
        "Embankment height" : "embankmentHeight", 
        "Bridge or viaduct height": "bridgeViaductHeight"}
).select("sectionID", "embankmentDropOffHazard", "embankmentHeight", "bridgeViaductHeight"
).cast(
    {"embankmentDropOffHazard" : pl.Categorical,
        "embankmentHeight" : pl.Categorical,
        "bridgeViaductHeight": pl.Int32}
).fill_null(0
).with_columns(
    pl.when(pl.col("embankmentHeight") == "3m to <10m High"
            ).then(5.0
                   ).otherwise(
                       pl.when(pl.col("embankmentHeight") == ">10m High"
                               ).then(15.0
                                      ).otherwise(0.0)
                   ).alias("embankmentHeight(m)")
)

trainSectionDirectionPeriod = pl.read_csv(path + "/train_in_section_in_direction_per_period", schema_overrides= {"trainInSectionInDirectionID": pl.Categorical, "periodID":pl.String})
levelCrossings = pl.read_csv(path + "/level_crossings", schema_overrides={"levelCrossingID" : pl.Categorical})
precursorDerailmentMap = pl.read_csv(path + "/precursor_to_derailment_mapping", schema_overrides = {"mappingID":pl.Categorical})
precursor = pl.read_csv(path + "/precursor")
period = pl.read_csv(path + "/period", schema_overrides={"periodID": pl.String})
lazyMitigationScenarios = pl.scan_csv(path + "/mitigation_scenarios", schema_overrides= {"scenarioID": pl.Categorical})

lazyCutsetMatrixC = pl.scan_csv(path + "/cutset_matrix_c").cast({"1Y": pl.Boolean, "2Y": pl.Boolean, "3Y": pl.Boolean, "4Y": pl.Boolean, "5Y": pl.Boolean, "6Y": pl.Boolean, "7Y": pl.Boolean, "8Y": pl.Boolean, "9Y": pl.Boolean, "10Y": pl.Boolean, "11Y": pl.Boolean, "12Y": pl.Boolean, "13Y": pl.Boolean, "14Y": pl.Boolean, "1N": pl.Boolean, "2N": pl.Boolean, "3N": pl.Boolean, "4N": pl.Boolean, "5N": pl.Boolean, "6N": pl.Boolean, "7N": pl.Boolean, "8N": pl.Boolean, "9N": pl.Boolean, "10N": pl.Boolean, "11N": pl.Boolean, "12N": pl.Boolean, "13N": pl.Boolean, "14N": pl.Boolean, "cutsetID": pl.Categorical})

lazyCutsetMatrixC_1_7 = lazyCutsetMatrixC.select(["cutsetID", "1Y", "1N", "2Y", "2N", "3Y", "3N", "4Y", "4N", "5Y", "5N", "6Y", "6N", "7Y", "7N"])
lazyCutsetMatrixC_8_14 = lazyCutsetMatrixC.select(["cutsetID", "8Y", "8N", "9Y", "9N", "10Y", "10N", "11Y", "11N", "12Y", "12N", "13Y", "13N", "14Y", "14N"])

lazySecondaryCollisionIA = pl.scan_csv(path + "/IAsecondaryCollision.csv"
).select(
    pl.exclude(["SPAD train", "SET train", "Passenger FWI", "Staff FWI", "Driver major injuries (train 1)", "Guard major injuries (train 1)", "Driver major injuries (train 2)", "Guard major injuries (train 2)"])
).filter((pl.col("Type of conflict") != "Rear-end") & (pl.col("Train 1 type ") != "-") & (pl.col("Train 2 type ") != "-")
).rename(
{   "Closing velocity (mph)" : "collisionSpeedmph",
    "Type of conflict": "collisionType",
    "Train 1 type ": "trainType1",
    "Train 2 type ": "trainType2",
    "Passenger Fatality (train 1)": "passengerFatal1", 
    "Passenger Severe Hospital (train 1)": "passengerSevHosp1", 
    "Passenger Non-severe (train 1)": "passengerNonSevere1", 
    "Driver Fatality (train 1)": "driverFatalTrain1", 
    "Driver Specified (train 1)": "driverSpecifiedTrain1",
    "Driver Severe 7 (train 1)": "driverSev71",
    "Driver Non-severe (train 1)": "driverNonSevere1", 
    "Guard Fatality (train 1)": "guardFatal1",
    "Guard Specified (train 1)": "guardSpecified1",
    "Guard Severe 7 (train 1)": "guardSev71",
    "Guard Non-severe (train 1)": "guardNonSevere1",
    "Passenger Fatality (train 2)": "passengerFatal2", 
    "Passenger Severe Hospital (train 2)": "passengerSevHosp2", 
    "Passenger Non-severe (train 2)": "passengerNonSevere2", 
    "Driver Fatality (train 2)": "driverFatalTrain2", 
    "Driver Specified (train 2)": "driverSpecifiedTrain2",
    "Driver Severe 7 (train 2)": "driverSev72",
    "Driver Non-severe (train 2)": "driverNonSevere2", 
    "Guard Fatality (train 2)": "guardFatal2",
    "Guard Specified (train 2)": "guardSpecified2",
    "Guard Severe 7 (train 2)": "guardSev72",
    "Guard Non-severe (train 2)": "guardNonSevere2"
}
).with_columns(
    pl.sum_horizontal("driverFatalTrain1", "guardFatal1").alias("workforceFatal1"), 
    pl.sum_horizontal("driverSpecifiedTrain1", "guardSpecified1").alias("workforceSpecified1"),
    pl.sum_horizontal("driverSev71", "guardSev71").alias("workforceSev71"),
    pl.sum_horizontal("driverNonSevere1", "guardNonSevere1").alias("workforceNonSevere1"),
    pl.sum_horizontal("driverFatalTrain2", "guardFatal2").alias("workforceFatal2"), 
    pl.sum_horizontal("driverSpecifiedTrain2", "guardSpecified2").alias("workforceSpecified2"),
    pl.sum_horizontal("driverSev72", "guardSev72").alias("workforceSev72"),
    pl.sum_horizontal("driverNonSevere2", "guardNonSevere2").alias("workforceNonSevere2")
).with_columns(
    passengerShock1 = 0,
    passengerShock2 = 0,
    workforceShock71 = 0,
    workforceShock1 = 0,
    workforceShock72 = 0,
    workforceShock2 = 0
).select(
    pl.exclude(["driverFatalTrain1", "guardFatal1", "driverSpecifiedTrain1", "guardSpecified1", "driverSev71", "guardSev71", "driverNonSevere1", "guardNonSevere1", "driverFatalTrain2", "guardFatal2", "driverSpecifiedTrain2", "guardSpecified2", "driverSev72", "guardSev72", "driverNonSevere2", "guardNonSevere2"]))

lazyCutsetMatrixD = pl.scan_csv(path + "/cutset_matrix_d").cast(
    {"1" : pl.Boolean, "2" : pl.Boolean, "3" : pl.Boolean, "4" : pl.Boolean, "5" : pl.Boolean, "6" : pl.Boolean, "7" : pl.Boolean, "8" : pl.Boolean, "9" : pl.Boolean, "10" : pl.Boolean, "11" : pl.Boolean, "12" : pl.Boolean, "13" : pl.Boolean, "14" : pl.Boolean}
).rename(
    {"cutsetID": "cutset_ID_string", "1" : "significantStructureCollision", "2" : "significantStructureCollapse", "3" : "smallStructureCollision", "4" : "vehicleFallFromHeight", "5" : "vehicleFallFromEmbankment", "6" : "vehicleFallIntoWater", "7" : "vehicleFallOnSide", "8" : "secondaryCollision", "9" : "toxicGoodRelease", "10" : "toxicGoodRelease(noCollision)", "11" : "flamGoodRelease", "12" : "flamGoodRelease(noCollision)", "13" : "fire(flamGoods)", "14" : "fire(noFlamGoods)"}
).join(
    lazyCutsetMatrixC.select("cutsetID").with_columns(pl.col("cutsetID").cast(pl.String).alias("cutset_ID_string")), on = "cutset_ID_string", how = "inner"
).select(
    pl.exclude("cutset_ID_string")
)

injuryDegree = pl.read_csv(path + "/injury_degree")
personInjury = pl.read_csv(path + "/person_injury", schema_overrides={"personTypeID": pl.Categorical, "personInjuryID": pl.Categorical})
direction = pl.read_csv(path + "/direction")

##Level crossing normalizers 

In [0]:
# Annual rate of derailment due to collision with a vehicle at a given level crossing type per year
levelCrossingNormalizers = pl.DataFrame(
    data = {
        "levelCrossingType" : pl.Series(["ABCL","AFBCL","AHB","AOCL","AOCLB","FP","FPMWL","MCB","MCBCCTV","MCBOD","MCG","OC","UWC","UWCMWL","UWCT"]), 
        "totalLevelCrossingDerailmentRisk": pl.Series([ 0.000382657, 0.000382657, 0.049631002, 0.000077968, 0.000084824, 0.000001033, 0.000000410, 0.003282743, 0.015157379, 0.001134564, 0.000122723, 0.000000554,0.019632537,0.037638675,0.115193576])
        }
    )

## Total train kilometers where assets are present

In [0]:
# Total train kms over sections where a given asset presents a risk 
assetDependentNormalizers = pl.read_csv(path + "/asset_dependent_normalizers.csv"
).rename(
    {
        "asset_type": "assetTypeID",
        "train_type": "trainType",
        "train_km": "trainKm"
    }
)

# The asset type necessary for a precursor to materialize
precursorAssetMap = pl.DataFrame(
    {
        "precursorGroupID": pl.Series(["CATD","DCRW","LNSC","LNSE", "RGBD", "RBSF", "RBSH", "ROHL", "RSIG", "RWAL", "SCWS", "SHNE", "WFLD", "OVSP"]), 
        "assetTypeID": pl.Series(["switches_crossing","switches_crossing","cutting","embankment","overbridge","underbridge","underbridge","elec_masts","signalling_gantry","lineside","switches_crossing","switches_crossing","water", "curve"])
    }
)



##Matching Categoricals
Having categorical values from the same sources makes joins a lot more efficient computationally. However, because we are loading our data in from .csv files they don't register as having the same encoding, defeating the purpose.

To deal with this we use StringCache( ) to tell Polars that these Categorical fields should have the same encoding. We are avoiding a global string cache because that comes with a significant overhead cost. 

In [0]:
# Numerical columns are first cast into strings as polars cannot convert numerical values to categoricals
# Service section ID:
with pl.StringCache():
    sectionSegments = sectionSegments.sort("serviceSectionID").with_columns(pl.col("serviceSectionID").cast(pl.Categorical))
    trainGroup = trainGroup.sort("serviceSectionID").with_columns(pl.col("serviceSectionID").cast(pl.Categorical))

# Train type
with pl.StringCache():
    trainGroup = trainGroup.with_columns(pl.col("trainType").cast(pl.Categorical))
    trainType = trainType.with_columns(pl.col("trainType").cast(pl.Categorical))
    precursor = precursor.with_columns(pl.col("trainType").cast(pl.Categorical))
    assetDependentNormalizers = assetDependentNormalizers.with_columns(pl.col("trainType").cast(pl.Categorical))

# Section ID
with pl.StringCache():
    sectionSegments = sectionSegments.with_columns(pl.col("sectionID").cast(pl.Categorical))
    embankmentHeights = embankmentHeights.with_columns(pl.col("sectionID").cast(pl.Categorical))
    assetInSection = assetInSection.with_columns(pl.col("sectionID").cast(pl.Categorical))
    trainSectionDirectionPeriod = trainSectionDirectionPeriod.with_columns(pl.col("sectionID").cast(pl.Categorical))
    levelCrossings = levelCrossings.with_columns(pl.col("sectionID").cast(pl.Categorical))

# Asset Type ID
with pl.StringCache():
    assetInSection = assetInSection.with_columns(pl.col("assetTypeID").cast(pl.Categorical))
    assetType = assetType.with_columns(pl.col("assetTypeID").cast(pl.Categorical))
    assetDependentNormalizers = assetDependentNormalizers.with_columns(pl.col("assetTypeID").cast(pl.Categorical))
    precursorAssetMap = precursorAssetMap.with_columns(pl.col("assetTypeID").cast(pl.Categorical))

# Direction 
with pl.StringCache():
    assetInSection = assetInSection.with_columns(pl.col("direction").cast(pl.Categorical))
    trainSectionDirectionPeriod = trainSectionDirectionPeriod.with_columns(pl.col("direction").cast(pl.Categorical))
    direction = direction.with_columns(pl.col("direction").cast(pl.Categorical))

# Period ID
with pl.StringCache():
    sectionSegments = sectionSegments.with_columns(pl.col("periodID").cast(pl.Categorical))
    trainSectionDirectionPeriod = trainSectionDirectionPeriod.with_columns(pl.col("periodID").cast(pl.Categorical))
    period = period.with_columns(pl.col("periodID").cast(pl.Categorical))

# Train Group ID
with pl.StringCache():
    trainSectionDirectionPeriod = trainSectionDirectionPeriod.with_columns(pl.col("trainGroupID").cast(pl.Categorical))
    trainGroup = trainGroup.with_columns(pl.col("trainGroupID").cast(pl.Categorical))

# Derailment Type ID
with pl.StringCache():
    derailmentType = derailmentType.with_columns(pl.col("derailmentTypeID").cast(pl.Categorical))
    derailmentType = derailmentType.with_columns(pl.col("derailmentTypeID").cast(pl.Categorical))
    precursorDerailmentMap = precursorDerailmentMap.with_columns(pl.col("derailmentTypeID").cast(pl.Categorical))

# Precursor ID
with pl.StringCache():
    precursorDerailmentMap = precursorDerailmentMap.with_columns(pl.col("precursorID").cast(pl.Categorical))
    precursor = precursor.with_columns(pl.col("precursorID").cast(pl.Categorical))

# Level Crossing Type
with pl.StringCache():
    levelCrossings = levelCrossings.with_columns(pl.col("levelCrossingType").cast(pl.Categorical))
    precursor = precursor.with_columns(pl.col("levelCrossingType").cast(pl.Categorical))
    levelCrossingNormalizers = levelCrossingNormalizers.with_columns(pl.col("levelCrossingType").cast(pl.Categorical))

# Injury Degree ID
with pl.StringCache():
    injuryDegree = injuryDegree.with_columns(pl.col("injuryDegreeID").cast(pl.Categorical))
    personInjury = personInjury.with_columns(pl.col("injuryDegreeID").cast(pl.Categorical))

# Precursor Group ID
with pl.StringCache():
    precursorAssetMap = precursorAssetMap.with_columns(pl.col("precursorGroupID").cast(pl.Categorical))
    precursor = precursor.with_columns(pl.col("precursorGroupID").cast(pl.Categorical))

The conversion to dataframes is performed after the string caching to ensure the Categorical fields have the same encoding 

In [0]:
lazySectionSegments = pl.LazyFrame(sectionSegments)
lazyTrainGroup = pl.LazyFrame(trainGroup)
lazyDerailmentType = pl.LazyFrame(derailmentType)
lazyTrainType = pl.LazyFrame(trainType)
lazyAssetInSection = pl.LazyFrame(assetInSection)
lazyAssetType = pl.LazyFrame(assetType)
lazyPrecursor = pl.LazyFrame(precursor)
lazyEmbankmentHeights = pl.LazyFrame(embankmentHeights)
lazyTrainSectionDirectionPeriod = pl.LazyFrame(trainSectionDirectionPeriod)
lazyLevelCrossings = pl.LazyFrame(levelCrossings)
lazyDirection = pl.LazyFrame(direction)
lazyPeriod = pl.LazyFrame(period)
lazyPrecursorDerailmentMap = pl.LazyFrame(precursorDerailmentMap)
lazyInjuryDegree = pl.LazyFrame(injuryDegree)
lazyPersonInjury = pl.LazyFrame(personInjury) 
lazyLevelCrossingNormalizers = pl.LazyFrame(levelCrossingNormalizers)
lazyPrecursorAssetMap = pl.LazyFrame(precursorAssetMap)
lazyAssetDependentNormalizers = pl.LazyFrame(assetDependentNormalizers)

##Asset Dependent Normalizers


In [0]:
# Categorical encoding is decided on materialization - there we create the dataframe first and then convert it to a lazyframe 
lazyAssetDependentNormalizers = pl.LazyFrame(assetDependentNormalizers.join(assetType.select("assetTypeName", "assetTypeID"), 
       on = "assetTypeID", how = "left", coalesce = True
).join(
    precursorAssetMap, on = "assetTypeID", how = "inner"
).with_columns(
    pl.concat_str(
        [
            pl.col("precursorGroupID").cast(pl.String), pl.col("trainType").cast(pl.String)
        ], separator = "_"
    ).alias("assetDependentNormalizerID").cast(pl.Categorical(ordering="lexical"))
))
# streaming=True

##Global Parameters

In [0]:
maxLateralDeviation = 15.0 # the maximum lateral deviation in m for any derailment 
lateralDisplacementConstant = 0.013178616 # In m laterally per m along the track
subsequentPointsInitialLateralDisplacement = 3.0 # the initial lateral displacement of trains with a derailment trajectory going through points
speedConversionConstant =  0.44704 # For converting mph to m/s
vehicleKmPerYearPassenger = 2864092079.0 # total vehicle km travelled per year for passenger trains
vehicleKmPerYearFreight = 700935779.8 # total vehicle km travelled per year for freight trains
trainKmPerYearPassenger = 493533456.3 # total train km travelled per year for passenger trains
trainKmPerYearFreight = 111835525.7 # total train km travelled per year for freight trains

##Escalation Probability Calculation Constants
Constants used during the escalation probability calculation step 

In [0]:
#Significant structure collision probability 
probSignStructCollisionP = 0.19 #Given that a significant structure is present
probSignStructCollisionN = 0.0 #Given no significant structure is present 

#Significant structure collapse 
collisionForce = 4000000
timeElapsed = 1 #Reaction time 
vehicleMass = 40000
maxNumberOfVehicles = 4

#Small structure collision probability 
probSmallStructCollisionP = 0.38 #Given that a small structure is present 
probSmallStructCollisionN = 0.0 #Given that a small structure is not present 

#Fall from height probability 
probFallFromHeightP = 1.0 #Given that a fall from height is possible 
probFallFromHeightN = 0.0 #Given that a fall from height is not possible 

#Fall from embankment probability 
probFallFromEmbankmentP = 0.169 #Given that an embankment is present 
probFallFromEmbankmentN = 0.0 #Given that an embankment is not present

#Fall into water probability  
probFallInWaterP = 0.085 #Given that a body of water is present
probFallInWaterN = 0.0 #Given that a body of water is not present

#Fall on side probability 
probFallOnSideP = 1.0 #Given that the derailment type is T7 or T8
constantA = 125
P0 = 0.009 #Probability the train will fall on its side at 0 speed 
P125 = 0.26 #Probability the train will fall on its side at 125mph 
V50 = 80 #The speed in mph where the probability is between P0 and P125
K = 1.5 #The steepness parameter 

#Probability of flammable goods release
probFlamGoodsReleaseP = 0.667 #Given that flam goods are present 
#In the absence of flam goods diesel release is modelled as P(x) = 1/(1+e^(-(constantA + constantB * speed )))
constantB = -4.704 
constantC = 0.0609
probFlamGoodsReleaseN = 0.0 #Given that no flam goods or diesel is present 

#Probability of hazardous goods release 
probHazGoodsReleaseP = 0.667 #Given that haz goods are present 
probHazGoodsReleaseN = 0.0 #Given that haz goods are not present

#Probability of fire
probFireP = 0.375 #Given that flam goods are present 
probFireN = 0.028 #Given that flam goods are not present 

#Calculations


##Derailment Speed
Derailment speed is calculated as a function of:
- derailmentSpeed(m/s) = lineSpeed(mph) * _derailmentSpeedConstant_ * _trainTypeSpeedConstant_ * _conversionConstant_

We need the derailment speed in m/s so that we can calculate the distance the train covers post derailment (_derailmentDistance_) using its develeration which is in m/s^2.

In [0]:
lazyDerailmentSpeed = lazySectionSegments.select(
    pl.exclude("Ref_ID", "nextSectionUp", "NextSectionDown", "inTunnel")
).join(lazyTrainGroup.select(
    "trainGroupID", "serviceSectionID", "trainType"
), on="serviceSectionID", how="inner"
).join(lazyTrainType, 
       on = "trainType", how ="inner"
).join(lazyDerailmentType.select(
    pl.exclude("description", "initialLateralDisplacement")
), how="cross"
).join(
    lazyDirection.select("direction"), 
    how="cross"
).with_columns(
    # These conditionals check the appropriate derailment speed constant to use 
    pl.when(pl.col("useDefaultSpeed")
            ).then(pl.col("lineSpeedMPH") * pl.col("defaultSpeed") * speedConversionConstant
                   ).otherwise(pl.col("lineSpeedMPH") * pl.col("overspeedingSpeed") * speedConversionConstant
                               ).alias("derailment_speed_m/s") 
).with_columns(
    # Creating a uniqueID for each combination
    pl.concat_str(
        ["trainGroupID", 
        "sectionID", 
        "derailmentTypeID", 
        "periodID",
        "direction"], separator = "_"
        ).alias("trainSectionDerailmentPeriodDirectionID").cast(pl.Categorical(ordering="lexical"))
)

# streaming=True 

We materialize lazyDerailmentSpeed so that the ID categorical encoding can be set, allowing for accurate joining in the future

In [0]:
lazyDerailmentSpeed = pl.LazyFrame(lazyDerailmentSpeed.collect(streaming=True))

##Derailment Distance
The derailment distance is calculated as the function of: 
- _derailmentDistance = derailmentSpeed(m/s)^2 / decelerationRate(m/s^2) * 2_


In [0]:
lazyDerailmentDistance = lazyDerailmentSpeed.with_columns(
    # Calculating the derailment Distance
    (
        (pl.col("derailment_speed_m/s")**2) /
        (pl.col("decelerationRate(m/s^2)")*2)
        ).alias("derailmentDistance")
)

# streaming=True

##Segments Covered
The segments covered are calculated by identifying the distance of each segment from the beginning of its respective section. This is used to calculate the distance between two segments like so:
- distanceBetweenSegments = D2 - D1, 

Where: 
- D2: distance of traversed segments from service section start and 
- D1: distance of derailment segment from service section start

We then compare this to the scenario's derailmentDistance, and if the value is greater than it, then the train cannot have reached this segment, and is excluded. 

We also exclude segments that are not 100% traversed (i.e. the train covered less than 100% of their length, typically the final segment). This is done to account for the fact that we do not know where the assets are positioned along the length.

In [0]:
serviceSectionDistances = pl.LazyFrame(
    sectionSegments.join(
        # The direction reverses the order in which the segments appear. This essentially swaps the first segment in the service section
        # and by extention changes the distance of each segment from the service start.
        lazyDirection.collect() # We call .collect() because we need to join two DataFrames
        ,how = "cross"
    ).with_columns(
        # Here we calculate the cumulative distance of each segment from the first segment in the serviceSection
        pl.cum_sum("length(m)"
                   ).over(["serviceSectionID", "direction"], 
        # The key in the order_by parameter enforces a conditional ordering, where if the direction is up the distances start at 0 and increase
        # and if the direction is down they start at the maximum and decrease.
        # The same result could be achieved by creating the DataFrames seperately and stacking them but stacks are computationally more 
        # expensive. 
                                     order_by = pl.when(pl.col("direction") == "up"
                                                        ).then(pl.col("sectionID")
                                                               ).otherwise(pl.col("sectionID").reverse())).alias("derailmentSegmentDistanceFromServiceSectionStart")
        ).select(
            "sectionID", 
            "serviceSectionID",
            "hasS&C", 
            "curvature", 
            "direction",
            "derailmentSegmentDistanceFromServiceSectionStart"
            )
        )

# streaming=False 
# We calculate this table as a DataFrame and then convert it into a LazyFrame because cumulative functions are incompatible with streaming
# This is unlikely to cause issues though as the DataFrames are small. 

# For this LazyFrame we gather all of the fields for the derailment cone calculation
# We join the serviceSectionDistances twice - the first time we join so that each dearilmentSegment has a distanceFromSectionStart value
# and the second time it is an inner join so that all segments within the same service section are joined with one another. 
segmentsCovered = lazyDerailmentDistance.join(
    serviceSectionDistances, 
    how = "inner", on = ["sectionID", "direction"] 
).join(serviceSectionDistances.select(
    # It is important to note that here the sections in the "down" direction are returned the other way around, meaning last first. 
    # This does NOT affect the calculations but makes it a bit less human-readable. 
    pl.col("sectionID").alias("segmentTraversed"), 
    pl.col("direction"),
    pl.col("serviceSectionID"),
    pl.col("hasS&C").alias("segmentTraversedHasS&C"), 
    pl.col("curvature").alias("segmentTraversedCurvature"), 
    pl.col("derailmentSegmentDistanceFromServiceSectionStart").alias("segmentTraversedDistanceFromServiceSectionStart")
    ), how="inner", on= ["serviceSectionID", "direction"]  
).with_columns(
    # By subtracting the location of the derailment segment from the location of the traversed segment we can identify which segments are 
    # actually traversed (a negative value indicates that the traversed segment comes before the derailment segment, and hence cannot be 
    # traversed AFTER the derailment). This excludes all segments that came BEFORE the derailment occured. 
    (pl.col("segmentTraversedDistanceFromServiceSectionStart") - 
     pl.col("derailmentSegmentDistanceFromServiceSectionStart")
     ).alias("distanceBetweenTwoSegments")
).filter(pl.col("distanceBetweenTwoSegments") >= 0
).with_columns(
    # Then by subtracting the distance between the two segments from the derailmentDistance we can identify which segments are "too far" for 
    # the derailed train to reach. This excludes all of the segments that come after the derailed train would have come to a stop. 
    (pl.col("derailmentDistance") - 
     pl.col("distanceBetweenTwoSegments") 
     ).alias("distanceFromEndPoint")
).filter(pl.col("distanceFromEndPoint") >= 0
).select(
    pl.col("trainSectionDerailmentPeriodDirectionID"),
    pl.col("sectionID").alias("derailmentSectionID"),
    pl.col("length(m)"),
    pl.col("direction"),
    pl.col("periodID"), 
    pl.col("trainGroupID"),
    pl.col("derailmentTypeID"),
    pl.col("hasS&C").alias("derailmentSegmentHasS&C"),
    pl.col("curvature").alias("derailmentSegmentCurvature"),
    pl.col("derailment_speed_m/s").alias("derailmentSpeed(m/s)"),
    pl.col("decelerationRate(m/s^2)").alias("decelerationRate(m/s^2)"),
    pl.col("derailmentDistance"),
    pl.col("derailmentSegmentDistanceFromServiceSectionStart"),
    pl.col("segmentTraversed"),
    pl.col("segmentTraversedHasS&C").cast(pl.Int8), #This is important to the next step
    pl.col("segmentTraversedCurvature"),
    pl.col("segmentTraversedDistanceFromServiceSectionStart"),
    pl.col("distanceBetweenTwoSegments"),
    pl.col("distanceFromEndPoint")
)



# streaming=True

##Change in derailment type

In the original algorithm, encountering S&C during the derailment process changed the derailment type into "T1".

This also comes with a number of changes including deceleration rate (which by extention changes the derailmentDistance) and the initial lateral displacement. 

Polars' parallelization is row based - similar to simultaneous computation of batches. This means that we cannot know if a derailed train will meet an S&C down the line unless we compute it before hand. We do this by getting the cumulative max value for our "segmentHasTraversedS&C" field partitioned by the unique scenario ID. This is why rather than a boolean value we used an integer in the previous step. 

In [0]:
# Every time we call a cumulative function we need to pass the key as in the "down" direction the sections appear the wrong way around (last
# first)
traversedPoints = segmentsCovered.with_columns(
    # The flag indicates that from that point forward the train has been through an S&C and hence the derailment type, deceleration rate, and
    # lateral displacement have all changed. The intiger is used as a gate in calculations. 
    pl.col("segmentTraversedHasS&C").cum_max().over(["trainSectionDerailmentPeriodDirectionID", "direction"], 
                                                    order_by = pl.when(pl.col("direction") == "up"
                                                                       ).then(pl.col("segmentTraversed")
                                                                              ).otherwise(pl.col("segmentTraversed").reverse())
                                                                       ).alias("beenThroughS&C")
).with_columns(
        # The distance to the first "point" is a measure that is used in a number of cone calcultions. Here we calculate the distanceSinceS&C
        # as a surrogate measure because performing calculations across rows is strictly speaking impossible. 
        # We use it to infer the distance to the first S&C as we know their relative positions at all times. 
        pl.when(pl.col("beenThroughS&C") == 1
                    ).then(pl.col("length(m)").cum_sum().over(["trainSectionDerailmentPeriodDirectionID", "direction", "beenThroughS&C"], 
                                                              order_by = pl.when(pl.col("direction") == "up"
                                                                       ).then(pl.col("segmentTraversed")
                                                                              ).otherwise(pl.col("segmentTraversed").reverse()))
                           ).otherwise(0.0
                                ).alias("distanceSinceS&C")
).collect()

distanceToFirstPoints = traversedPoints.filter((pl.col("beenThroughS&C") == 1)
    ).group_by("trainSectionDerailmentPeriodDirectionID"
    ).agg(pl.col("distanceBetweenTwoSegments").min().alias("distanceToFirstPoints"))

# streaming = False - the cumulative operations force us out of streaming mode. This table is likely to become very large; we might need to 
# batch this operation in subsequent versions of the algorithm (partition by service section).


# Using the "segmentTraversedHasS&C" flag we can reset the lateral displacement value and the deceleration rate to reflect the change in 
# derailment type. The subsequentPointsInitLatDisplac represents an additional value added during the displacement calculation in the 
# next step. 
lazyUpdatedDerailments = pl.LazyFrame(traversedPoints.with_columns(
    pl.when(
        pl.col("beenThroughS&C") == 1 #Polars doesn't do truthy statements (i.e. in Polars 1 != True)
        ).then(
            subsequentPointsInitialLateralDisplacement
        ).otherwise(0.0
            ).alias("subsequentPointsInitLatDisplac"),
    # The deceleration rate only changes when the train goes through an S&C if its current deceleration rate is lower than T1s decel rate
    pl.when(
        pl.col("beenThroughS&C") == 1
        ).then(
            pl.max_horizontal(derailmentType.filter(pl.col("derailmentTypeID") == "T1").select("decelerationRate(m/s^2)").item(), pl.col("decelerationRate(m/s^2)"))
        ).otherwise(pl.col("decelerationRate(m/s^2)")
            ).alias("secondDecelerationRate(m/s^2)")
).join(distanceToFirstPoints,
    on="trainSectionDerailmentPeriodDirectionID", how="left", coalesce=True
).fill_null(0
).with_columns(
    (np.sqrt((pl.col("derailmentSpeed(m/s)")**2) - (2* pl.col("distanceToFirstPoints") * pl.col("decelerationRate(m/s^2)")))
     ).alias("speedAtFirstPoints")
).with_columns(
    (pl.col("distanceToFirstPoints") + ((pl.col("speedAtFirstPoints")**2)/(2*pl.col("secondDecelerationRate(m/s^2)")))
     ).alias("secondDerailmentDistance")
).filter(pl.col("secondDerailmentDistance") >= pl.col("distanceBetweenTwoSegments"))
)

##Derailment Cone

The derailment cone algorithm is complex, but it basically produces output in the form of **left and right displacement** at each segment traversed during a specific derailment scenario. 

The **vertical direction** (_left or right_) is determined using a combination of the horizontal direction (_up or down_) and the sign of the curvature (_positive or negative_).

The **lateral displacements** are calculated for the _closer_ and _further away_ sides. These are calculated as follows:
- **closerSide** = initialLateralDisplacement + lateralDisplacementDueToSubsequentPoints 
- **furtherAwaySide** = initialLateralDisplacement + lateralDisplacementDueToSubsequentPoints + lateralDisplacementDueToSubsequentCurves

In turn each of these values are subject to their own formulas and conditions. These are explained in more detailed in the body of the algorithm. 

In [0]:
lazyDerailmentCones = lazyUpdatedDerailments.join(
    lazyDerailmentType.select("derailmentTypeID", "initialLateralDisplacement"), 
    on="derailmentTypeID", how ="inner"
).join(
    lazyMitigationScenarios, 
    how="cross"
).with_columns(
    # Subsequent curves only influence the outcome under specific circumstances. Derailments T2 and T7 need curves to happen. Curves also 
    # contribute in other derailments when they aren't 0. This could be computed directly during the calculation but this added step here
    # improves clarity.
    pl.when((pl.col("derailmentTypeID") == "T2") | 
            (pl.col("derailmentTypeID") == "T7") | 
            (pl.col("segmentTraversedCurvature") != 0)
            ).then(True
            ).otherwise(False
                        ).alias("subsequentCurvesApply")
).with_columns( 
    # The effect of points is subject to more complicated conditions. The basic idea is that there are two major conditions that
    # must be fulfilled at the same time: condition A, consisting of three subconditions any of which would satisfy condition A, 
    # and condition B. Condition B is simply the train having traversed S&C points before as given by the flag "segmentTraversedHasS&C"
    # Condition A consists of the following: A1 (derailment type == T4 or T6, A2 (derailment type == T3 and subsequent curves do NOT apply)
    # and A3 (derailment type == T5  and subsequent curves do NOT apply).
    pl.when(
            (
            ((pl.col("derailmentTypeID") == "T4") | 
             (pl.col("derailmentTypeID") == "T6")) |
             ((pl.col("derailmentTypeID") == "T3") & (~pl.col("subsequentCurvesApply"))) |
             ((pl.col("derailmentTypeID") == "T5") & (~pl.col("subsequentCurvesApply")))
             ) & (pl.col("segmentTraversedHasS&C") == 1)
            ).then(True
            ).otherwise(False
                        ).alias("subsequentPointsApply")
).with_columns(
    # Derailment types T1 and T2 need to occur on points and a curve respectively. This condition creates a flag that ensures this is the case
    (pl.when(pl.col("derailmentTypeID") == "T1"
            ).then(pl.when(pl.col("derailmentSegmentHasS&C")
                           ).then(True
                           ).otherwise(False)
            ).otherwise(pl.when(pl.col("derailmentTypeID") == "T2"
                                ).then(pl.when(pl.col("derailmentSegmentCurvature") > 0
                                               ).then(True
                                               ).otherwise(False)
                                ).otherwise(True)
            )
        ).alias("scenarioApplies"), 
    (pl.when(
    # The direction of the curve is used in the lateralisation of the displacement. The curvature is positive if the track is curving to the 
    # left hand side of the travelling direction. The original algorithm used the measure of "worst side" to conceptualize this; the worst side
    # is the side of the direction of the curve, as the train has the greatest potential displacement on that side. This measure is NOT 
    # brought over to the Python algorithm as it is not needed.  
        ((np.sign(pl.col("segmentTraversedCurvature")) == 1) & (pl.col("direction") == "down")) |
        ((np.sign(pl.col("segmentTraversedCurvature")) == -1) & (pl.col("direction") == "up")) |
        ((np.sign(pl.col("segmentTraversedCurvature")) == 0) & (pl.col("direction") == "up"))
            ).then(pl.lit("Right")
            ).otherwise(pl.lit("Left"))
        ).alias("curveDirection")
).with_columns(
    # Checking that encountering subsequent curves affects lateral displacement
    (pl.when(pl.col("subsequentCurvesApply")
    # The absolute value of the calculation is important because of the way that the total lateral displacement is calculated
    # The total lateral displacement is the minimum between the max lateral displacement possible (15m) and the sum of all contributing
    # factors. The curvature values we are working with are very high (1000s). As such a very high negative value would indicate that 
    # displacement would be thousands of meters to the opposite direction of the curve. This doesn't conceptually make sense. 
            ).then(np.abs(pl.col("segmentTraversedCurvature") - 
                   (np.sqrt(np.power(pl.col("segmentTraversedCurvature"), 2) - (np.power(pl.col("segmentTraversedCurvature"), 2) / 4))))
                   ).otherwise(0.0
                               )
                ).alias("lateralDisplacementCurve"), 
    (pl.when(pl.col("subsequentPointsApply")
             ).then(pl.col("beenThroughS&C") * (pl.col("distanceSinceS&C") * lateralDisplacementConstant + pl.col("subsequentPointsInitLatDisplac"))
                    ).otherwise(0.0)
             ).alias("lateralDisplacementPoints")
).with_columns(
    # scenarioApplies is our most important flag - if it is set to False then the scenario is impossible (e.g. T1 derailment not happening
    # on an S&C) and hence the lateral displacement for this scenario should be 0. Subsequent filtering removes all segments belonging
    # to these derailments (except the derailmentSegment) as they do not contribute anything of value. 
    (pl.when(pl.col("scenarioApplies")
            ).then(pl.min_horizontal(
                pl.sum_horizontal("initialLateralDisplacement", "lateralDisplacementCurve", "lateralDisplacementPoints"),
                 maxLateralDeviation)
                   ).otherwise(0.0)
            ).alias("totalLateralDisplacementFurtherAwaySide"),
    (pl.when(pl.col("scenarioApplies")
             ).then(pl.min_horizontal(
                pl.sum_horizontal("initialLateralDisplacement", "lateralDisplacementPoints"),
                 maxLateralDeviation)
                   ).otherwise(0.0)
            ).alias("totalLateralDisplacementCloserSide")
).with_columns(
    (pl.when(pl.col("curveDirection") == "Left"
            ).then(pl.col("totalLateralDisplacementCloserSide")
                   ).otherwise("totalLateralDisplacementFurtherAwaySide")
            ).alias("Total Left Displacement"),
    (pl.when(pl.col("curveDirection") == "Right"
            ).then(pl.col("totalLateralDisplacementCloserSide")
                   ).otherwise("totalLateralDisplacementFurtherAwaySide")
            ).alias("Total Right Displacement")    
).filter(
    (pl.col("scenarioApplies") == True) | 
    ((pl.col("scenarioApplies") == False) & (pl.col("derailmentSectionID") == pl.col("segmentTraversed")))
    # We are filtering out all instances where the scenario does NOT apply, except for instances where the derailment Section ID equals the
    # segment traversed ID. This effectively only keeps the first segment of derailment scenarios that cannot occur, showing that they 
    # have been calculated, and that they do not result in displacement. 
).with_columns(
    pl.concat_str(pl.col("trainSectionDerailmentPeriodDirectionID").cast(pl.String), pl.col("scenarioID").cast(pl.String), separator = "_").alias("coneID").cast(pl.Categorical)
)

#streaming=True

Consolidating the categorical values we will need further on

In [0]:
lazyDerailmentCones = pl.LazyFrame(lazyDerailmentCones.collect(streaming=True))

##Derailment Cone Lengths


In [0]:
lazyDerailmentConeSegments = pl.LazyFrame(lazyDerailmentCones.group_by("coneID").agg(pl.col("segmentTraversed").count().alias("coneLength(Sections)")).collect())

# Here because we have manifested the coneID field once, it has been successfully encoded 
# We can confirm this by assigning two calls of lazyDerailmentConeLengths to two variables and testing to see if they are equal
lazyDerailmentConeLengths = lazyDerailmentCones.join(
  lazyDerailmentConeSegments, 
  on = "coneID", how = "inner"
).select(
  pl.col("coneID"),
  pl.col("trainSectionDerailmentPeriodDirectionID"), 
  pl.col("scenarioID"), 
  pl.col("secondDerailmentDistance").alias("coneLength(m)"),
  pl.col("coneLength(Sections)")
)

# collect(streaming=True)
# streaming = True

##Assets in Cone


In [0]:
lazyAssetInCone = lazyDerailmentCones.join(lazyAssetInSection,
    left_on="segmentTraversed", right_on="sectionID", how = "inner", suffix="Asset"
).with_columns(
    # In lazyAssetInSection the assets are always on the left hand side of the track and hence are described as being either in the "up" or 
    # "down" direction. Here we reconcile this difference. 
    pl.when(pl.col("directionAsset") == pl.col("direction")
            ).then(pl.lit("Left")
            ).otherwise(pl.lit("Right")
                         ).alias("leftOrRight")
).with_columns(
    # Assets further away from the track than the displacement of the train at this specific segment are defined as outside the cone
    (pl.when((pl.col("leftOrRight") == "Left") & 
             ((pl.col("distance_from_section_route")) <= pl.col("Total Left Displacement"))
             ).then(pl.lit(True)
                    ).otherwise(pl.lit(False)) | 
             (pl.when((pl.col("leftOrRight") == "Right") & 
                      (pl.col("distance_from_section_route") <= pl.col("Total Right Displacement"))
                      ).then(pl.lit(True)
                             ).otherwise(pl.lit(False)))
             ).alias("inCone")
  )

# collect(streaming=True)
# streaming = True

##Section In Cone


In [0]:
lazySectionInCone = lazyDerailmentCones.with_columns(
    pl.concat_str(pl.col("coneID").cast(pl.String), pl.col("segmentTraversed").cast(pl.String)).alias("sectionInConeID").cast(pl.Categorical)
  ).select(
      pl.col("sectionInConeID"),
      pl.col("coneID"), 
      pl.col("segmentTraversed"),
      pl.col("Total Left Displacement"), 
      pl.col("Total Right Displacement")       
  )

# collect(streaming=True)
# streaming = True

#Calculated Tables

##Train derailment on a section in a direction by precursor per Period


In [0]:
# This is essentially a conversion of km to section segments. As this is spread out over the network we take the mean section length 
section_length_km = sectionSegments.select("length(m)").mean().item()/1000 

lazyTrainDerailments = lazyDerailmentSpeed.select(
         "trainSectionDerailmentPeriodDirectionID", 
         "sectionID", 
         "direction", 
         "trainGroupID", 
         "periodID", 
         "derailmentTypeID", 
)
# By joining the lazyTrainSectionDirectionPeriod lazyframe to the lazyLevelCrossing lazyframe with a left join we can identify the  
# derailments starting on a LX. This is a prerequisite for certain precursors.
lazyTrainGroupLXSectionDirectionPeriod = lazyTrainDerailments.join(
    lazyTrainGroup.select(
        "trainGroupID", "trainType", "vehicleLength", "frequencyUp", "frequencyDown"), 
    on = "trainGroupID", how = "inner"
).join(lazyLevelCrossings.select(
        "sectionID", pl.col("levelCrossingType").alias("levelCrossingTypePresent")), 
       on = "sectionID", how = "left", coalesce = True
).with_columns(
    (pl.when(pl.col("direction") == "up"
             ).then(pl.col("frequencyUp")
                    ).otherwise(pl.col("frequencyDown"))
             ).alias("frequency")
)

# streaming = True

# The derailment precursors are split into two groups - one for passenger trains and one for freight trains as they are liable to different 
# derailment precursors. Then each derailment scenario is associated with all the possible derailment precursors, and their national frequency
# This means that each derailment scenario has a one-to-many relationship with its possible derailment types, which in turn have a one-to-many
# relationship with their precursors.
lazyPassengerDerailmentPrecursors = lazyTrainGroupLXSectionDirectionPeriod.filter(
    pl.col("trainType") == "Passenger"
).join(lazyPrecursorDerailmentMap.filter(
    # We need to cast to String as the .str methods are not available for Categorical data
    (pl.col("precursorID").cast(pl.String).str.ends_with("_P")) | 
    (pl.col("precursorID").cast(pl.String).str.starts_with("LX_")) 
    ), how = "cross"
).join(lazyPrecursor.select(
        "precursorID", pl.col("nationalFrequency").alias("precursorNationalFrequency")), 
       on = "precursorID", how = "inner"
)

# streaming = True

lazyFreightDerailmentPrecursors = lazyTrainGroupLXSectionDirectionPeriod.filter(
    pl.col("trainType") == "Freight"
).join(lazyPrecursorDerailmentMap.filter(
    (pl.col("precursorID").cast(pl.String).str.ends_with("F")) |
    (pl.col("precursorID").cast(pl.String).str.starts_with("LX_"))), 
       how = "cross"
).join(lazyPrecursor.select(
        "precursorID", pl.col("nationalFrequency").alias("precursorNationalFrequency")), 
       on = "precursorID", how = "inner"
)

# streaming = True

# Depending on the lazyPrecursor the excel is using a different normalization formula. These formulas can be grouped
# and we use the flags in "lazyPrecursor" which indicate what sort of normalization it should undergo.
# Here we calculate all of the normalizations for all the precursors but in the absence of a "TRUE" flag they are set 
# to zero. This is (surprisingly) faster than implementing extensive conditional logic. 
# Some of the expressions evaluate into boolean logic which also serve as gates to ensure that a non-zero value only
# under specific conditions. These expressions are multiplied by 1 prior to multiplying them with the remainder of the 
# expression. This is done to force their conversion into an integer as polars doesn't support boolean multiplication.  
lazyTrainSectionDirectionPeriodPrecursor = pl.concat(
    [lazyPassengerDerailmentPrecursors, lazyFreightDerailmentPrecursors], 
           how="vertical"
    ).join(lazyLevelCrossingNormalizers, left_on= "levelCrossingTypePresent", right_on = "levelCrossingType", how = "left", coalesce = True
    ).join(lazyLevelCrossings, on = "sectionID", how = "left", coalesce = True
    ).join(lazyPeriod, on = "periodID", how = "inner"
    ).join(lazyPrecursor, on = "precursorID", how = "inner"
    ).join(lazySectionSegments.select(["sectionID", "inTunnel", "curvature"]), on = ["sectionID"], how = "inner"
    ).join(lazyAssetInSection.select(["sectionID", "direction", "assetTypeID"]), left_on = ["sectionID", "direction"], right_on = ["sectionID", "direction"], how = "left", coalesce = True
    ).join(lazyAssetDependentNormalizers.select(
        [pl.col("trainType"), 
         pl.col("precursorGroupID"), 
         pl.col("trainKm").alias("nationalAssetKm"), 
         pl.col("assetTypeID").alias("necessaryAssetPresent")]), 
        on = ["precursorGroupID", "trainType"], how = "left", coalesce= True
    ).with_columns(
        ((pl.when(
            pl.col("trainType") == "Passenger"
        )).then(
            pl.max_horizontal(
                (pl.max_horizontal(pl.col("tunnelIndependent") * 1, 
                                  (pl.col("tunnelIndependent") != pl.col("inTunnel")) * 1)) * 
                pl.col("vehicleKmNormalization") * 
                pl.col("proportionOfPrecursorToDerailment") * 
                pl.col("precursorNationalFrequency")/
                vehicleKmPerYearPassenger * 
                section_length_km * 
                pl.col("frequency") * 
                pl.col("lengthHours") * 
                pl.col("numPerYear") * 
                pl.col("vehicleLength"), 
                            0) +

            pl.max_horizontal(
                (pl.max_horizontal(pl.col("tunnelIndependent") * 1, 
                                   (pl.col("tunnelIndependent") != pl.col("inTunnel")) * 1)) * 
                pl.col("trainKmNormalization") * 
                pl.col("proportionOfPrecursorToDerailment") * 
                pl.col("precursorNationalFrequency")/
                trainKmPerYearPassenger * 
                section_length_km * 
                pl.col("frequency") * 
                pl.col("lengthHours") * 
                pl.col("numPerYear"), 
                            0) +

            pl.max_horizontal(
                pl.col("assetDependentNormalization") * 
                ((pl.col("assetTypeID") == pl.col("necessaryAssetPresent")) * 1) * 
                pl.col("proportionOfPrecursorToDerailment") * 
                pl.col("precursorNationalFrequency")/
                pl.col("nationalAssetKm") * 
                section_length_km * 
                pl.col("frequency") * 
                pl.col("lengthHours") * 
                pl.col("numPerYear"), 
                            0) +

            pl.max_horizontal(
                (pl.col("assetDependentSpecialNormalization") * 
                ((pl.col("assetTypeID") == pl.col("necessaryAssetPresent")) * 1) * 
                pl.col("proportionOfPrecursorToDerailment") * 
                pl.col("precursorNationalFrequency")/ 
                pl.col("nationalAssetKm") * 
                pl.col("vehicleLength") * 
                section_length_km * 
                pl.col("frequency") * 
                pl.col("lengthHours") * 
                pl.col("numPerYear")), 
                            0) +

            pl.max_horizontal(
                (pl.col("levelCrossingDependentNormalization") * 
                pl.col("proportionOfPrecursorToDerailment") * 
                ((pl.col("levelCrossingTypePresent") == pl.col("levelCrossingType")) *  1) * 
                pl.col("precursorNationalFrequency")/
                pl.col("totalLevelCrossingDerailmentRisk") * 
                pl.col("levelCrossingDerailmentRisk")), 
                            0) +

            pl.max_horizontal(
                (pl.col("overspeedingNormalization") * 
                pl.col("proportionOfPrecursorToDerailment") * 
                (((np.abs(pl.col("curvature")) < 1000) & (pl.col("curvature") != 0)) * 1) * 
                pl.col("precursorNationalFrequency") / 
                pl.col("nationalAssetKm") * 
                section_length_km * 
                pl.col("frequency") * 
                pl.col("lengthHours") * 
                pl.col("numPerYear")), 
                            0)
        ).otherwise(
            pl.max_horizontal(
                pl.max_horizontal(
                    pl.col("tunnelIndependent") * 1, 
                    (pl.col("tunnelIndependent") != pl.col("inTunnel")) * 1) * 
                pl.col("vehicleKmNormalization") * 
                pl.col("proportionOfPrecursorToDerailment") * 
                (pl.col("precursorNationalFrequency")/
                vehicleKmPerYearFreight * 
                section_length_km * 
                pl.col("frequency") * 
                pl.col("lengthHours") * 
                pl.col("numPerYear")) * 
                pl.col("vehicleLength"), 
                            0) +

            pl.max_horizontal(
                pl.max_horizontal(
                    pl.col("tunnelIndependent") * 1, 
                    (pl.col("tunnelIndependent") != pl.col("inTunnel")) * 1) * 
                pl.col("trainKmNormalization") * 
                pl.col("proportionOfPrecursorToDerailment") * 
                pl.col("precursorNationalFrequency")/
                trainKmPerYearFreight * 
                section_length_km * 
                pl.col("frequency") * 
                pl.col("lengthHours") * 
                pl.col("numPerYear"), 
                            0) +

            pl.max_horizontal(
                pl.col("assetDependentNormalization") * 
                ((pl.col("assetTypeID") == pl.col("necessaryAssetPresent")) * 1) * 
                pl.col("proportionOfPrecursorToDerailment") * 
                pl.col("precursorNationalFrequency")/
                pl.col("nationalAssetKm") * 
                section_length_km * 
                pl.col("frequency") * 
                pl.col("lengthHours") * 
                pl.col("numPerYear"), 
                            0) +

            pl.max_horizontal(
                (pl.col("assetDependentSpecialNormalization") * 
                ((pl.col("assetTypeID") == pl.col("necessaryAssetPresent")) * 1) * 
                 pl.col("proportionOfPrecursorToDerailment") * 
                 pl.col("precursorNationalFrequency")/ 
                 pl.col("nationalAssetKm") * 
                 pl.col("vehicleLength") * 
                 section_length_km * 
                 pl.col("frequency") * 
                 pl.col("lengthHours") * 
                 pl.col("numPerYear")), 
                            0) +

            pl.max_horizontal(
                (pl.col("levelCrossingDependentNormalization") * 
                pl.col("proportionOfPrecursorToDerailment") * 
                ((pl.col("levelCrossingTypePresent") == pl.col("levelCrossingType")) * 1) * 
                pl.col("precursorNationalFrequency")/
                pl.col("totalLevelCrossingDerailmentRisk") * 
                pl.col("levelCrossingDerailmentRisk")), 
                            0) +

            pl.max_horizontal(
                (pl.col("overspeedingNormalization") * 
                pl.col("proportionOfPrecursorToDerailment") * 
                (((np.abs(pl.col("curvature")) < 1000) & (pl.col("curvature") != 0)) * 1) * 
                pl.col("precursorNationalFrequency") / 
                pl.col("nationalAssetKm") * 
                section_length_km * 
                pl.col("frequency") * 
                pl.col("lengthHours") * 
                pl.col("numPerYear")), 
                            0)
        ).alias("derailmentFrequency")
    )
)

#collect(streaming=True)
# streaming = True

# Here our ID column "derailmentScenarioWithPrecursor" doensn't need to be manifested because it is not used for any joins
lazyDerailmentFrequencyTrainSectionDirectionPeriodPrecursor = lazyTrainSectionDirectionPeriodPrecursor.with_columns(
    pl.concat_str(
        ["trainGroupID", "sectionID", "direction", pl.col("periodID").cast(pl.String), "mappingID"], separator = "_"
        ).alias("derailmentScenarioWithPrecursor").cast(pl.Categorical)
    ).select(
        "derailmentScenarioWithPrecursor", 
        "trainInSectionInDirectionID", 
        "mappingID", 
        "derailmentFrequency")

# collect(streaming=True)
# streaming = True

##Train derailment on a section in a direction per period 

In [0]:
# lazyDerailmentFreuqencies is essentially an end DataFrame where we aggregate a result
lazyDerailmentFrequencies = pl.LazyFrame(lazyTrainSectionDirectionPeriodPrecursor.group_by(
    "trainGroupID", "sectionID", "direction","periodID", "derailmentTypeID"
).agg(pl.col("derailmentFrequency").sum()
).collect())

# This is essentially "lazyDerailmentFrequencyTrainSectionDirectionPeriodPrecursor" but with an extra level of aggregation. 
# This table gives the normalized frequency of each derailment scenario
lazyTrainDerailmentSectionDirectionPeriod = lazyTrainDerailments.join(
  lazyDerailmentFrequencies, 
  on = ["sectionID", "trainGroupID", "direction", "periodID", "derailmentTypeID"], how = "inner"
)

# collect(streaming=True)
# streaming = True

##Potential Train Collision 
For each train in a section in a direction finding the possible trains that they could collide with

In [0]:
lazyDerailedTrains = lazyTrainSectionDirectionPeriod.join(
   lazySectionSegments.select(
        [
            "sectionID", "serviceSectionID"
        ]
    ), on = "sectionID", how = "inner"
).join(lazyTrainGroup.select(
    [
        "trainGroupID", "serviceID"
    ]
), on = "trainGroupID", how = "inner"
).select(
    [
        pl.col("trainInSectionInDirectionID").alias("derailedTrainSectionDirectionPeriodID"),
        pl.col("sectionID").alias("derailmentSection"), 
        pl.col("serviceID").alias("derailedServiceID"), 
        pl.col("direction").alias("derailedTrainDirection"),
        pl.col("serviceSectionID").alias("derailedTrainServiceSectionID"), 
        pl.col("frequency").alias("derailedTrainFrequency"), 
        pl.col("trainGroupID").alias("derailedTrainGroupID")
    ]
)

lazyCollidingTrains = lazyTrainSectionDirectionPeriod.join(
   lazySectionSegments.select(
        [
            "sectionID", "serviceSectionID"
        ]
    ), on = "sectionID", how = "inner"
).join(lazyTrainGroup.select(
    [
        "trainGroupID", "serviceID"
    ]
), on = "trainGroupID", how = "inner"
).select(
    [
        pl.col("trainInSectionInDirectionID").alias("collidingTrainSectionDirectionPeriodID"),
        pl.col("sectionID").alias("collisionSection"), 
        pl.col("serviceID").alias("collidingServiceID"), 
        pl.col("direction").alias("collidingTrainDirection"),
        pl.col("serviceSectionID").alias("collidingTrainServiceSectionID"), 
        pl.col("frequency").alias("collidingTrainFrequency"), 
        pl.col("expectedSpeedMPH").alias("collidingTrainSpeedMPH"),  
        pl.col("trainGroupID").alias("collidingTrainGroupID")
    ]
)

lazyPotentialCollisions = lazyDerailedTrains.join(
    lazyCollidingTrains, how = "inner", left_on="derailmentSection", right_on="collisionSection"
    # the consecutive .filter() calls are used because "&" filtering ends streaming 
).filter( 
    (pl.col("derailedTrainDirection") != pl.col("collidingTrainDirection")))

lazyTotalCollisionFrequency = pl.LazyFrame(lazyPotentialCollisions.group_by("derailedTrainSectionDirectionPeriodID").agg(pl.col("collidingTrainFrequency").sum().alias("totalCollidingTrainFrequency")).collect())

lazyPotentialCollisionFrequencies = pl.LazyFrame(lazyPotentialCollisions.join(
    lazyTotalCollisionFrequency, on = "derailedTrainSectionDirectionPeriodID", how = "inner"
    ).with_columns(
        [pl.concat_str(["derailedTrainSectionDirectionPeriodID", 
                        "collidingTrainSectionDirectionPeriodID"], separator = "_").cast(pl.Categorical).alias("potentialCollisionID"), 
         (pl.col("collidingTrainFrequency")/pl.col("totalCollidingTrainFrequency")).alias("probCollisionWithSpecificTrain")]
    ).select(
            ["potentialCollisionID", 
             "derailedTrainSectionDirectionPeriodID", 
             "collidingTrainSectionDirectionPeriodID",
             "derailedTrainGroupID", 
             "collidingTrainGroupID",  
             "collidingTrainFrequency", 
             "totalCollidingTrainFrequency", 
             "probCollisionWithSpecificTrain", 
             "collidingTrainSpeedMPH"]
    ).collect(streaming=True)
)
# collect(streaming=True)
# streaming = True

##Secondary Collision Probability 


In [0]:
#Creating a numpy array using the columns necessary to calculate the secondary collision probability 
array = lazyPotentialCollisionFrequencies.select(
  pl.col("potentialCollisionID"),
  pl.col("collidingTrainSpeedMPH"), 
  pl.lit(9).alias("dummy_braking_rate"), 
  pl.col("collidingTrainFrequency")
  ).collect().to_numpy()

#We transform the data so that they can be used to make predictions using the model  
transformation = (array[:,0], poly.fit_transform(array[:,1:]))
coefficients = model.coef_

lazyTransformedArray = pl.LazyFrame(
  data = {
    "potential_collision_ID_str" : transformation[0],
    "transformed_values" : transformation[1]
  }
)

lazyCoefficients = pl.LazyFrame(
  data = {
    "coefficients" : coefficients 
  }
)

lazyTransformedArrayWithCoeff = lazyTransformedArray.join(lazyCoefficients, how = "cross")

lazySecondaryCollisions = lazyPotentialCollisionFrequencies.join(
  lazyTransformedArrayWithCoeff, left_on = pl.col("potentialCollisionID").cast(pl.String), right_on = "potential_collision_ID_str", how = "inner"
  ).with_columns(
    (pl.col("transformed_values") * pl.col("coefficients")).alias("values_times_coefficients")
  ).with_columns(
    (pl.col("values_times_coefficients").arr.sum()).alias("probability")
  ).select(
    "potentialCollisionID",
    "derailedTrainSectionDirectionPeriodID",
    "collidingTrainSectionDirectionPeriodID",
    "derailedTrainGroupID", 
    "collidingTrainGroupID",
    "collidingTrainSpeedMPH",
    "probability"
  )

# streaming = True

##Asset type in Cone 


In [0]:
assetTypeInCone = lazyAssetInCone.filter(
    pl.col("inCone") == True
).group_by(["coneID", "assetTypeID"]
).agg(pl.col("segmentTraversed").count().alias("segmentsWithAssetInCone")).collect()

# collect(streaming=True)
# streaming = False

##Derailment Cone with potential Collision 


In [0]:
lazyDerailmentConePotentialCollision = lazyDerailmentCones.filter(pl.col("derailmentSectionID") == pl.col("segmentTraversed")
).select(
    "coneID","derailmentTypeID", "derailmentSectionID", "derailmentSpeed(m/s)", "direction", "periodID", "trainGroupID", "trainSectionDerailmentPeriodDirectionID"
).join(lazyTrainSectionDirectionPeriod, 
left_on = ["derailmentSectionID", "direction", "periodID", "trainGroupID"], right_on = ["sectionID", "direction", "periodID", "trainGroupID"], how = "inner"
).join(
    lazyPotentialCollisionFrequencies, 
left_on = "trainInSectionInDirectionID", right_on = "derailedTrainSectionDirectionPeriodID", how = "inner"
).with_columns(
    pl.concat_str(pl.col("coneID"), pl.col("potentialCollisionID"), separator = "_").alias("derailmentConePotentialCollisionID").cast(pl.Categorical)
).select(
    pl.col("derailmentConePotentialCollisionID"),
    pl.col("trainSectionDerailmentPeriodDirectionID"),
    pl.col("derailmentTypeID"),
    (pl.col("derailmentSpeed(m/s)")/speedConversionConstant).alias("derailmentSpeedMPH"),
    pl.col("derailmentSectionID").alias("sectionID"),
    pl.col("coneID"), 
    pl.col("potentialCollisionID")
)
# collect(streaming=True) (26/09/2024)
# streaming = True (26/09/2024)

In [0]:
lazyDerailmentConePotentialCollision = pl.LazyFrame(lazyDerailmentConePotentialCollision.collect(streaming=True))

##Derailment Cone Cutset with Potential Collision



###Escalation Cutsets


###Asset types encountered 
This LazyFrame contains two boolean flags, one for significant structures and one for small structures, as well as a list field that contains all the stucture IDs that are found **_WITHIN_** the derailment cone for each derailment 

In [0]:
#Constructing the lazy_asset_types_encountered LazyFrame which is used to run the calculations natively on the API
#We at first create columns "significant_structure_present" and "small_structure_present" that have a "1" and "0" in them depending on 
#the presence of a significant structure/ small structure in the section of the cone 
#Then we aggregate by derailment cone. 
lazyAssetTypesEncountered = pl.LazyFrame(assetTypeInCone.with_columns(
    pl.when(pl.col("assetTypeID").is_in(assetType.filter(pl.col("significant_structure") == True).select("assetTypeID").to_series())
            ).then(True
                   ).otherwise(False
    ).alias("significant_structure_present"), 
    pl.when(pl.col("assetTypeID").is_in(assetType.filter(pl.col("small_structure") == True).select("assetTypeID").to_series())
            ).then(True
                   ).otherwise(False
    ).alias("small_structure_present")
    ).group_by("coneID").agg(
        pl.col("assetTypeID").alias("asset_type_IDs_encountered"), 
        pl.col("significant_structure_present").max(), 
        pl.col("small_structure_present").max())
)

# collect(streaming=True) (26/09/2024)
# streaming = True (26/09/2024)

In [0]:
#Converting previous DataFrames into LazyFrames so that they can be used to run calculations natively
#lazy_all_derailment_cones is used to calculate the derailment speed for each derailment and lazy_potential_train_collision...
#is used to extract data regarding the possible train collisions. 
lazyConeID = lazyDerailmentCones.filter(pl.col("derailmentSectionID") == pl.col("segmentTraversed")
).select(["coneID", "trainSectionDerailmentPeriodDirectionID"])

# collect(streaming=True)
# streaming=True

lazyDerailmentswithConeID = lazyDerailmentConeLengths.join(lazyConeID, on = "coneID", how = "inner")

# collect(streaming=True) (26/09/2024)
# streaming=True (26/09/2024)

In [0]:
#Finally we divide the value derived by the function by the conversion rate of mph_to_m/s as calculating the probability of a secondary 
#collision requires the value in mph. 
lazyDerailmentConesAndSpeeds = lazyDerailmentCones.filter(pl.col("derailmentSectionID") == pl.col("segmentTraversed")
).select(
    "coneID", "trainSectionDerailmentPeriodDirectionID"
).join(
    lazyDerailmentSpeed,
    on = "trainSectionDerailmentPeriodDirectionID", how = "inner"
).with_columns(
    (pl.col("derailment_speed_m/s")/ speedConversionConstant).alias("derailmentSpeedMPH")
).select(
    "coneID", "trainSectionDerailmentPeriodDirectionID", "derailmentSpeedMPH", "derailmentTypeID", "sectionID"
)

# collect(streaming=True) (26/09/2024)
# streaming=True (26/09/2024)

###Calculation of escalation probabilities
The probabilities of individual escalations depend entirely on the context of the derailment and subsequent collision (i.e. train type, derailment type etc). The gates only determine which escalations are applicable in each scenario. 
By calculating the probability for each escalation before involving the cutsets we reduce the memory burden of the algorithm substantially

In [0]:
#Joining all the LazyFrames created prior to the calculation step
#Note that two seperate fill_null statements were necessary. This is because the "asset_type_IDs_encountered" field contains a Struct
#object which is necessary for calculations. 
#Therefore we first fill null values (i.e. cones that do not have assets within them) with [0] as 0 is not a valid asset type ID,
#and then we fill the significant_structure/small_structure_present fields with False to indicate they do not encounter any such assets. 
#This works because Polars only fills out fields of the same data type - it infers [0] as a Struct, and False as a Boolean.

escalationProbabilities = lazyDerailmentConePotentialCollision.join(lazyAssetTypesEncountered, on = "coneID", how = "left", coalesce = True 
).join(lazySecondaryCollisions, on = "potentialCollisionID", how = "inner"
).join(lazySectionSegments.select("sectionID", "lineSpeedMPH"), on="sectionID", how="inner"
).join(lazyEmbankmentHeights, on = "sectionID", how = "inner"
).join(lazyTrainGroup.select(
        [pl.col("trainGroupID").alias("derailedTrainGroupID"), 
         pl.col("trainType").alias("derailedTrainType"),
         pl.col("power").cast(pl.Categorical).alias("derailed_train_power"), 
         pl.col("carryingHazGoods").alias("derailedTrainHazGoods"), 
         pl.col("carryingFlamGoods").alias("derailedTrainFlamGoods"), 
         pl.col("crashworthiness").alias("derailedTrainCrashworthiness"),
         pl.col("loading").alias("derailedTrainLoading"),
         pl.col("vehicleLength").alias("derailedTrainLength")]), on = "derailedTrainGroupID", how = "inner"
).join(lazyTrainGroup.select(
        [pl.col("trainGroupID").alias("collidingTrainGroupID"), 
         pl.col("trainType").alias("collidingTrainType"),
         pl.col("power").cast(pl.Categorical).alias("colliding_train_power"), 
         pl.col("crashworthiness").alias("collidingTrainCrashworthiness"),
         pl.col("carryingHazGoods").alias("collidingTrainHasGoods"), 
         pl.col("loading").alias("collidingTrainLoading"),
         pl.col("carryingFlamGoods").alias("collidingTrainFlamGoods")]), on = "collidingTrainGroupID", how = "inner"
).fill_null([0]
).fill_null(False
).with_columns(
  (pl.when(pl.col("significant_structure_present") 
           ).then(probSignStructCollisionP
                  ).otherwise(probSignStructCollisionN)
    ).cast(pl.Float32
          ).alias("significant_structure_collision_Y"),
    
  ((pl.when(pl.col("significant_structure_present")
            ).then(probSignStructCollisionN
                   ).otherwise(
      1 - ((collisionForce * timeElapsed)**2)/(pl.min_horizontal(maxNumberOfVehicles, pl.col("derailedTrainLength")) * vehicleMass * pl.col("derailmentSpeedMPH") * (speedConversionConstant))**2))
   ).cast(pl.Float32
          ).alias("significant_structure_collapse_Y"),

  (pl.when(pl.col("small_structure_present")
           ).then(probSmallStructCollisionP
                  ).otherwise(probSmallStructCollisionN)
    ).cast(pl.Float32
           ).alias("small_structure_collision_Y"),

  (pl.when(
    (pl.col("asset_type_IDs_encountered").list.contains("viaduct")) | 
    (pl.col("asset_type_IDs_encountered").list.contains("underbridge")) | 
    (pl.col("asset_type_IDs_encountered").list.contains("retaining_walls_land"))
      ).then(probFallFromHeightP
             ).otherwise(probFallFromHeightN)
    ).cast(pl.Float32
           ).alias("fall_from_height_Y"),

  (pl.when(pl.col("asset_type_IDs_encountered").list.contains("embankment")
           ).then(probFallFromEmbankmentP 
                  ).otherwise(probFallFromEmbankmentN)
    ).cast(pl.Float32
           ).alias("fall_from_embankment_Y"),

  (pl.when(pl.col("asset_type_IDs_encountered").list.contains("embankment")
           ).then(probFallInWaterP
                  ).otherwise(probFallInWaterN)
    ).cast(pl.Float32
           ).alias("fall_into_water_Y"),

  (pl.when((pl.col("derailmentTypeID") == "T7") | 
           (pl.col("derailmentTypeID") == "T8")
           ).then(probFallOnSideP
                  ).otherwise(P0 + ((P125 - P0)/( 1 + np.power((np.abs(np.power((pl.col("lineSpeedMPH")/constantA),((np.log(2))/(np.log(V50/constantA)))) - 1)), K))))
    ).cast(pl.Float32
           ).alias("fall_on_side_Y"),
  
  (pl.when(pl.col("probability") < 0
           ).then(0.001
                  ).otherwise(pl.col("probability"))
             ).cast(pl.Float32
                     ).alias("secondary_collision_Y"),

  (((pl.when(pl.col("derailedTrainFlamGoods")
             ).then(probFlamGoodsReleaseP
                    ).otherwise(
                      pl.when(pl.col("derailed_train_power") == "Diesel"
                              ).then(1/(1+(np.exp(-1*(constantB + constantC * pl.col("lineSpeedMPH")))))
                                     ).otherwise(probFlamGoodsReleaseN)) + 
   pl.when(pl.col("collidingTrainFlamGoods")
           ).then(probFlamGoodsReleaseP 
                  ).otherwise(
                    pl.when(pl.col("colliding_train_power") == "Diesel"
                            ).then(1/(1+(np.exp(-1*(constantB + constantC * pl.col("collidingTrainSpeedMPH")))))
                                   ).otherwise(probFlamGoodsReleaseN))) - 
   (pl.when(pl.col("derailedTrainFlamGoods")
            ).then(probFlamGoodsReleaseP 
                   ).otherwise(pl.when(pl.col("derailed_train_power") == "Diesel"
                                       ).then(1/(1+(np.exp(-1*(constantB + constantC * pl.col("lineSpeedMPH")))))
                                              ).otherwise(probFlamGoodsReleaseN)) * 
   pl.when(pl.col("collidingTrainFlamGoods")
           ).then(probFlamGoodsReleaseP 
              ).otherwise(pl.when(pl.col("colliding_train_power") == "Diesel"
                     ).then(1/(1+(np.exp(-1*(constantB + constantC * pl.col("collidingTrainSpeedMPH")))))
                            ).otherwise(probFlamGoodsReleaseN))))
   ).cast(pl.Float32
           ).alias("flam_goods_release(collision)_Y"),
  
  (pl.when(pl.col("derailedTrainFlamGoods")
           ).then(probFlamGoodsReleaseP
                  ).otherwise(
                    pl.when(pl.col("derailed_train_power") == "Diesel"
                            ).then(1/(1+(np.exp(-1*(constantB + constantC * pl.col("lineSpeedMPH")))))
                                   ).otherwise(probFlamGoodsReleaseN))
    ).cast(pl.Float32
           ).alias("flam_goods_release(no_collision)_Y"),

  (((pl.when(pl.col("derailedTrainHazGoods")
             ).then(probHazGoodsReleaseP
                    ).otherwise(probHazGoodsReleaseN)) + 
    (pl.when(pl.col("collidingTrainHasGoods")
             ).then(probHazGoodsReleaseP
                    ).otherwise(probHazGoodsReleaseN))) - 
   ((pl.when(pl.col("derailedTrainHazGoods")
             ).then(probHazGoodsReleaseP
                    ).otherwise(probHazGoodsReleaseN)) * 
    (pl.when(pl.col("collidingTrainHasGoods")
             ).then(probHazGoodsReleaseP
                    ).otherwise(probHazGoodsReleaseN)))
   ).cast(pl.Float32
           ).alias("haz_goods_release(collision)_Y"),

  ((pl.when(pl.col("derailedTrainHazGoods")
            ).then(probHazGoodsReleaseP
                   ).otherwise(probHazGoodsReleaseN))
   ).cast(pl.Float32
           ).alias("haz_goods_release(no_collision)_Y"),

  (pl.lit(probFireP)).cast(pl.Float32
           ).alias("fire(flam_goods)_Y"),
  
  (pl.lit(probFireN)).cast(pl.Float32
           ).alias("fire(no_flam_goods)_Y")
  )
  
# collecting(streaming=True) (30/09/2024)
# streaming=True (26/09/2024)

###Applying the escalation gates by scenario
The escalation gates can be calculated using only the probability of the escalation occuring and calculating the probability of the escalation not occuring on the spot. The saves us having to call each function twice and saves us memory space by needing only one probability column. 

In [0]:
#We start by excluding all fields that were used in the calculation as they are no longer needed. 
# There is somewhere a square root with a negative value
cutsetProbabilities = escalationProbabilities.join(
    lazyCutsetMatrixC, how = "cross"
).with_columns(
   ( pl.when(
        (pl.col("1Y") * pl.col("significant_structure_collision_Y")) == 0
        ).then(1 - pl.col("1Y").cast(pl.Int8)
               ).otherwise(pl.col("1Y") * pl.col("significant_structure_collision_Y")
                           )).alias("significant_structure_collision_Y"),
               
    (pl.when(
        (pl.col("1N") * (1 - pl.col("significant_structure_collision_Y"))) == 0
        ).then(1 - pl.col("1N").cast(pl.Int8)
                ).otherwise(pl.col("1N") * (1 - pl.col("significant_structure_collision_Y"))
                            )).alias("significant_structure_collision_N"),
            
    (pl.when(
        (pl.col("2Y") * pl.col("significant_structure_collapse_Y")) == 0
            ).then(1 - pl.col("2Y").cast(pl.Int8)
                   ).otherwise(pl.col("2Y") * pl.col("significant_structure_collapse_Y")
                               )).alias("significant_structure_collapse_Y"),
                   
   (pl.when(
        (pl.col("2N") * (1 - pl.col("significant_structure_collapse_Y"))) == 0
            ).then(1 - pl.col("2N").cast(pl.Int8)
                   ).otherwise(pl.col("2N") * (1 - pl.col("significant_structure_collapse_Y"))
                               )).alias("significant_structure_collapse_N"),
   
    (pl.when(
        (pl.col("3Y") * pl.col("small_structure_collision_Y")) == 0
        ).then(1 - pl.col("3Y").cast(pl.Int8)
               ).otherwise(pl.col("3Y") * pl.col("small_structure_collision_Y")
                           )).alias("small_structure_collision_Y"),
    
    (pl.when(
        (pl.col("3N") * (1 - pl.col("small_structure_collision_Y"))) == 0
        ).then(1 - pl.col("3N").cast(pl.Int8)
               ).otherwise(pl.col("3N") * (1 - pl.col("small_structure_collision_Y"))
                           )).alias("small_structure_collision_N"),
    
    (pl.when(
        (pl.col("4Y") * pl.col("fall_from_height_Y")) == 0
        ).then(1 - pl.col("4Y").cast(pl.Int8)
               ).otherwise(pl.col("4Y") * pl.col("fall_from_height_Y")
                           )).alias("fall_from_height_Y"),
    
    (pl.when(
        (pl.col("4N") * (1 - pl.col("fall_from_height_Y"))) == 0
        ).then(1 - pl.col("4N").cast(pl.Int8)
               ).otherwise(pl.col("4N") * (1 - pl.col("fall_from_height_Y"))
                           )).alias("fall_from_height_N"),
    
    (pl.when(
        (pl.col("5Y") * pl.col("fall_from_embankment_Y")) == 0
        ).then(1 - pl.col("5Y").cast(pl.Int8)
               ).otherwise((pl.col("5Y") * pl.col("fall_from_embankment_Y"))
                           )).alias("fall_from_embankment_Y"),
    
    (pl.when(
        (pl.col("5N") * (1 - pl.col("fall_from_embankment_Y"))) == 0 
        ).then(1 - pl.col("5N").cast(pl.Int8)
               ).otherwise(pl.col("5N") * (1 - pl.col("fall_from_embankment_Y"))
                           )).alias("fall_from_embankment_N"),
    
    (pl.when(
        (pl.col("6Y") * pl.col("fall_into_water_Y")) == 0
        ).then(1 - pl.col("6Y").cast(pl.Int8)
               ).otherwise(pl.col("6Y") * pl.col("fall_into_water_Y")
                           )).alias("fall_into_water_Y"),
    
    (pl.when(
        (pl.col("6N") * (1 - pl.col("fall_into_water_Y"))) == 0
        ).then(1 - pl.col("6N").cast(pl.Int8)
               ).otherwise(pl.col("6N") * (1 - pl.col("fall_into_water_Y"))
                           )).alias("fall_into_water_N"),
    
    (pl.when(
        (pl.col("7Y") * pl.col("fall_on_side_Y")) == 0
        ).then(1 - pl.col("7Y").cast(pl.Int8)
               ).otherwise(pl.col("7Y") * pl.col("fall_on_side_Y")
                           )).alias("fall_on_side_Y"),
    
    (pl.when(
        (pl.col("7N") * (1 - pl.col("fall_on_side_Y"))) == 0
        ).then(1 - pl.col("7N").cast(pl.Int8)
               ).otherwise(pl.col("7N") * (1 - pl.col("fall_on_side_Y"))
                           )).alias("fall_on_side_N"),
  
    (pl.when(
        (pl.col("8Y") * pl.col("secondary_collision_Y")) == 0
        ).then(1 - pl.col("8Y").cast(pl.Int8)
               ).otherwise(pl.col("8Y") * pl.col("secondary_collision_Y")
                           )).alias("secondary_collision_Y"),
    
    (pl.when(
        (pl.col("8N") * (1 - pl.col("secondary_collision_Y"))) == 0
        ).then(1 - pl.col("8N").cast(pl.Int8)
               ).otherwise(pl.col("8N") * (1 - pl.col("secondary_collision_Y"))
                           )).alias("secondary_collision_N"),
    
    (pl.when(
        (pl.col("9Y") * pl.col("flam_goods_release(collision)_Y")) == 0
        ).then(1 - pl.col("9Y").cast(pl.Int8)
               ).otherwise(pl.col("9Y") * pl.col("flam_goods_release(collision)_Y")
                           )).alias("flam_goods_release(collision)_Y"),
    
    (pl.when(
        (pl.col("9N") * (1 - pl.col("flam_goods_release(collision)_Y"))) == 0
        ).then(1 - pl.col("9N").cast(pl.Int8)
               ).otherwise(pl.col("9N") * (1 - pl.col("flam_goods_release(collision)_Y"))
                           )).alias("flam_goods_release(collision)_N"),
    
    (pl.when(
        (pl.col("10Y") * pl.col("flam_goods_release(no_collision)_Y")) == 0
        ).then(1 - pl.col("10Y").cast(pl.Int8)
               ).otherwise(pl.col("10Y") * pl.col("flam_goods_release(no_collision)_Y")
                           )).alias("flam_goods_release(no_collision)_Y"),
    
    (pl.when(
        (pl.col("10N") * (1 - pl.col("flam_goods_release(no_collision)_Y"))) == 0
        ).then(1 - pl.col("10N").cast(pl.Int8)
               ).otherwise(pl.col("10N") * (1 - pl.col("flam_goods_release(no_collision)_Y"))
                           )).alias("flam_goods_release(no_collision)_N"),
    
    (pl.when(
        (pl.col("11Y") * pl.col("haz_goods_release(collision)_Y")) == 0
        ).then(1 - pl.col("11Y").cast(pl.Int8)
               ).otherwise(pl.col("11Y") * pl.col("haz_goods_release(collision)_Y")
                           )).alias("haz_goods_release(collision)_Y"),
    
    (pl.when(
        (pl.col("11N") * (1 - pl.col("haz_goods_release(collision)_Y"))) == 0
        ).then(1 - pl.col("11N").cast(pl.Int8)
               ).otherwise(pl.col("11N") * (1 - pl.col("haz_goods_release(collision)_Y"))
                           )).alias("haz_goods_release(collision)_N"),
    
    (pl.when(
        (pl.col("12Y") * pl.col("haz_goods_release(no_collision)_Y")) == 0
        ).then(1 - pl.col("12Y").cast(pl.Int8)
               ).otherwise(pl.col("12Y") * pl.col("haz_goods_release(no_collision)_Y")
                           )).alias("haz_goods_release(no_collision)_Y"),
    
    (pl.when(
        (pl.col("12N") * (1 - pl.col("haz_goods_release(no_collision)_Y"))) == 0
        ).then(1 - pl.col("12N").cast(pl.Int8)
               ).otherwise(pl.col("12N") * (1 - pl.col("haz_goods_release(no_collision)_Y"))
                           )).alias("haz_goods_release(no_collision)_N"),
    
    (pl.when(
        (pl.col("13Y") * pl.col("fire(flam_goods)_Y")) == 0
        ).then(1 - pl.col("13Y").cast(pl.Int8)
               ).otherwise(pl.col("13Y") * pl.col("fire(flam_goods)_Y")
                           )).alias("fire(flam_goods)_Y"),
    
    (pl.when(
        (pl.col("13N") * (1 - pl.col("fire(flam_goods)_Y"))) == 0
        ).then(1 - pl.col("13N").cast(pl.Int8)
               ).otherwise(pl.col("13N") * (1 - pl.col("fire(flam_goods)_Y"))
                           )).alias("fire(flam_goods)_N"),
    
    (pl.when(
        (pl.col("14Y") * pl.col("fire(no_flam_goods)_Y")) == 0
        ).then(1 - pl.col("14Y").cast(pl.Int8)
               ).otherwise(pl.col("14Y") * pl.col("fire(no_flam_goods)_Y")
                           )).alias("fire(no_flam_goods)_Y"),
    
    (pl.when(
        (pl.col("14N") * (1 - pl.col("fire(no_flam_goods)_Y"))) == 0
        ).then(1 - pl.col("14N").cast(pl.Int8)
               ).otherwise(pl.col("14N") * (1 - pl.col("fire(no_flam_goods)_Y"))
                           )).alias("fire(no_flam_goods)_N")
).with_columns((pl.col("significant_structure_collision_Y") * pl.col("significant_structure_collision_N") * pl.col("significant_structure_collapse_Y") * pl.col("significant_structure_collapse_N") * pl.col("small_structure_collision_Y") * pl.col("small_structure_collision_N") * pl.col("fall_from_height_Y") * pl.col("fall_from_height_N") * pl.col("fall_from_embankment_Y") * pl.col("fall_from_embankment_N") * pl.col("fall_into_water_Y") * pl.col("fall_into_water_N") * pl.col("fall_on_side_Y") * pl.col("fall_on_side_N") * pl.col("secondary_collision_Y") * pl.col("secondary_collision_N") * pl.col("flam_goods_release(collision)_Y") * pl.col("flam_goods_release(collision)_N") * pl.col("flam_goods_release(no_collision)_Y") * pl.col("flam_goods_release(no_collision)_N") * pl.col("haz_goods_release(collision)_Y") * pl.col("haz_goods_release(collision)_N") * pl.col("haz_goods_release(no_collision)_Y") * pl.col("haz_goods_release(no_collision)_N") * pl.col("fire(flam_goods)_Y") * pl.col("fire(flam_goods)_N") * pl.col("fire(no_flam_goods)_Y") * pl.col("fire(no_flam_goods)_N")).alias("cutset_probability")
).filter(pl.col("cutset_probability") > 0.0
).select("derailmentConePotentialCollisionID", 
         "trainSectionDerailmentPeriodDirectionID",
         "coneID",
         "sectionID", 
         "potentialCollisionID", 
         "cutsetID", 
         "derailmentSpeedMPH", 
         "cutset_probability",
         "collidingTrainSpeedMPH",
         "derailedTrainType",
         "derailed_train_power",
         "derailedTrainLoading",
         "collidingTrainType",
         "colliding_train_power",
         "collidingTrainLoading", 
         "derailedTrainCrashworthiness",
         "collidingTrainCrashworthiness", 
         "derailedTrainGroupID",
         "collidingTrainGroupID", 
         "embankmentHeight(m)",
         "bridgeViaductHeight",
         pl.col("1Y").alias("significantStructureCollision"),
         pl.col("2Y").alias("significantStructureCollapse"),
         pl.col("3Y").alias("smallStructureCollision"),
         pl.col("4Y").alias("vehicleFallFromHeight"),
         pl.col("5Y").alias("vehicleFallFromEmbankment"),
         pl.col("6Y").alias("vehicleFallIntoWater"),
         pl.col("7Y").alias("vehicleFallOnSide"),
         pl.col("8Y").alias("secondaryCollision"),
         pl.col("9Y").alias("toxicGoodRelease"),
         pl.col("10Y").alias("toxicGoodRelease(noCollision)"),
         pl.col("11Y").alias("flamGoodRelease"),
         pl.col("12Y").alias("flamGoodRelease(noCollision)"),
         pl.col("13Y").alias("fire(flamGoods)"),
         pl.col("14Y").alias("fire(noFlamGoods)")
         )

##Cutset Materialization
This step has two aims: 
1) Reduces run time for everything downstream 
2) Stabilizes the lazyframe - because secondary collision is calculated using a regression model, we get variation in our zero values every time we call the dataframe 

In [0]:
cutsetProbabilities = pl.LazyFrame(cutsetProbabilities.collect(streaming=True))

#Injury Atoms
Injury atoms are calculated in different ways depending on the group that they involve and the IA itself. They can however be grouped by some similarities:
- **_Group 1_**: **derailmentIA** (passenger and workforce), **structureCollisionIA** (passenger and workforce), **structureCollapseIA** (passenger and workforce), and **fallOnSideIA** (passenger and workforce) are all calculated using the exact same methodology but with different constants
- **smallStructureCollisionIA** calculated as structureCollisionIA * smallStructureConstant
- **_Group 2_**:**secondaryCollision** (passenger and workforce for each train) and **Fire** (passenger and workforce for each train) are calculated using a lookup table 
- **_Group 3_**: **Haz** and **Flam** goods are set values 
- **_Group 4_**: **bridgeFall** and **embankmentFall** are calculated in the same way using the slightly modified function of Group 1 and simply using different heights

##Constants
A number of constants are used in the calculations of the injury atoms. These are initialized here and follow the naming convention of 
"escalationGroup". The fields are "Value", followed by the group specific injury ranks.
"Value" represents the type of probability, with the corresponding value indicating how likely this injury is to occur. 

###Constants for injury Atom calculations
To calculate the injury atom of some escalations unique constants are used. These are compiled here and retrieved during calculation, so that if any changes to the constants are necessary they can be easily accessed and changed. 

In [0]:
derPassenger = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "P0Constant", "P125Constant", "smallCollisionModifier"]),
        "Fatal": pl.Series([0.0, 0.0000142857139664037, 0.0000142857139664037, 20.0, 4.0, 0.9, 1.02, 1.0]),
        "SevHosp": pl.Series([0.0, 0.00141428572790963, 0.00141428572790963, 20.0, 4.0, 0.9, 1.02, 1.0]),
        "NonSevere": pl.Series([0.00142857142857143, 0.00285714285714286, 0.00285714285714286, 20.0, 4.0, 0.9, 1.02, 1.0]),
        "Shock": pl.Series([0.0, 0.0, 0.0, 20.0, 4.0, 0.9, 1.02, 1.0])
    }
)
derPassenger.name = "derPassenger"

derWorkforce = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "P0Constant", "P125Constant", "smallCollisionModifier"]),
        "Fatal": pl.Series([0.0, 0.0, 0.0, 20.0, 1.0, 1.0, 1.0, 1.0]),
        "Specified": pl.Series([0.0, 0.0, 0.0, 20.0, 1.0, 1.0, 1.0, 1.0]),
        "Sev7": pl.Series([0.01, 0.1, 0.45, 60.0, 2.5, 0.9, 1.05, 1.0]),
        "NonSevere": pl.Series([0.09, 0.2, 0.05, 45.0, 2.0, 0.9, 1.05, 1.0]),
        "Shock7": pl.Series([0.0, 0.0, 0.05, 45.0, 2.0, 0.9, 1.05, 1.0]),
        "Shock": pl.Series([0.0, 0.134297520661157, 0.1, 40.0, 2.0, 0.9, 1.035, 1.0])
    }
)
derWorkforce.name = "derWorkforce"

structureCollisionPassenger = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "P0Constant", "P125Constant", "smallCollisionModifier"]),
        "Fatal": pl.Series([0.000205780167210667, 0.000718928675219401, 0.00161451408055514, 55.0, 1.75, 0.8, 1.1, 1.0]),
        "SevHosp": pl.Series([0.00185202142823692, 0.00518464350161897, 0.0132449118160091, 60.0, 1.75, 0.8, 1.1, 1.0]),
        "NonSevere": pl.Series([0.00142857142857143, 0.00571428571428572, 0.00571428571428572, 45.0, 1.75, 0.8, 1.05, 1.0]),
        "Shock": pl.Series([0.0, 0.0, 0.0, 45.0, 1.75, 0.8, 1.05, 1.0])
    }
)
structureCollisionPassenger.name = "structureCollisionPassenger"

structureCollisionWorkforce = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "P0Constant", "P125Constant", "smallCollisionModifier"]),
        "Fatal": pl.Series([0.0, 0.150000005960465, 0.150000005960465, 25.0, 4.0, 0.8, 1.02, 1.0]),
        "Specified": pl.Series([0.100000001490116, 0.100000001490116, 0.100000001490116, 25.0, 4.0, 0.95, 1.02, 1.0]),
        "Sev7": pl.Series([0.100000001490116, 0.100000001490116, 0.100000001490116, 25.0, 4.0, 0.95, 1.02, 1.0]),
        "NonSevere": pl.Series([0.0, 0.0, 0.0, 25.0, 4.0, 0.95, 1.02, 1.0]),
        "Shock7": pl.Series([0.0, 0.2, 0.3, 30.0, 2.25, 0.9, 1.02, 1.0]),
        "Shock": pl.Series([0.0, 0.2, 0.1, 20.0, 4.0, 0.9, 1.02, 1.0])
    }
)
structureCollisionWorkforce.name = "structureCollisionWorkforce"

smallStructureCollisionPassenger = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "P0Constant", "P125Constant", "smallCollisionModifier"]),
        "Fatal": pl.Series([0.000205780167210667, 0.000718928675219401, 0.00161451408055514, 55.0, 1.75, 0.8, 1.1, 0.5]),
        "SevHosp": pl.Series([0.00185202142823692, 0.00518464350161897, 0.0132449118160091, 60.0, 1.75, 0.8, 1.1, 0.5]),
        "NonSevere": pl.Series([0.00142857142857143, 0.00571428571428572, 0.00571428571428572, 45.0, 1.75, 0.8, 1.05, 0.5]),
        "Shock": pl.Series([0.0, 0.0, 0.0, 45.0, 1.75, 0.8, 1.05, 0.5])
    }
)
smallStructureCollisionPassenger.name = "smallStructureCollisionPassenger"

smallStructureCollisionWorkforce = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "P0Constant", "P125Constant", "smallCollisionModifier"]),
        "Fatal": pl.Series([0.0, 0.150000005960465, 0.150000005960465, 25.0, 4.0, 0.8, 1.02, 0.5]),
        "Specified": pl.Series([0.100000001490116, 0.100000001490116, 0.100000001490116, 25.0, 4.0, 0.95, 1.02, 0.5]),
        "Sev7": pl.Series([0.100000001490116, 0.100000001490116, 0.100000001490116, 25.0, 4.0, 0.95, 1.02, 0.5]),
        "NonSevere": pl.Series([0.0, 0.0, 0.0, 25.0, 4.0, 0.95, 1.02, 0.5]),
        "Shock7": pl.Series([0.0, 0.2, 0.3, 30.0, 2.25, 0.9, 1.02, 0.5]),
        "Shock": pl.Series([0.0, 0.2, 0.1, 20.0, 4.0, 0.9, 1.02, 0.5])
    }
)
smallStructureCollisionWorkforce.name = "smallStructureCollisionWorkforce"

structureCollapsePassenger = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "P0Constant", "P125Constant", "smallCollisionModifier"]),
        "Fatal": pl.Series([0.0508231206535108, 0.127932857502114, 0.131515199056731, 20.0, 4.0, 0.9, 1.01, 1.0]),
        "SevHosp": pl.Series([0.0491768793924846, 0.122067142661772, 0.118484801307334, 20.0, 4.0, 0.9, 1.01, 1.0]),
        "NonSevere": pl.Series([0.0114285714285714, 0.0342857142857143, 0.0342857142857143, 20.0, 4.0, 0.9, 1.005, 1.0]),
        "Shock": pl.Series([0.0, 0.0, 0.0, 20.0, 4.0, 0.9, 1.005, 1.0])
    }
)
structureCollapsePassenger.name = "structureCollapsePassenger"

structureCollapseWorkforce = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "P0Constant", "P125Constant", "smallCollisionModifier"]),
        "Fatal": pl.Series([0.1, 0.1, 0.1, 20.0, 1.0, 1.0, 1.0, 1.0]),
        "Specified": pl.Series([0.0749999955296518, 0.0749999955296518, 0.0749999955296518, 20.0, 1.0, 1.0, 1.0, 1.0]),
        "Sev7": pl.Series([0.0749999955296518, 0.12499999254942, 0.164999990165234, 40.0, 2.5, 0.98, 1.02, 1.0]),
        "NonSevere": pl.Series([0.0, 0.0499999970197677, 0.00999999940395354, 20.0, 3.0, 0.95, 1.005, 1.0]),
        "Shock7": pl.Series([0.0, 0.4, 0.5, 30.0, 3.0, 0.9, 1.03, 1.0]),
        "Shock": pl.Series([0.0, 0.0, 0.0, 30.0, 3.0, 0.9, 1.03, 1.0])
    }
)
structureCollapseWorkforce.name = "structureCollapseWorkforce"

fallPassenger = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "midpointHeight", "heightPowerFactor", "maxLossAtHeight", "maxLoss","P0Constant", "P125Constant"]),
        "Fatal": pl.Series([0.025127715224731, 0.0314096440309137, 0.0471144660463706, 60.0, 1.5, 5.0, 1.35, 30.0, 0.6, 0.96, 1.08]),
        "SevHosp": pl.Series([0.0832922841660118, 0.104115355207515, 0.156173032811272, 60.0, 1.5, 5.0, 1.35, 30.0, 0.6, 0.96, 1.08]),
        "NonSevere": pl.Series([0.019265306122449, 0.0240816326530612, 0.0361224489795918, 60.0, 1.5, 5.0, 1.35, 30.0, 0.6, 0.96, 1.06]),
        "Shock": pl.Series([0.272314694486808, 0.34039336810851, 0.360590052162765, 50.0, 1.5, 5.0, 1.35, 30.0, 0.60, 0.97, 1.06])
    }
)
fallPassenger.name = "fallPassenger"

fallPublic = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "midpointHeight", "heightPowerFactor", "maxLossAtHeight", "maxLoss", "P0Constant", "P125Constant"]),
        "Fatal": pl.Series([0.2, 0.2, 0.2, 60.0, 1.0, 5.0, 0.0, 30.0, 0.6, 1.0, 1.0]),
        "SevHosp": pl.Series([0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0]),
        "NonSevere": pl.Series([0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0]),
        "Shock": pl.Series([0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0])
    }
)
fallPublic.name = "fallPublic"

fallWorkForce = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "midpointHeight", "heightPowerFactor", "maxLossAtHeight", "maxLoss", "P0Constant", "P125Constant"]),
        "Fatal": pl.Series([0.5, 0.503140964403091, 0.51099337541082, 60.0, 2.0, 5.0, 1.35, 30.0, 0.8, 1.0, 1.0015]),
        "Specified": pl.Series([0.0, 0.0104115355207515, 0.0364403743226302, 60.0, 2.0, 5.0, 1.35, 30.0, 0.8, 1.0, 1.0015]),
        "Sev7": pl.Series([0.0, 0.00240816326530612, 0.00842857142857143, 60.0, 2.0, 5.0, 1.35, 30.0, 0.8, 1.0, 1.005]),
        "NonSevere": pl.Series([0.0, 0.034039336810851, 0.0441376788379786, 40.0, 2.0, 5.0, 1.35, 30.0, 0.8, 1.0, 1.01]),
        "Shock7": pl.Series([0.0, 0.2, 0.3, 60.0, 2.0, 5.0, 1.35, 30.0, 0.8, 1.0, 1.0015]),
        "Shock": pl.Series([0.0, 0.2, 0.1, 60.0, 2.0, 5.0, 1.35, 30.0, 0.8, 1.0, 1.0015])
    }
)   
fallWorkForce.name = "fallWorkForce"

sidePassenger = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "P0Constant", "P125Constant", "smallCollisionModifier"]),
        "Fatal": pl.Series([0.00154335129240954, 0.0106247533789642, 0.0228071437643043, 50.0, 1.75, 0.65, 1.05, 1.0]),
        "SevHosp": pl.Series([0.00531598751240057, 0.0352312914910405, 0.0771928562223911, 50.0, 1.75, 0.65, 1.05, 1.0]),
        "NonSevere": pl.Series([0.0128571428571429, 0.0257142857142857, 0.0257142857142857, 45.0, 1.75, 0.75, 1.05, 1.0]),
        "Shock": pl.Series([0.0, 0.0, 0.0, 45.0, 1.75, 0.75, 1.05, 1.0])
    }
)
sidePassenger.name = "sidePassenger"

sideWorkforce = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "P0Constant", "P125Constant", "smallCollisionModifier"]),
        "Fatal": pl.Series([0.050000000745058, 0.050000000745058, 0.050000000745058, 45.0, 1.0, 1.0, 1.0, 1.0]),
        "Specified": pl.Series([0.100000001490116, 0.100000001490116, 0.100000001490116, 45.0, 1.0, 1.0, 1.0, 1.0]),
        "Sev7": pl.Series([0.100000001490116, 0.100000001490116, 0.100000001490116, 45.0, 1.0, 1.0, 1.0, 1.0]),
        "NonSevere": pl.Series([0.0, 0.0, 0.0, 45.0, 1.0, 1.0, 1.0, 1.0]),
        "Shock7": pl.Series([0.0, 0.2, 0.3, 35.0, 2.0, 0.92, 1.05, 1.0]),
        "Shock": pl.Series([0.0, 0.2, 0.1, 35.0, 4.0, 1.0, 1.0, 1.0])
    }
)
sideWorkforce.name = "sideWorkforce"

fallWaterPassenger = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "P0Constant", "P125Constant", "smallCollisionModifier"]),
        "Fatal": pl.Series([0.2, 0.4, 0.6, 45.0, 1.75, 1.0, 1.05, 1.0]),
        "SevHosp": pl.Series([0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0]),
        "NonSevere": pl.Series([0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0]),
        "Shock": pl.Series([0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0])
    }
)
fallWaterPassenger.name = "fallWaterPassenger"

fallWaterWorkforce = pl.DataFrame(
    data = {
        "Value": pl.Series(["probabilitySlow", "probabilityMedium", "probabilityFast", "midpointSpeed", "K", "P0Constant", "P125Constant"]),
        "Fatal": pl.Series([0.6, 0.7, 0.8, 45.0, 2.0, 1.0, 1.01]),
        "Specified": pl.Series([0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0]),
        "Sev7": pl.Series([0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0]),
        "NonSevere": pl.Series([0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0]),
        "Shock7": pl.Series([0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0]),
        "Shock": pl.Series([0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0])
    }
)
fallWaterWorkforce.name = "fallWaterWorkforce"
#Technically for the ECS and Freight trains it does not make a difference whether the fire is inside or whether the power type is electric/diesel, and for freights whether they are loaded. 
#However the way the selection process works (by first checking whether the train is a freight and then whether it is loaded) these cells will never be accessed, and Boolean dtypes are efficient to work with. 
fireWorkForce = pl.LazyFrame(
    data = {
        "trainType" : pl.Series(["Passenger", "Passenger", "Passenger", "Passenger", "Passenger", "Freight", "Freight"]),
        "loaded" : pl.Series([True, True, True, False, False, False, False]),
        "fireInside" : pl.Series([True, False, False, False, False, False, False]),
        "power" : pl.Series([None, "Diesel", "Electric", "Diesel", "Electric", "Diesel", "Electric"]),
        "Fatal" : pl.Series([0.00000595, 0.0000584, 0.0000326, 0.0, 0.0, 0.00000135, 0.00000135]),
        "Specified" : pl.Series([0.0000125, 0.00292, 0.000033, 0.0, 0.0, 0.00001950, 0.00001950]),
        "Sev7" : pl.Series([0.00235, 0.00158, 0.00195, 0.00205, 0.00205, 0.00125, 0.00125]),
        "NonSevere" : pl.Series([0.0235, 0.0158, 0.0195, 0.0205, 0.0205, 0.0125, 0.0125]),
        "Shock7" : pl.Series([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
        "Shock" : pl.Series([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
    }
)
fireWorkForce.name = "fireWorkForce"

fireNonWorkForce = pl.LazyFrame(
    data = {
        "trainType" : pl.Series(["Passenger", "Passenger", "Passenger", "Passenger", "Passenger", "Freight", "Freight"]),
        "loaded" : pl.Series([True, True, True, False, False, False, False]),
        "fireInside" : pl.Series([True, False, False, False, False, False, False]),
        "power" : pl.Series([None, "Diesel", "Electric", "Diesel", "Electric", "Diesel", "Electric"]),
        "Fatal": pl.Series([0.00000155, 0.0000019, 0.00000088, 0.0, 0.0, 0.00000019, 0.00000019]),
        "SevHosp": pl.Series([0.0000488, 0.000048, 0.0000421, 0.0, 0.0, 0.0000017, 0.0000017]),
        "NonSevere": pl.Series([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
        "Shock": pl.Series([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
    }
)
fireNonWorkForce.name = "fireNonWorkForce"

#Constructing the dataframe directly caused issues with the orientation of the matrix 
passengerStructure = np.array([["Fatal", "a","c","c","c","c"], 
                                ["SevHosp", "b","a","c","c","c"], 
                                ["NonSevere", "b","b","a","c","c"],
                                ["Shock", "b","b","b","a","c"],
                                ["NotInjured", "b","b","b","b","b"]])
passengerInjuryAtomStructure = pl.from_numpy(passengerStructure).rename(
    {
        "column_0": "Injury",
        "column_1": "Fatal",
        "column_2": "SevHosp",
        "column_3": "NonSevere",
        "column_4": "Shock",
        "column_5": "NotInjured"
    }
)
lazyPassengerInjuryAtomStructure = pl.LazyFrame(passengerInjuryAtomStructure)

workforceStructure =  np.array([["Fatal", "a", "c", "c", "c", "c", "c", "c"], 
                                ["Specified", "b", "a", "c", "c", "c", "c", "c"],
                                ["Sev7", "b", "b", "a", "c", "c", "c", "c"], 
                                ["NonSevere", "b", "b", "b", "a", "c", "c", "c"],
                                ["Shock7", "b", "b", "b", "b", "a", "c", "c"],
                                ["Shock", "b", "b", "b", "b", "b", "a", "c"],
                                ["NotInjured", "b","b","b","b","b", "b", "b"]])
workforceInjuryAtomStructure = pl.from_numpy(workforceStructure).rename(
    {
        "column_0": "Injury",
        "column_1": "Fatal",
        "column_2": "Specified",
        "column_3": "Sev7",
        "column_4": "NonSevere",
        "column_5": "Shock7",
        "column_6": "Shock",
        "column_7": "NotInjured"
    }
)
lazyWorkforceInjuryAtomStructure = pl.LazyFrame(workforceInjuryAtomStructure)

headOnCollisionProbability = 0.05
sideOnCollisionProbability = 0.95

headOnCollisionDerailmentSpeedCoefficient = 0.5
sideOnCollisionDerailmentSpeedCoefficient = 0.0

###Gate Effect functions 
If a "gate" for a given escalation is negative, then the injury atom (regardless of value) doesn't apply to the given scenario. 
These functions are called following injury atom calculation and turn the values to 0 if the "gate" field is False

In [0]:
# Create a function to apply the effect of the gate to the probabilities of injury 
def gate_effect_passenger(lazyframe): 
    appliedEffect = lazyframe.with_columns(
        pl.when(pl.col("gate")
                ).then("Fatal"
                       ).otherwise(0.0
                              ).alias("Fatal"),
        pl.when(pl.col("gate")
                ).then("SevHosp"
                       ).otherwise(0.0
                              ).alias("SevHosp"),
        pl.when(pl.col("gate")
                ).then("NonSevere"
                       ).otherwise(0.0
                              ).alias("NonSevere"),
        pl.when(pl.col("gate")
                ).then("Shock"
                       ).otherwise(0.0
                              ).alias("Shock"),
        pl.when(pl.col("gate")
                ).then("NotInjured"
                       ).otherwise(1.0
                              ).alias("NotInjured")
    )
    
    return appliedEffect

In [0]:
# Create a function to apply the effect of the gate to the probabilities of injury 
def gate_effect_workforce(lazyframe): 
    appliedEffect = lazyframe.with_columns(
        pl.when(pl.col("gate")
                ).then("Fatal"
                       ).otherwise(0.0
                              ).alias("Fatal"),
        pl.when(pl.col("gate")
                ).then("Specified"
                       ).otherwise(0.0
                              ).alias("Specified"),
        pl.when(pl.col("gate")
                ).then("Sev7"
                       ).otherwise(0.0
                              ).alias("Sev7"),
        pl.when(pl.col("gate")
                ).then("NonSevere"
                       ).otherwise(0.0
                              ).alias("NonSevere"),
        pl.when(pl.col("gate")
                ).then("Shock7"
                       ).otherwise(0.0
                              ).alias("Shock7"),
        pl.when(pl.col("gate")
                ).then("Shock"
                       ).otherwise(0.0
                              ).alias("Shock"),
        pl.when(pl.col("gate")
                ).then("NotInjured"
                       ).otherwise(1.0
                              ).alias("NotInjured")
    )
    
    return appliedEffect

###Secondary Collision pre-processing
Secondary collision is calculated differently to the other injury atoms using a lookup table and an older format (workforce subdivided into driver and guard).
To maintain a coherent structure we are preprocessing the table 

In [0]:
# The total number is approximately lazyDerailmentConePotentialCollision * 2 * 1.7 as each derailment-collision has crashworthiness
#combinations appearing more than once. This is because while the derailments are different (for example due to different derailment types)
#this isn't evaluated here, meaning that 
baseInjuryAtomsSecondaryCollision = cutsetProbabilities.select(
    [
        "derailmentConePotentialCollisionID",
        "cutsetID",
        "cutset_probability",
        "derailmentSpeedMPH",
        "collidingTrainSpeedMPH",
        "derailedTrainCrashworthiness", 
        "collidingTrainCrashworthiness",
        "secondaryCollision"
    ]
).with_columns( #Casting into Int64 is necessary for the join to work
    pl.min_horizontal((pl.col("derailmentSpeedMPH") * headOnCollisionDerailmentSpeedCoefficient + pl.col("collidingTrainSpeedMPH")), 120).round(0).cast(pl.Int64).alias("headOnCollisionSpeed"),
    pl.min_horizontal((pl.col("derailmentSpeedMPH") * sideOnCollisionDerailmentSpeedCoefficient + pl.col("collidingTrainSpeedMPH")), 120).round(0).cast(pl.Int64).alias("sideOnCollisionSpeed")
).join(lazySecondaryCollisionIA.filter(pl.col("collisionType") == "Head-on"), left_on = ["derailedTrainCrashworthiness", "collidingTrainCrashworthiness", "headOnCollisionSpeed"], right_on = ["trainType1", "trainType2", "collisionSpeedmph"], how = "inner"
).join(lazySecondaryCollisionIA.filter(pl.col("collisionType") == "Side-on"), left_on = ["derailedTrainCrashworthiness", "collidingTrainCrashworthiness", "sideOnCollisionSpeed"], right_on = ["trainType1", "trainType2", "collisionSpeedmph"], how = "inner", suffix = "side"
).with_columns(
    (pl.col("passengerFatal1") * headOnCollisionProbability + pl.col("passengerFatal1side") * sideOnCollisionProbability).alias("passengerFatal1"), 
    (pl.col("passengerSevHosp1") * headOnCollisionProbability + pl.col("passengerSevHosp1side") * sideOnCollisionProbability).alias("passengerSevHosp1"),
    (pl.col("passengerNonSevere1") * headOnCollisionProbability + pl.col("passengerNonSevere1side") * sideOnCollisionProbability).alias("passengerNonSevere1"),
    (pl.col("passengerShock1") * headOnCollisionProbability + pl.col("passengerShock1side") * sideOnCollisionProbability).alias("passengerShock1"),
    (pl.col("passengerFatal2") * headOnCollisionProbability + pl.col("passengerFatal2side") * sideOnCollisionProbability).alias("passengerFatal2"), 
    (pl.col("passengerSevHosp2") * headOnCollisionProbability + pl.col("passengerSevHosp2side") * sideOnCollisionProbability).alias("passengerSevHosp2"),
    (pl.col("passengerNonSevere2") * headOnCollisionProbability + pl.col("passengerNonSevere2side") * sideOnCollisionProbability).alias("passengerNonSevere2"),
    (pl.col("passengerShock2") * headOnCollisionProbability + pl.col("passengerShock2side") * sideOnCollisionProbability).alias("passengerShock2"), 
    (pl.col("workforceFatal1") * headOnCollisionProbability + pl.col("workforceFatal1side") * sideOnCollisionProbability).alias("workforceFatal1"),
    (pl.col("workforceSpecified1") * headOnCollisionProbability + pl.col("workforceSpecified1side") * sideOnCollisionProbability).alias("workforceSpecified1"),
    (pl.col("workforceSev71") * headOnCollisionProbability + pl.col("workforceSev71side") * sideOnCollisionProbability).alias("workforceSev71"),
    (pl.col("workforceNonSevere1") * headOnCollisionProbability + pl.col("workforceNonSevere1side") * sideOnCollisionProbability).alias("workforceNonSevere1"),
    (pl.col("workforceFatal2") * headOnCollisionProbability + pl.col("workforceFatal2side") * sideOnCollisionProbability).alias("workforceFatal2"),
    (pl.col("workforceSpecified2") * headOnCollisionProbability + pl.col("workforceSpecified2side") * sideOnCollisionProbability).alias("workforceSpecified2"),
    (pl.col("workforceSev72") * headOnCollisionProbability + pl.col("workforceSev72side") * sideOnCollisionProbability).alias("workforceSev72"),
    (pl.col("workforceNonSevere2") * headOnCollisionProbability + pl.col("workforceNonSevere2side") * sideOnCollisionProbability).alias("workforceNonSevere2"),
    (pl.col('workforceShock71') * headOnCollisionProbability + pl.col("workforceShock71side") * sideOnCollisionProbability).alias("workforceShock71"),
    (pl.col('workforceShock1') * headOnCollisionProbability + pl.col("workforceShock1side") * sideOnCollisionProbability).alias("workforceShock1"),
    (pl.col('workforceShock72') * headOnCollisionProbability + pl.col("workforceShock72side") * sideOnCollisionProbability).alias("workforceShock72"),
    (pl.col('workforceShock2')* headOnCollisionProbability + pl.col("workforceShock2side") * sideOnCollisionProbability).alias("workforceShock2")
).group_by("derailmentConePotentialCollisionID", "cutsetID"
).agg(pl.col("cutset_probability").mean(),
      pl.col("passengerFatal1").mean(),
      pl.col("passengerSevHosp1").mean(),
      pl.col('passengerNonSevere1').mean(),
      pl.col('passengerFatal2').mean(),
      pl.col("passengerSevHosp2").mean(),
      pl.col('passengerNonSevere2').mean(),
      pl.col("workforceFatal1").mean(),
      pl.col("workforceSpecified1").mean(),
      pl.col("workforceSev71").mean(),
      pl.col("workforceNonSevere1").mean(),
      pl.col("workforceFatal2").mean(),
      pl.col("workforceSpecified2").mean(),
      pl.col("workforceSev72").mean(),
      pl.col("workforceNonSevere2").mean(),
      pl.col("passengerShock1").mean(), 
      pl.col("passengerShock2").mean(),
      pl.col('workforceShock71').mean(),
      pl.col('workforceShock1').mean(),
      pl.col('workforceShock72').mean(),
      pl.col('workforceShock2').mean(),
      pl.col("secondaryCollision").min()
).collect()

secondaryCollisionPassengerTrain1 = baseInjuryAtomsSecondaryCollision.select(
    [
        "derailmentConePotentialCollisionID",
        "cutsetID",
        "cutset_probability",
        pl.col("passengerFatal1").alias("passengerFatal"),
        pl.col("passengerSevHosp1").alias("passengerSevHosp"),
        pl.col("passengerNonSevere1").alias("passengerNonSevere"), 
        pl.col("passengerShock1").alias("passengerShock"),
        "secondaryCollision"
    ]
)
lazySecondaryCollisionPassengerTrain1 = pl.LazyFrame(secondaryCollisionPassengerTrain1)
lazySecondaryCollisionPassengerTrain1.name = "secondaryCollisionPassengerTrain1"

secondaryCollisionPassengerTrain2 = baseInjuryAtomsSecondaryCollision.select(
    [
        "derailmentConePotentialCollisionID",
        "cutsetID",
        "cutset_probability",
        pl.col("passengerFatal2").alias("passengerFatal"),
        pl.col("passengerSevHosp2").alias("passengerSevHosp"),
        pl.col("passengerNonSevere2").alias("passengerNonSevere"), 
        pl.col("passengerShock2").alias("passengerShock"),
        "secondaryCollision"
    ]
)
lazySecondaryCollisionPassengerTrain2 = pl.LazyFrame(secondaryCollisionPassengerTrain2)
lazySecondaryCollisionPassengerTrain2.name = "secondaryCollisionPassengerTrain2"

secondaryCollisionWorkforceTrain1 = baseInjuryAtomsSecondaryCollision.select(
    [
        "derailmentConePotentialCollisionID",
        "cutsetID",
        "cutset_probability",
        pl.col("workforceFatal1").alias("workforceFatal"),
        pl.col("workforceSpecified1").alias("workforceSpecified"),
        pl.col("workforceSev71").alias("workforceSev7"),
        pl.col("workforceNonSevere1").alias("workforceNonSevere"),
        pl.col("workforceShock71").alias("workforceShock7"), 
        pl.col("workforceShock1").alias("workforceShock"),
        "secondaryCollision"
    ]
)
lazySecondaryCollisionWorkforceTrain1 = pl.LazyFrame(secondaryCollisionWorkforceTrain1)
lazySecondaryCollisionWorkforceTrain1.name = "secondaryCollisionWorkforceTrain1"

secondaryCollisionWorkforceTrain2 = baseInjuryAtomsSecondaryCollision.select(
    [
        "derailmentConePotentialCollisionID",
        "cutsetID",
        "cutset_probability",
        pl.col("workforceFatal2").alias("workforceFatal"),
        pl.col("workforceSpecified2").alias("workforceSpecified"),
        pl.col("workforceSev72").alias("workforceSev7"),
        pl.col("workforceNonSevere2").alias("workforceNonSevere"),
        pl.col("workforceShock72").alias("workforceShock7"), 
        pl.col("workforceShock2").alias("workforceShock"),
        "secondaryCollision"
    ]
)
lazySecondaryCollisionWorkforceTrain2 = pl.LazyFrame(secondaryCollisionWorkforceTrain2)
lazySecondaryCollisionWorkforceTrain2.name = "secondaryCollisionWorkforceTrain2"

##Group 1
Rather than repeat significantly lengthy code here we pass the API native calculations into a function which we parallelize 
This allows us to calculate all of the IA in the first group at the same time  

###Functions
Simplified functions return only the probability of a person receiving a specified injury from being uninjured

In [0]:
def groupOnePassengerIASimple(dataframe, name):
# Calling the function returns a LazyFrame 
# The left join is because of a bug - as of 27/09/2024 using an inner join return an empty dataframe when streaming = True. It is unclear what is causing this but it might have to do 
# with how the parallelization works in conjunction with the forming of categorical data.
  maxSpeedMPH = 125
  injuryAtoms = cutsetProbabilities.with_columns(
      ((dataframe.filter(pl.col("Value") == "probabilitySlow").select("Fatal").item() * 
        dataframe.filter(pl.col("Value") == "P0Constant").select("Fatal").item()) + 
      ((dataframe.filter(pl.col("Value") == "probabilityFast").select("Fatal").item() * 
        dataframe.filter(pl.col("Value") == "P125Constant").select("Fatal").item()) - (dataframe.filter(pl.col("Value") == "probabilitySlow").select("Fatal").item() * dataframe.filter(pl.col("Value") == "P0Constant").select("Fatal").item())) * 
      (1 / (1 + 
        np.power(
            np.power(
                (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
                (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("Fatal").item() / maxSpeedMPH))
                      ) - 1, dataframe.filter(pl.col("Value") == "K").select("Fatal").item()
            )
          )
        )
      ).alias("{}CumFatal".format(name)),
      (((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "SevHosp")).item()) * 
        dataframe.filter(pl.col("Value") == "P0Constant").select("SevHosp").item()) + 
      (((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "SevHosp")).item()) * 
        dataframe.filter(pl.col("Value") == "P125Constant").select("SevHosp").item()) - 
        ((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "SevHosp")).item()) * 
         dataframe.filter(pl.col("Value") == "P0Constant").select("SevHosp").item())) * 
      (1 / (1 + 
        np.power(
            np.power(
                (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
                (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("SevHosp").item() / maxSpeedMPH))
                      ) - 1, dataframe.filter(pl.col("Value") == "K").select("SevHosp").item()
            )
          )
        )
      ).alias("{}CumSevHosp".format(name)),
      (((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "SevHosp", "NonSevere")).item()) * 
        dataframe.filter(pl.col("Value") == "P0Constant").select("NonSevere").item()) + 
      (((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "SevHosp", "NonSevere")).item()) * dataframe.filter(pl.col("Value") == "P125Constant").select("NonSevere").item()) - 
        ((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "SevHosp", "NonSevere")).item()) * dataframe.filter(pl.col("Value") == "P0Constant").select("NonSevere").item())) * 
      (1 / (1 + 
        np.power(
            np.power(
                (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
                (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("NonSevere").item() / maxSpeedMPH))
                      ) - 1, dataframe.filter(pl.col("Value") == "K").select("NonSevere").item()
            )
          )
        )
      ).alias("{}CumNonSevere".format(name)),
      (((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "SevHosp", "NonSevere", "Shock")).item()) * dataframe.filter(pl.col("Value") == "P0Constant").select("Shock").item()) + 
      (((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "SevHosp", "NonSevere", "Shock")).item()) * dataframe.filter(pl.col("Value") == "P125Constant").select("Shock").item()) - 
        ((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "SevHosp", "NonSevere", "Shock")).item()) * dataframe.filter(pl.col("Value") == "P0Constant").select("Shock").item())) * 
      (1 / (1 + 
        np.power(
            np.power(
                (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
                (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("Shock").item() / maxSpeedMPH))
                      ) - 1, dataframe.filter(pl.col("Value") == "K").select("Shock").item()
            )
          )
        )
      ).alias("{}CumShock".format(name))
  ).with_columns(
    pl.col("{}CumFatal".format(name)),
    (pl.col("{}CumSevHosp".format(name)) - pl.col("{}CumFatal".format(name))).alias("{}SevHospUC".format(name)),
    (pl.col("{}CumNonSevere".format(name)) - pl.col("{}CumSevHosp".format(name))).alias("{}NonSevereUC".format(name)),
    (pl.col("{}CumShock".format(name)) - pl.col("{}CumNonSevere".format(name))).alias("{}ShockUC".format(name))
  ).with_columns( 
    # The consecutive with_columns() statements are necessary as the result of the previous one is used in calculating the next one 
    # This doesn't significantly affect the efficiency of the algorithm 

    # The smallCollisionModifier applies only to the Passenger IAs and is 0 for IAs other than smallStructureCollision
    (pl.col("{}CumFatal".format(name)) * 
     dataframe.filter(pl.col("Value") == "smallCollisionModifier").select("Fatal").item()
     ).alias("Fatal")
  ).with_columns(
    (pl.max_horizontal(
      pl.min_horizontal(pl.col("{}SevHospUC".format(name)), 
                                        1 - pl.col("Fatal")), 0) * 
     dataframe.filter(pl.col("Value") == "smallCollisionModifier").select("SevHosp").item()
     ).alias("SevHosp")
  ).with_columns(
    (pl.max_horizontal(
      pl.min_horizontal(pl.col("{}NonSevereUC".format(name)), 
                        1 - pl.sum_horizontal("Fatal", "SevHosp")), 0) * 
     dataframe.filter(pl.col("Value") == "smallCollisionModifier").select("NonSevere").item()
     ).alias("NonSevere")
  ).with_columns(
    (pl.max_horizontal(
      pl.min_horizontal(
        pl.col("{}ShockUC".format(name)), 
        1 - pl.sum_horizontal("Fatal", "SevHosp", "NonSevere")), 0) * 
     dataframe.filter(pl.col("Value") == "smallCollisionModifier").select("Shock").item()
     ).alias("Shock")
  ).with_columns(
    (1 - pl.sum_horizontal("Fatal", "SevHosp", "NonSevere", "Shock")).alias("NotInjured")
  ).select("derailmentConePotentialCollisionID", 
            "cutsetID",
            "cutset_probability",
            "Fatal", 
            "SevHosp", 
            "NonSevere", 
            "Shock", 
            "NotInjured",
            "significantStructureCollision",
            "significantStructureCollapse",
            "vehicleFallOnSide",
            "smallStructureCollision",
            "vehicleFallIntoWater"
  )

  
  return injuryAtoms

In [0]:
# derailment IA for the Workforce 
def groupOneWorkforceIASimple(dataframe, name):
  maxSpeedMPH = 125
  injuryAtoms = cutsetProbabilities.with_columns(
  ((dataframe.filter(pl.col("Value") == "probabilitySlow").select("Fatal").item() * 
    dataframe.filter(pl.col("Value") == "P0Constant").select("Fatal").item()) + 
    ((dataframe.filter(pl.col("Value") == "probabilityFast").select("Fatal").item() * 
      dataframe.filter(pl.col("Value") == "P125Constant").select("Fatal").item()) - 
    (dataframe.filter(pl.col("Value") == "probabilitySlow").select("Fatal").item() * 
      dataframe.filter(pl.col("Value") == "P0Constant").select("Fatal").item())) * 
    (1 / (1 + 
      np.power(
          (np.power(
              (pl.col("derailmentSpeedMPH")/maxSpeedMPH), 
              (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("Fatal").item() / maxSpeedMPH))
                  ) - 1), dataframe.filter(pl.col("Value") == "K").select("Fatal").item()
          )
      )
    )
  ).alias("{}CumFatal".format(name)),


    ((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified")).item() * 
      dataframe.filter(pl.col("Value") == "P0Constant").select("Specified").item()) + 
    ((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "Specified")).item() * 
      (dataframe.filter(pl.col("Value") == "P125Constant").select("Specified").item())) - 
    (dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified")).item() * 
      dataframe.filter(pl.col("Value") == "P0Constant").select("Specified").item())) * 
    (1 / (1 + 
      np.power(
          np.power(
              (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
              (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("Specified").item() / maxSpeedMPH))
                  ) - 1, dataframe.filter(pl.col("Value") == "K").select("Specified").item()
          )
      )
    )
  ).alias("{}CumSpecified".format(name)),


  ((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7")).item() * 
    dataframe.filter(pl.col("Value") == "P0Constant").select("Sev7").item()) + 
  ((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "Specified", "Sev7")).item() * 
    (dataframe.filter(pl.col("Value") == "P125Constant").select("Sev7").item())) - 
    (dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7")).item() * 
    dataframe.filter(pl.col("Value") == "P0Constant").select("Sev7").item())) * 
    (1 / (1 + 
      np.power(
          np.power(
              (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
              (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("Sev7").item() / maxSpeedMPH))
                  ) - 1, dataframe.filter(pl.col("Value") == "K").select("Sev7").item()
          )
      )
    )
  ).alias("{}CumSev7".format(name)),


  ((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere")).item() *  dataframe.filter(pl.col("Value") == "P0Constant").select("NonSevere").item()) + 
    ((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere")).item() * (dataframe.filter(pl.col("Value") == "P125Constant").select("NonSevere").item())) - 
    (dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere")).item() * dataframe.filter(pl.col("Value") == "P0Constant").select("NonSevere").item())) * 
    (1 / (1 + 
      np.power(
          np.power(
              (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
              (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("NonSevere").item() / maxSpeedMPH))
                  ) - 1, dataframe.filter(pl.col("Value") == "K").select("NonSevere").item()
          )
      )
    )
  ).alias("{}CumNonSevere".format(name)),


  ((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere", "Shock7")).item() * 
    dataframe.filter(pl.col("Value") == "P0Constant").select("Shock7").item()) + 
    ((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere", "Shock7")).item() * 
    (dataframe.filter(pl.col("Value") == "P125Constant").select("Shock7").item())) - 
    (dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere", "Shock7")).item() * 
      dataframe.filter(pl.col("Value") == "P0Constant").select("Shock7").item())) * 
    (1 / (1 + 
      np.power(
          np.power(
              (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
              (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("Shock7").item() / maxSpeedMPH))
                  ) - 1, dataframe.filter(pl.col("Value") == "K").select("Shock7").item()
          )
      )
    )
  ).alias("{}CumShock7".format(name)),


  ((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere", "Shock7", "Shock")).item() * 
    dataframe.filter(pl.col("Value") == "P0Constant").select("Shock").item()) + 
    ((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere", "Shock7", "Shock")).item() * 
    (dataframe.filter(pl.col("Value") == "P125Constant").select("Shock").item())) - 
    (dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere", "Shock7", "Shock")).item() * 
      dataframe.filter(pl.col("Value") == "P0Constant").select("Shock").item())) * 
    (1 / (1 + 
      np.power(
         np.power(
              (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
              (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("Shock").item() / maxSpeedMPH))
                  ) - 1, dataframe.filter(pl.col("Value") == "K").select("Shock").item()
          )
      )
    )
  ).alias("{}CumShock".format(name))
  ).with_columns(
  pl.col("{}CumFatal".format(name)),
  (pl.col("{}CumSpecified".format(name)) - pl.col("{}CumFatal".format(name))).alias("{}SpecifiedUC".format(name)),
  (pl.col("{}CumSev7".format(name)) - pl.col("{}CumSpecified".format(name))).alias("{}Sev7UC".format(name)),
  (pl.col("{}CumNonSevere".format(name)) - pl.col("{}CumSev7".format(name))).alias("{}NonSevereUC".format(name)),
  (pl.col("{}CumShock7".format(name)) - pl.col("{}CumNonSevere".format(name))).alias("{}Shock7UC".format(name)),
  (pl.col("{}CumShock".format(name)) - pl.col("{}CumShock7".format(name))).alias("{}ShockUC".format(name))
  ).with_columns(
    (pl.col("{}CumFatal".format(name))
     ).alias("Fatal")
  ).with_columns(
    (pl.max_horizontal(pl.min_horizontal(pl.col("{}SpecifiedUC".format(name)), 1 - pl.col("Fatal")), 0) 
     ).alias("Specified")
  ).with_columns(
    (pl.max_horizontal(pl.min_horizontal(pl.col("{}Sev7UC".format(name)), 1 - pl.sum_horizontal("Fatal", "Specified")), 0)
     ).alias("Sev7")
  ).with_columns(
    (pl.max_horizontal(pl.min_horizontal(pl.col("{}NonSevereUC".format(name)), 1 - pl.sum_horizontal("Fatal", "Specified", "Sev7")), 0)
     ).alias("NonSevere")
  ).with_columns(
    (pl.max_horizontal(pl.min_horizontal(pl.col("{}Shock7UC".format(name)), 1 - pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere")), 0)
     ).alias("Shock7")
  ).with_columns(
    (pl.max_horizontal(pl.min_horizontal(pl.col("{}ShockUC".format(name)), 1 - pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere", "Shock7")), 0)
     ).alias("Shock")
  ).with_columns(
    (1 - pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere","Shock7", "Shock")).alias("NotInjured")
  ).select("derailmentConePotentialCollisionID", 
            "cutsetID",
            "cutset_probability",
            "Fatal", 
            "Specified", 
            "Sev7", 
            "NonSevere",
            "Shock7",
            "Shock", 
            "NotInjured",
            "significantStructureCollision",
            "significantStructureCollapse",
            "vehicleFallOnSide",
            "smallStructureCollision",
            "vehicleFallIntoWater"
  )
  
  return injuryAtoms

In [0]:
lazyDerPassengerIA = derPassenger.pipe(groupOnePassengerIASimple, derPassenger.name)
# collect(streaming=True) (27/09/2024)
# streaming = True (27/09/2024)
lazyDerWorkforceIA = derWorkforce.pipe(groupOneWorkforceIASimple, derWorkforce.name)
# collect(streaming=True) (27/09/2024)
# streaming = True (27/09/2024)
lazySTRColPassengerIA = groupOnePassengerIASimple(structureCollisionPassenger, structureCollisionPassenger.name)
lazySTRColWorkforceIA = groupOneWorkforceIASimple(structureCollisionWorkforce, structureCollisionWorkforce.name)
lazySTRCollapsePassengerIA = groupOnePassengerIASimple(structureCollapsePassenger, structureCollapsePassenger.name)
lazySTRCollapseWorkforceIA = groupOneWorkforceIASimple(structureCollapseWorkforce, structureCollapseWorkforce.name)
lazySidePassengerIA = groupOnePassengerIASimple(sidePassenger, sidePassenger.name)
lazySideWorkforceIA = groupOneWorkforceIASimple(sideWorkforce, sideWorkforce.name)
lazySmallSTRPassengerIA = groupOnePassengerIASimple(smallStructureCollisionPassenger, smallStructureCollisionPassenger.name)
lazySmallSTRWorkforceIA = groupOneWorkforceIASimple(smallStructureCollisionWorkforce, smallStructureCollisionWorkforce.name)
lazyFallWaterPassengerIA = groupOnePassengerIASimple(fallWaterPassenger, fallWaterPassenger.name)
lazyFallWaterWorkforceIA = groupOneWorkforceIASimple(fallWaterWorkforce, fallWaterWorkforce.name)

# collect(streaming=True) (27/09/2024)
# streaming = True (27/09/2024)

##Group 2


###Functions
Simplified functions return only the probability of a person receiving a specified injury from being uninjured

In [0]:
def groupTwoPassengerIASimple(lazyframe):
    lazyframe = lazyframe.with_columns(
    pl.col("passengerFatal").alias("Fatal")
    ).with_columns(
    pl.max_horizontal(
        pl.min_horizontal(pl.col("passengerSevHosp"), 1 - pl.col("Fatal")), 
        0
    ).alias("SevHosp")
    ).with_columns(
        pl.max_horizontal(
            pl.min_horizontal(pl.col("passengerNonSevere"), 1 - pl.sum_horizontal("Fatal", "SevHosp")), 
            0
        ).alias("NonSevere")
    ).with_columns(
        pl.max_horizontal(
            pl.min_horizontal(pl.col("passengerShock"), 1 - pl.sum_horizontal("Fatal", "SevHosp", "NonSevere")), 
            0
        ).alias("Shock")
    ).with_columns(
        (1 - pl.sum_horizontal("Fatal", "SevHosp", "NonSevere", "Shock")).alias("NotInjured")
    ).select(
        "derailmentConePotentialCollisionID",
        "cutsetID",
        "cutset_probability",
        "Fatal", 
        "SevHosp", 
        "NonSevere", 
        "Shock", 
        "NotInjured", 
        "secondaryCollision"
    )

    return lazyframe

In [0]:
def groupTwoWorkForceIASimple(lazyframe):
    lazyframe = lazyframe.with_columns(
    pl.col("workforceFatal").alias("Fatal")
    ).with_columns(
    pl.max_horizontal(
        pl.min_horizontal(pl.col("workforceSpecified"), 1 - pl.col("Fatal")), 
        0
    ).alias("Specified")
    ).with_columns(
        pl.max_horizontal(
            pl.min_horizontal(pl.col("workforceSev7"), 1 - pl.sum_horizontal("Fatal", "Specified")), 
            0
        ).alias("Sev7")
    ).with_columns(
        pl.max_horizontal(
            pl.min_horizontal(pl.col("workforceNonSevere"), 1 - pl.sum_horizontal("Fatal", "Specified", "Sev7")), 
            0
        ).alias("NonSevere")
    ).with_columns(
        pl.max_horizontal(
            pl.min_horizontal(pl.col("workforceShock7"), 1 - pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere")), 
            0
        ).alias("Shock7")
    ).with_columns(
        pl.max_horizontal(
            pl.min_horizontal(pl.col("workforceShock"), 1 - pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere","Shock7")), 
            0
        ).alias("Shock")
    ).with_columns(
        (1 - pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere","Shock7","Shock")).alias("NotInjured")
    ).select(
        "derailmentConePotentialCollisionID", 
        "cutsetID",
        "cutset_probability",
        "Fatal", 
        "Specified", 
        "Sev7", 
        "NonSevere", 
        "Shock7", 
        "Shock", 
        "NotInjured",
        "secondaryCollision"
    )

    return lazyframe

In [0]:
secondaryCollisionPassengerTrain1IA = groupTwoPassengerIASimple(lazySecondaryCollisionPassengerTrain1)
secondaryCollisionPassengerTrain2IA = groupTwoPassengerIASimple(lazySecondaryCollisionPassengerTrain2)
secondaryCollisionWorkforceTrain1IA = groupTwoWorkForceIASimple(lazySecondaryCollisionWorkforceTrain1)
secondaryCollisionWorkforceTrain2IA = groupTwoWorkForceIASimple(lazySecondaryCollisionWorkforceTrain2)

# collect(streaming=True)
# streaming = True

##Group 3
The Injury atoms for toxic and flammable goods are hardcoded as they have set values derived from historic events 

For each LazyFrame only the final row ("Not Injured") is accurate as it is the only row we will be using in the calculations. 

In [0]:
hazGoodsPassengerIA = pl.LazyFrame(
    data = {"Injury": pl.Series(["Fatal", "SevHosp", "NonSevere", "Shock", "NotInjured"], dtype = pl.Categorical),
             "Fatal": pl.Series([1, 0, 0, 0, 0.1285714], dtype=pl.Float32),
             "SevHosp":  pl.Series([0.1285714, 0.871428571, 0, 0,0.085714286], dtype=pl.Float32),
             "NonSevere":  pl.Series([0.1285714, 0.085714286, 0.785714286, 0, 0], dtype=pl.Float32),
             "Shock":  pl.Series([0.1285714, 0.085714286, 0, 0.785714286, 0], dtype=pl.Float32),
             "NotInjured":  pl.Series([0.1285714, 0.085714286, 0, 0, 0.785714286], dtype=pl.Float32)})

hazGoodsPublicIA = pl.LazyFrame(
    data = {"Injury": pl.Series(["Fatal", "SevHosp", "NonSevere", "Shock", "NotInjured"], dtype = pl.Categorical),
             "Fatal": pl.Series([1, 0, 0, 0, 0.142857143], dtype=pl.Float32),
             "SevHosp":  pl.Series([ 0.142857143, 0.857142857, 0, 0, 0.285714286], dtype=pl.Float32),
             "NonSevere":  pl.Series([0.142857143, 0.285714286, 0.571428571, 0,  0.571428571], dtype=pl.Float32),
             "Shock":  pl.Series([ 0.142857143, 0.285714286, 0.571428571, 0.0, 0.0], dtype=pl.Float32),
             "NotInjured":  pl.Series([0.142857143, 0.285714286, 0.571428571, 0, 0.0], dtype=pl.Float32)})


hazGoodsWorkforceIA = pl.LazyFrame(
    data = {"Injury": pl.Series(["Fatal","Specified", "Sev7", "NonSevere","Shock7", "Shock", "NotInjured"], dtype = pl.Categorical),
             "Fatal": pl.Series([1,0,0,0,0,0,0.128571429], dtype=pl.Float32),
             "Specified":  pl.Series([ 0.128571429, 0.871428571,0,0,0,0,0.042857143], dtype=pl.Float32),
             "Sev7" : pl.Series([0.128571429, 0.042857143, 0.828571429,0,0,0,0.042857143], dtype=pl.Float32),
             "NonSevere":  pl.Series([0.128571429, 0.042857143, 0.042857143, 0.785714286,0,0,0], dtype=pl.Float32),
             "Shock7":  pl.Series([  0.128571429, 0.042857143, 0.042857143, 0, 0.785714286,0,0], dtype=pl.Float32),
             "Shock":  pl.Series([  0.128571429, 0.042857143, 0.042857143, 0, 0, 0.785714286,0], dtype=pl.Float32),
             "NotInjured":  pl.Series([0.128571429,0.042857143,0.042857143,0,0, 0, 0.785714286], dtype=pl.Float32)})



In [0]:
flamGoodsPassengerIA = pl.LazyFrame(
    data = {"Injury": pl.Series(["Fatal", "SevHosp", "NonSevere", "Shock", "NotInjured"], dtype = pl.Categorical),
             "Fatal": pl.Series([1, 0, 0, 0, 0.071428571], dtype=pl.Float32),
             "SevHosp":  pl.Series([0.071428571, 0.928571429, 0, 0,  0.071428571], dtype=pl.Float32),
             "NonSevere":  pl.Series([0.071428571, 0.071428571, 0.857142857, 0, 0], dtype=pl.Float32),
             "Shock":  pl.Series([0.071428571, 0.071428571, 0,0.857142857, 0], dtype=pl.Float32),
             "NotInjured":  pl.Series([0.071428571, 0.071428571, 0, 0, 0.857142857], dtype=pl.Float32)})



flamGoodsPublicIA = pl.LazyFrame(
    data = {"Injury": pl.Series(["Fatal", "SevHosp", "NonSevere", "Shock", "NotInjured"], dtype = pl.Categorical),
             "Fatal": pl.Series([1, 0, 0, 0, 0], dtype=pl.Float32),
             "SevHosp":  pl.Series([ 0.0, 1, 0, 0, 0.5], dtype=pl.Float32),
             "NonSevere":  pl.Series([0.0, 0.5, 0.5, 0, 0.5], dtype=pl.Float32),
             "Shock":  pl.Series([0.0, 0.5, 0.5, 0.0, 0.0], dtype=pl.Float32),
             "NotInjured":  pl.Series([0.0, 0.5, 0.5, 0, 0.0], dtype=pl.Float32)}) 



flamGoodsWorkforceIA = pl.LazyFrame(
    data = {"Injury": pl.Series(["Fatal","Specified", "Sev7", "NonSevere","Shock7", "Shock", "NotInjured"], dtype = pl.Categorical),
             "Fatal": pl.Series([1,0,0,0,0,0,0.071428571], dtype=pl.Float32),
             "Specified":  pl.Series([ 0.071428571, 0.928571429,0,0,0,0,0.035714286], dtype=pl.Float32),
             "Sev7" : pl.Series([0.071428571, 0.035714286, 0.892857143,0,0,0,0.035714286], dtype=pl.Float32),
             "NonSevere":  pl.Series([0.071428571, 0.035714286, 0.035714286, 0.857142857,0,0,0], dtype=pl.Float32),
             "Shock7":  pl.Series([  0.071428571, 0.035714286, 0.035714286, 0, 0.857142857,0,0], dtype=pl.Float32),
             "Shock":  pl.Series([   0.071428571, 0.035714286, 0.035714286, 0, 0, 0.857142857,0], dtype=pl.Float32),
             "NotInjured":  pl.Series([0.071428571,0.035714286,0.035714286,0,0, 0, 0.857142857], dtype=pl.Float32)})


The fire injury atoms make a distinction depending on the train power type so the power type needs to be included in the calculation

In [0]:
derailedTrainGroups = cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", 
    "cutsetID", 
    "cutset_probability", 
    "potentialCollisionID", 
    pl.col("derailedTrainType").alias("trainType"),
    pl.col("derailed_train_power").alias("power"),
    "derailedTrainLoading", 
    "toxicGoodRelease(noCollision)",
    "toxicGoodRelease",
    "flamGoodRelease(noCollision)",
    "flamGoodRelease",
    "fire(noFlamGoods)",
    "fire(flamGoods)"
    ).with_columns(
        pl.when(pl.col("derailedTrainLoading") > 0 
                ).then(True
                       ).otherwise(False
                                   ).alias("loaded")
    )


# collect(streaming=True) (27/09/2024)
# streaming = True (27/09/2024)

collidingTrainGroups = cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", 
    "cutsetID", 
    "cutset_probability",
    "potentialCollisionID", 
    pl.col("collidingTrainType").alias("trainType"),
    pl.col("colliding_train_power").alias("power"),
    "collidingTrainLoading",
    "toxicGoodRelease(noCollision)",
    "toxicGoodRelease",
    "flamGoodRelease(noCollision)",
    "flamGoodRelease",
    "fire(noFlamGoods)",
    "fire(flamGoods)"
    ).with_columns(
        pl.when(pl.col("collidingTrainLoading") > 0 
                ).then(True
                       ).otherwise(False
                                   ).alias("loaded")
    )


# collect(streaming=True) (27/09/2024)
# streaming = True (27/09/2024)

###Functions


In [0]:

def non_workforce_fire_flam_goods(dataframe, table):
    output = dataframe.cast(
        {"trainType":pl.String, "loaded": pl.Boolean, "power":pl.String}
    ).join(table, on = ["trainType", "loaded", "power"], how = "inner"
    ).with_columns(
        (1 - pl.sum_horizontal("Fatal", "SevHosp", "NonSevere", "Shock")).alias("NotInjured")
    ).select("derailmentConePotentialCollisionID",
             "cutsetID",
             "Fatal",
             "SevHosp", 
             "NonSevere", 
             "Shock", 
             "NotInjured", 
             pl.col("fire(flamGoods)").alias("gate")
    )
    return output 

In [0]:

def non_workforce_fire_no_flam_goods(dataframe, table):
    output = dataframe.cast({"trainType":pl.String, "loaded": pl.Boolean, "power":pl.String}).join(table, on = ["trainType", "loaded", "power"], how = "inner"
    ).with_columns(
        (1 - pl.sum_horizontal("Fatal", "SevHosp", "NonSevere", "Shock")).alias("NotInjured")
    ).select("derailmentConePotentialCollisionID",
             "cutsetID",
             "Fatal",
             "SevHosp", 
             "NonSevere", 
             "Shock", 
             "NotInjured",
             pl.col("fire(noFlamGoods)").alias("gate")
    )

    return output 

In [0]:

def workforce_fire_flam_goods(dataframe, table):
    output = dataframe.cast({"trainType":pl.String, "loaded": pl.Boolean, "power":pl.String}).join(table, on = ["trainType", "loaded", "power"], how = "inner"
    ).with_columns(
        (1 - pl.sum_horizontal("Fatal","Specified", "Sev7", "NonSevere", "Shock7", "Shock")).alias("NotInjured")
    ).select("derailmentConePotentialCollisionID",
             "cutsetID",
             "Fatal",
             "Specified", 
             "Sev7", 
             "NonSevere", 
             "Shock7", 
             "Shock",
             "NotInjured", 
             pl.col("fire(flamGoods)").alias("gate")
    )

    return output 

In [0]:
def workforce_fire_no_flam_goods(dataframe, table):
    output = dataframe.cast({"trainType":pl.String, "loaded": pl.Boolean, "power":pl.String}).join(table, on = ["trainType", "loaded", "power"], how = "inner"
    ).with_columns(
        (1 - pl.sum_horizontal("Fatal","Specified", "Sev7", "NonSevere", "Shock7", "Shock")).alias("NotInjured")
    ).select("derailmentConePotentialCollisionID",
             "cutsetID",
             "Fatal",
             "Specified", 
             "Sev7", 
             "NonSevere", 
             "Shock7", 
             "Shock",
             "NotInjured",
             pl.col("fire(noFlamGoods)").alias("gate")
    )

    return output 

##Group 4
We prepare the LazyFrames for the calculation outside of the function environment, as the height parameter will change depending on what we are trying to calculate


In [0]:

lazyDerailmentsWithEmbankmentHeights = cutsetProbabilities.select(
    "derailmentConePotentialCollisionID",
    "cutsetID",
    "cutset_probability",
    "potentialCollisionID",
    "derailmentSpeedMPH",
    "embankmentHeight(m)",
    pl.col("vehicleFallFromEmbankment").alias("gate")
).rename(
    {
        "embankmentHeight(m)": "height(m)"
    }
)

# collect(streaming=True) (27/09/2024)
# streaming = True (27/09/2024)

lazyDerailmentsWithBridgeHeights =  cutsetProbabilities.select(
    "derailmentConePotentialCollisionID",
    "cutsetID",
    "cutset_probability",
    "potentialCollisionID",
    "derailmentSpeedMPH",
    "bridgeViaductHeight",
    pl.col("vehicleFallFromHeight").alias("gate")
).rename(
    {
        "bridgeViaductHeight": "height(m)"
    }
).cast(
    {
        "height(m)" : pl.Float32
    }
)

# collect(streaming=True) (27/09/2024)
# streaming = True (27/09/2024)

###Functions


In [0]:

def groupFourPassengerIASimple(lazyframe, dataframe, name):
#Calling the function returns a LazyFrame 
  maxSpeedMPH = 125
  injuryAtoms = lazyframe.with_columns(
      #Cumulative probability for Fatal injuries
      pl.min_horizontal(((dataframe.filter(pl.col("Value") == "probabilitySlow").select("Fatal").item() * 
        dataframe.filter(pl.col("Value") == "P0Constant").select("Fatal").item()) + 
      ((dataframe.filter(pl.col("Value") == "probabilityFast").select("Fatal").item() * 
        dataframe.filter(pl.col("Value") == "P125Constant").select("Fatal").item()) - (dataframe.filter(pl.col("Value") == "probabilitySlow").select("Fatal").item() * dataframe.filter(pl.col("Value") == "P0Constant").select("Fatal").item())) * 
      (1 / (1 + 
        np.power(
            np.power(
                (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
                (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("Fatal").item() / maxSpeedMPH))
                      ) - 1, dataframe.filter(pl.col("Value") == "K").select("Fatal").item()
            )
          )
        ) * np.power((pl.min_horizontal(pl.col("height(m)"), dataframe.filter(pl.col("Value") == "maxLossAtHeight").select("Fatal").item())/
             dataframe.filter(pl.col("Value") == "midpointHeight").select("Fatal").item()), 
                     dataframe.filter(pl.col("Value") == "heightPowerFactor").select("Fatal").item())
      ), 
             dataframe.filter(pl.col("Value") == "maxLoss").select("Fatal").item()
             ).alias("{}CumFatal".format(name)),
      #Cumulative probability for severe hospitalisations
      pl.min_horizontal((((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "SevHosp")).item()) * 
        dataframe.filter(pl.col("Value") == "P0Constant").select("SevHosp").item()) + 
      (((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "SevHosp")).item()) * 
        dataframe.filter(pl.col("Value") == "P125Constant").select("SevHosp").item()) - 
        ((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "SevHosp")).item()) * 
         dataframe.filter(pl.col("Value") == "P0Constant").select("SevHosp").item())) * 
      (1 / (1 + 
        np.power(
            np.power(
                (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
                (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("SevHosp").item() / maxSpeedMPH))
                      ) - 1, dataframe.filter(pl.col("Value") == "K").select("SevHosp").item()
            )
          )
        ) * np.power((pl.min_horizontal(pl.col("height(m)"), dataframe.filter(pl.col("Value") == "maxLossAtHeight").select("SevHosp").item())/
             dataframe.filter(pl.col("Value") == "midpointHeight").select("SevHosp").item()), 
                     dataframe.filter(pl.col("Value") == "heightPowerFactor").select("SevHosp").item()),
        dataframe.filter(pl.col("Value") == "maxLoss").select("SevHosp").item())
      ).alias("{}CumSevHosp".format(name)),
      #Cumulative probability for non severe injuries 
      pl.min_horizontal((((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "SevHosp", "NonSevere")).item()) * 
        dataframe.filter(pl.col("Value") == "P0Constant").select("NonSevere").item()) + 
      (((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "SevHosp", "NonSevere")).item()) * dataframe.filter(pl.col("Value") == "P125Constant").select("NonSevere").item()) - 
        ((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "SevHosp", "NonSevere")).item()) * dataframe.filter(pl.col("Value") == "P0Constant").select("NonSevere").item())) * 
      (1 / (1 + 
        np.power(
            np.power(
                (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
                (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("NonSevere").item() / maxSpeedMPH))
                      ) - 1, dataframe.filter(pl.col("Value") == "K").select("NonSevere").item()
            )
          )
        )  * np.power((pl.min_horizontal(pl.col("height(m)"), dataframe.filter(pl.col("Value") == "maxLossAtHeight").select("NonSevere").item())/
             dataframe.filter(pl.col("Value") == "midpointHeight").select("NonSevere").item()), 
                     dataframe.filter(pl.col("Value") == "heightPowerFactor").select("NonSevere").item()),
        dataframe.filter(pl.col("Value") == "maxLoss").select("NonSevere").item())
      ).alias("{}CumNonSevere".format(name)),
      #Cumulative probability for shock and trauma
      pl.min_horizontal((((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "SevHosp", "NonSevere", "Shock")).item()) * dataframe.filter(pl.col("Value") == "P0Constant").select("Shock").item()) + 
      (((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "SevHosp", "NonSevere", "Shock")).item()) * dataframe.filter(pl.col("Value") == "P125Constant").select("Shock").item()) - 
        ((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "SevHosp", "NonSevere", "Shock")).item()) * dataframe.filter(pl.col("Value") == "P0Constant").select("Shock").item())) * 
      (1 / (1 + 
        np.power(
            np.power(
                (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
                (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("Shock").item() / maxSpeedMPH))
                      ) - 1, dataframe.filter(pl.col("Value") == "K").select("Shock").item()
            )
          )
        ) * np.power((pl.min_horizontal(pl.col("height(m)"), dataframe.filter(pl.col("Value") == "maxLossAtHeight").select("Shock").item())/
             dataframe.filter(pl.col("Value") == "midpointHeight").select("Shock").item()), 
                     dataframe.filter(pl.col("Value") == "heightPowerFactor").select("Shock").item()),
        dataframe.filter(pl.col("Value") == "maxLoss").select("Shock").item())
      ).alias("{}CumShock".format(name))
  ).with_columns(
    pl.col("{}CumFatal".format(name)),
    (pl.col("{}CumSevHosp".format(name)) - pl.col("{}CumFatal".format(name))).alias("{}SevHospUC".format(name)),
    (pl.col("{}CumNonSevere".format(name)) - pl.col("{}CumSevHosp".format(name))).alias("{}NonSevereUC".format(name)),
    (pl.col("{}CumShock".format(name)) - pl.col("{}CumNonSevere".format(name))).alias("{}ShockUC".format(name))
  ).with_columns( 
    #The consecutive with_columns() statements are necessary as the result of the previous one is used in calculating the next one 
    #This doesn't significantly affect the efficiency of the algorithm 
    (pl.col("{}CumFatal".format(name))
     ).alias("Fatal")
  ).with_columns(
    #The smallCollisionModifier applies only to the Passenger IAs
    (pl.max_horizontal(
      pl.min_horizontal(pl.col("{}SevHospUC".format(name)), 
                                        1 - pl.col("Fatal")), 0)
     ).alias("SevHosp")
  ).with_columns(
    (pl.max_horizontal(
      pl.min_horizontal(pl.col("{}NonSevereUC".format(name)), 
                        1 - pl.sum_horizontal("Fatal", "SevHosp")), 0)
     ).alias("NonSevere")
  ).with_columns(
    (pl.max_horizontal(
      pl.min_horizontal(
        pl.col("{}ShockUC".format(name)), 
        1 - pl.sum_horizontal("Fatal", "SevHosp", "NonSevere")), 0)
     ).alias("Shock")
  ).with_columns(
    (1 - pl.sum_horizontal("Fatal", "SevHosp", "NonSevere", "Shock")).alias("NotInjured")
  ).select(["derailmentConePotentialCollisionID", 
            "cutsetID",
            "cutset_probability",
            pl.col("Fatal").cast(pl.Float64), 
            pl.col("SevHosp").cast(pl.Float64), 
            pl.col("NonSevere").cast(pl.Float64), 
            pl.col("Shock").cast(pl.Float64), 
            pl.col("NotInjured").cast(pl.Float64),
            "gate"]
  )
  
  return injuryAtoms

In [0]:
def groupFourWorkforceIASimple(lazyframe, dataframe, name):
  maxSpeedMPH = 125
  injuryAtoms = lazyframe.with_columns(
  #Cumulative probability of fatal injuries 
  pl.min_horizontal(((dataframe.filter(pl.col("Value") == "probabilitySlow").select("Fatal").item() * 
    dataframe.filter(pl.col("Value") == "P0Constant").select("Fatal").item()) + 
    ((dataframe.filter(pl.col("Value") == "probabilityFast").select("Fatal").item() * 
      dataframe.filter(pl.col("Value") == "P125Constant").select("Fatal").item()) - 
    (dataframe.filter(pl.col("Value") == "probabilitySlow").select("Fatal").item() * 
      dataframe.filter(pl.col("Value") == "P0Constant").select("Fatal").item())) * 
    (1 / (1 + 
      np.power(
          np.power(
              (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
              (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("Fatal").item() / maxSpeedMPH))
                  ) - 1, dataframe.filter(pl.col("Value") == "K").select("Fatal").item()
          )
      )
    ) * np.power((pl.min_horizontal(pl.col("height(m)"), dataframe.filter(pl.col("Value") == "maxLossAtHeight").select("Fatal").item())/
             dataframe.filter(pl.col("Value") == "midpointHeight").select("Fatal").item()), 
                     dataframe.filter(pl.col("Value") == "heightPowerFactor").select("Fatal").item()),
    dataframe.filter(pl.col("Value") == "maxLoss").select("Fatal").item())
  ).alias("{}CumFatal".format(name)),

#Cumulative probability of specified injuries 
    pl.min_horizontal(((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified")).item() * 
      dataframe.filter(pl.col("Value") == "P0Constant").select("Specified").item()) + 
    ((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "Specified")).item() * 
      (dataframe.filter(pl.col("Value") == "P125Constant").select("Specified").item())) - 
    (dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified")).item() * 
      dataframe.filter(pl.col("Value") == "P0Constant").select("Specified").item())) * 
    (1 / (1 + 
      np.power(
          np.power(
              (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
              (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("Specified").item() / maxSpeedMPH))
                  ) - 1, dataframe.filter(pl.col("Value") == "K").select("Specified").item()
          )
      )
    ) * np.power((pl.min_horizontal(pl.col("height(m)"), dataframe.filter(pl.col("Value") == "maxLossAtHeight").select("Specified").item())/
             dataframe.filter(pl.col("Value") == "midpointHeight").select("Specified").item()), 
                     dataframe.filter(pl.col("Value") == "heightPowerFactor").select("Specified").item()),
    dataframe.filter(pl.col("Value") == "maxLoss").select("Specified").item())
  ).alias("{}CumSpecified".format(name)),

#Cumulative probability of Severe Injuries
  pl.min_horizontal(((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7")).item() * 
    dataframe.filter(pl.col("Value") == "P0Constant").select("Sev7").item()) + 
  ((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "Specified", "Sev7")).item() * 
    (dataframe.filter(pl.col("Value") == "P125Constant").select("Sev7").item())) - 
    (dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7")).item() * 
    dataframe.filter(pl.col("Value") == "P0Constant").select("Sev7").item())) * 
    (1 / (1 + 
      np.power(
          np.power(
              (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
              (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("Sev7").item() / maxSpeedMPH))
                  ) - 1, dataframe.filter(pl.col("Value") == "K").select("Sev7").item()
          )
      )
    ) * np.power((pl.min_horizontal(pl.col("height(m)"), dataframe.filter(pl.col("Value") == "maxLossAtHeight").select("Sev7").item())/
             dataframe.filter(pl.col("Value") == "midpointHeight").select("Sev7").item()), 
                     dataframe.filter(pl.col("Value") == "heightPowerFactor").select("Sev7").item()),
    dataframe.filter(pl.col("Value") == "maxLoss").select("Sev7").item())
  ).alias("{}CumSev7".format(name)),

#Cumulative Probability of Non Severe injuries 
  pl.min_horizontal(((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere")).item() *  dataframe.filter(pl.col("Value") == "P0Constant").select("NonSevere").item()) + 
    ((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere")).item() * (dataframe.filter(pl.col("Value") == "P125Constant").select("NonSevere").item())) - 
    (dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere")).item() * dataframe.filter(pl.col("Value") == "P0Constant").select("NonSevere").item())) * 
    (1 / (1 + 
      np.power(
          np.power(
              (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
              (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("NonSevere").item() / maxSpeedMPH))
                  ) - 1, dataframe.filter(pl.col("Value") == "K").select("NonSevere").item()
          )
      )
    ) * np.power((pl.min_horizontal(pl.col("height(m)"), dataframe.filter(pl.col("Value") == "maxLossAtHeight").select("NonSevere").item())/
             dataframe.filter(pl.col("Value") == "midpointHeight").select("NonSevere").item()), 
                     dataframe.filter(pl.col("Value") == "heightPowerFactor").select("NonSevere").item()),
    dataframe.filter(pl.col("Value") == "maxLoss").select("NonSevere").item())
  ).alias("{}CumNonSevere".format(name)),

#Cumulative probability of Shock7-type injuries
  pl.min_horizontal(((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere", "Shock7")).item() * 
    dataframe.filter(pl.col("Value") == "P0Constant").select("Shock7").item()) + 
    ((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere", "Shock7")).item() * 
    (dataframe.filter(pl.col("Value") == "P125Constant").select("Shock7").item())) - 
    (dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere", "Shock7")).item() * 
      dataframe.filter(pl.col("Value") == "P0Constant").select("Shock7").item())) * 
    (1 / (1 + 
      np.power(
          np.power(
              (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
              (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("Shock7").item() / maxSpeedMPH))
                  ) - 1, dataframe.filter(pl.col("Value") == "K").select("Shock7").item()
          )
      )
    ) * np.power((pl.min_horizontal(pl.col("height(m)"), dataframe.filter(pl.col("Value") == "maxLossAtHeight").select("Shock7").item())/
             dataframe.filter(pl.col("Value") == "midpointHeight").select("Shock7").item()), 
                     dataframe.filter(pl.col("Value") == "heightPowerFactor").select("Shock7").item()),
    dataframe.filter(pl.col("Value") == "maxLoss").select("Shock7").item())
  ).alias("{}CumShock7".format(name)),

#Cumulative probability of shock injuries 
  pl.min_horizontal(((dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere", "Shock7", "Shock")).item() * 
    dataframe.filter(pl.col("Value") == "P0Constant").select("Shock").item()) + 
    ((dataframe.filter(pl.col("Value") == "probabilityFast").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere", "Shock7", "Shock")).item() * 
    (dataframe.filter(pl.col("Value") == "P125Constant").select("Shock").item())) - 
    (dataframe.filter(pl.col("Value") == "probabilitySlow").select(pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere", "Shock7", "Shock")).item() * 
      dataframe.filter(pl.col("Value") == "P0Constant").select("Shock").item())) * 
    (1 / (1 + 
      np.power(
          np.power(
              (pl.min_horizontal(pl.col("derailmentSpeedMPH"), maxSpeedMPH)/maxSpeedMPH), 
              (np.log(2)/np.log(dataframe.filter(pl.col("Value") == "midpointSpeed").select("Shock").item() / maxSpeedMPH))
                  ) - 1, dataframe.filter(pl.col("Value") == "K").select("Shock").item()
          )
      )
    ) * np.power((pl.min_horizontal(pl.col("height(m)"), dataframe.filter(pl.col("Value") == "maxLossAtHeight").select("Shock").item())/
             dataframe.filter(pl.col("Value") == "midpointHeight").select("Shock").item()), 
                     dataframe.filter(pl.col("Value") == "heightPowerFactor").select("Shock").item()),
    dataframe.filter(pl.col("Value") == "maxLoss").select("Shock").item())
  ).alias("{}CumShock".format(name))
  ).with_columns(
  pl.col("{}CumFatal".format(name)),
  (pl.col("{}CumSpecified".format(name)) - pl.col("{}CumFatal".format(name))).alias("{}SpecifiedUC".format(name)),
  (pl.col("{}CumSev7".format(name)) - pl.col("{}CumSpecified".format(name))).alias("{}Sev7UC".format(name)),
  (pl.col("{}CumNonSevere".format(name)) - pl.col("{}CumSev7".format(name))).alias("{}NonSevereUC".format(name)),
  (pl.col("{}CumShock7".format(name)) - pl.col("{}CumNonSevere".format(name))).alias("{}Shock7UC".format(name)),
  (pl.col("{}CumShock".format(name)) - pl.col("{}CumShock7".format(name))).alias("{}ShockUC".format(name))
  ).with_columns(
    (pl.col("{}CumFatal".format(name))
     ).alias("Fatal")
  ).with_columns(
    (pl.max_horizontal(pl.min_horizontal(pl.col("{}SpecifiedUC".format(name)), 1 - pl.col("Fatal")), 0) 
     ).alias("Specified")
  ).with_columns(
    (pl.max_horizontal(pl.min_horizontal(pl.col("{}Sev7UC".format(name)), 1 - pl.sum_horizontal("Fatal", "Specified")), 0)
     ).alias("Sev7")
  ).with_columns(
    (pl.max_horizontal(pl.min_horizontal(pl.col("{}NonSevereUC".format(name)), 1 - pl.sum_horizontal("Fatal", "Specified", "Sev7")), 0)
     ).alias("NonSevere")
  ).with_columns(
    (pl.max_horizontal(pl.min_horizontal(pl.col("{}Shock7UC".format(name)), 1 - pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere")), 0)
     ).alias("Shock7")
  ).with_columns(
    (pl.max_horizontal(pl.min_horizontal(pl.col("{}ShockUC".format(name)), 1 - pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere", "Shock7")), 0)
     ).alias("Shock")
  ).with_columns(
    (1 - pl.sum_horizontal("Fatal", "Specified", "Sev7", "NonSevere","Shock7", "Shock")).alias("NotInjured")
  ).select(["derailmentConePotentialCollisionID", 
            "cutsetID",
            "cutset_probability",
            pl.col("Fatal").cast(pl.Float64), 
            pl.col("Specified").cast(pl.Float64), 
            pl.col("Sev7").cast(pl.Float64), 
            pl.col("NonSevere").cast(pl.Float64),
            pl.col("Shock7").cast(pl.Float64), 
            pl.col("Shock").cast(pl.Float64), 
            pl.col("NotInjured").cast(pl.Float64),
            "gate"]
  )

  return injuryAtoms

In [0]:
lazyFallEmbankmentPassengerIA = groupFourPassengerIASimple(lazyDerailmentsWithEmbankmentHeights, fallPassenger, fallPassenger.name)
# collect(streaming=True) (27/09/2024)
# streaming = True (27/09/2024)
lazyFallEmbankmentWorkforceIA = groupFourWorkforceIASimple(lazyDerailmentsWithEmbankmentHeights, fallWorkForce, fallWorkForce.name)
# collect(streaming=True) (27/09/2024)
# streaming = True (27/09/2024)
lazyFallEmbankmentPublicIA = groupFourPassengerIASimple(lazyDerailmentsWithEmbankmentHeights, fallPublic, fallPublic.name)
lazyFallBridgePassengerIA = groupFourPassengerIASimple(lazyDerailmentsWithBridgeHeights, fallPassenger, fallPassenger.name)
lazyFallBridgeWorkforceIA = groupFourWorkforceIASimple(lazyDerailmentsWithBridgeHeights,fallWorkForce, fallWorkForce.name)
lazyFallBridgePublicIA = groupFourPassengerIASimple(lazyDerailmentsWithBridgeHeights, fallPublic, fallPublic.name)

# collect(streaming=True) (27/09/2024)
# streaming = True (27/09/2024)

###Group 1 Injury Atoms
The injury Atoms are calculated after the string caching to allow for the necessary joins to be done efficiently 

In [0]:
# Here we manually join each injuryAtom with its cutset gate to see whether it would apply to a given scenario. This is because we cannot loop over the changes made to the lazyframes
# First we materialize the dataframes before assigning them to a LazyFrame to stream the calculations - a bug with polars causes  
# Derailment - notably we don't need to apply the cutset gates
lazyDerailmentPassengerIA = lazyDerPassengerIA

# collect(streaming=True) (27/09/2024)
# streaming=True (27/09/2024)

lazyDerailmentWorkforceIA = lazyDerWorkforceIA
# collect(streaming=True) (27/09/2024)
# streaming=True (27/09/2024)

# Significant structure collision
lazySTRColPassengerGateIA = lazySTRColPassengerIA.select(
    pl.exclude(
    "significantStructureCollapse", 
    "vehicleFallOnSide", 
    "smallStructureCollision", 
    "vehicleFallIntoWater")
    ).rename({"significantStructureCollision": "gate"}
    ).pipe(gate_effect_passenger)
# collect(streaming=True) (27/09/2024)
# streaming=True (27/09/2024)

lazySTRColWorkforceGateIA = lazySTRColWorkforceIA.select(
    pl.exclude(
    "significantStructureCollapse", 
    "vehicleFallOnSide", 
    "smallStructureCollision", 
    "vehicleFallIntoWater")
    ).rename({"significantStructureCollision": "gate"}
    ).pipe(gate_effect_workforce)


# Significant structure collapse
lazySTRCollapsePassengerGateIA = lazySTRCollapsePassengerIA.select(
    pl.exclude(
    "significantStructureCollision", 
    "vehicleFallOnSide", 
    "smallStructureCollision", 
    "vehicleFallIntoWater")
    ).rename({"significantStructureCollapse": "gate"}
    ).pipe(gate_effect_passenger)


lazySTRCollapseWorkforceGateIA = lazySTRCollapseWorkforceIA.select(
    pl.exclude(
    "significantStructureCollision", 
    "vehicleFallOnSide", 
    "smallStructureCollision", 
    "vehicleFallIntoWater")
    ).rename({"significantStructureCollapse": "gate"}
    ).pipe(gate_effect_workforce)


# Fall on side
lazySidePassengerGateIA = lazySidePassengerIA.select(
    pl.exclude(
    "significantStructureCollision", 
    "significantStructureCollapse", 
    "smallStructureCollision", 
    "vehicleFallIntoWater")
    ).rename({"vehicleFallOnSide": "gate"}
    ).pipe(gate_effect_passenger)


lazySideWorkforceGateIA = lazySideWorkforceIA.select(
    pl.exclude(
    "significantStructureCollision", 
    "significantStructureCollapse", 
    "smallStructureCollision", 
    "vehicleFallIntoWater")
    ).rename({"vehicleFallOnSide": "gate"}
    ).pipe(gate_effect_workforce)


# Small structure collision
lazySmallSTRPassengerGateIA = lazySmallSTRPassengerIA.select(
    pl.exclude(
    "significantStructureCollision", 
    "significantStructureCollapse", 
    "vehicleFallOnSide", 
    "vehicleFallIntoWater")
    ).rename({"smallStructureCollision": "gate"}
    ).pipe(gate_effect_passenger)


lazySmallSTRWorkforceGateIA = lazySmallSTRWorkforceIA.select(
    pl.exclude(
    "significantStructureCollision", 
    "significantStructureCollapse", 
    "vehicleFallOnSide", 
    "vehicleFallIntoWater")
    ).rename({"smallStructureCollision": "gate"}
    ).pipe(gate_effect_workforce)


# Fall into water
lazyFallWaterPassengerGateIA = lazyFallWaterPassengerIA.select(
    pl.exclude(
    "significantStructureCollision", 
    "significantStructureCollapse", 
    "vehicleFallOnSide", 
    "smallStructureCollision") 
    ).rename({"vehicleFallIntoWater": "gate"}
    ).pipe(gate_effect_passenger)


lazyFallWaterWorkforceGateIA = lazyFallWaterWorkforceIA.select(
    pl.exclude(
    "significantStructureCollision", 
    "significantStructureCollapse", 
    "vehicleFallOnSide", 
    "smallStructureCollision") 
    ).rename({"vehicleFallIntoWater": "gate"}
    ).pipe(gate_effect_workforce)


###Group 2 Injury Atoms


In [0]:
#Secondary Collision Passengers
lazySecondaryCollisionPassengerTrain1GateIA = secondaryCollisionPassengerTrain1IA.rename(
    {"secondaryCollision": "gate"}
    ).pipe(gate_effect_passenger)

# collect(streaming=True) (27/09/2024)
# streaming=True (27/09/2024)

lazySecondaryCollisionPassengerTrain2GateIA = secondaryCollisionPassengerTrain2IA.rename(
    {"secondaryCollision": "gate"}
    ).pipe(gate_effect_passenger)

# collect(streaming=True) (27/09/2024)
# streaming=True (27/09/2024)

#Secondary Collision Workforce
lazySecondaryCollisionWorkforceTrain1GateIA = secondaryCollisionWorkforceTrain1IA.rename(
    {"secondaryCollision": "gate"}
    ).pipe(gate_effect_workforce)

# collect(streaming=True) (27/09/2024)
# streaming=True (27/09/2024)

lazySecondaryCollisionWorkforceTrain2GateIA = secondaryCollisionWorkforceTrain2IA.rename(
    {"secondaryCollision": "gate"}
    ).pipe(gate_effect_workforce)

# collect(streaming=True) (27/09/2024)
# streaming=True (27/09/2024)

###Group 3 Injury Atoms


In [0]:
# The hazardous goods tables are hardcoded so we simply need to make sure that all derailment-collision-cutset combinations have the 
# bottom row which will be used in the matrix multiplication

# Hazardous Goods Passenger
# Train 1 is at risk regardless of whether there is a secondary collision, while Train2 is at risk only if there is a secondary collision
lazyHazGoodsPassengerTrain1NoCollisionGateIA = hazGoodsPassengerIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "toxicGoodRelease(noCollision)"), 
    how="cross"
    ).rename({"toxicGoodRelease(noCollision)": "gate"}
    ).pipe(gate_effect_passenger)
# collect(streaming=True) (27/09/2024)
# streaming=True (27/09/2024)

lazyHazGoodsPassengerTrain1CollisionGateIA = hazGoodsPassengerIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "toxicGoodRelease"), 
    how="cross"
    ).rename({"toxicGoodRelease": "gate"}
    ).pipe(gate_effect_passenger)
# collect(streaming=True) (27/09/2024)
# streaming=True (27/09/2024)

lazyHazGoodsPassengerTrain2CollisionGateIA = hazGoodsPassengerIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "toxicGoodRelease"), 
    how="cross"
    ).rename({"toxicGoodRelease": "gate"}
    ).pipe(gate_effect_passenger)

# Hazardous Goods Public
# The public is at risk from both trains
lazyHazGoodsPublicNoCollisionGateIA = hazGoodsPublicIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "toxicGoodRelease(noCollision)"), 
    how="cross"
    ).rename({"toxicGoodRelease(noCollision)": "gate"}
    ).pipe(gate_effect_passenger)

lazyHazGoodsPublicCollisionGateIA = hazGoodsPublicIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "toxicGoodRelease"), 
    how="cross"
    ).rename({"toxicGoodRelease": "gate"}
    ).pipe(gate_effect_passenger)

# Hazardous Goods Workforce
lazyHazGoodsWorkforceTrain1NoCollisionGateIA = hazGoodsWorkforceIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "toxicGoodRelease(noCollision)"), 
    how="cross"
    ).rename({"toxicGoodRelease(noCollision)": "gate"}
    ).pipe(gate_effect_workforce)

lazyHazGoodsWorkforceTrain1CollisionGateIA = hazGoodsWorkforceIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "toxicGoodRelease"), 
    how="cross"
    ).rename({"toxicGoodRelease": "gate"}
    ).pipe(gate_effect_workforce)

lazyHazGoodsWorkforceTrain2CollisionGateIA = hazGoodsWorkforceIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "toxicGoodRelease"), 
    how="cross"
    ).rename({"toxicGoodRelease": "gate"}
    ).pipe(gate_effect_workforce)


# Flammable Goods Passenger
lazyFlamGoodsPassengerTrain1NoCollisionGateIA = flamGoodsPassengerIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "flamGoodRelease(noCollision)"), 
    how="cross"
    ).rename({"flamGoodRelease(noCollision)": "gate"}
    ).pipe(gate_effect_passenger)

lazyFlamGoodsPassengerTrain1CollisionGateIA = flamGoodsPassengerIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "flamGoodRelease"), 
    how="cross"
    ).rename({"flamGoodRelease": "gate"}
    ).pipe(gate_effect_passenger)

lazyFlamGoodsPassengerTrain2CollisionGateIA = flamGoodsPassengerIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "flamGoodRelease"), 
    how="cross"
    ).rename({"flamGoodRelease": "gate"}
    ).pipe(gate_effect_passenger)

# Flammable Goods Public
# The public is at risk from both trains
lazyFlamGoodsPublicNoCollisionGateIA = flamGoodsPublicIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "flamGoodRelease(noCollision)"), 
    how="cross"
    ).rename({"flamGoodRelease(noCollision)": "gate"}
    ).pipe(gate_effect_passenger)

lazyFlamGoodsPublicCollisionGateIA = flamGoodsPublicIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "flamGoodRelease"), 
    how="cross"
    ).rename({"flamGoodRelease": "gate"}
    ).pipe(gate_effect_passenger)

# Flammable Goods Workforce
lazyFlamGoodsWorkforceTrain1NoCollisionGateIA = flamGoodsWorkforceIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "flamGoodRelease(noCollision)"), 
    how="cross"
    ).rename({"flamGoodRelease(noCollision)": "gate"}
    ).pipe(gate_effect_workforce)

lazyFlamGoodsWorkforceTrain1CollisionGateIA = flamGoodsWorkforceIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "flamGoodRelease"), 
    how="cross"
    ).rename({"flamGoodRelease": "gate"}
    ).pipe(gate_effect_workforce)

lazyFlamGoodsWorkforceTrain2CollisionGateIA = flamGoodsWorkforceIA.filter(pl.col("Injury") == "NotInjured"
    ).join(cutsetProbabilities.select(
    "derailmentConePotentialCollisionID", "cutsetID", "cutset_probability", "flamGoodRelease"), 
    how="cross"
    ).rename({"flamGoodRelease": "gate"}
    ).pipe(gate_effect_workforce)


In [0]:

fireFlamGoodsPassengerTrain1IA = non_workforce_fire_flam_goods(derailedTrainGroups, fireNonWorkForce)
fireNoFlamGoodsPassengerTrain1IA = non_workforce_fire_no_flam_goods(derailedTrainGroups, fireNonWorkForce)
fireFlamGoodsWorkforceTrain1IA = workforce_fire_flam_goods(derailedTrainGroups, fireWorkForce)
fireNoFlamGoodsWorkforceTrain1IA = workforce_fire_no_flam_goods(derailedTrainGroups, fireWorkForce) 
fireFlamGoodsPassengersTrain2IA = non_workforce_fire_flam_goods(collidingTrainGroups, fireNonWorkForce)
fireNoFlamGoodsPassengersTrain2IA = non_workforce_fire_no_flam_goods(collidingTrainGroups, fireNonWorkForce)
fireFlamGoodsWorkforceTrain2IA = workforce_fire_flam_goods(collidingTrainGroups, fireWorkForce)
fireNoFlamGoodsWorkforceTrain2IA = workforce_fire_no_flam_goods(collidingTrainGroups, fireWorkForce)

lazyFireFlamGoodsPassengerTrain1GateIA = gate_effect_passenger(fireFlamGoodsPassengerTrain1IA)


lazyFireNoFlamGoodsPassengerTrain1GateIA = gate_effect_passenger(fireNoFlamGoodsPassengerTrain1IA)

lazyFireFlamGoodsPassengerTrain2GateIA = gate_effect_passenger(fireFlamGoodsPassengersTrain2IA)

lazyFireNoFlamGoodsPassengerTrain2GateIA = gate_effect_passenger(fireNoFlamGoodsPassengersTrain2IA)

lazyFireFlamGoodsWorkforceTrain1GateIA = gate_effect_workforce(fireFlamGoodsWorkforceTrain1IA)

lazyFireNoFlamGoodsWorkforceTrain1GateIA = gate_effect_workforce(fireNoFlamGoodsWorkforceTrain1IA)

lazyFireFlamGoodsWorkforceTrain2GateIA = gate_effect_workforce(fireFlamGoodsWorkforceTrain2IA)

lazyFireNoFlamGoodsWorkforceTrain2GateIA = gate_effect_workforce(fireNoFlamGoodsWorkforceTrain2IA)


###Group 4 Injury Atoms


In [0]:
#Fall from embankment
lazyFallEmbankmentPassengerGateIA = lazyFallEmbankmentPassengerIA.pipe(gate_effect_passenger)

lazyFallEmbankmentWorkforceGateIA = lazyFallEmbankmentWorkforceIA.pipe(gate_effect_workforce)

lazyFallEmbankmentPublicGateIA = lazyFallEmbankmentPublicIA.pipe(gate_effect_passenger)
# streaming=True (03/10/2024)
# collect(streaming=True)

#Fall from Height (termed "FallBridge" in the IA)
lazyFallBridgePassengerGateIA = lazyFallBridgePassengerIA.pipe(gate_effect_passenger)
# streaming=True (03/10/2024)
# collect(streaming=True)

lazyFallBridgeWorkforceGateIA = lazyFallBridgeWorkforceIA.pipe(gate_effect_workforce)

lazyFallBridgePublicGateIA = lazyFallBridgePublicIA.pipe(gate_effect_passenger)


#Bottom Row Multiplication Function 
Originally the IA were in matrix form (5x5 for passenger and public and 7x7 for workforce) and subsequent escalations were multiplied using matrix multiplication. This operation is _**not**_ supported in polars. 
However it is possible to reconstruct the complete injury atom matrix using only the probability of an uninjured individual sustaining a certain degree of injury, the so-called matrix "bottom row". Thus we construct the bottom row of each matrix using only terms found in the bottom row of each matrix.

##Functions


In [0]:
def passenger_injury_atom_multiplication(lazyframe1, lazyframe2):
  output = lazyframe1.join(lazyframe2, on = ["derailmentConePotentialCollisionID", "cutsetID"], how = "inner", suffix = "_right"
  ).with_columns(
      (pl.col("Fatal") + 
      pl.col("SevHosp") * pl.col("Fatal_right") + 
      pl.col("NonSevere") * pl.col("Fatal_right") + 
      pl.col("Shock") * pl.col("Fatal_right") + 
      pl.col("NotInjured") * pl.col("Fatal_right")
      ).alias("Fatal_new"), 
      
      (pl.col("SevHosp") * (1.0 - pl.col("Fatal_right")) + 
      pl.col("NonSevere") * pl.col("SevHosp_right") + 
      pl.col("Shock") * pl.col("SevHosp_right") + 
      pl.col("NotInjured") * pl.col("SevHosp_right")
      ).alias("SevHosp_new"),
      
      (pl.col("NonSevere") * (1 - pl.sum_horizontal("Fatal_right", "SevHosp_right")) + 
      pl.col("Shock") * pl.col("NonSevere_right") + 
      pl.col("NotInjured") * pl.col("NonSevere_right")
      ).alias("NonSevere_new"),
      
      (pl.col("Shock") * (1 - pl.sum_horizontal("Fatal_right", "SevHosp_right", "NonSevere_right")) + 
      pl.col("NotInjured") * pl.col("Shock_right")
      ).alias("Shock_new"), 

      (pl.col("NotInjured") * pl.col("NotInjured_right")
      ).alias("NotInjured_new")
  ).select(
      "derailmentConePotentialCollisionID", 
      "cutsetID", 
      "cutset_probability",
      pl.col("Fatal_new").alias("Fatal"), 
      pl.col("SevHosp_new").alias("SevHosp"),
      pl.col("NonSevere_new").alias("NonSevere"),
      pl.col("Shock_new").alias("Shock"),
      pl.col("NotInjured_new").alias("NotInjured"),
  )

  return output

In [0]:
# NOTE: The matrix multiplications can be rewritten in the form of pl.sum_horizontal(x,y,z) * pl.col(injury)
# I have kept this as it is for clarity 
def workforce_injury_atom_multiplication(lazyframe1, lazyframe2):
    output = lazyframe1.join(lazyframe2, on = ["derailmentConePotentialCollisionID", "cutsetID"], how = "inner", suffix = "_right"
    ).with_columns(
        (pl.col("Fatal") + 
        pl.col("Specified") * pl.col("Fatal_right") + 
        pl.col("Sev7") * pl.col("Fatal_right") + 
        pl.col("NonSevere") * pl.col("Fatal_right") + 
        pl.col("Shock7") * pl.col("Fatal_right") + 
        pl.col("Shock") * pl.col("Fatal_right") + 
        pl.col("NotInjured") * pl.col("Fatal_right")
        ).alias("Fatal_new"), 
        
        (pl.col("Specified") * (1 - pl.col("Fatal_right")) + 
        pl.col("Sev7") * pl.col("Specified_right") + 
        pl.col("NonSevere") * pl.col("Specified_right") + 
        pl.col("Shock7") * pl.col("Specified_right") + 
        pl.col("Shock") * pl.col("Specified_right") + 
        pl.col("NotInjured") * pl.col("Specified_right")
        ).alias("Specified_new"),
        
        (pl.col("Sev7") * (1 - pl.sum_horizontal("Fatal_right", "Specified_right")) + 
        pl.col("NonSevere") * pl.col("Sev7_right") + 
        pl.col("Shock7") * pl.col("Sev7_right") + 
        pl.col("Shock") * pl.col("Sev7_right") + 
        pl.col("NotInjured") * pl.col("Sev7_right")
        ).alias("Sev7_new"),
        
        (pl.col("NonSevere") * (1 - pl.sum_horizontal("Fatal_right", "Specified_right", "Sev7_right")) +
        pl.col("Shock7") * pl.col("NonSevere_right") + 
        pl.col("Shock") * pl.col("NonSevere_right") + 
        pl.col("NotInjured") * pl.col("NonSevere_right")
        ).alias("NonSevere_new"), 

        (pl.col("Shock7") * (1 - pl.sum_horizontal("Fatal_right", "Specified_right", "Sev7_right", "NonSevere_right")) +
        pl.col("Shock") * pl.col("Shock7_right") + 
        pl.col("NotInjured") * pl.col("Shock7_right")
        ).alias("Shock7_new"),
        
        (pl.col("Shock") * (1 - pl.sum_horizontal("Fatal_right", "Specified_right", "Sev7_right", "NonSevere_right", "Shock7_right")) +
        pl.col("NotInjured") * pl.col("Shock_right")
        ).alias("Shock_new"),
        
        (pl.col("NotInjured") * pl.col("NotInjured_right")).alias("NotInjured_new")
    ).select(
        "derailmentConePotentialCollisionID", 
        "cutsetID", 
        pl.col("Fatal_new").alias("Fatal"), 
        pl.col("Specified_new").alias("Specified"),
        pl.col("Sev7_new").alias("Sev7"),
        pl.col("NonSevere_new").alias("NonSevere"),
        pl.col("Shock7_new").alias("Shock7"),
        pl.col("Shock_new").alias("Shock"),
        pl.col("NotInjured_new").alias("NotInjured"),
    )

    return output

##Train Loading by Cutset Scenario


In [0]:
lazyTrain1CutsetProbabilityPassengerLoading = derailedTrainGroups.with_columns(
    (pl.col("cutset_probability") * pl.col("derailedTrainLoading")
     ).alias("weightedLoading")
    ).select(
        "derailmentConePotentialCollisionID",
        "cutsetID",
        pl.col("derailedTrainLoading").alias("loading"),
        "weightedLoading"
    )
# streaming=True (03/10/2024)
# collect(streaming=True)
 
lazyTrain2CutsetProbabilityPassengerLoading =  collidingTrainGroups.with_columns(
    (pl.col("cutset_probability") * pl.col("collidingTrainLoading")
     ).alias("weightedLoading")
    ).select(
        "derailmentConePotentialCollisionID",
        "cutsetID",
        pl.col("collidingTrainLoading").alias("loading"),
        "weightedLoading"
    )

lazyTrain1CutsetProbabilityWorkforceLoading = derailedTrainGroups.with_columns(
    (pl.col("cutset_probability") * 2
     ).alias("weightedLoading")
    ).select(
        "derailmentConePotentialCollisionID",
        "cutsetID",
        "weightedLoading"
    )

lazyTrain2CutsetProbabilityWorkforceLoading = collidingTrainGroups.with_columns(
    (pl.col("cutset_probability") * 2
     ).alias("weightedLoading")
    ).select(
        "derailmentConePotentialCollisionID",
        "cutsetID",
        "weightedLoading"
    )

#Injury Atom Calculation
The injury atoms are calculated by successive operations, where the output of the function is fed again into the function with the next injury atom. 
This eventually returns the proportion of each injury. 

##Passenger Injury Atoms


###Train 1


In [0]:
# This step uses a number of large LazyFrames to make calculations. These represent a considerable strain to memory resources. By clearing them
# after they have been used in their respective calculations we save up on memory. 

injuryAtomsPassengerTrain1 = lazyDerailmentPassengerIA.pipe(
    passenger_injury_atom_multiplication, lazySTRColPassengerGateIA
).pipe(passenger_injury_atom_multiplication, lazySTRCollapsePassengerGateIA
).pipe(passenger_injury_atom_multiplication, lazySmallSTRPassengerGateIA
).pipe(passenger_injury_atom_multiplication, lazyFallBridgePassengerGateIA #Unknown introduced
).pipe(passenger_injury_atom_multiplication, lazyFallEmbankmentPassengerGateIA #Unknown
).pipe(passenger_injury_atom_multiplication, lazyFallWaterPassengerGateIA
).pipe(passenger_injury_atom_multiplication, lazySidePassengerGateIA
).pipe(passenger_injury_atom_multiplication, lazySecondaryCollisionPassengerTrain1GateIA
).pipe(passenger_injury_atom_multiplication, lazyHazGoodsPassengerTrain1NoCollisionGateIA
).pipe(passenger_injury_atom_multiplication, lazyHazGoodsPassengerTrain1CollisionGateIA
).pipe(passenger_injury_atom_multiplication, lazyFireFlamGoodsPassengerTrain1GateIA
).pipe(passenger_injury_atom_multiplication, lazyFireNoFlamGoodsPassengerTrain1GateIA
).pipe(passenger_injury_atom_multiplication, lazyFlamGoodsPassengerTrain1NoCollisionGateIA
).pipe(passenger_injury_atom_multiplication, lazyFlamGoodsPassengerTrain1CollisionGateIA
)



###Train 2


In [0]:
injuryAtomsPassengerTrain2 = lazySecondaryCollisionPassengerTrain2GateIA.pipe(
    passenger_injury_atom_multiplication, lazyHazGoodsPassengerTrain2CollisionGateIA
).pipe(passenger_injury_atom_multiplication, lazyFireFlamGoodsPassengerTrain2GateIA
).pipe(passenger_injury_atom_multiplication, lazyFireNoFlamGoodsPassengerTrain2GateIA
).pipe(passenger_injury_atom_multiplication, lazyFlamGoodsPassengerTrain2CollisionGateIA)


##Workforce Injury Atoms


###Train 1


In [0]:
injuryAtomsWorkforceTrain1 = lazyDerailmentWorkforceIA.pipe(
    workforce_injury_atom_multiplication, lazySTRColWorkforceGateIA
).pipe(workforce_injury_atom_multiplication, lazySTRCollapseWorkforceGateIA
).pipe(workforce_injury_atom_multiplication, lazySmallSTRWorkforceGateIA
).pipe(workforce_injury_atom_multiplication, lazyFallBridgeWorkforceGateIA 
).pipe(workforce_injury_atom_multiplication, lazyFallEmbankmentWorkforceGateIA 
).pipe(workforce_injury_atom_multiplication, lazyFallWaterWorkforceGateIA
).pipe(workforce_injury_atom_multiplication, lazySideWorkforceGateIA
).pipe(workforce_injury_atom_multiplication, lazySecondaryCollisionWorkforceTrain1GateIA
).pipe(workforce_injury_atom_multiplication, lazyHazGoodsWorkforceTrain1NoCollisionGateIA
).pipe(workforce_injury_atom_multiplication, lazyHazGoodsWorkforceTrain1CollisionGateIA
).pipe(workforce_injury_atom_multiplication, lazyFireFlamGoodsWorkforceTrain1GateIA
).pipe(workforce_injury_atom_multiplication, lazyFireNoFlamGoodsWorkforceTrain1GateIA
).pipe(workforce_injury_atom_multiplication, lazyFlamGoodsWorkforceTrain1NoCollisionGateIA
).pipe(workforce_injury_atom_multiplication, lazyFlamGoodsWorkforceTrain1CollisionGateIA
)

###Train 2


In [0]:
injuryAtomsWorkforceTrain2 = lazySecondaryCollisionWorkforceTrain2GateIA.pipe(
    workforce_injury_atom_multiplication, lazyHazGoodsWorkforceTrain2CollisionGateIA
).pipe(workforce_injury_atom_multiplication, lazyFireFlamGoodsWorkforceTrain2GateIA
).pipe(workforce_injury_atom_multiplication, lazyFireNoFlamGoodsWorkforceTrain2GateIA
).pipe(workforce_injury_atom_multiplication, lazyFlamGoodsWorkforceTrain2CollisionGateIA)

##Public Injury Atoms


In [0]:
injuryAtomsPublic = lazyFallEmbankmentPublicGateIA.pipe(
    passenger_injury_atom_multiplication, lazyFallBridgePublicGateIA
).pipe(passenger_injury_atom_multiplication, lazyHazGoodsPublicNoCollisionGateIA
).pipe(passenger_injury_atom_multiplication, lazyHazGoodsPublicCollisionGateIA
).pipe(passenger_injury_atom_multiplication, lazyFlamGoodsPublicNoCollisionGateIA
).pipe(passenger_injury_atom_multiplication, lazyFlamGoodsPublicCollisionGateIA)

#Injuries


##Passengers


###Train 1


In [0]:
passengerInjuriesTrain1 = injuryAtomsPassengerTrain1.join(
    lazyTrain1CutsetProbabilityPassengerLoading.select(
        pl.exclude("cutset_probability")
        ), on = ["derailmentConePotentialCollisionID", "cutsetID"], how = "inner"
    ).with_columns(
        (pl.col("Fatal") * pl.col("loading")).alias("RAWFatal"),
        (pl.col("SevHosp") * pl.col("loading")).alias("RAWSevHosp"),
        (pl.col("NonSevere") * pl.col("loading")).alias("RAWNonSevere"),
        (pl.col("Shock") * pl.col("loading")).alias("RAWShock"),
        (pl.col("NotInjured") * pl.col("loading")).alias("RAWNotInjured"),
        (pl.col("Fatal") * pl.col("weightedLoading")).alias("weightedFatal"),
        (pl.col("SevHosp") * pl.col("weightedLoading")).alias("weightedSevHosp"),
        (pl.col("NonSevere") * pl.col("weightedLoading")).alias("weightedNonSevere"),
        (pl.col("Shock") * pl.col("weightedLoading")).alias("weightedShock"),
        (pl.col("NotInjured") * pl.col("weightedLoading")).alias("weightedNotInjured")
    )
    

###Train 2


In [0]:
passengerInjuriesTrain2 = injuryAtomsPassengerTrain2.join(
    lazyTrain1CutsetProbabilityPassengerLoading.select(
        pl.exclude("cutset_probability")
        ), on = ["derailmentConePotentialCollisionID", "cutsetID"], how = "inner"
    ).with_columns(
        (pl.col("Fatal") * pl.col("loading")).alias("RAWFatal"),
        (pl.col("SevHosp") * pl.col("loading")).alias("RAWSevHosp"),
        (pl.col("NonSevere") * pl.col("loading")).alias("RAWNonSevere"),
        (pl.col("Shock") * pl.col("loading")).alias("RAWShock"),
        (pl.col("NotInjured") * pl.col("loading")).alias("RAWNotInjured"),
        (pl.col("Fatal") * pl.col("weightedLoading")).alias("weightedFatal"),
        (pl.col("SevHosp") * pl.col("weightedLoading")).alias("weightedSevHosp"),
        (pl.col("NonSevere") * pl.col("weightedLoading")).alias("weightedNonSevere"),
        (pl.col("Shock") * pl.col("weightedLoading")).alias("weightedShock"),
        (pl.col("NotInjured") * pl.col("weightedLoading")).alias("weightedNotInjured")
    )


###Total Expected Passenger Injuries


In [0]:
lazyInjuryWeights = lazyPersonInjury.join(lazyInjuryDegree, on=["injuryDegreeID"], how="inner")


In [0]:
totalExpectedPassengerInjuries = passengerInjuriesTrain1.join(
    passengerInjuriesTrain2, on=["derailmentConePotentialCollisionID", "cutsetID"], how= "inner", suffix="_right"
    ).with_columns(
        pl.sum_horizontal("RAWFatal","RAWFatal_right").alias("RAWFatal"),
        pl.sum_horizontal("RAWSevHosp","RAWSevHosp_right").alias("RAWSevHosp"),
        pl.sum_horizontal("RAWNonSevere","RAWNonSevere_right").alias("RAWNonSevere"),
        pl.sum_horizontal("RAWShock","RAWShock_right").alias("RAWShock"),
        pl.sum_horizontal("RAWNotInjured","RAWNotInjured_right").alias("RAWNotInjured"),
        pl.sum_horizontal("weightedFatal","weightedFatal_right").alias("weightedFatal"),
        pl.sum_horizontal("weightedSevHosp","weightedSevHosp_right").alias("weightedSevHosp"),
        pl.sum_horizontal("weightedNonSevere","weightedNonSevere_right").alias("weightedNonSevere"),
        pl.sum_horizontal("weightedShock","weightedShock_right").alias("weightedShock"),
        pl.sum_horizontal("weightedNotInjured","weightedNotInjured_right").alias("weightedNotInjured")
    ).select(
        "derailmentConePotentialCollisionID",
        "cutsetID",
        "RAWFatal", 
        "RAWSevHosp",
        "RAWNonSevere",
        "RAWShock",
        "RAWNotInjured",
        "weightedFatal", 
        "weightedSevHosp",
        "weightedNonSevere",
        "weightedShock",
        "weightedNotInjured"
    ).join(lazyInjuryWeights.filter(pl.col("personTypeID") == "P").select(["injuryDegreeID", "personInjuryID", "weight"]), how="cross"
    ).with_columns( # This complicated pl.when() statement checks the injuryDegreeID and assigns the correct value to the new field
                    # It is designed this way to avoid utilizing a .pivot() or .transpose() method. This is because they are VERY 
                    # expensive operations and require for us to manifest the entire dataframe. This will not be possible for the entire 
                    # dataset.
        (pl.when(pl.col("injuryDegreeID") == "Fatal"
                ).then(pl.col("RAWFatal")
                ).otherwise(
                    pl.when(pl.col("injuryDegreeID") == "SevHosp"
                            ).then(pl.col("RAWSevHosp")
                            ).otherwise(pl.when(pl.col("injuryDegreeID") == "NonSevere"
                                        ).then(pl.col("RAWNonSevere")
                                        ).otherwise(pl.when(pl.col("injuryDegreeID") == "Shock"
                                                            ).then(pl.col("RAWShock")
                                                            ).otherwise(pl.when(pl.col("injuryDegreeID") == "NotInjured"
                                                                        ).then(pl.col("RAWNotInjured"))
                                                                )
                                                    )
                                        )
                            )
                ).alias("numberOfInjuriesRAW"),
        
        (pl.when(pl.col("injuryDegreeID") == "Fatal"
                ).then(pl.col("weightedFatal")
                ).otherwise(
                    pl.when(pl.col("injuryDegreeID") == "SevHosp"
                            ).then(pl.col("weightedSevHosp")
                            ).otherwise(pl.when(pl.col("injuryDegreeID") == "NonSevere"
                                        ).then(pl.col("weightedNonSevere")
                                        ).otherwise(pl.when(pl.col("injuryDegreeID") == "Shock"
                                                            ).then(pl.col("weightedShock")
                                                            ).otherwise(pl.when(pl.col("injuryDegreeID") == "NotInjured"
                                                                        ).then(pl.col("weightedNotInjured"))
                                                                )
                                                    )
                                        )
                            )
                ).alias("expectedNumberOfInjuries"),
        
        (pl.when(pl.col("injuryDegreeID") == "Fatal"
                ).then(pl.col("weightedFatal") * pl.col("weight")
                ).otherwise(
                    pl.when(pl.col("injuryDegreeID") == "SevHosp"
                            ).then(pl.col("weightedSevHosp") * pl.col("weight")
                            ).otherwise(pl.when(pl.col("injuryDegreeID") == "NonSevere"
                                        ).then(pl.col("weightedNonSevere") * pl.col("weight")
                                        ).otherwise(pl.when(pl.col("injuryDegreeID") == "Shock"
                                                            ).then(pl.col("weightedShock") * pl.col("weight")
                                                            ).otherwise(pl.when(pl.col("injuryDegreeID") == "NotInjured"
                                                                        ).then(pl.col("weightedNotInjured") * pl.col("weight"))
                                                                )
                                                    )
                                        )
                            )
                ).alias("personInjuryConsequence")
            ).select(
                "derailmentConePotentialCollisionID",
                "cutsetID",
                "personInjuryID",
                "numberOfInjuriesRAW",
                "expectedNumberOfInjuries",
                "personInjuryConsequence"
    # streaming= True until this aggregation step
            )


In [0]:
totalExpectedPassengerInjuriesAgg = totalExpectedPassengerInjuries.group_by("derailmentConePotentialCollisionID", "personInjuryID"
            ).agg(pl.col("numberOfInjuriesRAW").sum(),
                  pl.col("expectedNumberOfInjuries").sum(), 
                  pl.col("personInjuryConsequence").sum()
            ).sort(by = [pl.col("derailmentConePotentialCollisionID").cast(pl.String), pl.col("personInjuryID").cast(pl.String)])

##Workforce


###Train 1


In [0]:
workforceInjuriesTrain1 = injuryAtomsWorkforceTrain1.join(
    lazyTrain1CutsetProbabilityWorkforceLoading.select(
        pl.exclude("cutset_probability")
        ), on = ["derailmentConePotentialCollisionID", "cutsetID"], how = "inner"
    ).with_columns(
        (pl.col("Fatal") * 2).alias("RAWFatal"),
        (pl.col("Specified") * 2).alias("RAWSpecified"),
        (pl.col("Sev7") * 2).alias("RAWSev7"),
        (pl.col("NonSevere") * 2).alias("RAWNonSevere"),
        (pl.col("Shock7") * 2).alias("RAWShock7"),
        (pl.col("Shock") * 2).alias("RAWShock"),
        (pl.col("NotInjured") * 2).alias("RAWNotInjured"),
        (pl.col("Fatal") * pl.col("weightedLoading")).alias("weightedFatal"),
        (pl.col("Specified") * pl.col("weightedLoading")).alias("weightedSpecified"),
        (pl.col("Sev7") * pl.col("weightedLoading")).alias("weightedSev7"),
        (pl.col("NonSevere") * pl.col("weightedLoading")).alias("weightedNonSevere"),
        (pl.col("Shock7") * pl.col("weightedLoading")).alias("weightedShock7"),
        (pl.col("Shock") * pl.col("weightedLoading")).alias("weightedShock"),
        (pl.col("NotInjured") * pl.col("weightedLoading")).alias("weightedNotInjured")
    )

###Train 2


In [0]:
workforceInjuriesTrain2 = injuryAtomsWorkforceTrain2.join(
    lazyTrain1CutsetProbabilityWorkforceLoading.select(
        pl.exclude("cutset_probability")
        ), on = ["derailmentConePotentialCollisionID", "cutsetID"], how = "inner"
    ).with_columns( #As we do not have values for workforce loading it is assumed that there are only two workers on each train
        (pl.col("Fatal") * 2).alias("RAWFatal"),
        (pl.col("Specified") * 2).alias("RAWSpecified"),
        (pl.col("Sev7") * 2).alias("RAWSev7"),
        (pl.col("NonSevere") * 2).alias("RAWNonSevere"),
        (pl.col("Shock7") * 2).alias("RAWShock7"),
        (pl.col("Shock") * 2).alias("RAWShock"),
        (pl.col("NotInjured") * 2).alias("RAWNotInjured"),
        (pl.col("Fatal") * pl.col("weightedLoading")).alias("weightedFatal"),
        (pl.col("Specified") * pl.col("weightedLoading")).alias("weightedSpecified"),
        (pl.col("Sev7") * pl.col("weightedLoading")).alias("weightedSev7"),
        (pl.col("NonSevere") * pl.col("weightedLoading")).alias("weightedNonSevere"),
        (pl.col("Shock7") * pl.col("weightedLoading")).alias("weightedShock7"),
        (pl.col("Shock") * pl.col("weightedLoading")).alias("weightedShock"),
        (pl.col("NotInjured") * pl.col("weightedLoading")).alias("weightedNotInjured")
    )

###Total Expected Workforce Injuries


In [0]:
totalExpectedWorkforceInjuries = workforceInjuriesTrain1.join(
    workforceInjuriesTrain2, on=["derailmentConePotentialCollisionID", "cutsetID"], how= "inner", suffix="_right"
    ).with_columns(
        pl.sum_horizontal("RAWFatal","RAWFatal_right").alias("RAWFatal"),
        pl.sum_horizontal("RAWSpecified","RAWSpecified_right").alias("RAWSpecified"),
        pl.sum_horizontal("RAWSev7","RAWSev7_right").alias("RAWSev7"),
        pl.sum_horizontal("RAWNonSevere","RAWNonSevere_right").alias("RAWNonSevere"),
        pl.sum_horizontal("RAWShock7","RAWShock7_right").alias("RAWShock7"),
        pl.sum_horizontal("RAWShock","RAWShock_right").alias("RAWShock"),
        pl.sum_horizontal("RAWNotInjured","RAWNotInjured_right").alias("RAWNotInjured"),
        pl.sum_horizontal("weightedFatal","weightedFatal_right").alias("weightedFatal"),
        pl.sum_horizontal("weightedSpecified","weightedSpecified_right").alias("weightedSpecified"),
        pl.sum_horizontal("weightedSev7","weightedSev7_right").alias("weightedSev7"),
        pl.sum_horizontal("weightedNonSevere","weightedNonSevere_right").alias("weightedNonSevere"),
        pl.sum_horizontal("weightedShock7","weightedShock7_right").alias("weightedShock7"),
        pl.sum_horizontal("weightedShock","weightedShock_right").alias("weightedShock"),
        pl.sum_horizontal("weightedNotInjured","weightedNotInjured_right").alias("weightedNotInjured")
    ).select(
        "derailmentConePotentialCollisionID",
        "cutsetID",
        "RAWFatal", 
        "RAWSpecified",
        "RAWSev7",
        "RAWNonSevere",
        "RAWShock7",
        "RAWShock",
        "RAWNotInjured",
        "weightedFatal",
        "weightedSpecified",  
        "weightedSev7",
        "weightedNonSevere",
        "weightedShock7",
        "weightedShock",
        "weightedNotInjured"
    ).join(lazyInjuryWeights.filter(pl.col("personTypeID") == "W").select(["injuryDegreeID", "personInjuryID", "weight"]), how="cross"
    ).with_columns( # This complicated pl.when() statement checks the injuryDegreeID and assigns the correct value to the new field
                    # It is designed this way to avoid utilizing a .pivot() or .transpose() method. This is because they are VERY 
                    # expensive operations and require for us to manifest the entire dataframe. This will not be possible for the entire 
                    # dataset.
         (pl.when(pl.col("injuryDegreeID") == "Fatal"
                ).then(pl.col("RAWFatal")
                ).otherwise(
                    pl.when(pl.col("injuryDegreeID") == "Specified"
                            ).then(pl.col("RAWSpecified")
                            ).otherwise(pl.when(pl.col("injuryDegreeID") == "Sev7"
                                        ).then(pl.col("RAWSev7")
                                        ).otherwise(pl.when(pl.col("injuryDegreeID") == "NonSevere"
                                                            ).then(pl.col("RAWNonSevere")
                                                            ).otherwise(pl.when(pl.col("injuryDegreeID") == "Shock7"
                                                                        ).then(pl.col("RAWShock7")
                                                                        ).otherwise(pl.when(pl.col("injuryDegreeID") == "Shock"
                                                                                            ).then(pl.col("RAWShock")
                                                                                            ).otherwise(pl.when(pl.col("injuryDegreeID") == "NotInjured").then(pl.col("RAWNotInjured"))
                                                                                                        )
                                                                                            )
                                                                        )
                                                            )
                                        )
                            )
                ).alias("numberOfInjuriesRAW"),
        
        (pl.when(pl.col("injuryDegreeID") == "Fatal"
                ).then(pl.col("weightedFatal")
                ).otherwise(
                    pl.when(pl.col("injuryDegreeID") == "Specified"
                            ).then(pl.col("weightedSpecified")
                            ).otherwise(pl.when(pl.col("injuryDegreeID") == "Sev7"
                                        ).then(pl.col("weightedSev7")
                                        ).otherwise(pl.when(pl.col("injuryDegreeID") == "NonSevere"
                                                            ).then(pl.col("weightedNonSevere")
                                                            ).otherwise(pl.when(pl.col("injuryDegreeID") == "Shock7"
                                                                        ).then(pl.col("weightedShock7")
                                                                        ).otherwise(pl.when(pl.col("injuryDegreeID") == "Shock"
                                                                                    ).then(pl.col("weightedShock")
                                                                                    ).otherwise(pl.when(pl.col("injuryDegreeID") == "NotInjured"
                                                                                                        ).then(pl.col("weightedNotInjured"))
                                                                                                )
                                                                                    )
                                                                        )
                                                            )
                                        )
                            )
                ).alias("expectedNumberOfInjuries"),
        
        (pl.when(pl.col("injuryDegreeID") == "Fatal"
                ).then(pl.col("weightedFatal") * pl.col("weight")
                ).otherwise(
                    pl.when(pl.col("injuryDegreeID") == "Specified"
                            ).then(pl.col("weightedSpecified") * pl.col("weight")
                            ).otherwise(pl.when(pl.col("injuryDegreeID") == "Sev7"
                                        ).then(pl.col("weightedSev7") * pl.col("weight")
                                        ).otherwise(pl.when(pl.col("injuryDegreeID") == "NonSevere"
                                                            ).then(pl.col("weightedNonSevere") * pl.col("weight")
                                                            ).otherwise(pl.when(pl.col("injuryDegreeID") == "Shock7"
                                                                        ).then(pl.col("weightedShock7") * pl.col("weight")
                                                                        ).otherwise(pl.when(pl.col("injuryDegreeID") == "Shock"
                                                                                    ).then(pl.col("weightedShock") * pl.col("weight")
                                                                                    ).otherwise(pl.when(pl.col("injuryDegreeID") == "NotInjured"
                                                                                                        ).then(pl.col("weightedNotInjured") * pl.col("weight"))
                                                                                                )
                                                                                    )
                                                                        )
                                                            )
                                        )
                            )
                ).alias("personInjuryConsequence")
            ).select(
                "derailmentConePotentialCollisionID",
                "cutsetID",
                "personInjuryID",
                "numberOfInjuriesRAW",
                "expectedNumberOfInjuries",
                "personInjuryConsequence"
            )

In [0]:
totalExpectedWorkforceInjuriesAgg = totalExpectedWorkforceInjuries.group_by("derailmentConePotentialCollisionID", "personInjuryID"
            ).agg(pl.col("numberOfInjuriesRAW").sum(),
                  pl.col("expectedNumberOfInjuries").sum(), 
                  pl.col("personInjuryConsequence").sum()
            ).sort(by = [pl.col("derailmentConePotentialCollisionID").cast(pl.String), pl.col("personInjuryID").cast(pl.String)])

In [0]:
# LazyFrames unfortunately do not have access to the .vstack() method that would have performed this operation seemlessly. 
# There we need to resort to .merge_sorted() to merge the two LazyFrames. 
# Notably merge_sorted is NOT a method that can function with streaming which is why the aggregation had to occur beforehand
# Also merge_sorted converts the variables into dtype=pl.Categorical prior to the merging, which is why we had to convert them into strings

totalExpectedInjuries = pl.concat([totalExpectedPassengerInjuries, totalExpectedWorkforceInjuries], how = "vertical")

#Person Injuries of a Train Derailment on a Section in a direction per Period by Scenario


In [0]:
derailmentFrequencies = cutsetProbabilities.join(
    lazyTrainDerailmentSectionDirectionPeriod.select(
    "trainSectionDerailmentPeriodDirectionID", "derailmentFrequency"),
    on="trainSectionDerailmentPeriodDirectionID", how="inner"
    )

In [0]:
lazyTDSDPS = totalExpectedInjuries.join(derailmentFrequencies.select("derailmentConePotentialCollisionID", "cutsetID", "derailmentFrequency"),
                                        on = ["derailmentConePotentialCollisionID", "cutsetID"])



In [0]:
output = lazyTDSDPS.group_by(
    "derailmentConePotentialCollisionID","cutsetID", "personInjuryID"
    ).agg(
        pl.col("derailmentFrequency").sum(), 
        pl.col("personInjuryConsequence").sum()
    ).with_columns(
        (pl.col("derailmentFrequency") * pl.col("personInjuryConsequence")).alias("Risk")
    )

In [0]:
output.collect(streaming=True)

In [0]:
output2 = lazyTDSDPS.select("derailmentConePotentialCollisionID", "personInjuryID", "personInjuryConsequence")

In [0]:
output2.collect(streaming=True)

In [0]:
end = time.perf_counter()
print(end - start)