In [1]:
import pandas as pd
import numpy as np

# Simple approach. Only Faults and cost.

## Assumption: No preventative, only one maintenance type.
## Assumption 2: Each fault type only takes 1 trip per day

In [29]:
fault_df = pd.read_csv('../data/raw/fault_data.csv')

# Parse DateTime and filter necessary data
fault_df['DateTime'] = pd.to_datetime(fault_df['DateTime'], format='%Y-%m-%d %H:%M', errors='coerce')

# Extract year, month, and day for grouping and high-demand season determination
fault_df['Year'] = fault_df['DateTime'].dt.year
fault_df['Month'] = fault_df['DateTime'].dt.month
fault_df['Day'] = fault_df['DateTime'].dt.date

# Calculate internal cost
internal_fault_types = fault_df['Fault'].unique()
internal_cost = 750000 * len([fault for fault in internal_fault_types if fault != 'NF'])

# Calculate external cost
# Define high-demand months
high_demand_months = [1, 2, 6, 7, 8]

external_cost = 0
same_day_faults = set()  # Track faults already addressed on the same day
current_month = None
for _, fault in fault_df.iterrows():
    if fault['Fault'] != 'NF':  # Only consider faults
        print(same_day_faults)
        # Reset same_day_faults when a new month starts
        if current_month != fault['Month']:
            same_day_faults = set()
            current_month = fault['Month']

        # Check if the fault type on the same day has already been addressed
        if (fault['Day'], fault['Fault']) not in same_day_faults:
            # Determine the cost based on the month
            if int(fault['Month']) in high_demand_months:
                external_cost += 150000
            else:
                external_cost += 50000
            # Mark this fault type as addressed for the day
            same_day_faults.add((fault['Day'], fault['Fault']))


# Output results
internal_cost, external_cost

set()
{(datetime.date(2014, 5, 14), 'GF')}
{(datetime.date(2014, 5, 14), 'GF')}
{(datetime.date(2014, 5, 14), 'GF')}
{(datetime.date(2014, 5, 14), 'GF')}
{(datetime.date(2014, 5, 14), 'GF')}
{(datetime.date(2014, 5, 14), 'GF')}
{(datetime.date(2014, 6, 4), 'MF')}
{(datetime.date(2014, 6, 4), 'MF')}
{(datetime.date(2014, 6, 4), 'MF')}
{(datetime.date(2014, 6, 5), 'MF'), (datetime.date(2014, 6, 4), 'MF')}
{(datetime.date(2014, 6, 5), 'MF'), (datetime.date(2014, 6, 4), 'MF'), (datetime.date(2014, 6, 5), 'FF')}
{(datetime.date(2014, 6, 5), 'MF'), (datetime.date(2014, 6, 4), 'MF'), (datetime.date(2014, 6, 8), 'AF'), (datetime.date(2014, 6, 5), 'FF')}
{(datetime.date(2014, 6, 5), 'MF'), (datetime.date(2014, 6, 4), 'MF'), (datetime.date(2014, 6, 9), 'AF'), (datetime.date(2014, 6, 5), 'FF'), (datetime.date(2014, 6, 8), 'AF')}
{(datetime.date(2014, 6, 5), 'MF'), (datetime.date(2014, 6, 4), 'MF'), (datetime.date(2014, 6, 9), 'AF'), (datetime.date(2014, 6, 9), 'MF'), (datetime.date(2014, 6, 5), '

(3750000, 8750000)

In [30]:
# Calculate total external costs per fault type based on the updated rules
external_costs_per_fault = {fault_type: 0 for fault_type in fault_df['Fault'].unique() if fault_type != 'NF'}
same_day_faults = set()  # Track faults already addressed on the same day
current_month = None  # Keep track of the current month

for _, fault in fault_df.iterrows():
    if fault['Fault'] != 'NF':  # Only consider faults
        # Reset same_day_faults when a new month starts
        if current_month != fault['Month']:
            same_day_faults = set()
            current_month = fault['Month']

        # Check if the fault type on the same day has already been addressed
        if (fault['Day'], fault['Fault']) not in same_day_faults:
            # Determine the cost based on the month
            if int(fault['Month']) in high_demand_months:
                external_costs_per_fault[fault['Fault']] += 150000
            else:
                external_costs_per_fault[fault['Fault']] += 50000
            # Mark this fault type as addressed for the day
            same_day_faults.add((fault['Day'], fault['Fault']))

# Output the external costs per fault type
external_costs_per_fault


{'GF': 300000, 'MF': 1250000, 'FF': 2250000, 'AF': 3050000, 'EF': 1900000}

In [1]:
import pandas as pd
from pulp import LpProblem, LpMinimize, LpVariable, lpSum

# Load and preprocess data
fault_df = pd.read_csv('../data/raw/fault_data.csv')
fault_df['DateTime'] = pd.to_datetime(fault_df['DateTime'], format='%Y-%m-%d %H:%M', errors='coerce')
fault_df['Year'] = fault_df['DateTime'].dt.year
fault_df['Month'] = fault_df['DateTime'].dt.month
fault_df['Day'] = fault_df['DateTime'].dt.date

# Parameters
internal_cost_per_fault = 750000
external_cost_normal = 50000
external_cost_high_demand = 150000
high_demand_months = [1, 2, 6, 7, 8]

# Prepare unique fault types and days
fault_types = fault_df['Fault'].unique()
days = fault_df['Day'].unique()

# Optimization variables
internal = {f: LpVariable(f"Internal_{f}", 0, 1, cat="Binary") for f in fault_types if f != 'NF'}
external = {(f, d): LpVariable(f"External_{f}_{d}", 0, 1, cat="Binary")
            for f in fault_types if f != 'NF' for d in days}

# Optimization problem
prob = LpProblem("Maintenance_Cost_Optimization", LpMinimize)

# Objective function: Minimize total cost
prob += (
    # Internal maintenance cost
    lpSum(internal[f] * internal_cost_per_fault for f in fault_types if f != 'NF') +
    # External maintenance cost
    lpSum(
        external[(f, d)] * (external_cost_high_demand if fault_df[fault_df['Day'] == d]['Month'].iloc[0] in high_demand_months else external_cost_normal)
        for f in fault_types if f != 'NF' for d in days
    )
)

# Constraints
# Each fault must be handled either internally or externally for all occurrences
for f in fault_types:
    if f != 'NF':
        for d in days:
            prob += internal[f] + external[(f, d)] >= 1, f"Maintenance_Assignment_{f}_{d}"

# External trips are limited to one per fault type per day
for d in days:
    for f in fault_types:
        if f != 'NF':
            prob += external[(f, d)] <= 1, f"One_Trip_Limit_{f}_{d}"

# Solve the optimization problem
prob.solve()

# Results
optimized_internal_cost = sum(internal[f].varValue * internal_cost_per_fault for f in fault_types if f != 'NF')
optimized_external_cost = sum(
    external[(f, d)].varValue * (external_cost_high_demand if fault_df[fault_df['Day'] == d]['Month'].iloc[0] in high_demand_months else external_cost_normal)
    for f in fault_types if f != 'NF' for d in days
)
optimized_total_cost = optimized_internal_cost + optimized_external_cost

print("Optimized Internal Cost:", optimized_internal_cost)
print("Optimized External Cost:", optimized_external_cost)
print("Optimized Total Cost:", optimized_total_cost)


Optimized Internal Cost: 3750000.0
Optimized External Cost: 0.0
Optimized Total Cost: 3750000.0


## Assume preventative

In [2]:
# Parameters
preventative_cost_per_fault = 50000  # Example cost per fault type for preventative maintenance

# Add preventative maintenance variables
preventative = {f: LpVariable(f"Preventative_{f}", 0, 1, cat="Binary") for f in fault_types if f != 'NF'}

# Update optimization problem
prob = LpProblem("Maintenance_Cost_Optimization_with_Preventative", LpMinimize)

# Objective function: Minimize total cost
prob += (
    # Internal maintenance cost
    lpSum(internal[f] * internal_cost_per_fault for f in fault_types if f != 'NF') +
    # External maintenance cost
    lpSum(
        external[(f, d)] * (external_cost_high_demand if fault_df[fault_df['Day'] == d]['Month'].iloc[0] in high_demand_months else external_cost_normal)
        for f in fault_types if f != 'NF' for d in days
    ) +
    # Preventative maintenance cost
    lpSum(preventative[f] * preventative_cost_per_fault for f in fault_types if f != 'NF')
)

# Constraints
# Each fault must be handled either internally, externally, or reduced via preventative maintenance
for f in fault_types:
    if f != 'NF':
        for d in days:
            prob += (
                internal[f] + external[(f, d)] >= (1 - 0.6 * preventative[f]),
                f"Maintenance_with_Preventative_{f}_{d}"
            )

# External trips are limited to one per fault type per day
for d in days:
    for f in fault_types:
        if f != 'NF':
            prob += external[(f, d)] <= 1, f"One_Trip_Limit_{f}_{d}"

# Solve the optimization problem
prob.solve()

# Results
optimized_internal_cost = sum(internal[f].varValue * internal_cost_per_fault for f in fault_types if f != 'NF')
optimized_external_cost = sum(
    external[(f, d)].varValue * (external_cost_high_demand if fault_df[fault_df['Day'] == d]['Month'].iloc[0] in high_demand_months else external_cost_normal)
    for f in fault_types if f != 'NF' for d in days
)
optimized_preventative_cost = sum(preventative[f].varValue * preventative_cost_per_fault for f in fault_types if f != 'NF')
optimized_total_cost = optimized_internal_cost + optimized_external_cost + optimized_preventative_cost

print("Optimized Internal Cost:", optimized_internal_cost)
print("Optimized External Cost:", optimized_external_cost)
print("Optimized Preventative Maintenance Cost:", optimized_preventative_cost)
print("Optimized Total Cost:", optimized_total_cost)


Optimized Internal Cost: 3750000.0
Optimized External Cost: 0.0
Optimized Preventative Maintenance Cost: 0.0
Optimized Total Cost: 3750000.0
