# Lab. 2

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

### Data preparation

In [None]:
functions = (
    old_functions := [  # 1
        lambda x: 5 * x**3 - 2 * x**2 + 3 * x - 17,
        lambda x: np.sin(x) + np.cos(x),
        lambda x: 2 * np.log(x + 1),
        lambda x, y: x + 2 * y,
        lambda x, y: np.sin(x / 2) + 2 * np.cos(x),
        lambda x, y: x**2 + 3 * x * y - 7 * y + 1,
    ]
) + (
    new_functions := [  # 2
        lambda x: np.sin(x + 3.141592 / 2),
        lambda x: np.tan(2 * x + 1),
    ]
)

In [None]:
asymptote = lambda n: np.round(-np.pi / 4 - 1 / 2 + np.pi * n / 2, 2)  # 2

domains = (
    old_domains := [
        [(-10, 10), (0, 100), (-1, 1), (-1000, 1000)],  # 1
        [(-3.14, 3.14), (0, 7), (0, 100), (-100, 100)],
        [(0, 4), (0, 9), (0, 99), (0, 999)],
        [(0, 1), (-10, 10), (0, 100), (-1000, 1000)],
        [(-3.14, 3.14), (0, 7), (0, 100), (-100, 100)],
        [(-10, 10), (0, 100), (-1, 1), (-1000, 1000)],
    ]
) + (
    new_domains := [  # 2
        [(0, 6.28), (-3.14, 3.14), (-1.57, 4.71), (-1000, 1000)],
        [(asymptote(0.01), asymptote(0.99)), (-3.14, 3.14), (-1.57, 1.57)],
    ]
)

In [None]:
import os
import subprocess

In [None]:
for i, (function, domains_) in enumerate(zip(new_functions, new_domains)):
    for domain in domains_:
        X = np.linspace(*domain, (n if (n := int(100 * abs(domain[1] - domain[0]))) < 1001 else 1000)).round(2)
        Y = function(X).round(2)
        filename = f"1_{i+7}__{domain[0]}_{domain[1]}.dat"
        with open(f"data\input\{filename}", "a") as f:
            f.write(f"1 100 -5 6 {X.shape[0]}\n")
        pd.DataFrame({"X": X, "Y": Y}).to_csv(
            f"data\input\{filename}", index=False, header=False, sep=" ", mode="a"
        )

In [None]:
input_filenames = os.listdir("data\input")

In [None]:
def get_domain_from_filename(filename):
    return tuple(map(float, ".".join(filename.split('.')[:-1]).split('__')[1].split('_')))

def get_domain_length_from_filename(filename):
    domain = get_domain_from_filename(filename)
    return abs(domain[1] - domain[0])

def get_problem_solving_threshold_from_filename(filename):
    domain_length = get_domain_length_from_filename(filename)
    return domain_length * 0.21

In [None]:
for input_file in input_filenames:
    output_file = ".".join(input_file.split('.')[:-1]) + '.out'
    problem_solved_threshold = get_problem_solving_threshold_from_filename(input_file)
    print(
        command := f"copy NUL data\output\wsc\{output_file} & cd TinyGP & java TinyGP.java ..\data\input\{input_file} {problem_solved_threshold} > ..\data\output\wsc\{output_file} & cd .."
    )
    t = subprocess.run(command, shell=True)

### Output parsing

In [None]:
import re
import json

In [None]:
from numpy import sin, cos

def dictify_individual(individual):
    return (
        individual.replace("X1", "X")
        .replace("X2", "Y")
        .replace("SIN", "sin")
        .replace("COS", "cos")
        .strip()
    )

def reverse_dictify_individual(individual):
    return (
        individual.replace("X", "X1")
        .replace("Y", "X2")
        .replace("sin", "SIN")
        .replace("cos", "COS")
        .replace("-", " - ")
        .replace("+", " + ")
        .replace("*", " * ")
        .replace("/", " / ")
        .strip()
    )

def dictify(log):
    return (
        dictify_individual(log).replace("AvgFitness", '{"avg_fitness"')
        .replace("BestFitness", ', "best_fitness"')
        .replace("AvgSize", ', "avg_size"')
        .replace("BestIndividual:", ', "best_individual": "')
        .replace("\n", " ")
        .replace("=", ": ")
        .replace("PROBLEM*NOT*SOLVED", "")
        .replace("PROBLEMSOLVED", "")
        + '"}'
    )

In [None]:
def logs_to_df(logs):
    generations = {
        int(log.split(" ")[0][1:]): "".join(log.split(" ")[1:])
        for log in re.split("Generation", logs)[1:]
    }
    for k, v in generations.items():
        generations[k] = json.loads(dictify(v))
    return pd.DataFrame().from_dict(generations, orient="index")

### Simplification

In [None]:
import sympy as sp
import inspect
from func_timeout import func_timeout

In [None]:
def simplify(expression):
    return str(sp.simplify(expression))

In [None]:
def simplify_java(individual):
    """
    Requires original TinyGP output
    """
    INPUT_FILENAME = "input.txt"
    INPUT_PATH = os.path.join("TinyGP", INPUT_FILENAME)
    with open(INPUT_PATH, "w") as f:
        f.write(individual)
    with open("TinyGP/individual.txt", "w") as f:
        command = f"cd TinyGP & java Simplifier.java {INPUT_FILENAME} & cd .."
        subprocess.run(
            command, shell=True, stdout=f, text=True, 
        )
    with open("TinyGP/individual.txt", "r") as f:
        individual = f.read().strip()
    return str(sp.parse_expr(dictify_individual(individual)))

In [None]:
def read_from_original(function_idx, domain_idx, with_sin_cos=True):
    FILEPATH = os.path.join(
        "data",
        "output",
        "wsc" if with_sin_cos else "initial",
        f"1_{function_idx+1}__{domains[function_idx][domain_idx][0]}_{domains[function_idx][domain_idx][1]}.out"
    )
    with open(FILEPATH, "r") as f:
        logs = f.readlines()
    return logs[-2].replace("Best Individual: ", "")

In [None]:
def simplify_timeout(value, timeout=600, with_trigonomethric=True, function_idx=None, domain_idx=None):
    try:
        return func_timeout(timeout, simplify, args=(value,))
    except:
        print("Using Java Simplifier")
        return simplify_java(read_from_original(function_idx, domain_idx, with_sin_cos=with_trigonomethric))

### Comparison

In [None]:
def compare(function, domain, df, size=(8, 8), save=False):
    X = np.linspace(*domain, int(100 * abs(domain[1] - domain[0])))
    Y_true = function(X)

    best_individual = df.iloc[-1, -1]
    Y_gp = eval(best_individual)

    f = plt.figure(figsize=size)
    plt.plot(X, Y_true, label="True", linestyle="-", color="black", linewidth=4, alpha=0.5)
    plt.plot(X, Y_gp, label="GP", linestyle="-", color="red", linewidth=1)
    plt.legend()
    plt.title(
        inspect.getsource(function)
        .replace("lambda x:", "f(x) =")
        .replace(",", "")
        .replace("np.", "")
        + f"\nx ∈ [{domain[0]}, {domain[1]}]"

    )
    plt.grid()
    plt.show()
    if save:
        f.savefig(f"data/output/plots/{i+1}__{inspect.getsource(function).split(':')[1].replace('**', 'pow').replace('/', 'div').replace('*', '').replace(' ', '').replace('np.', '').strip()}/{domain[0]}_{domain[1]}.png")

In [None]:
def compare_2_var_3_views_with_and_without_sin_cos(function, domain, df_with, df_without, size=(12, 8), save=False):
    X, Y = np.meshgrid(np.linspace(*domain, 100), np.linspace(*domain, 100))
    Z_true = function(X.ravel(), Y.ravel())

    best_individual_without = df_without.iloc[-1, -1]
    Z_gp_without = eval(best_individual_without)

    best_individual_with = df_with.iloc[-1, -1]
    Z_gp_with = eval(best_individual_with)

    fig = plt.figure(figsize=size)
    fig.suptitle(
        inspect.getsource(function)
        .replace(",", "")
        .replace("lambda x y:", "f(x, y) =")
        .replace("np.", "")
        + f"\nx ∈ [{domain[0]}, {domain[1]}], y ∈ [{domain[0]}, {domain[1]}]"
    )

    for y, (Z, label, color) in enumerate(zip([Z_gp_without, Z_gp_with], ["Without", "With"], ["red", "blue"])):
        for x, azim in enumerate([0, 90, 180]):
            ax = fig.add_subplot(2, 3, y * 3 + x + 1, projection='3d')
            surf1 = ax.plot_surface(X, Y, Z_true.reshape(X.shape), rstride=1, cstride=1,
                        linewidth=0, antialiased=False, color="gray", alpha=0.4, label="True")

            surf2 = ax.plot_surface(X, Y, Z.reshape(X.shape), rstride=1, cstride=1,
                            linewidth=0, antialiased=False, color=color, alpha=0.4, label=label)
            ax.xaxis.set_label_text("X")
            ax.yaxis.set_label_text("Y")
            ax.zaxis.set_label_text("Z")
            ax.view_init(azim=azim)
            if x == 0:
                ax.legend()

    plt.grid()
    plt.show()
    if save:
        fig.savefig(
            f"data/output/plots/{i+1}__{inspect.getsource(function).split(':')[1].replace('**', 'pow').replace('/', 'div').replace('*', '').replace(' ', '').replace('np.', '').strip()}/{domain[0]}_{domain[1]}_compare.png"
        )


In [None]:
def compare_without_and_with_sin_cos(
    function,
    domain,
    df_without,
    df_with,
    size=(14, 9),
    save=False
):
    X = np.linspace(*domain, int(100 * abs(domain[1] - domain[0])))
    Y_true = function(X)

    best_individual_without = df_without.iloc[-1, -1]
    Y_gp_without = eval(best_individual_without)

    best_individual_with = df_with.iloc[-1, -1]
    Y_gp_with = eval(best_individual_with)

    f, a = plt.subplots(1, 3, figsize=size)
    f.suptitle(
        inspect.getsource(function)
        .replace("lambda x:", "f(x) =")
        .replace(",", "")
        .replace("np.", "")
        + f"\nx ∈ [{domain[0]}, {domain[1]}]"
    )

    a[0].plot(
        X, Y_true, label="True", linestyle="-", color="black", linewidth=2, alpha=0.5
    )
    a[0].legend()
    a[0].set_title("True")
    a[0].grid()

    a[1].plot(
        X, Y_true, label="True", linestyle="-", color="black", linewidth=4, alpha=0.5
    )
    a[1].plot(X, Y_gp_with, label="GP with", linestyle="-", color="red", linewidth=1)
    a[1].legend()
    a[1].set_title("GP with sin/cos")
    a[1].grid()

    a[2].plot(
        X, Y_true, label="True", linestyle="-", color="black", linewidth=4, alpha=0.5
    )
    a[2].plot(
        X,
        Y_gp_without,
        label="GP without sin/cos",
        linestyle="-",
        color="blue",
        linewidth=1,
    )
    a[2].legend()
    a[2].set_title("GP without sin/cos")
    a[2].grid()

    plt.show()
    if save:
        f.savefig(
            f"data/output/plots/{i+1}__{inspect.getsource(function).split(':')[1].replace('**', 'pow').replace('/', 'div').replace('*', '').replace(' ', '').replace('np.', '').strip()}/{domain[0]}_{domain[1]}_compare.png"
        )

In [None]:
def show_fitness_process(df, save=False):
    f, a = plt.subplots(2)
    # if process took only 1 generation (1 row), then scatter plot
    if df.shape[0] == 1:
        a[0].scatter(
            df["avg_fitness"],
            df["avg_fitness"],
            label="avg_fitness",
            linestyle="-",
            color="red",
            linewidth=1,
        )
        a[1].scatter(
            df["best_fitness"],
            df["best_fitness"],
            label="best_fitness",
            linestyle="-",
            color="blue",
            linewidth=1,
        )
    else:
        a[0].plot(
            df["avg_fitness"], label="avg_fitness", linestyle="-", color="red", linewidth=1
        )
        a[1].plot(
            df["best_fitness"],
            label="best_fitness",
            linestyle="-",
            color="blue",
            linewidth=1,
        )
    a[0].legend()
    a[0].grid()
    a[1].legend()
    a[1].grid()
    a[0].title.set_text("fitness(generation)")
    plt.show()
    if save:
        f.savefig(
            f"data/output/plots/{i+1}__{inspect.getsource(function).split(':')[1].replace('**', 'pow').replace('/', 'div').replace('*', '').replace(' ', '').replace('np.', '').strip()}/{domain[0]}_{domain[1]}_fitness.png"
        )

In [None]:
import time

In [None]:
def save_best_individual(df_with, df_without=None, timeout=600, function_index=None, domain_index=None):
    best_individual = df_with["best_individual"].iloc[-1]
    print("Simplyfying...")
    best_individual_simple = simplify_timeout(best_individual, timeout=timeout, with_trigonomethric=True, function_idx=function_index, domain_idx=domain_index)
    best_individual_without = None
    best_individual_without_simple = None
    if df_without is not None:
        best_individual_without = df_without["best_individual"].iloc[-1]
        print("Simplyfying without sin/cos...")
        best_individual_without_simple = simplify_timeout(best_individual_without, timeout=timeout, with_trigonomethric=False, function_idx=function_index, domain_idx=domain_index)

    pd.DataFrame.from_dict(
        [
            {
                "best_individual": best_individual,
                "best_individual_simplified": best_individual_simple,
                "best_individual_without": best_individual_without,
                "best_individual_without_simplified": best_individual_without_simple,
            }
        ]
    ).to_csv(
        f"data/output/plots/{i+1}__{inspect.getsource(function).split(':')[1].replace('**', 'pow').replace('/', 'div').replace('*', '').replace(' ', '').replace('np.', '').strip()}/{domain[0]}_{domain[1]}_best.csv",
    )

In [None]:
for i, (function, domains_) in enumerate(zip(old_functions, old_domains)):
    print(
        f'Function {i+1}: {inspect.getsource(function).replace("lambda x:", "f(x) =").replace("lambda x, y:", "f(x, y) =")}'
    )
    for j, domain in enumerate(domains_):

        with open(f"data/output/initial/1_{i+1}__{domain[0]}_{domain[1]}.out", "r") as f:
            logs_without = f.read()
        df_without = logs_to_df(logs_without)
        df_without.to_csv(
            f"data/output/csv/initial/1_{i+1}__{domain[0]}_{domain[1]}.csv",
        )


        with open(f"data/output/wsc/1_{i+1}__{domain[0]}_{domain[1]}.out", "r") as f:
            logs_with = f.read()
        df_with = logs_to_df(logs_with)
        df_with.to_csv(
            f"data/output/csv/with_sin_cos/1_{i+1}__{domain[0]}_{domain[1]}.csv",
        )

        if i in [3, 4, 5]:
            compare_2_var_3_views_with_and_without_sin_cos(function, domain, df_without, df_with)
            pass
        else:
            compare_without_and_with_sin_cos(function, domain, df_without, df_with)
            pass
        show_fitness_process(df_with)
        save_best_individual(df_with, df_without)


In [None]:
for i, (function, domains_) in enumerate(zip(new_functions, new_domains)):
    i = i + 6
    print(
        f'Function {i + 1}: {inspect.getsource(function).replace("lambda x:", "f(x) =").replace("lambda x, y:", "f(x, y) =")}'
    )
    for domain in domains_:
        with open(f"data/output/wsc/1_{i+1}__{domain[0]}_{domain[1]}.out", "r") as f:
            logs_with = f.read()
        df = logs_to_df(logs_with)
        df.to_csv(
            f"data/output/csv/with_sin_cos/1_{i+1}__{domain[0]}_{domain[1]}.csv",
        )

        compare(function, domain, df)
        show_fitness_process(df)
        save_best_individual(df)

In [None]:
def read_best_individuals(function_idx, domain_idx):
    FILEPATH = os.path.join(
        "data",
        "output",
        "plots",
        f"{function_idx+1}__{inspect.getsource(functions[function_idx]).split(':')[1].replace('**', 'pow').replace('/', 'div').replace('*', '').replace(' ', '').replace('np.', '').strip()}",
        f"{domains[function_idx][domain_idx][0]}_{domains[function_idx][domain_idx][1]}_best.csv",
    )
    return pd.read_csv(FILEPATH, index_col=0)

In [None]:
def compare_after_simplification(function_idx, domain_idx, size=(8, 8), plot_true=False):
    f, a = plt.subplots(2, 2, figsize=size)
    f.suptitle(
        inspect.getsource(functions[function_idx])
        .replace("lambda x:", "f(x) =")
        .replace(",", "")
        .replace("np.", "")
        + f"\nx ∈ [{domains[function_idx][domain_idx][0]}, {domains[function_idx][domain_idx][1]}]"
    )

    df = read_best_individuals(function_idx, domain_idx)
    best_individual = df["best_individual"].iloc[-1]
    best_individual_simplified = df["best_individual_simplified"].iloc[-1]
    best_individual_without = df["best_individual_without"].iloc[-1]
    best_individual_without_simplified = df["best_individual_without_simplified"].iloc[-1]

    X = np.linspace(*domains[function_idx][domain_idx], int(100 * abs(domains[function_idx][domain_idx][1] - domains[function_idx][domain_idx][0])))

    Y_bi = eval(best_individual)
    Y_bis = eval(best_individual_simplified)
    Y_biw = eval(best_individual_without)
    Y_biws = eval(best_individual_without_simplified)

    a[0][0].plot(X, Y_bi, label="Best Individual", linestyle="-", color="black", linewidth=1)
    a[0][0].legend()
    a[0][0].grid()
    a[0][0].set_title("Best Individual")

    a[0][1].plot(X, Y_bis, label="Best Individual Simplified", linestyle="-", color="red", linewidth=1)
    a[0][1].legend()
    a[0][1].grid()
    a[0][1].set_title("Best Individual Simplified")

    a[1][0].plot(X, Y_biw, label="Best Individual Without", linestyle="-", color="blue", linewidth=1)
    a[1][0].legend()
    a[1][0].grid()
    a[1][0].set_title("Best Individual (Without cos/sin)")

    a[1][1].plot(X, Y_biws, label="Best Individual Without Simplified", linestyle="-", color="green", linewidth=1)
    a[1][1].legend()
    a[1][1].grid()
    a[1][1].set_title("Best Individual Simplified (Without cos/sin)")

    if plot_true:
        Y_true = functions[function_idx](X)
        a[0][0].plot(X, Y_true, label="True", linestyle="-", color="black", linewidth=4, alpha=0.5)
        a[0][1].plot(X, Y_true, label="True", linestyle="-", color="black", linewidth=4, alpha=0.5)
        a[1][0].plot(X, Y_true, label="True", linestyle="-", color="black", linewidth=4, alpha=0.5)
        a[1][1].plot(X, Y_true, label="True", linestyle="-", color="black", linewidth=4, alpha=0.5)

    plt.show()

In [None]:
def compare_after_simplification_2_var(function_idx, domain_idx, size=(8, 8), plot_true=False):
    function = functions[function_idx]
    domain = domains[function_idx][domain_idx]

    X, Y = np.meshgrid(np.linspace(*domain, 100), np.linspace(*domain, 100))
    Z_true = function(X.ravel(), Y.ravel())

    df = read_best_individuals(function_idx, domain_idx)

    best_individual_with = df["best_individual"].iloc[-1]
    best_individual_with_simplified = df["best_individual_simplified"].iloc[-1]
    best_individual_without = df["best_individual_without"].iloc[-1]
    best_individual_without_simplified = df["best_individual_without_simplified"].iloc[-1]

    Z_gp_with = eval(best_individual_with)
    Z_gp_with_simplified = eval(best_individual_with_simplified)
    Z_gp_without = eval(best_individual_without)
    Z_gp_without_simplified = eval(best_individual_without_simplified)


    fig = plt.figure(figsize=size)
    fig.suptitle(
        inspect.getsource(function)
        .replace(",", "")
        .replace("lambda x y:", "f(x, y) =")
        .replace("np.", "")
        + f"\nx ∈ [{domain[0]}, {domain[1]}], y ∈ [{domain[0]}, {domain[1]}]"
    )

    for y, (Z, Z_simplified, label, color) in enumerate(zip([Z_gp_without, Z_gp_with], [Z_gp_without_simplified, Z_gp_with_simplified], ["Without", "With sin/cos"], ["red", "blue"])):
        for x, azim in enumerate([0, 90, 180]):
            ax = fig.add_subplot(2, 3, y * 3 + x + 1, projection='3d')
            if plot_true:
                surf1 = ax.plot_surface(X, Y, Z_true.reshape(X.shape), rstride=1, cstride=1,
                            linewidth=0, antialiased=False, color="gray", alpha=0.4, label="True")

            surf2 = ax.plot_surface(X, Y, Z.reshape(X.shape), rstride=1, cstride=1,
                            linewidth=0, antialiased=False, color=color, alpha=0.6, label=label + " (Original)")
            
            surf3 = ax.plot_surface(X, Y, Z_simplified.reshape(X.shape), rstride=1, cstride=1,
                            linewidth=0, antialiased=False, color="black", alpha=0.2, label=label + " (Simplified)")
            ax.xaxis.set_label_text("X")
            ax.yaxis.set_label_text("Y")
            ax.zaxis.set_label_text("Z")
            ax.view_init(azim=azim)
            if x == 0:
                ax.legend(bbox_to_anchor=(0, 1.02, 1, 0.2), loc="lower left", mode="expand")

    plt.grid()
    plt.show()

### BTC course vs. TinyGP 

In [None]:
btc_df = pd.read_csv("data/other/btc.csv")
btc_df.sort_values(by="Date", inplace=True)
btc_df.reset_index(drop=True, inplace=True)
btc_df.head()

In [None]:
BTC_FILENAME = "btc.dat"
BTC_DIR = "data/other/btc.dat"

with open(BTC_DIR, "a") as f:
    f.write(f"1 100 -10 10 {btc_df.index.size}\n")

btc_df[["Price"]].apply(lambda x: x.str.replace(",", "")).to_csv(
    BTC_DIR, index=True, header=False, sep=" ", mode="a"
)

In [None]:
input_file = BTC_FILENAME
output_file = ".".join(input_file.split(".")[:-1]) + ".out"
print(
    command := f"copy NUL data\other\{output_file} & cd TinyGP & java TinyGP.java ..\data\other\{input_file} > ..\data\other\{output_file} & cd .."
)
subprocess.run(command, shell=True)

In [None]:
with open(f"data/other/{output_file}", "r") as f:
    logs = f.read()

tinygp_btc_df = logs_to_df(logs)

In [None]:
X = np.arange(btc_df.index.start, btc_df.index.stop, btc_df.index.step)
Y_true = btc_df["Price"].str.replace(",", "").astype(float)
Y_gp = eval(tinygp_btc_df["best_individual"].iloc[-1])

plt.figure(figsize=(12, 6))
plt.plot(X, Y_true, label="True", linestyle="-", color="black", linewidth=4, alpha=0.5)
plt.plot(X, Y_gp, label="GP", linestyle="-", color="red", linewidth=1)
plt.ylabel("Price [USD]")
plt.xlabel("Days (since 23/05/2023)")
plt.legend()
plt.title("BTC price")
plt.grid()
plt.show()