In [2]:
from pyspark.sql import SparkSession
from pyspark.storagelevel import StorageLevel
from pyspark.sql.functions import udf, lit, explode, collect_list, col
from pyspark.sql.functions import sum
from pyspark.sql.functions import max
from pyspark.sql.functions import rand
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
import pyspark.sql.functions as f
import random
import math

In [3]:
spark = SparkSession.builder.appName("ACO").getOrCreate()

In [4]:
#Reading node location data
nodes = spark.read.format('csv').option('header','true').option('inferSchema','true').load('/content/drive/MyDrive/ACO.csv')
no_of_nodes=nodes.count()
nodes.show()

+-------+---+---+
|node_id|  x|  y|
+-------+---+---+
|      A|  1|  2|
|      B|  1|  5|
|      C|  6|  4|
|      D|  5|  6|
|      E|  2|  5|
|      F|  4|  6|
|      G|  2|  1|
|      H|  0|  4|
|      I|  3|  0|
+-------+---+---+



In [5]:
#Creating edges dataframe having edge length information
edges = nodes.selectExpr('node_id as from_node_id', 'x as from_x', 'y as from_y').crossJoin(nodes.selectExpr('node_id as to_node_id', 'x as to_x', 'y as to_y'))
edges=edges.where("from_node_id <> to_node_id")
edges=edges.withColumn('distance',((col('from_x')-col('to_x'))**2+(col('from_y')-col('to_y'))**2)**0.5).select('from_node_id','to_node_id','distance')
edges.show()

+------------+----------+------------------+
|from_node_id|to_node_id|          distance|
+------------+----------+------------------+
|           A|         B|               3.0|
|           A|         C| 5.385164807134504|
|           A|         D| 5.656854249492381|
|           A|         E|3.1622776601683795|
|           A|         F|               5.0|
|           A|         G|1.4142135623730951|
|           A|         H|  2.23606797749979|
|           A|         I|2.8284271247461903|
|           B|         A|               3.0|
|           B|         C|5.0990195135927845|
|           B|         D| 4.123105625617661|
|           B|         E|               1.0|
|           B|         F|3.1622776601683795|
|           B|         G| 4.123105625617661|
|           B|         H|1.4142135623730951|
|           B|         I| 5.385164807134504|
|           C|         A| 5.385164807134504|
|           C|         B|5.0990195135927845|
|           C|         D|  2.23606797749979|
|         

In [6]:
#Setting parameters required for the algorithm 
num_ants = 10
evaporation_rate = 0.5
alpha = 1
beta = 2
q = 3
initial_pheromone = 0.1
max_iterations = 5


In [8]:
#Setting initial phermone level
edges = edges.withColumn('pheromone',lit(initial_pheromone))

for i in range(max_iterations):
  #Creating probability and edge_id columns
  edges = edges.withColumn("pheromone_distance", (col("pheromone") ** alpha) * ((1 / col("distance")) ** beta))
  edges = edges.withColumn("probability",col('pheromone_distance')/edges.agg(sum('pheromone_distance')).collect()[0][0])
  edges = edges.drop('pheromone_distance').withColumn('edge_id',f.concat('from_node_id','to_node_id'))
  
  #Caching the dataframe in memory for better performance
  edges.persist(StorageLevel.MEMORY_ONLY)
  
  #Creating ants dataframe to record the routes travelled by each ant
  ants_schema=StructType([StructField('ant_id',StringType(),False),StructField('route',ArrayType(StringType(),False),False)])
  ants=spark.createDataFrame([],schema=ants_schema)
  ants.createOrReplaceTempView('ants')
  
  
  
  random_nodes = nodes.sample(True,float(num_ants/no_of_nodes)+1,seed=42).limit(num_ants).collect()
  for i in range(1,num_ants+1):
    #Intialising ant_id and current_route of the ant
    ant_id=f"a{i}" 
    current_route=[random_nodes[i-1].asDict()['node_id']]
    #Loop to extend the current_route to cover all nodes
    while len(current_route) < no_of_nodes:
      prev_node=current_route[-1][-1]
      #Conditions to select next node
      condition_string2 = ' and '.join([f"to_node_id <> '{route[-1]}'"for route in current_route]) 
      condition_string1 = f"from_node_id = '{prev_node}'"
      #Selecting the next node with the probability distribution and updating current_route
      next_node=edges.where(f''' {condition_string1} and {condition_string2}''').withColumn('select_path',rand()**(1/col('probability'))).orderBy(col('select_path').desc()).first()['to_node_id']
      current_route.append(prev_node+next_node)
    current_route=current_route[1:]+[next_node+current_route[0]]
    #Updating ants dataframe
    ants = ants.union(spark.createDataFrame([(ant_id, current_route)], ants_schema))
  
  #To find the length of route taken by each ant
  inter=ants.select('ant_id',explode('route').alias('edge_id'))
  len_route=inter.join(edges,['edge_id'],'left').groupBy('ant_id').agg(f.sum('distance').alias('length')).select('ant_id','length')
  
  #Printing the best route for each iteration 
  final=ants.join(len_route,['ant_id'],'inner').orderBy('length').limit(1).collect()[0].asDict()
  print(final)
  
  #Finding the no. of times an edge is chosen by an ant 
  edge_frequency=inter.select('edge_id').groupBy('edge_id').agg(f.count('edge_id').alias('freq'))
  edges_temp=edges.join(edge_frequency,['edge_id'],'left').fillna(value=0,subset=['freq'])
  
  #Updating phermone level after each iteration 
  edges=edges_temp.withColumn('pheromone',col('pheromone')+(1-evaporation_rate)*col('pheromone')+col('freq')*q/col('distance')).drop('freq')
  

{'ant_id': 'a1', 'route': ['BH', 'HG', 'GI', 'IA', 'AE', 'EF', 'FD', 'DC', 'CB'], 'length': 22.995838653717115}
{'ant_id': 'a7', 'route': ['HA', 'AG', 'GI', 'IC', 'CD', 'DF', 'FE', 'EB', 'BH'], 'length': 17.950844619618653}
{'ant_id': 'a4', 'route': ['EB', 'BH', 'HA', 'AG', 'GI', 'IC', 'CD', 'DF', 'FE'], 'length': 17.950844619618653}
{'ant_id': 'a4', 'route': ['EB', 'BH', 'HA', 'AG', 'GI', 'IC', 'CD', 'DF', 'FE'], 'length': 17.950844619618653}
{'ant_id': 'a10', 'route': ['IG', 'GA', 'AH', 'HB', 'BE', 'EF', 'FD', 'DC', 'CI'], 'length': 17.950844619618653}
