In [None]:
import ast
import base64
from io import BytesIO
from pathlib import Path

import json
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from PIL import Image
import sklearn

from discriminator import Discriminator
from embedder import Embedder
from evaluator import Evaluator
from expresser import Expresser
from population import Index, Individual

In [None]:
evaluator = Evaluator()

In [None]:
class Result:
    def __init__(self, save_path: Path, evaluator: Evaluator):
        self.save_path = save_path

        self.results_df = pd.read_csv(save_path / "results.csv")
        self.results_df["parents"] = self.results_df["parents"].apply(ast.literal_eval)

        self.population = self.load_all_individuals(evaluator)

        id_to_phenostr = {ind.cand_id: ind.encode_phenotype() for ind in self.population}
        self.results_df["phenotype"] = self.results_df["cand_id"].map(id_to_phenostr)

        self.population_df = self.get_population_df()

        with open(save_path / "tokens.json", "r") as f:
            self.tokens = json.load(f)
        

    def load_all_individuals(self, evaluator: Evaluator) -> list[Individual]:
        individuals = []
        # for gen in range(results_df["gen"].max() + 1):
        #     gen_df = results_df[results_df["gen"] == gen]
        #     top_10 = gen_df.sort_values("novelty_score").iloc[:10]
        #     for _, row in top_10.iterrows():
        #         individual = Individual(row["cand_id"], row["parents"], row["genotype"])
        #         individuals.append(individual)
        for cand_id in self.results_df["cand_id"].unique():
            row = self.results_df[self.results_df["cand_id"] == cand_id].iloc[0]
            individual = Individual(row["cand_id"], row["parents"], row["genotype"])
            individuals.append(individual)

        # Embed candidates
        evaluator.prepare_candidates(individuals)

        return individuals

    def get_population_df(self) -> pd.DataFrame:
        population_df = pd.DataFrame(columns=self.results_df.columns)
        for gen in range(self.results_df["gen"].max() + 1):
            gen_df = self.results_df[self.results_df["gen"] == gen]
            top_10 = gen_df.sort_values("novelty_score").iloc[:10]
            population_df = pd.concat([population_df, top_10], ignore_index=True) if not population_df.empty else top_10
        return population_df

    def get_individual(self, cand_id: int) -> Individual:
        ind: Individual = None
        for individual in self.population:
            if individual.cand_id == cand_id:
                ind = individual
                break
        return ind

In [None]:
baseline = Result(Path("results/baseline"), evaluator)

evolved = Result(Path("results/discriminator2"), evaluator)

In [None]:
def plot_novelty_generations(ax: plt.Axes, results: dict[str, Result]):
    for name, result in results.items():
        population_df = result.population_df
        grouped = population_df.groupby("gen")["novelty_score"].mean()
        if name == "Baseline":
            ax.plot(grouped.index, grouped.values, label=name, linestyle="--", alpha=0.7, color="black")
        else:
            ax.plot(grouped.index, grouped.values, label=name)
    ax.legend()
    ax.set_ylim(0, None)
    ax.set_title("Average Novelty Score of Population per Generation")

def plot_nnew_generations(ax: plt.Axes, result: Result):
    population_df = result.population_df

    n_new = []
    for gen in range(population_df["gen"].max() + 1):
        gen_df = population_df[population_df["gen"] == gen]
        new_ids = range(gen * 10, (gen + 1) * 10)
        successful = gen_df[gen_df["cand_id"].isin(new_ids)]
        n_new.append(len(successful))

    ax.bar(range(len(n_new)), n_new)
    ax.set_title("Number of New Individuals Added to Population per Generation")

fig, axes = plt.subplots(1, 2, figsize=(12, 6))
plot_novelty_generations(axes[0], {"Evolved": evolved, "Baseline": baseline})
plot_nnew_generations(axes[1], evolved)
plt.show()

In [None]:
def decode_phenotype(phenotype_str: str) -> Image.Image:
    img_data = base64.b64decode(phenotype_str)
    img = Image.open(BytesIO(img_data))
    return img

In [None]:
def show_df(axes, results_df: pd.DataFrame):
    for ax, (_, row) in zip(axes, results_df.iterrows()):
        img = decode_phenotype(row["phenotype"])
        ax.set_title(int(row["cand_id"]))
        ax.imshow(img)
        ax.axis("off")

fig, axes = plt.subplots(2, 5, figsize=(15, 6))
axes = axes.flatten()
show_df(axes, baseline.results_df[baseline.results_df["gen"] == 0])
fig.suptitle("Seed Images")
# plt.savefig("figures/baseline_blank", dpi=300, bbox_inches="tight")
plt.show()

In [None]:
def show_most_novel(axes, result: Result):
    results_df = result.population_df

    show_off = results_df[(~results_df["genotype"].isna()) & (~results_df["phenotype"].isna())]
    show_off = show_off.drop_duplicates(subset="cand_id").sort_values(by="novelty_score", ascending=True)
    print(show_off[["cand_id", "parents", "novelty_score"]].iloc[:10])

    # drop = [ind.cand_id for ind in result.population if ind.phenotype and evaluator.is_low_variance(ind.phenotype)]
    # show_off = show_off[~show_off["cand_id"].isin(drop)]

    
    axes = axes.flatten()
    show_df(axes, show_off)
    

fig, axes = plt.subplots(2, 5, figsize=(15, 6))
show_most_novel(axes, baseline)
fig.suptitle("Baseline Most Novel Individuals")
plt.show()

fig, axes = plt.subplots(2, 5, figsize=(15, 6))
show_most_novel(axes, evolved)
fig.suptitle("Evolved Most Novel Individuals")
plt.show()

In [None]:
def plot_novelty_distribution(ax: plt.Axes, result: Result, c: str):
    results_df = result.results_df.copy()
    results_df = results_df.drop_duplicates("cand_id")

    ax.hist(results_df["novelty_score"], bins=30, color=c)
    ax.set_xlim(0, None)
    ax.set_ylim(0, 25)
    ax.set_xticks(np.arange(0, 1.1, 0.1))

fig, axes = plt.subplots(2, 1, figsize=(8, 6))
axes = axes.flatten()
plot_novelty_distribution(axes[0], baseline, "black")
plot_novelty_distribution(axes[1], evolved, "C0")

fig.suptitle("Histograms of Created Novelty Scores")
fig.supxlabel("Novelty Score")
fig.legend(["Baseline", "Evolved"], loc="lower center", ncol=2, bbox_to_anchor=(0.5, -0.05))

plt.show()

In [None]:
baseline_last_gen = baseline.population_df[baseline.population_df["gen"] == baseline.population_df["gen"].max()]
baseline_last_gen["cand_id"]

In [None]:
def plot_tsne(ax: plt.Axes, result: Result):
    population = result.population

    tsne = sklearn.manifold.TSNE()
    top_ids = result.population_df["cand_id"].unique().tolist()

    top_cands = [result.get_individual(cand_id) for cand_id in top_ids]
    top_cands = [ind for ind in top_cands if ind.embedding is not None]

    embeddings = np.array([ind.embedding for ind in top_cands])
    cand_ids = np.array([ind.cand_id for ind in top_cands])
    gens = cand_ids // 10
    first_gen = (gens == 0)
    embeddings_2d = tsne.fit_transform(embeddings)

    ax.scatter(embeddings_2d[first_gen][:, 0], embeddings_2d[first_gen][:, 1], color="red", label="Baseline")
    ax.scatter(embeddings_2d[~first_gen][:, 0], embeddings_2d[~first_gen][:, 1], c=gens[~first_gen], cmap="viridis", label="Created")

    # Remove ticks from axes
    ax.set_xticks([])
    ax.set_yticks([])


fig, axes = plt.subplots(1, 2, figsize=(12, 5))
plot_tsne(axes[0], baseline)
plot_tsne(axes[1], evolved)

# Create generation colorbar at the bottom of the plot horizontally
cbar_ax = fig.add_axes([0.25, -0.05, 0.5, 0.03])
norm = plt.Normalize(vmin=0, vmax=max(baseline.population_df["gen"].max(), evolved.population_df["gen"].max()))
cbar = fig.colorbar(plt.cm.ScalarMappable(norm=norm, cmap="viridis"), cax=cbar_ax, orientation="horizontal")
cbar.set_label("Generation")

fig.suptitle("t-SNE of Population Embeddings over Generations")
axes[0].set_title("Baseline")
axes[1].set_title("Evolved")
plt.show()

In [None]:
from population import Index

def plot_intergeneration_novelty(ax: plt.Axes, results: dict[str, Result]):
    for name, result in results.items():
        novelties = []
        for gen in range(result.population_df["gen"].max()):
            population = [ind for ind in result.population if ind.cand_id // 10 == gen]
            population = [ind for ind in population if ind.embedding is not None]

            index = Index(-1)
            for ind in population:
                index.add_embedding(ind)

            print(index.index.ntotal)

            novelty = 0
            for ind in population:
                novelty += index.measure_novelty(ind.embedding.reshape(1, -1))[0]
            
            novelty /= len(population)
            novelties.append(novelty)

        if name == "Baseline":
            ax.plot(range(len(novelties)), novelties, label=name, linestyle="--", alpha=0.7, color="black")
        else:
            ax.plot(range(len(novelties)), novelties, label=name, color="C0")
    
    ax.set_ylim(0, 1.1)
    ax.set_title("Average Intergeneration Novelty per Generation")
    ax.legend()
    ax.set_ylabel("Average Intergeneration Novelty")
    ax.set_xlabel("Generation")

fig, ax = plt.subplots()
plot_intergeneration_novelty(ax, {"Evolved": evolved, "Baseline": baseline})
plt.show()        


In [None]:
def token_usage(ax: plt.Axes, result: Result):
    token_data = result.tokens
    input_tokens = token_data["reproducer_tokens"]["input_tokens"]
    output_tokens = token_data["reproducer_tokens"]["output_tokens"]
    total_tokens = [in_tok + out_tok for in_tok, out_tok in zip(input_tokens, output_tokens)]
    print(sum(total_tokens))

fig, ax = plt.subplots()
token_usage(ax, baseline)
token_usage(ax, evolved)
plt.show()