In [1]:
import numpy as np
import math
import matplotlib as mpl
import matplotlib.pyplot as plt
import pandas as pd
from typing import Any
from tqdm import tqdm
import os

from utils import COLLEGES, heuristic_function

  COLLEGES: list[str] = (data := pd.read_csv(f"data/filtered-data.csv")).groupby("INSTNM").agg("mean", numeric_only=True).reset_index()[["INSTNM", "COSTT4_A","SAT_AVG"]].copy().dropna(axis="rows").sample(100, random_state=0)["INSTNM"].unique()


In [2]:
_backend = mpl.get_backend()

In [3]:
class MarketMotion:
    def __init__(self, initial_price: float, drift: float, volatility: float, timestep: float, duration: float) -> None:
        self._initial_price = initial_price
        self._current_price = self._initial_price

        self._drift = drift
        self._volatility = volatility

        self._timestep = timestep
        self._duration = duration
        self._current_time = 0

        self._prices = np.empty(int(self._duration / self._timestep))

    def step(self) -> "MarketMotion":
        self._step()
        self._current_time += 1

        return self

    def run(self, duration: float | None = None) -> "MarketMotion":
        if duration is None:
            for _ in range(int((self._duration - (self._current_time * self._timestep)) / self._timestep)):
                self.step()

        return self

    def get_time_series(self) -> np.ndarray[np.float64]:
        return self._prices

    def _step(self) -> None:
        raise NotImplementedError


In [4]:
class Payoff:
    def __init__(self, strike_price: float) -> None:
        self._strike = strike_price
        
    def get_payoff(self, stock_price_path: list[float]) -> float:
        raise NotImplementedError

In [5]:
def run_sim(motion, *motion_args, motion_kwargs: dict[str, Any], payoffs: dict[str, Payoff] = {}, risk_free_rate: float = 0.01, number_simulations: int = 5000, plot: bool = True, college: str = "") -> tuple[float, list[float]]:
    plt.ioff()
    mpl.use("AGG")
    if 'output' not in os.listdir():
        os.mkdir("output/")
        
    price_paths = [motion(*motion_args, **motion_kwargs).run().get_time_series()
                   for _ in range(number_simulations)]

    call_payoffs = {
        key: [] for key in list(payoffs.keys())
    }

    for price_path in price_paths:
        for key in list(payoffs.keys()):
            call_payoffs[key].append(payoffs[key].get_payoff(
                price_path) / (1 + risk_free_rate))
    fig = plt.figure()
    for price_path in price_paths:
        plt.plot(price_path)

    # plt.xlabel("Day")
    plt.ylabel("Price")

    v1 = f"Average Value: {(avg_value := np.array(price_paths)[:, -1].mean())}"
    v2 = f"Max Value: {np.array(price_paths)[:, -1].max()}"
    v3 = f"Min Value: {np.array(price_paths)[:, -1].min()}"
    
    if plot:
        print(v1)
        print(v2)
        print(v3)

    title = f"{college}\n{v1}\n{v2}\n{v3}\n"

    payoff_results = []
    for key in list(payoffs.keys()):
        _price = np.array(call_payoffs[key]).mean()

        if plot:
            print(f"{key}: {_price}")
            
        title += f"{key}: {_price}\n"
        payoff_results.append(_price)


    plt.title(title)

    fig.tight_layout()
    fig.savefig(f"output/{college}.png")
    if plot:
        # Plot the set of generated sample paths
        plt.show()
    
    mpl.use(_backend)
    return avg_value, payoff_results


# ACTUAL MOTIONS AND PAYOFFS

In [6]:
#initial price is determined by heuristic function

class CollegeEducationMotion(MarketMotion):
    #can be played around with
    DRIFT = .03
    VOLATILITY = .01
    

    def __init__(self, initial_price: float) -> None:
        super().__init__(
            initial_price, 
            CollegeEducationMotion.DRIFT, 
            CollegeEducationMotion.VOLATILITY, 
            1, #month
            4 * 12 #12 months for 4 years
        )

    def _step(self) -> None:
        dWt = np.random.normal(0, math.sqrt(self._timestep))  # Brownian motion, change this to change the motion


        dYt = (self._drift * self._timestep) + \
            (self._volatility * dWt)  # Change in price

        self._current_price *= (1 + dYt)  # Add the change to the current price
        # Append new price to series
        self._prices[self._current_time] = self._current_price

In [7]:
class Graduation_Payoff(Payoff):
    def __init__(self, strike_price: float) -> None:
        super().__init__(strike_price)

    def get_payoff(self, stock_price_path: list[float]) -> float:
        stock_price = stock_price_path[-1]

        if stock_price > self._strike:
            return stock_price - self._strike
        else:
            return 0

# Run the simulations for 100 colleges

In [8]:
results = []
for college in (pbar := tqdm(COLLEGES)):
    pbar.set_description(f"Running simulation for: {college}")
    intial_education_value, desired_education_value = heuristic_function(college)
    avg_price, payoffs = run_sim(
        CollegeEducationMotion,
        motion_kwargs={
            "initial_price" : intial_education_value 
        },
        payoffs={
            "Education Payoff": Graduation_Payoff(strike_price=desired_education_value)
        },
        college=college,
        plot=False
    )
    results.append((college, np.mean(payoffs), avg_price))

Running simulation for: McNeese State University: 100%|██████████| 100/100 [06:31<00:00,  3.91s/it]                          


In [9]:
sorted_avg = sorted(results, key=lambda r: r[2], reverse=True)
sorted_payoff = sorted(results, key=lambda r: r[1], reverse=True)

print(f"Best College By Average Simulation End Education Value: {sorted_avg[0][0]}")
print(f"Best College By Payoff End Education Value: {sorted_payoff[0][0]}")

Best College By Average Simulation End Education Value: Yale University
Best College By Payoff End Education Value: Tufts University
