In [16]:
import numpy as np
import pandas as pd
import math
from scipy.stats import norm, lognorm
import matplotlib.pyplot as plt
from matplotlib import animation
import random 
from pprint import pprint      
import os
import contextlib 

def brownian(x0, n, sd = None, loc = None, dt = 1, out = None):
    "Computes the brownian motion with an average increase of loc and DAILY SD of sd. \
    dt is how many prices per day"
    if not out:
        out = np.ones(n)
    # For each element of x0, generate a sample of n numbers from a
    # normal distribution.
    sd = sd or 0.16/(256**0.5)#average actualy market vol
    loc = loc or (1.08)**(1/256)
    r = norm.rvs(loc = loc, size=n, scale=sd*math.sqrt(dt))
    # This computes geometrix Brownian motion by forming the cumulative product of samples. 
    return x0*np.cumprod(r, axis=-1, out = out)

In [16]:
#approximating w/ 4 64 day quarters = 1 year
#GRIB!!, account for vol drag when converting daily returns to annual returns
class Market:
    def __init__(self, year_vol = 0.16, year_ret = 0.08, risk_free = 0.03, num_days = 256*30, price0 = 50):
        """Generates 30 yrs worth of prices.
        This assumes no dividends; how to incorporate?"""
        self.year_vol = year_vol #True expected volatility of ?log? returns
        self.year_ret = year_ret 
        self.risk_free = risk_free
        self.drisk_free = math.log(1 + risk_free)/256#risk free as the daily compounding rate
        self.d_vol = year_vol/math.sqrt(256)#daily vol
        
        self.ret_dist = norm(loc = self.drisk_free, scale = self.d_vol)#lognorm(loc = 0, scale = np.exp(self.drisk_free), s = self.d_vol)
        self.prices = np.exp(np.cumsum(self.ret_dist.rvs(num_days)))#brownian(price0, num_days, sd = self.year_vol, loc = self.year_ret)
        self.process = lambda x0, **kwargs: brownian(x0, num_days, *kwargs)

    #def get_prices; should yield arbitary prices and then remember them using self.process
    #add setters if want specifics returns, cor etc.  
        
class Stock:
    def __init__(self, prices = None, price0 = 50, exp_vol = 0.16, 
                 dividend = 0, market_cor = 1, exp_dvol=None, exp_ret = 0.08, 
                 name = None, const = False):
        """
        Takes in yearly vol as expected vol, which is distribution from which values drawn.
        True prices are the prices at the *CLOSE* of the given day. price0 is the value at CLOSE of 1st day
        The Expected and true Vol's may be off if given a series of prices
        """
        if exp_dvol is None:
            self.exp_vol = exp_vol#yearly
            self.exp_dvol = exp_vol/256**0.5# or exp_dvol or market.d_vol#daily expected vol
        else:
            self.exp_dvol = exp_dvol
            self.exp_vol = exp_dvol *256**0.5
        self.drisk_free = math.log(1.03)/256
        self.exp_dret = math.log(1 + exp_ret)/256
        #prices at end of day, assumes started just after dividend paid, 4 dividends/year
        self.dividend = dividend#same every quarter
        self.dividend_every = 64#paid on end of day
        num_days = 30*256
        self.ret_dist = norm(loc = self.exp_dret, scale = self.exp_dvol)#lognorm(loc = 0, scale = np.exp(self.exp_dret), s = self.exp_dvol)
        if prices is None:
            self.price0 = price0
            if const:
                self.true_prices = np.full(num_days, price0)
            else:
                self.true_prices = price0*np.exp(np.cumsum(self.ret_dist.rvs(num_days))) - np.array([[0]*(64 - 1) + [dividend]]*30*4).flatten() 
#             self.true_prices = brownian(price0, n = 30*256, sd = self.exp_dvol, loc = self.exp_dret) \
#                                 - np.array([[0]*(64 - 1) + [dividend]]*30*4).flatten()  #subtract dividend on day paid 
        else:        
            self.price0 = prices[0]
            self.true_prices = prices
#         self.market = market
        self.name = name or "".join([chr(i) for i in np.random.randint(ord('A'), ord('Z'),4)])
        
    def calc_value(self, spot = None, days_elapsed=0, iv = None):
        if spot is not None:
            return spot
        else:
            return self.true_prices[days_elapsed]
        
    def calc_greeks(self, iv = None, days_elapsed = 0, spot = None):
        delta = 1
        gamma = 0
        vega = 0
        theta = 0#don't make money with day's changing, make money with price changing
        rho = 0#unknown
        return {"Delta":delta, "Gamma":gamma, "Vega":vega/100, "Theta":theta, "Rho":rho/10000}
        
class Euro_Option:
    def __init__(self, strike = 100, premium = 0, lifetime = 256, tp = 'call', asset = None):
        """Defines a European option;
         if asset None has constant prices as strike with a default expected vol
         assigned_iv: IV as determined by an external model, not nessisarily consistent with BS projections
         """
        self.strike = strike
        self.premium = premium
        self.lifetime = lifetime
        assert(tp in ('put', 'call'))
        self.tp = tp#type
        self.asset = asset or Stock(prices = np.full(lifetime + 1, strike))
        
    def calc_greeks(self, iv = None, days_elapsed = 0, spot = None, ignore = None):#ignore is for extra, unnessisary
        """Calculates the greeks using the BS method.  Using **DAILY** volatility
        if the implied vol isn't given it takes expected daily for the underlying.
        Computes vol as based on 1pt change, then divides by 100; Rho divide by 10k.
        ignores option premium"""
        days_left = self.lifetime - days_elapsed
        spot = spot or self.asset.true_prices[days_elapsed]
        if days_left == 0:#issue of dividing by 0
            #calculated as if market just opened, ~6 hours before settle
            if self.tp == 'call':
                #delta,gamma are calculated based on whole dolar moves
                #theta should be changed to remaining premium?
                out = (int(spot-1 >= self.strike), int(abs(spot-self.strike) <= 2), 0, -1, 0)
            elif self.tp == 'put':
                out = (-1*int(spot <= self.strike-1), int(abs(spot-self.strike) <= 2), 0, -1, 0)
            return {k:v for k,v in zip(["Delta","Gamma","Vega","Theta", "Rho"], out)}
        if days_left < 0:
            if self.tp == 'call':
                out = (int(spot > self.strike), 0, 0, 0, 0)
            elif self.tp == 'put':
                out = (-1*int(spot < self.strike), 0, 0, 0, 0)
            return {k:v for k,v in zip(["Delta","Gamma","Vega","Theta", "Rho"], out)}

        drisk_free = self.asset.drisk_free
        if not iv:
            iv = self.asset.exp_dvol
        variance = iv**2
        d1 = (1 / (variance*days_left)**0.5) * (math.log(spot/self.strike) + (drisk_free + variance/2)*days_left)
        d2 = d1 - (variance * days_left)**0.5
        pv_k = self.strike * math.exp(-drisk_free*days_left)
        gamma = norm.pdf(d1)/(spot*(iv*days_left)**0.5)#note uses PDF, not CDF as for d1
        vega = spot*norm.pdf(d1)*(days_left)**0.5
        theta = -spot*norm.cdf(d1)*iv/(2*days_left**0.5)
        if self.tp == 'call':
            delta = norm.cdf(d1)
            theta -= drisk_free*pv_k*norm.cdf(d2)#updates theta
            rho = days_left*pv_k*norm.cdf(d2)
        elif self.tp == 'put':
            delta = norm.cdf(d1) - 1
            theta += drisk_free*pv_k*norm.cdf(-d2)
            rho = -days_left*pv_k*norm.cdf(-d2)
        return {"Delta":delta, "Gamma":gamma, "Vega":vega/100, "Theta":theta, "Rho":rho/10000}#scale down the values
    
    def plot_greek(self, iv = None, days_elapsed = 0, greek='Delta', x_axis='Price'):
        "Plots the greek that is given"
        greek = greek.capitalize()
        x_axis = x_axis.capitalize()
        assert(greek in {'Delta', 'Gamma', 'Vega', 'Theta', 'Rho'})
        assert(x_axis in {'Price', 'Time'})
        if x_axis == 'Price':
            mn = self.asset.true_prices.min()
            mx = self.asset.true_prices.min()
            sd = max(self.asset.true_prices.std(), self.strike/15)
            x_vals = np.arange(max(0.1, mn-4*sd), mx+4*sd, 0.10)#can't be 0
            y_vals = np.array([self.calc_greeks(iv = iv,
                                             days_elapsed = days_elapsed, 
                                             spot = s)[greek] 
                            for s in x_vals])
        elif x_axis == 'Time':
            days_left = self.lifetime - days_elapsed
            time_inc = 1#max(1, days_left/50)
            x_vals = np.arange(days_elapsed, self.lifetime, time_inc)#don't include last day
            y_vals= np.array([self.calc_greeks(iv = iv,
                                             days_elapsed = t)[greek]
                            for t in x_vals])
        fig = plt.figure()
        ax1 = fig.add_subplot(1,1,1)
        ax1.plot(x_vals, y_vals)
        ax1.set_xlabel(x_axis)
        ax1.set_ylabel(greek)
        ax1.set_title(f"{self.tp} with Strike = {self.strike}")#comment out this line if want to turn of type
        return fig
        
    def calc_value(self, spot, days_elapsed=0, iv = None):
        "Given an option; with how many days left and at what price, will return option's value"
        days_left = self.lifetime - days_elapsed
        assert(days_left >= 0)
        if days_left == 0:#issue of dividing by 0
            if self.tp == 'call':
                return max(0, spot - self.strike)
            elif self.tp == 'put':
                return max(0, self.strike - spot)
        if not iv:
            iv = self.asset.exp_dvol
        variance = iv**2
        drisk_free = self.asset.drisk_free
        d1 = (1 / (variance*days_left)**0.5) * (math.log(spot/self.strike) + (drisk_free + variance/2)*days_left)
        d2 = d1 - (variance * days_left)**0.5
        pv_k = self.strike * math.exp(-drisk_free*days_left)
        call = norm.cdf(d1)*spot - norm.cdf(d2)*pv_k
        if self.tp == 'call':
            return call 
        elif self.tp == 'put':
            return pv_k - spot + call
        
    def make_payoff(self, position = 'long', t = 0, scale = 10):
        """Makes payoff diagram at time = t when a option expirs at t=lifetime. 
        Will autoscale if scale = auto"""
        if isinstance(scale, str):
                vol = self.asset.exp_vol
                scale = 2*vol*self.asset.true_prices[t]#sets as 2SD for year
        stock_prices = np.arange(self.strike - scale, self.strike + scale, scale/20)
        payoff = [self.calc_value(spot = i, days_elapsed=t) - self.premium for i in stock_prices]
        fig = plt.figure()
        ax1 = fig.add_subplot(1,1,1) 
        if position == 'long':
            ax1.axhline(y=-self.premium, alpha=0.3, color = 'k')
        else:
            payoff = [i*-1 for i in payoff]
            ax1.axhline(y=self.premium, alpha=0.3, color = 'k')
        ax1.plot(stock_prices, payoff)
        ax1.set_title(f"Option Value at Time = {t}, with expiry at T={self.lifetime}")
        ax1.set_xlabel("stock prices")
        ax1.set_ylabel("Value of Option Position")
        #How to return fig object?, Just displays it
        return fig
    
    def plot_total_ret(self, price_idx = None):
        "Given an asset, plot the distribution of it's total returns"
        n = self.lifetime
        p_0 = self.asset.price0
        lognorm_scale = np.exp(self.asset.ret_dist.kwds['loc']*n)
        lognorm_shape = self.asset.ret_dist.kwds['scale']*(n**0.5)#takes SD
        total_ret_dist = lognorm(loc = 0, s = lognorm_shape, scale = lognorm_scale)
        if price_idx is None:
            price_idx =  np.linspace(p_0*max(0,lognorm_scale - 3*lognorm_shape), p_0*(lognorm_scale + 3*lognorm_shape), 1000)
        plt.plot(price_idx, total_ret_dist.pdf(price_idx/p_0))
        plt.title("Probability Distribution over Asset's Value at Expiry")#Density'
    
    @staticmethod
    def rep_option(lifetime = -1, long_short = True, at_maturity = False):
        "creates random options and makes their payoff graphs"
        if lifetime  == -1:
            _lifetime = random.randint(0,256)
        tp = 'call'
        if random.randint(0,100) %2 == 1:
            tp = 'put'
        opt = Euro_Option(strike = np.random.randint(50,150), premium = np.random.randint(5,20), lifetime = _lifetime, tp = tp, asset = None)
        opt.strike = opt.asset.price0 + np.random.randint(-10,10)
        opt.premium = opt.calc_value(opt.asset.price0)
        pos = 'long'
        if long_short and random.randint(0,100) %2 == 1:
            pos = 'short'
        print(opt.asset.price0, opt.premium)
        if at_maturity:
            opt.make_payoff(position = pos, t = _lifetime)
        else:
            opt.make_payoff(position = pos)
        plt.show()
        temp = input("stop?")
        pprint(vars(opt))
        if 'n' in temp.lower():
            Euro_Option.rep_option(lifetime= lifetime, long_short =long_short, at_maturity = at_maturity)
        else:
            return None

        
#all have same underlying
def portfolio_value(instr, weights, kwarg_lst):
    'given a list of instruments, and kwargs for calc value returns sum\
    indicate Long/Short with a pos/neg amount of instrument'
    return sum([c*i.calc_value(**k) for i,c, k in zip(instr, weights, kwarg_lst)])

def portfolio_greeks(instr, weights, kwarg_lst):
    calc_greeks = [i.calc_greeks(**k) for i,k in zip(instr, kwarg_lst)]
    out = {"Delta":0, "Gamma":0, "Vega":0, "Theta":0, "Rho":0}
    for key in out.keys():
        out[key] = sum([c*g[key] for c,g in zip(weights, calc_greeks)])
    return out

def _plot_port(fn, instr, weights, kwarg_lst, x_axis, offset = 0, tangent = False, zero_line = False,
               xlab ="Price of Underlying", ylab = "Value of portfolio if Instantaneous Jump",
              price0 = None):
    """Offset is the value that will be added to each plot.
    """
    assert(min(x_axis) > 0)
    diff = len(instr) - len(kwarg_lst)
    kwarg_lst = kwarg_lst + [{}]*diff
    payoffs = [fn(instr, weights, [dict(k, spot= x) for k in kwarg_lst])
               + offset for x in x_axis]
    plt.plot(x_axis, payoffs)
    plt.xlabel(xlab)
    plt.ylabel(ylab)
    if tangent:
        p0_ix = np.searchsorted(x_axis, price0)#np.where(abs(x_axis - price0) <= step_sz)[0][0]
        ix1, ix2 = max(0, p0_ix -1), min(p0_ix + 1, x_axis.size)
        y1, y2 = payoffs[ix1], payoffs[ix2]
        m = (x_axis[ix2] - x_axis[ix1]) /(y2 - y1)
        b = y1 - m*x_axis[ix1]
        plt.plot(x_axis, m*x_axis + b, '--r')
    if zero_line:
        price0 = x_axis[np.argmin(payoffs)]#plot with minimum portfolio value
        p0_ix = np.searchsorted(x_axis, price0)#np.where(abs(x_axis - price0) <= step_sz)[0][0]
        plt.plot(x_axis, [payoffs[p0_ix]]*x_axis.size, ':g',alpha = 0.3)
        
def plot_port_payoff(instr, weights, kwarg_lst, x_axis, offset = 0,
                     tangent = False, zero_line = False):
    "plots the graph of the change in the payout of the portfolio"
    _plot_port(portfolio_value, instr, weights, kwarg_lst, x_axis, 
               tangent = tangent, zero_line =zero_line, offset = offset)

def plot_port_greek(instr, weights, kwarg_lst, x_axis, greek = 'Delta', price0 = None, tangent = False, zero_line = False):
    fn = lambda instr, weights, kwarg_lst: portfolio_greeks(instr, weights, kwarg_lst)[greek] 
    _plot_port(fn, instr, weights, kwarg_lst, x_axis, tangent = tangent, price0 = price0, ylab = greek)

def no_print(fn):
    def wrapper(*argv, **kwargs):
        "Calls function but blocks print outs"
        no_printing = kwargs['no_printing']
        del kwargs['no_printing']#don't pass to fn
        if no_printing:
            with open(os.devnull, "w") as f, contextlib.redirect_stdout(f):#This line prevents from being printed
                fn(*argv, **kwargs)
        else:
            fn(*argv, **kwargs)
    return wrapper     

if __name__ != '__main__':
    #technically an error since have to put magics at top of cell
    #but this exits calls without raising exceptions. Implementation detail?
    %%script echo skipping    

being_imported = False
try:#file only defined if called from another script
    print(f"Are closing {__file__} after importing classses")
    being_imported = True
except:
    pass
if being_imported:
    #technically an error since have to put magics at top of cell
    #but this exits calls without raising exceptions. Implementation detail?
    %%script echo skipping
#     class StopExecution(Exception):
#         def _render_traceback_(self):
#             pass
#     raise StopExecution
#     import sys
#     sys.exit(0)
print("\n\nRunning things you don't want to if importing \n\n")



Running things you don't want to if importing 




NameError: name '__file__' is not defined

In [None]:
print("cell2")