# Install libraries

In [1]:
!pip install pyspark
!pip install matplotlib
!pip3 install pygad



# Import libraries

In [2]:
from pyspark.sql import SparkSession, functions as F, SQLContext
from pyspark.sql.functions import when, lit, col, lower
from IPython.display import display, Image
from pygad import GA
import numpy as np
import random

# Setup Spark

In [3]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

# Read data

In [4]:
df = spark.read.csv("/home/jovyan/work/BCU/ModernOptimisation/Assignment/Data/pokemons.csv", header=True, inferSchema=True)

In [5]:
TypeData_df = spark.read.csv("/home/jovyan/work/BCU/ModernOptimisation/Assignment/Data/TypeData.csv", header=True, inferSchema=True)

# Get 3 random pokemon

In [6]:
random.seed(42)

In [7]:
# Get IDs
pokemon_ids = df.select("id").distinct().rdd.map(lambda x: x[0]).collect()

# Randomly select 3 unique pokemon_ids
random_pokemon_ids = random.sample(pokemon_ids, 3)

# Filter the DataFrame based on the randomly selected pokemon_ids
random_pokemon = df.filter(col("id").isin(random_pokemon_ids))
random_pokemon.show()

+---+---------+---------+---------------+------------+-----+-------+---+---+---+-----+-----+-----+-----+------+------+--------------------+--------------------+
| id|     name|     rank|     generation|evolves_from|type1|  type2| hp|atk|def|spatk|spdef|speed|total|height|weight|           abilities|                desc|
+---+---------+---------+---------------+------------+-----+-------+---+---+---+-----+-----+-----+-----+------+------+--------------------+--------------------+
|314| illumise| ordinary| generation-iii|     nothing|  bug|   None| 65| 47| 75|   73|   85|   85|  430|     6|   177|oblivious tinted-...|With its sweet ar...|
|375|   metang| ordinary| generation-iii|      beldum|steel|psychic| 60| 75|100|   55|   80|   50|  420|    12|  2025|clear-body light-...|When two BELDUM f...|
|896|glastrier|legendary|generation-viii|     nothing|  ice|   None|100|145|130|   65|  110|   30|  580|    22|  8000|     chilling-neigh |Glastrier emits i...|
+---+---------+---------+---------

In [9]:
print("Opponents pokemons: ")
print()
for id in random_pokemon_ids:

    # Filter the random_pokemon DataFrame to get the details of the current pokemon_id
    pokemon_details = random_pokemon.filter(col("id") == id)\
        .select("id", "name", "rank", "generation", "evolves_from", "type1", "type2", "hp", "atk", "def", "spatk", "speed", "abilities", "desc")\
        .first()
    
    # Print the details in the specified format
    print("id:", pokemon_details["id"])
    print("name:", pokemon_details["name"])
    print("rank:", pokemon_details["rank"])
    print("generation:", pokemon_details["generation"])
    print("evolves_from:", pokemon_details["evolves_from"])
    print("type1:", pokemon_details["type1"])
    print("type2:", pokemon_details["type2"])
    print("hp:", pokemon_details["hp"])
    print("atk:", pokemon_details["atk"])
    print("def:", pokemon_details["def"])
    print("spatk:", pokemon_details["spatk"])
    print("speed:", pokemon_details["speed"])
    print("abilities:", pokemon_details["abilities"])
    # print("desc:", pokemon_details["desc"])
    desc = pokemon_details["desc"]
    print("desc:", end=" ")
    char_count = 0
    for word in desc.split():
        if char_count + len(word) > 64:
            print("\n\t", end="")
            char_count = 0
        print(word, end=" ")
        char_count += len(word) + 1  # Add 1 for the space between words
    print()

    
    # Convert the ID to a string with leading zeros if necessary
    pokemon_id_str = str(id).zfill(3)

    # Generate the URL for the Pokémon image
    url = f"https://assets.pokemon.com/assets/cms2/img/pokedex/full/{pokemon_id_str}.png"
    
    # Display the image
    display(Image(url=url))


Opponents pokemons: 

id: 896
name: glastrier
rank: legendary
generation: generation-viii
evolves_from: nothing
type1: ice
type2: None
hp: 100
atk: 145
def: 130
spatk: 65
speed: 30
abilities: chilling-neigh 
desc: Glastrier emits intense cold from its hooves. It’s also a 
	belligerent Pokémon—anything it wants, it takes by force. 


id: 375
name: metang
rank: ordinary
generation: generation-iii
evolves_from: beldum
type1: steel
type2: psychic
hp: 60
atk: 75
def: 100
spatk: 55
speed: 50
abilities: clear-body light-metal 
desc: When two BELDUM fuse together, METANG is formed. The brains of 
	the BELDUM are joined by a magnetic nervous system. By linking 
	its brains magnetically, this POKéMON generates strong 
	psychokinetic power. 


id: 314
name: illumise
rank: ordinary
generation: generation-iii
evolves_from: nothing
type1: bug
type2: None
hp: 65
atk: 47
def: 75
spatk: 73
speed: 85
abilities: oblivious tinted-lens prankster 
desc: With its sweet aroma, it guides Volbeat to draw signs with light 
	in the night sky. 


In [10]:
def get_n_row(df, number_in_df):
    num_rows = df.count()
    nth_row_df = df.limit(num_rows).collect()[num_rows - number_in_df]
    return spark.createDataFrame([nth_row_df], schema=df.schema)

def create(primary_type):
    # 1) Locate the type1 in TypeData based on primary_type
    TypeData_df_2 = TypeData_df.filter(TypeData_df.Type == primary_type.first()["type1"])
    # 2) Shorten df
    df_updated = df.select("id", "name", "type1", "hp", "atk").withColumn("bns", lit(None))
    # 3) Iterate over dataframe
    for column in TypeData_df_2.columns:
        df_updated = df_updated.withColumn("bns", when(col("type1") == column, TypeData_df_2.first()[column]).otherwise(df_updated["bns"]))
    # 4) Multiply value of column type with HP of orignal dataset
    return df_updated.withColumn("hp", col("hp") * col("bns")).drop("bns")


In [11]:
pokemon_1 = get_n_row(df=random_pokemon, number_in_df=1) # returns pyspark dataframe
pokemon_2 = get_n_row(df=random_pokemon, number_in_df=2) # returns pyspark dataframe
pokemon_3 = get_n_row(df=random_pokemon, number_in_df=3) # returns pyspark dataframe

all_enemies_of_pk1 = create(pokemon_1) # returns pyspark dataframe
#all_enemies_of_pk1.show()
all_enemies_of_pk2 = create(pokemon_2) # returns pyspark dataframe
#all_enemies_of_pk2.show()
all_enemies_of_pk3 = create(pokemon_3) # returns pyspark dataframe
all_enemies_of_pk3.show()

+---+----------+------+-----+---+
| id|      name| type1|   hp|atk|
+---+----------+------+-----+---+
|  1| bulbasaur| grass| 90.0| 49|
|  2|   ivysaur| grass|120.0| 62|
|  3|  venusaur| grass|160.0| 82|
|  4|charmander|  fire| 19.5| 52|
|  5|charmeleon|  fire| 29.0| 64|
|  6| charizard|  fire| 39.0| 84|
|  7|  squirtle| water| 44.0| 48|
|  8| wartortle| water| 59.0| 63|
|  9| blastoise| water| 79.0| 83|
| 10|  caterpie|   bug| 45.0| 30|
| 11|   metapod|   bug| 50.0| 20|
| 12|butterfree|   bug| 60.0| 45|
| 13|    weedle|   bug| 40.0| 35|
| 14|    kakuna|   bug| 45.0| 25|
| 15|  beedrill|   bug| 65.0| 90|
| 16|    pidgey|normal| 40.0| 45|
| 17| pidgeotto|normal| 63.0| 60|
| 18|   pidgeot|normal| 83.0| 80|
| 19|   rattata|normal| 30.0| 56|
| 20|  raticate|normal| 55.0| 81|
+---+----------+------+-----+---+
only showing top 20 rows



# Genetic algorithm

In [12]:
def my_ga(df1, df2):
    #np.random.seed(42)
    # Define fitness function
    def fitness_function(ga_instance, solution, solution_idx):
        """
        Fitness function to evaluate the strength of a Pokemon.
        Here, we calculate the total stats (hp + atk) and the closer it is to the target Pokemon, the better.
        """
        total_stats = solution[0] + solution[1]  # hp + atk
        target_stats = df1.select(F.sum("hp") + F.sum("atk")).collect()[0][0]
        fitness = 1 / (abs(total_stats - target_stats) + 0.0001)  # Avoid division by zero
        return fitness

    # Create initial population with seed
    def create_population():
        """
        Create initial population by randomly selecting Pokemon from df2.
        """
        population = df2.select("hp", "atk").collect()
        return [[float(p.hp), float(p.atk)] for p in population]

    # Initialize genetic algorithm without mutation
    ga = GA(num_generations=100,
            num_parents_mating=2,
            fitness_func=fitness_function,
            sol_per_pop=10,
            num_genes=2,
            gene_type=float,
            initial_population=create_population(),
            parent_selection_type="sss",
            crossover_type="single_point",
            mutation_type=None)  # Disable mutation

    # Set seed for the genetic algorithm instance
    ga.seed = 42

    # Run genetic algorithm
    ga.run()

    # Get the final best solution
    best_solution = ga.best_solution()

    # Find the Pokemon closest to the best solution
    target_stats = np.array(best_solution[0])
    df2_stats = np.array(df2.select("hp", "atk").collect())
    distances = np.linalg.norm(df2_stats - target_stats, axis=1)
    closest_pokemon_idx = np.argmin(distances)
    closest_pokemon = df2.select("name").collect()[closest_pokemon_idx][0]

    print("Best Solution:", best_solution)
    print("Closest Pokemon to Solution:", closest_pokemon)


In [14]:
df1 = pokemon_3.selectExpr("id","name","type1", "CAST(hp AS INT) AS hp", "CAST(atk AS INT) AS atk")
df2 = all_enemies_of_pk3.selectExpr("id","name","type1", "CAST(hp AS INT) AS hp", "CAST(atk AS INT) AS atk")
my_ga(df1, df2)



In [None]:
df1 = pokemon_2.selectExpr("id","name","type1", "CAST(hp AS INT) AS hp", "CAST(atk AS INT) AS atk")
df2 = all_enemies_of_pk2.selectExpr("id","name","type1", "CAST(hp AS INT) AS hp", "CAST(atk AS INT) AS atk")
my_ga(df1, df2)

In [None]:
df1 = pokemon_1.selectExpr("id","name","type1", "CAST(hp AS INT) AS hp", "CAST(atk AS INT) AS atk")
df2 = all_enemies_of_pk1.selectExpr("id","name","type1", "CAST(hp AS INT) AS hp", "CAST(atk AS INT) AS atk")
my_ga(df1, df2)

# Hill Climbing

In [None]:
attributes = ['hp', 'atk', 'def', 'spatk', 'spdef', 'speed']

# Calculate the total attribute score for each Pokémon in random_pokemon
random_pokemon_attributes = random_pokemon.selectExpr('id', 'hp + atk + def + spatk + spdef + speed AS total_score')

# Iterate through each attribute and compare with pokemon_df to find the top competitors
top_competitors = df
for attr in attributes:
    top_competitors = top_competitors.join(random_pokemon_attributes, random_pokemon_attributes.id == top_competitors.id, 'inner') \
                                     .withColumn('max_' + attr, col('df.' + attr) > col('random_pokemon_attributes.' + attr)) \
                                     .filter(col('max_' + attr) == True) \
                                     .drop('max_' + attr, 'random_pokemon_attributes.id')

# Show the top competitors
top_competitors.show()

In [None]:
# Import necessary libraries
from pyspark import SparkContext
import matplotlib.pyplot as plt
import numpy as np

In [None]:
def hill_climbing(f, x0, iter=1000):
    x = x0  # initial solution
    cur_iter = 0
    while iter >= cur_iter:
        # print(f"cur_iter: {cur_iter}, best: {x}")
        neighbors = generate_neighbors(x)  # generate neighbors of x
        # find the neighbor with the highest function value
        best_neighbor = max(neighbors, key=f)
        if f(best_neighbor) <= f(x):  # if the best neighbor is not better than x, stop
            return x, cur_iter
        x = best_neighbor  # otherwise, continue with the best neighbor
        cur_iter+=1

In [None]:
# Define the gradient descent function
def gradient_descent(x, learning_rate=0.1, max_iter=1000, tolerance=10E-13):
    iteration = 0
    while iteration < max_iter:
        gradient = 2*x  # Gradient of the quadratic function -2*x
        step = learning_rate * gradient
        x -= step
        if abs(step) < tolerance:
            break
        iteration += 1
    return x, iteration

In [None]:
# Define the function to optimize
def quadratic_function(x):
    return -x**2  # example function, we're maximizing, hence the negative sign

# Define a function to generate neighbors
def generate_neighbors(x):
    return [x - 0.1, x, x + 0.1]  # Simple neighbor generation for demonstration

In [None]:

# Run hill climbing
initial_solution = 90
best_solution, num_of_iter = hill_climbing(quadratic_function, initial_solution)
print(f"Best solution found: {best_solution}, number of iterations: {num_of_iter}")

# Visualize the optimization process
x_values = np.linspace(-10, 10, 100)
y_values = [quadratic_function(x) for x in x_values]

plt.plot(x_values, y_values, label='Function')
plt.scatter(best_solution, quadratic_function(best_solution), color='red', label='Optimal Solution')
plt.title('Hill Climbing Optimization')
plt.xlabel('X')
plt.ylabel('Y')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Run gradient descent
initial_solution = 2  # Initial value for gradient descent
best_solution, num_of_iter = gradient_descent(initial_solution)
print(f"Best solution found: {best_solution}, number of iterations: {num_of_iter}")

# Visualize the optimization process
x_values = np.linspace(-10, 10, 100)
y_values = [quadratic_function(x) for x in x_values]

plt.plot(x_values, y_values, label='Function')
plt.scatter(best_solution_gradient, quadratic_function(best_solution_gradient), color='green', label='Optimal Solution (Gradient Descent)')
plt.title('Gradient Descent Optimization')
plt.xlabel('X')
plt.ylabel('Y')
plt.legend()
plt.grid(True)
plt.show()
