# Matching the location of each TCR to contracts

We first start by reading the cleaned and matched TCRs for 2024 which are matched with contracts number. As well as Karins matching between bandel and number of hours of servicefönster.

## TCR:s

In [1]:
import pandas as pd

# Load the CSV file containing matched TCRs for 2024
csv_file_path = "TCR_T24_matched.csv"


# Option 1: Specify the delimiter (try ',' or ';')
try:
    tcr_df = pd.read_csv(csv_file_path, delimiter=';', on_bad_lines='skip', encoding='utf-8')
except pd.errors.ParserError:
    tcr_df = pd.read_csv(csv_file_path, delimiter=',', on_bad_lines='skip', encoding='utf-8')


## Förbindelser och BIS (bandelar - kontrakt)

Load the Excel file containing the dictionary for bandel matching, and keep only relevant columns and bandel.

In [2]:
dictionary_file_path = "Förbindelselinje_2024.xlsx"

# Read the entire dictionary into a DataFrame
dictionary_df = pd.read_excel(dictionary_file_path)

In [3]:
# File and sheet details
excel_file_path = "BIS_24_kontrakt_bandel_plats.xlsx"
sheet_name = "BIS 2024-01-09"

# Load the Excel file
bis_df = pd.read_excel(excel_file_path, sheet_name=sheet_name)

# Step 1: Remove duplicates from the mapping
bandel_to_contract_map = bis_df[['Bandel_nummer', 'UH_kontraktsområde']].drop_duplicates()

# Step 2: Filter out rows where UH_kontraktsområde is NaN or 'Ingår inte i något kontrakt'
bandel_to_contract_map = bandel_to_contract_map[
    bandel_to_contract_map['UH_kontraktsområde'].notna() & 
    (bandel_to_contract_map['UH_kontraktsområde'] != 'Ingår inte i något kontrakt')
]

# Convert to a dictionary for fast lookups
bandel_contract_dict = bandel_to_contract_map.set_index('Bandel_nummer')['UH_kontraktsområde'].to_dict()

## Karin's bandelar

In [4]:
# Step 1: Load the Excel file containing service contracts for each bandel
excel_file_path = "servicekontrakt_per_bandel_Abdou.xlsx"
sheet_name = "uppdaterad"

# Read the specific sheet 'T24' into a DataFrame
servicekontrakt_df = pd.read_excel(excel_file_path, sheet_name=sheet_name)

In [5]:
# keep only T24 contracts
servicekontrakt_df = servicekontrakt_df[(servicekontrakt_df['T24'] == 1)]
servicekontrakt_df = servicekontrakt_df.drop(columns=['T24', 'T23'])

In [6]:
# Parse the 'Bandel' column into two new columns 'Bandelnr' and 'Bandelnamn'
def parse_bandel(bandel):
    import re
    bandelnr_match = re.match(r'^(\d+(?:/\d+)*)', bandel)
    if bandelnr_match:
        bandelnr = bandelnr_match.group(0).replace('/', ', ')
        bandelnamn = bandel[len(bandelnr_match.group(0)):].strip()
    else:
        bandelnr = ''
        bandelnamn = bandel.strip()
    return pd.Series([bandelnr, bandelnamn])

# Apply the parsing function to create two new columns
servicekontrakt_df[['Bandelnr', 'Bandelnamn']] = servicekontrakt_df['Bandel'].apply(parse_bandel)

Let us add the corresponding distances (lengths) for the identified bandels using the dictionary.

In [7]:
import networkx as nx

# Global cache for lengths
langd_cache = {}

# Step 1: Create a mapping from Plats_sign (full name) to Banlangd
station_length_lookup = dictionary_df.set_index('Plats_sign')['Banlangd'].to_dict()

### Utility Functions ###

# Function to build a bidirectional graph from the DataFrame
def build_bidirectional_graph(dictionary_df, bdl_range):
    # Filter for the given BdlNr range
    bdl_df = dictionary_df[(dictionary_df['BdlNr'] >= bdl_range[0]) & (dictionary_df['BdlNr'] <= bdl_range[1])]
    
    G = nx.Graph()  # Undirected graph to simulate bidirectional connections
    for _, row in bdl_df.iterrows():
        if pd.notna(row['Forbind']):
            start, end = row['Forbind'].split('-')
            length = row['Banlangd']
            G.add_edge(start.strip(), end.strip(), length=length)  # Add bidirectional edges
    return G


def calculate_sum_langd(forbind_list, identified_bdlnr, dictionary_df):
    if not forbind_list or forbind_list == '':
        return None
    
    # Check cache
    cache_key = (forbind_list, identified_bdlnr)
    if cache_key in langd_cache:
        return langd_cache[cache_key]
    
    # Split and clean the forbind_list
    forbinds = [f.strip() for f in forbind_list.split(',')]
    stations = [station for forbind in forbinds for station in forbind.split('-')]
    first_station = stations[0]
    last_station = stations[-1]
    
    # Check if first and last stations are enclosed in parentheses
    include_first_station = not (first_station.startswith('(') and first_station.endswith(')'))
    include_last_station = not (last_station.startswith('(') and last_station.endswith(')'))
    
    # Remove parentheses for lookup in the graph
    first_station_cleaned = first_station.strip('()')
    last_station_cleaned = last_station.strip('()')
    
    # Build the smaller graph first
    small_range = (identified_bdlnr, identified_bdlnr)
    graph = build_bidirectional_graph(dictionary_df, small_range)
    
    # Try pathfinding in the small graph
    try:
        if first_station_cleaned in graph and last_station_cleaned in graph:
            path_length = nx.shortest_path_length(
                graph, source=first_station_cleaned, target=last_station_cleaned, weight='length'
            )
            
            # Calculate length of intermediate stations
            shortest_path_stations = nx.shortest_path(graph, source=first_station_cleaned, target=last_station_cleaned)
            intermediate_stations = shortest_path_stations[1:-1]  # Exclude first and last station
            station_length_sum = sum(station_length_lookup.get(station, 0) for station in intermediate_stations)

            # Add lengths of first and last stations based on inclusion rules
            if include_first_station:
                station_length_sum += station_length_lookup.get(first_station_cleaned, 0)
            if include_last_station:
                station_length_sum += station_length_lookup.get(last_station_cleaned, 0)

            total_length = path_length + station_length_sum
            langd_cache[cache_key] = total_length
            return total_length
    except nx.NetworkXNoPath:
        pass  # Path not found, fall back to larger graph
    
    # Build the larger graph if needed
    large_range = (max(1, identified_bdlnr - 30), min(990, identified_bdlnr + 30))
    graph = build_bidirectional_graph(dictionary_df, large_range)
    
    # Check again in the larger graph
    if first_station_cleaned not in graph or last_station_cleaned not in graph:
        langd_cache[cache_key] = None
        return None
    
    try:
        path_length = nx.shortest_path_length(
            graph, source=first_station_cleaned, target=last_station_cleaned, weight='length'
        )
        
        # Calculate length of intermediate stations
        shortest_path_stations = nx.shortest_path(graph, source=first_station_cleaned, target=last_station_cleaned)
        intermediate_stations = shortest_path_stations[1:-1]  # Exclude first and last station
        station_length_sum = sum(station_length_lookup.get(station, 0) for station in intermediate_stations)
        
        # Add lengths of first and last stations based on inclusion rules
        if include_first_station:
            station_length_sum += station_length_lookup.get(first_station_cleaned, 0)
        if include_last_station:
            station_length_sum += station_length_lookup.get(last_station_cleaned, 0)

        total_length = path_length + station_length_sum
        langd_cache[cache_key] = total_length
        return total_length
    except nx.NetworkXNoPath:
        langd_cache[cache_key] = None
        return None  # No path found

In [8]:
#Create a mapping from Plats (full name) to Plats_sign (short code)
name_to_code_mapping = dictionary_df.set_index('Plats_sign')['Plats'].to_dict()

def convert_bandelnamn_to_codes(bandelnamn):
    # Split by dash and preserve parentheses
    stations = bandelnamn.split('-')
    
    # Create a case-insensitive mapping of full names to codes
    name_to_code_mapping_lower = {
        str(v).lower(): str(k) for k, v in name_to_code_mapping.items()
    }
    
    # Detailed conversion with original names preserved
    station_details = []
    station_codes = []

    for name in stations:
        stripped_name = str(name).strip()
        has_parentheses = stripped_name.startswith('(') and stripped_name.endswith(')')
        
        # Remove parentheses temporarily for lookup
        name_without_parentheses = stripped_name[1:-1] if has_parentheses else stripped_name
        
        # Try case-insensitive matching with original name
        code = name_to_code_mapping_lower.get(name_without_parentheses.lower())
        
        # If no code found, try appending " central"
        central_name = ""
        if code is None:
            central_name = f"{name_without_parentheses} central"
            code = name_to_code_mapping_lower.get(central_name.lower(), None)
        
        if code is None:
            central_name = f"{name_without_parentheses}s central"
            code = name_to_code_mapping_lower.get(central_name.lower(), None)
        
        station_details.append({
            'original_name': stripped_name,
            'tried_name': central_name if code and code != name_without_parentheses else None,
            'station_code': code
        })
        
        if code:
            # Add parentheses back if they were present
            formatted_code = f"({code})" if has_parentheses else code
            station_codes.append(formatted_code)
    
    return {
        'station_details': station_details,
        'station_codes': station_codes,
        'short_path': '-'.join(station_codes) if station_codes else None
    }


# Step 2: Prepare the dataframe with detailed conversion results
def prepare_bandelnamn_conversion(servicekontrakt_df):
    # Apply the conversion function
    conversion_results = servicekontrakt_df['Bandelnamn'].apply(convert_bandelnamn_to_codes)
    
    # Extract details into separate columns
    servicekontrakt_df['station_details'] = conversion_results.apply(lambda x: x['station_details'])
    servicekontrakt_df['original_station_names'] = servicekontrakt_df['station_details'].apply(
        lambda x: [detail['original_name'] for detail in x]
    )
    servicekontrakt_df['station_codes'] = conversion_results.apply(lambda x: x['station_codes'])
    servicekontrakt_df['short_path'] = conversion_results.apply(lambda x: x['short_path'])
    
    return servicekontrakt_df


def calculate_sum_langd_for_bandelnamn(row, dictionary_df):
    # If short_path is None, return None
    if not row['short_path']:
        return None

    # Case 1: Single station
    if '-' not in row['short_path']:
        single_station = row['short_path']
        
        # Find the row in dictionary_df where Plats_sign matches the single station
        matching_station = dictionary_df[dictionary_df['Plats_sign'] == single_station]
        
        # If no matching station is found, return None
        if matching_station.empty:
            return None
        
        # Get the station's length
        station_length = matching_station['Banlangd'].iloc[0]
        
        # Find neighboring connections in the Forbind column (skip NaN values)
        neighbors = dictionary_df[
            dictionary_df['Forbind'].notna() &  # Skip NaN values
            dictionary_df['Forbind'].str.contains(f"^{single_station}-|-{single_station}$", regex=True)
        ]
        
        # Calculate half the lengths of the neighboring connections
        half_neighbor_lengths = 0
        for _, neighbor in neighbors.iterrows():
            length = neighbor['Banlangd']
            half_neighbor_lengths += length / 2
        
        # Return the total length: station's length + half the neighbors' lengths
        return station_length + half_neighbor_lengths

    # Case 2: Multiple stations (existing logic)
    return calculate_sum_langd(row['short_path'], int(row['Bandelnr']), dictionary_df)


# Apply the steps
# 1. First, prepare the conversion
servicekontrakt_df = prepare_bandelnamn_conversion(servicekontrakt_df)

# 2. Then calculate sum_langd
servicekontrakt_df['sum_langd'] = servicekontrakt_df.apply(
    lambda row: calculate_sum_langd_for_bandelnamn(row, dictionary_df),
    axis=1
)

In [9]:
# Step 3: Add 'kontrakt_från_bandel' column by mapping 'Bandel' to the cleaned dictionary
def map_bandel_to_contract(bandel, mapping_dict):
    # Extract the first Bandelnr if Bandelnr contains multiple values (e.g., "451, 452")
    first_bandel = int(bandel)
    return mapping_dict.get(first_bandel, None)

servicekontrakt_df['kontrakt_från_bandel'] = servicekontrakt_df['Bandelnr'].apply(
    lambda bandel: map_bandel_to_contract(bandel, bandel_contract_dict)
)

## Matching TCR:s Förbindelser with bandelar

First let us reformat the förbindelser in TCR so that it is a list.

In [10]:
import re

# Process 'Från linjespår' in tcr_df to create a 'förbind_list' column
def extract_forbind_list(fran_linjespar):
    # Use regex to extract patterns like 'Lub-Ttu', 'Ttu-Vån', etc.
    pattern = r'\b([A-Za-zåäöÅÄÖ]+-[A-Za-zåäöÅÄÖ]+)\b'
    matches = re.findall(pattern, fran_linjespar)
    return ', '.join(matches)

# Apply the function to create the 'förbind_list' column
tcr_df['förbind_list'] = tcr_df['Från linjespår'].apply(lambda x: extract_forbind_list(x) if pd.notna(x) else '')

So now based on that list förbind_list, we can identify the list of bandelar using dictionary.

In [11]:
# Identify bandelar for rows of tcr_df using 'förbind_list' and dictionary_df
def identify_bandelar(forbind_list, fran_trafikplats, dictionary_df):
    # Split the 'förbind_list' string into individual elements
    forbind_list = [f.strip() for f in forbind_list.split(',')]
    
    for forbind in forbind_list:
        # First try to find the exact match
        match = dictionary_df[dictionary_df['Forbind'] == forbind]
        if not match.empty:
            return match.iloc[0]['BdlNr']
        else:
            # Try to find the reversed direction 'Y-X' instead of 'X-Y'
            reversed_forbind = '-'.join(forbind.split('-')[::-1])
            reversed_match = dictionary_df[dictionary_df['Forbind'] == reversed_forbind]
            if not reversed_match.empty:
                return reversed_match.iloc[0]['BdlNr']
            else:
                # Try finding using one part, either X or Y, in 'Plats_sign'
                parts = forbind.split('-')
                for part in parts:
                    part_match = dictionary_df[dictionary_df['Plats_sign'] == part]
                    if not part_match.empty:
                        return part_match.iloc[0]['BdlNr']
                # If still not found, try using 'Från trafikplats'
                fran_trafikplats_match = dictionary_df[dictionary_df['Plats_sign'] == fran_trafikplats]
                if not fran_trafikplats_match.empty:
                    return fran_trafikplats_match.iloc[0]['BdlNr']
    return None

# Apply the function to create the 'identified_bandelar' column
tcr_df['identified_BdlNr'] = tcr_df.apply(lambda row: identify_bandelar(row['förbind_list'], row['Från trafikplats'], dictionary_df), axis=1)

There some rows where the bandel is not identified because the förbind_list is empty. For these rows use column Från trafikplats to find the corresponding BdlNr using column Plats_sign of dictionary_df. The following stations are concerned:
* M -> corresponds to Malmö central which is referred to in the dictionary as Mc
* V -> corresponds to Värnamo which is referred to in the dictionary as Väc
* Sär -> corresponds to Sävenäs which is referred to in the dictionary as Gsv or Göteborg Sävenäs
* Gäb -> corresponds to Gävle GBG which is referred to in the dictionary as Gä

Others not found in the dictionary such as
* Gsh ->  Skandiahamnen

In [12]:
# For rows where 'identified_BdlNr' is an empty string
mask = tcr_df['identified_BdlNr'].isna()
trafikplats_lookup = dictionary_df.set_index('Plats_sign')['BdlNr'].to_dict()

tcr_df.loc[mask, 'identified_BdlNr'] = tcr_df.loc[mask, 'Från trafikplats'].map(trafikplats_lookup)

Now, once we have identified BdlNR, we can use servicekontrakt_df to add a column with Kontraktsområdesnamn.

In [13]:
# Create the contract map with float conversion
contract_map = servicekontrakt_df.drop_duplicates(subset=['Bandelnr']).copy()
contract_map['Bandelnr'] = contract_map['Bandelnr'].astype(float)
contract_map = contract_map.set_index('Bandelnr')['Kontraktsområdesnamn'].to_dict()

# Map with the contract map
tcr_df['identified_BdlNr'] = tcr_df['identified_BdlNr'].astype(float)
tcr_df['Kontraktsområdesnamn'] = tcr_df['identified_BdlNr'].map(contract_map)

In [14]:
# Additional mapping using the bandel_contract_dict
tcr_df['kontrakt_från_bandel'] = tcr_df['identified_BdlNr'].map(bandel_contract_dict)

No that we have identified BdlNr, we want to get the total length langd (which is in meter) of the forbind_list and put it in a column (sum_langd). The idea is to use the order of forbind_list and look for the corresponding rows in dictionary_df (within same bandelnr = identified BdlNr) and accumulate the lenght in column dictionary_df(Banlangd). The forbind_list are normally linked, e.g., A-B, B-C, etc. 

In [15]:
### `tcr_df` Processing ###

# Apply the function to create the 'sum_langd' column for tcr_df
tcr_df['sum_langd'] = tcr_df.apply(
    lambda row: calculate_sum_langd(
        row['förbind_list'], 
        row['identified_BdlNr'], 
        dictionary_df
    ), 
    axis=1
)

For some of the TCRs, there is empty forbind_list (i.e., ""), but in all cases there is a station in Från trafikplats that we can use to find the langd in the dictionary.

In [16]:
# Step 1: Create a mapping from Plats_sign (full station name) to Banlangd
langd_lookup = dictionary_df.set_index('Plats_sign')['Banlangd'].to_dict()

# Step 2: Create a mask for rows where 'förbind_list' is empty
mask = (tcr_df['förbind_list'] == "")

# Step 3: Lookup 'Banlangd' for 'Från trafikplats' where 'förbind_list' is empty
tcr_df.loc[mask, 'sum_langd'] = tcr_df.loc[mask, 'Från trafikplats'].map(langd_lookup)

## Export to Excel files

In [17]:
#  for excel
excel_file_path = "Servicekontrakt_per_bandel_matched.xlsx"
servicekontrakt_df_to_export = servicekontrakt_df[['Kontraktsområdesnamn', 'kontrakt_från_bandel','Tidsperiod', 'Bandel', 'TPA timmar per år',
       'TPA dagar per år', 'TPA veckor per år', 'TPA timmar natt per år',
       'TPA timmar helg per år', 'EJ TPA timmar per år', 'EJ TPA dagar per år',
       'EJ TPA veckor per år', 'EJ TPA timmar natt per år',
       'EJ TPA timmar helg per år', 'Total timmar per år', 'Bandelnr',
       'Bandelnamn', 'sum_langd']]
servicekontrakt_df_to_export.to_excel(excel_file_path, index=False)

In [18]:
# Step 9: Keep only the specified columns in tcr_df
tcr_df_to_export = tcr_df[['TCR-id', 'Klassificering', 'Orsak till kapacitets-begränsning',
                 'Starttid', 'Sluttid', 'Servicefönster_nya_kategorier',
                 'Relaterade TPÅ:er', 'tid_timmar', 'Relaterad åtgärdsnummer', 'Relaterad kontrakt',
                 'förbind_list', 'identified_BdlNr', 'sum_langd', 'Kontraktsområdesnamn','kontrakt_från_bandel']]

# Step 10: Export the DataFrame to an Excel file
excel_file_path = "TCR_T24_matched_bandelar.xlsx"
tcr_df_to_export.to_excel(excel_file_path, index=False)