PosiSorterSystem:
- 


Import packages: 


In [1]:
import pandas as pd
from datetime import timedelta
import random

System parameters:

In [2]:
base_time       =   4.5
delay_small     =   0
delay_medium    =   1  
delay_large     =   2


Parcel information: 

In [3]:
class Parcel:
    def __init__(self, parcel_id, arrival_time, length, width, height, weight, feasible_outfeeds):
        self.id = parcel_id                 #Stores parcel number
        self.arrival_time = arrival_time    #Stores arrival time
        self.length = length                #Stores length
        self.width = width                  #Stores width
        self.height = height                #Stores height
        self.weight = weight                #Stores weight
        self.feasible_outfeeds = feasible_outfeeds  #Stores the possible outfeeds
        self.sorted = False                 #Checks if a package gets sorted 
        self.recirculated = False           #Counts if a package gets recirculated 
        self.travel_time = None             #Time it takes to get from the scanner to the outfeed
        self.outfeed_entry_time = None      #Time it takes to get for a parcel to get to the outfeed
        self.removal_time = None            #Time it takes for a parcel to get removed from the outfeed
        self.time_in_system = None          #Total time the parcel spends in the system, so travel time + time in outfeed
        self.orientation_length = 0.0
    
    def effective_length(self):         #Determines in which way the packages go into the outfeed
          if self.orientation == 'sequential':     
            return self.width
          elif self.orientation == 'parallel':
            return self.length
        
    def get_volume(self):               #Calculates volume of the packages 
        return self.length * self.width * self.height
    
def compute_outfeed_time(parcel):       #Determines the amount of time it costs to get unloaded from the outfeeed
    volume = parcel.get_volume()        #Get the volume of parcels and catergorize it
    if volume < 0.035:
        volume_delay = delay_small
    elif volume < 0.055:
        volume_delay = delay_medium
    else:
        volume_delay = delay_large

    weight = parcel.weight              #Get the weight of parcels and catergorize it
    if weight < 1700:
        weight_delay = delay_small
    elif weight < 2800:
        weight_delay = delay_medium
    else:
        weight_delay = delay_large

    return base_time + volume_delay + weight_delay

Outfeed:

In [4]:
class Outfeed:
    def __init__(self, max_length=3.0):
        self.max_length = max_length            # Maximum lenght of the outfeed
        self.current_parcels = []               # Stores the occupied length of the outfeed 
        self.current_length = 0.0               # Current length set to 0 at start
        self.time_until_next_discharge = 0.0    # Current waiting time, before package gets unloaded, set to 0 at start

    def can_accept(self, parcel):               # Determines if package can be execpted in a outfeed
        fits = self.current_length + parcel.orientation_length <= self.max_length
        print(f"[DEBUG] Parcel {parcel.id} → needed: {parcel.orientation_length:.2f}, used: {self.current_length:.2f}, fits: {fits}")
        return fits

    def add_parcel(self, parcel):                                               # Adds parcel at certain outfeed
        self.current_parcels.append((parcel, compute_outfeed_time(parcel)))     
        self.current_length += parcel.orientation_length
        if len(self.current_parcels) == 1:
            self.time_until_next_discharge = self.current_parcels[0][1]         # Discharge timer for the parcel    

    def update(self, time_step, system_time):  # Keeping track of all timings in the system      
        if self.current_parcels:
            self.time_until_next_discharge -= time_step
            if self.time_until_next_discharge <= 0:
                parcel, _ = self.current_parcels.pop(0)
                self.current_length -= parcel.orientation_length       
                parcel.removal_time = system_time                   # Time in outfeed
                parcel.time_in_system = parcel.travel_time + (parcel.removal_time - parcel.outfeed_entry_time) # Time parcel spend in system
                print(f"Parcel {parcel.id} removed from outfeed"
                    f"(total system time: {parcel.time_in_system.total_seconds():.2f} s)")
                if self.current_parcels:
                    self.time_until_next_discharge = self.current_parcels[0][1] # Timer for the next parcel in line

The PosiSorterSystem with the algorithm: 

In [5]:
class PosiSorterSystem:
    def __init__(self, layout_df, sorting_method, outfeed_distances, time_step):                # Collects all data/variables of the PosiSorter
        self.belt_speed = layout_df.loc[layout_df['Layout property'] == 'Belt Speed', 'Value'].values[0]  # Belt speed  
        self.num_outfeeds = 3                       #Amount of outfeeds
        self.outfeeds = [Outfeed(max_length=3.0) for _ in range(self.num_outfeeds)] #Outfeed information
        self.recirculation_belt = []                # Stores recirculated packages
        self.processed_parcels = []                 # Tracks all parcels that go into the system
        self.sorting_method = sorting_method        # Sorting orientation
        self.outfeed_distances = outfeed_distances  # Distances to outfeeds
        self.time_step = time_step                  # Time step
    
    def calculate_travel_time(self, parcel, outfeed_index):     # Travel time calculation
        distance = self.outfeed_distances[outfeed_index]        # Distance to be travelled
        travel_seconds = distance / self.belt_speed             # Time takes to travel
        parcel.travel_time = timedelta(seconds=travel_seconds)  # Store the time it takes to travel 
     
    def set_parcel_timing(self, parcel, outfeed_index):         # Parcel timings
        parcel.outfeed_entry_time = self.current_time           # Record time parcel enters outfeed
        self.calculate_travel_time(parcel, outfeed_index)       # Calculate travel time to outfeed
        parcel.sorted = True                                     
 
    def sort_parcel(self, parcel):                              # Sorting the parcels (ALGORITHM)
        parcel.orientation = self.sorting_method                # Set orientation based on sorting method
        parcel.orientation_length = parcel.effective_length()   # Calculate effective lenght based on orientation
 
        for outfeed_index in parcel.feasible_outfeeds:          # The algorithm, to assign parcels to a proper outfeed
            outfeed = self.outfeeds[outfeed_index]              # Check if parcel can be accepted by outfeed
            if outfeed.can_accept(parcel):                      # If can be accepted, add parcel to outgeed
                outfeed.add_parcel(parcel)
                self.set_parcel_timing(parcel, outfeed_index)   #Set the parcel's timing information
                print((f"Parcel {parcel.id} sorted to outfeed {outfeed_index}"
                    f"(travel time: {parcel.travel_time.total_seconds():.2f} seconds)"))
                return   
        
        parcel.recirculated = True                              # If no outfeed can accept the parcel, it is recirculated
        self.recirculation_belt.append(parcel)                  
        print(f"Parcel {parcel.id} could not be sorted and was recirculated.")

    def remove_from_outfeeds(self, time_step):                  # Update each outfeed with the current time and timestep
        for outfeed in self.outfeeds:
            outfeed.update(time_step, self.current_time)

    def run_simulation(self, parcels):                              # Run the simulation
        current_time = parcels[0].arrival_time                      # Set current time
        self.current_time = current_time                         
        end_time = parcels[-1].arrival_time + timedelta(seconds=30) # Buffer to make sure all packages are sorted
        parcel_index = 0                                            # To track next parcel that must be processed

        while current_time <= end_time:
            print(f"\n--- Time: {current_time.time()} ---")

            # Check for arrivals
            while parcel_index < len(parcels) and parcels[parcel_index].arrival_time <= current_time:
                parcel = parcels[parcel_index]
                parcel_index += 1
                self.processed_parcels.append(parcel)
                print(f"Parcel {parcel.id} arrived – feasible outfeeds: {parcel.feasible_outfeeds}")
                self.sort_parcel(parcel)

            self.remove_from_outfeeds(self.time_step)
            current_time += timedelta(seconds = self.time_step)

        self.print_summary()  # Print summary                                  

    def print_summary(self):                                        # Get summary of simulation
        total = len(self.processed_parcels)                         # Get total length
        sorted_count = sum(p.sorted for p in self.processed_parcels)# Get count of amount of packages that are sorted
        recirculated_count = sum(p.recirculated for p in self.processed_parcels) # Get count of amount of packages that are recirculated
        total_time = [p.time_in_system for p in self.processed_parcels if p.time_in_system is not None] # Calculate the total time spent in the system for all parcels that were sorted
        if total_time:  # Calculate the average time spent in the system for all parcels that were sorted
            avg_total_time = sum(t.total_seconds() for t in total_time) / len(total_time)
        print("\n--- Simulation Summary ---") # Print everything 
        print(f"Total parcels processed: {total}")
        print(f"Parcels sorted: {sorted_count}")
        print(f"Parcels recirculated: {recirculated_count}")
        print(f"Sorting success rate: {sorted_count / total:.2%}")
        print(f"Average total system time per parcel: {avg_total_time:.2f} seconds")
        if hasattr(self, 'drop_info'):
           print("\n--- Removed Parcels ---")
           print(f"Initial amount of parcels: {self.drop_info['initial']}")
           print(f"Removed due to NaNs: {self.drop_info['na_dropped']}")
           print(f"Removed due to extreme outliers (data entry errors): {self.drop_info['outliers_dropped']}")
           print(f"Removed due to no feasible outfeeds: {self.drop_info['no_outfeeds_dropped']}")
           print(f"Removed due to non-chronological arrival times (scanning error): {self.drop_info.get('non_chrono_dropped', 0)}")
           print(f"Total parcels removed before processing: {self.drop_info['total_dropped']}")
        return

Excel data cleaning and loading:

In [6]:
def drop_non_chronological_arrivals(df):                  # Remove rows with non-chronological arrival times
    valid_times = []
    last_time = pd.Timestamp.min                          # Initialize with the earliest possible timestamp

    for i, time in enumerate(df["Arrival Time"]):         # Loop through each arrival time in the DataFrame
        if time >= last_time:
            valid_times.append(True)                      # If not chronogica, append
            last_time = time
        else:
            valid_times.append(False)                     # If not chronogical, don't append

    return df[valid_times].reset_index(drop=True)

def remove_outliers_iqr(df, columns):                       # Remove outliers from colums using IQR
    for col in columns:                                     
        Q1 = df[col].quantile(0.25)                         # First quartile
        Q3 = df[col].quantile(0.75)                         # Last quartile
        IQR = Q3 - Q1                                       # Interquertile range
        lower_bound = Q1 - 2 * IQR                          # Lower bound for outlier detection 
        upper_bound = Q3 + 2 * IQR                          # Upper bound for outlier detection 
        df = df[df[col].between(lower_bound, upper_bound)]  # Keep only rows that are in bound
    return df


def drop_rows_without_true_outfeed(df, prefix="Outfeed"):               # Removes columns that have no possible outfeed
    outfeed_cols = [col for col in df.columns if col.startswith(prefix)]# Get all columns
    if not outfeed_cols:                                                # Remove columns that have no outfeed
        return df        
    mask = df[outfeed_cols].any(axis=1)                                 # Store columns with outfeed                    
    return df[mask]                                                     


def clean_parcel_data(parcels_df):                              # Cleans the data
    drop_info = {}                                              # Store removed rows
    drop_info['initial'] = initial_count = len(parcels_df)      # Initialize count
 
    parcels_df = parcels_df.dropna().reset_index(drop=True)     # Remove rows with NaNs
    after_na = len(parcels_df) 
    drop_info['na_dropped'] = initial_count - after_na          # Track amount of rows dropped due to NaNs

    before_outliers = len(parcels_df)                                           # Remove rows with outliers
    parcels_df = remove_outliers_iqr(parcels_df, ["Length", "Width", "Height"])
    after_outliers = len(parcels_df)
    drop_info['outliers_dropped'] = before_outliers - after_outliers            # Track amount of rows dropped due to outlying values
 
    before_outfeeds = len(parcels_df)                                           # Remove rows with no feasible outfeeds
    parcels_df = drop_rows_without_true_outfeed(parcels_df)                      
    after_outfeeds = len(parcels_df) 
    drop_info['no_outfeeds_dropped'] = before_outfeeds - after_outfeeds         # Track amount of rows dropped due to no feasible outfeeds
 
    before_chrono = len(parcels_df)                                             # Remove rows with non-chronological arrival times
    parcels_df = drop_non_chronological_arrivals(parcels_df)
    after_chrono = len(parcels_df)
    drop_info['non_chrono_dropped'] = before_chrono - after_chrono              # Track amount of rows dropped due to no non-chronological arrival time
 

    drop_info['total_dropped'] = drop_info['na_dropped'] + drop_info['outliers_dropped'] + drop_info['no_outfeeds_dropped'] + drop_info['non_chrono_dropped']

    return parcels_df, drop_info

def load_parcels_from_clean_df(df):                             # Load the data from the excel sheet 
    parcels = []                                                # Store parcel information
    for _, row in df.iterrows():                                 
        parcel_id = int(row['Parcel Number'])                   # Parcel number
        arrival_time = pd.to_datetime(row['Arrival Time'])      # Arrival time package
        length = float(row['Length'])                           # Lenght of package
        width = float(row['Width'])                             # Width of package
        height = float(row['Height'])                           # Height of package
        weight = float(row['Weight'])                           # Weight of package
        feasible_outfeeds = [i for i, flag in enumerate([row['Outfeed 1'], row['Outfeed 2'], row['Outfeed 3']]) if flag] # Feasible outfeeds
        parcels.append(Parcel(parcel_id, arrival_time, length, width, height, weight, feasible_outfeeds))
    return sorted(parcels, key=lambda p: p.arrival_time)

Main:

In [7]:
def main():
    xls = pd.ExcelFile("PosiSorterData1.xlsx")              # Load excel sheet with parcel data
    parcels_df = xls.parse('Parcels')                       # Gets Parcels sheet of Excel file
    layout_df = xls.parse('Layout')                         # Gets Layout sheet of Excel file
 
    parcels_df, drop_info = clean_parcel_data(parcels_df)   # Clear the data
 
    parcels = load_parcels_from_clean_df(parcels_df)        # Configurate the clean data into a list with parcel objects
    
    # Code parameters
    sorting_method = 'parallel'                 # Choose sorting method: 'parallel' or 'sequential'
    outfeed_distances = {0: 5.5, 1: 8, 2: 10.5} # Outfeed distances 
    time_step = 0.1                             # Time step
    
    # Gives all necessery information to PosiSorterSystem
    system = PosiSorterSystem(layout_df, sorting_method = sorting_method, outfeed_distances = outfeed_distances, time_step = time_step)
    system.drop_info = drop_info    # Get information for the removed parcels
    system.run_simulation(parcels)  # Runs the simulation    

# Entry point of the code    
if __name__ == "__main__":
    main()



FileNotFoundError: [Errno 2] No such file or directory: 'PosiSorterData1.xlsx'