## Vehicle Routing Problem (VRP) using Genetic Algorithm

### Parameters and Imports

In [1]:

# Importing necessary libraries
from string import ascii_letters
from numpy import random
import random as rnd 
import turtle
import time

# Parameters for the VRP problem
capacity = 5                  # Capacity of the vehicle
num_customers = 10            # Number of customers to visit
max_customer_repeat = 2       # Max times a customer can be visited
max_journey = num_customers * max_customer_repeat - 1
customers_letters = [letter for letter in ascii_letters[:num_customers]]
max_demand = 3                # Maximum demand for each customer
starting_point = (-50, 100)   # Starting coordinates for the journey

# City grid dimensions
city_length = 700
city_width = 700

# Genetic algorithm parameters
population_size = 300         # Number of solutions in the population
margin = 2                    # Margin used in crossover
child_length_type = "parent length" 
length_prob = 0.001           # Probability to alter length during mutation
swap_prob = 0.1               # Probability to swap two customers in mutation
length_range = 10             # Range of allowed journey lengths
num_iters = 1000              # Maximum number of iterations
early_stopping = True         # Enable early stopping criteria
stopping_num = 10000          # Number of iterations to stop if no improvement
    

### Generate Customer Demands and Locations

In [2]:

def generateDemand(num_customers, max_demand):
    """Generate demands for each customer such that their sum equals zero."""
    random.seed(7)  # Seed for reproducibility
    while True:
        possible_demands = [i for i in range(-max_demand, max_demand+1) if i != 0]
        demands = [random.choice(possible_demands) for _ in range(num_customers)]
        if sum(demands) == 0:
            return demands

# Initialize customer demands and assign random locations within city bounds
random.seed(7)
demands = generateDemand(num_customers, max_demand)
customers = {lett: ((random.randint(0, city_width), random.randint(0, city_length)), demand) 
             for lett, demand in zip(customers_letters, demands)}
    

### Fitness Functions

In [3]:

def distance(d1, d2):
    """Calculate Euclidean distance between two points."""
    return ((d1[0] - d2[0]) ** 2 + (d1[1] - d2[1]) ** 2) ** 0.5

def totalDistance(journey, with_starting=True):
    """Calculate total distance for a given journey with an optional start point."""
    total_distance = distance(starting_point, customers[journey[0]][0]) if with_starting else 0
    for i in range(len(journey) - 1):
        total_distance += distance(customers[journey[i]][0], customers[journey[i + 1]][0])
    return total_distance

def grantedDemands(journey):
    """Determine if a journey fulfills all customer demands."""
    customer_demands = {letter: demand for letter, (_, demand) in customers.items()}
    current_demand = 0
    for customer in journey:
        demand = customer_demands[customer]
        if demand > 0:
            grant = min(capacity - current_demand, demand)
        elif demand < 0:
            grant = max(-current_demand, demand)
        else:
            continue
        customer_demands[customer] -= grant
        current_demand += grant
    return sum(abs(d) for d in customer_demands.values()) == 0

def fitness(journey):
    """Calculate the fitness of a journey based on demand fulfillment and total distance."""
    return (grantedDemands(journey), totalDistance(journey, with_starting=True))
    

### Population Initialization

In [4]:
def sameCustomer(journey):
    """
    Identifies consecutive customer visits and records all occurrences of them in the journey.
    """
    indexes = []
    all_occurrences = []
    for i in range(len(journey)-1):
        if journey[i] == journey[i+1]:
            indexes.append(i+1) 
            occurrences = []
            for j in range(len(journey)):
                if journey[j] == journey[i]:
                    occurrences.append(j)
            all_occurrences.append(occurrences)

    return (indexes,all_occurrences)


def initPopulation(size, seed=None):
    """Initialize the population with valid journeys."""
    if seed:
        random.seed(seed)
    population = []
    while len(population) < size:
        journey = customers_letters + rnd.choices(customers_letters, k=random.randint(max_journey - num_customers + 1))
        journey = rnd.sample(journey, len(journey))
        # Eliminate journeys with consecutive customer visits
        while(sameCustomer(journey)[0]):
            journey = rnd.sample(journey,len(journey))
        population.append("".join(journey))
    return list(set(population))
    

### Crossover, Mutation, and Population Evolution

In [5]:

def swap_customers(journey, index1, index2):
  """
  Swaps two customers in a journey.

  Args:
    journey (str): The journey string.
    index1 (int): The index of the first customer to swap.
    index2 (int): The index of the second customer to swap.

  Returns:
    str: The modified journey string.
  """
  journey_list = list(journey)
  journey_list[index1], journey_list[index2] = journey_list[index2], journey_list[index1]
  return "".join(journey_list)

def complete_journey(journey, total_length):
  """
  Completes a partial journey by adding customers, ensuring that no customer is visited more than the maximum allowed times.

  Args:
    journey (str): The partial journey.
    total_length (int): The desired length of the complete journey.

  Returns:
    str: The completed journey.
  """
  remaining_length = total_length - len(journey)
  for _ in range(remaining_length):
    # Identify customers that cannot be added due to maximum visit constraints or because they are the last customer in the current journey.
    unavailable_customers = [c for c in journey if count_customer_occurrences(journey, c) >= max_customer_repeat]
    unavailable_customers.append(journey[-1])
    unavailable_customers = set(unavailable_customers)

    # Choose a random customer from the available options.
    new_customer = random.choice([c for c in customers_letters if c not in unavailable_customers])
    journey += new_customer
  return journey

In [6]:
def customerRepeat(journey, customer):
  """Counts the number of occurrences of a specific customer in a journey."""
  count = 0
  for char in journey:
    if char == customer:
      count += 1
  return count

def swapCustomers(journey, i, j):
    """Swap two customers in a journey."""
    jour_list = list(journey)
    jour_list[i], jour_list[j] = jour_list[j], jour_list[i]

    return "".join(jour_list)

def completeJourney(j,N):
    """Completes a partial journey by adding customers until it reaches the desired length."""
    rest_len = N-len(j)
    for _ in range(rest_len):
        cant_use = [c for c in j if customerRepeat(j,c)>=max_customer_repeat]
        cant_use.append(j[-1])
        cant_use = list(set(cant_use))
        new_c = random.choice([c for c in customers_letters if c not in cant_use])
        j += new_c
    return j

In [7]:
def custHistogram(journey, customers):
  """
  Calculates the frequency of a set of customers in a given journey."""
  frequencies = {c: 0 for c in customers}
  for c in journey:
    if c in customers:
      frequencies[c] += 1
  return frequencies

def maximumVisited(j):
  """Identifies customers who have been visited the maximum number of times in a journey."""
  hist = custHistogram(j, customers_letters)
  maximum_visited = []
  for char in hist.keys():
    if hist[char] >= max_customer_repeat:
      maximum_visited.append(char)
  return maximum_visited

def getChild(current_parent, other_parent, margin=2, length_type = 'random'):
    """
    Generates a new child journey by crossing over two parent journeys.
    
    margin: A parameter controlling the balance between exploration and exploitation.
    """

    # Determine the length of the child journey based on the `length_type` argument.
    if length_type == "random":
        child_length = random.randint(
            min(len(current_parent), len(other_parent)),
            max(len(current_parent), len(other_parent)) + 1
        )
    else:
        child_length = len(current_parent)

    # Create a copy of the unused customers.
    unvisited = customers_letters.copy()

    # Initialize the child journey with the first character from the `other_parent`.
    child = other_parent[0]

    # Remove the first character from the `unvisited` list.
    if child[-1] in unvisited:
        unvisited.remove(child[-1])

    # Build the child journey character by character.
    for i in range(1, child_length):
        # Calculate the index of the character to be added from the `current_parent`.
        index = i % len(current_parent)
        
        # Determine whether to prioritize visiting a new customer based on the `margin` parameter.
        should_visit_new = len(unvisited) > 0 and (child_length - len(child)) - len(unvisited) <= margin

        # Find a suitable character from the `current_parent` that meets the conditions.
        # This loop ensures that the next character is different from the previous one and either visits a new customer or is already in the child.
        tries = 0
        while child[-1] == current_parent[index] or (should_visit_new and current_parent[index] not in unvisited):
            index = (index + 1) % len(current_parent)
            tries += 1

            # If too many tries have been made, swap the parents to avoid getting stuck in a loop.
            if tries > len(current_parent):
                current_parent, other_parent = other_parent, current_parent

        # Add the selected character to the child journey.
        child += current_parent[index]

        # Remove the added character from the `unvisited` list.
        if child[-1] in unvisited:
            unvisited.remove(child[-1])

        # Eliminate customers who have been visited the maximum number of times.
        max_visited_customers = maximumVisited(child)
        current_parent = [c for c in current_parent if c not in max_visited_customers]
        other_parent = [c for c in other_parent if c not in max_visited_customers]

        # Swap the parents for the next iteration.
        current_parent, other_parent = other_parent, current_parent

    return child

def cross(parent1, parent2, length_type):
    """Create two children from two parent journeys."""
    return getChild(parent1, parent2, length_type = length_type), getChild(parent2, parent1, length_type = length_type)

def mutate(journey, length_prob, swap_prob, length_range):
    """Mutate a journey by swapping customers and adjusting length."""
    for i in range(len(journey)):
        if random.random() < swap_prob:
            j = swapCustomers(journey, i, random.randint(0, len(journey)))
    if random.random() < length_prob:
        return completeJourney(j, random.randint(len(j) - length_range, len(j) + length_range))
    return journey
    

In [8]:
def produce(pop, length_type = 'random'):
    """
    Generates a new population of journeys by applying the crossover operation to the current population.
    """
    pop = rnd.sample(pop,len(pop))
    new_pop = []
    for i in range(0,len(pop),2):
        child1, child2 = cross(pop[i],pop[(i+1) % len(pop)], length_type = length_type)
        new_pop.extend([child1, child2])
    return new_pop
    

### Selection and Evolution

In [9]:

def rank(pop):
    """Rank journeys based on fitness, sorting first by demand and then by distance."""
    ranked_population = sorted(pop, key=lambda j: (grantedDemands(j), totalDistance(j, True)))
    return ranked_population

def evolve(pop):
    """Perform evolution to generate new population."""
    new_pop = pop + produce(pop[:len(pop)//2])
    new_pop = list(set(new_pop))[:population_size]
    return rank(new_pop)
    

### Journey Visualization with Turtle Graphics

In [10]:
import turtle


t = turtle.Turtle()

def getDemandsHistory(journey):
    demands_history = [0]

    customers_demands = {letter:demand for letter, (_, demand) in customers.items()} 
    current_demand = 0

    for c in journey:
        cust_demand = customers_demands[c]
        if cust_demand>0: # if customers gives
            current_capacity = capacity - current_demand
            cust_granted_demand = min(current_capacity, cust_demand) 
        elif cust_demand<0: # if customers gives
            cust_granted_demand = max(-current_demand, cust_demand)
        else:
            continue

        customers_demands[c] = cust_demand - cust_granted_demand
        current_demand += cust_granted_demand
        demands_history.append(current_demand)
  
    
    return  demands_history

def drawDots():
  
  t.color("black")
  for na,c in customers.items():
      x,y = c[0]
      t.goto(x/2-200,y/2-200)
      t.dot()
      t.write(na+str(c[1]), font=("Arial", 14, "bold"))
  t.color("blue")

def writeSE(text,x,y):
  t.speed(0)
  t.penup()
  if text == "S":
    t.goto(x-5, y+15)
  else: t.goto(x+5,y+15)
  t.pendown()
  t.color("green")
  t.write(text, font=("Arial", 14, "bold"))
  t.color("blue")
  t.penup()
  t.goto(x, y)
  t.pendown()
  t.speed(1)

def drawJourney(journey, number_of_iterations, i,with_starting_point):
  t.penup()
  t.speed(0)
  t.goto(-300,200)
  t.pendown()
  t.write(f"Number of iterations: {number_of_iterations}", font=("Arial", 16, "bold"))
  t.penup()
  drawDots()
  if with_starting_point:
    x,y = [v/2-200 for v in starting_point] 
    t.goto(x,y)
    t.pendown()


  for i, c in enumerate(journey[:-1]):
    x1,y1 = [v/2-200 for v in customers[c][0]]
    x2,y2 = [v/2-200 for v in customers[journey[i+1]][0]]
    t.goto(x1, y1)
    t.speed(1)
    t.pendown()
    if i ==0: writeSE("S",x1,y1)
   
    t.goto(x2, y2)
    if i+1 == len(journey)-1: writeSE("E",x2,y2)
  t.speed(0)
  t.penup()
  t.goto(-300,-200)
  finish, distance = fitness(journey)
  t.color("black")
  t.write(f"Journey {i}: {journey}, Fitness: ({finish}, {distance:.2f})", font=("Arial", 16, "bold"))
  demands_history = getDemandsHistory(journey)
  t.goto(-300,-250)
  t.write(f"vehicle content: {demands_history}", font=("Arial", 16, "bold"))
  t.color("blue")

def clear():
  t.clear()
  t.reset()
  t.penup()

### Running the Genetic Algorithm

In [11]:

pop_0 = initPopulation(population_size)
pop = rank(pop_0)
best_journey = (pop[0], fitness(pop[0]), 0)
for i in range(num_iters):
    best_journey = max(best_journey, (pop[0], fitness(pop[0]), i), key=lambda x: x[1][1])
    pop = evolve(pop)
    if early_stopping and i - best_journey[2] > stopping_num:
        break
print("Best solution found:", best_journey)
    

Best solution found: ('hcgfeibdaj', (False, 2289.4845317729746), 0)


In [None]:
drawJourney(best_journey, )