Python for Finance --- Final Exam
----

**MSc in Mathematics and Finance, Imperial College London**

Autumn Term 2022-2023

Thursday 15 December 2022
***


## GENERAL INSTRUCTIONS


- For each question, you are asked to create a function with specific inputs and outputs.

- You should copy / paste all your functions, one after the others, in a single file named `CID.py`

- You may only use the libraries below

- Grading details:
    + Clarity of the code (name of temporary variables, comments)
    + Efficiency of the code (speed)
    
- At the end of the examination, you should update your CID.py file into the Shared Drive folder
xxxx


---

In [1]:
import platform
print("Current Python Version",platform.python_version())
if platform.python_version()<"3.10":
    print("ERROR: you are using a Python version lower than 3.10")

Current Python Version 3.10.13


### Allowed libraries ONLY

In [2]:
import numpy as np
from abc import ABC,abstractmethod
from scipy.stats import norm
import time as time
import pandas as pd

# PROBLEM I : OOP and Greeks
---

Consider the following two dynamic models, for $t\geq 0$,:

\begin{align} S_t &=S_0\exp\left\{-\frac{1}{2}\sigma^2 t +\sigma W_t\right\},\quad &\text{(Model 1: Black-Scholes)} \\
S_t &=S_0 + \sigma W_t,\quad &\text{(Model 2: Bachelier)}\end{align}

where $(W_t)_{t\geq 0}$ is a standard Brownian motion. We further assume that interest rates are null.

For a given strike $K>0$ and a fixed maturity $T>0$, 
the price of a European Call in each model is given by:

**(Model 1: Black-Scholes)**
\begin{align}
  C^{BS}(S_0, \sigma,K,T) &= \Phi(d_1)S_0 - \Phi(d_2)K, \\
     d_1 &= \frac{1}{\sigma\sqrt{T}}\left[\ln\left(\frac{S_0}{K}\right) + \frac{\sigma^2 T}{2}\right], \\
     d_2 &= d_1 - \sigma\sqrt{T}.
\end{align}

**(Model 2: Bachelier)**
\begin{align}
  C^{Bachelier}(S_0, \sigma,K,T) &= (S_0 - K) \Phi(b_1)+\sigma\sqrt{T} \phi(b_1), \\
     b_1 &= \frac{S_0-K}{\sigma\sqrt{T}},
\end{align}

where $\Phi(\cdot)$ is the Gaussian cdf implemented in `scipy.stats.norm.cdf` and  $\phi(\cdot)$ is the Gaussian pdf implemented in `scipy.stats.norm.pdf`

We will implement a GreekEngine base class that computes the Greeks of the prices of European Call options using closed-form expressions and finite differences. 
The base class will derive in two subclassess 1) Black-Scholes and 2) Bachelier.

## Question 1:

Implement a `price_european_call_option(self,K:float,T:float)` method for each model following the equations described above for $C(S_0,K)$ and the class template given below

Input:
- T: float time to maturity
- K: float strike


Output:
- float european Call option price


In [3]:
class GreekEngine(ABC):
    def __init__(self, S0,sigma):
        self.S0=S0
        self.sigma=sigma
    
    def delta_european_call_option(self, K: float, T: float, epsilon: float) -> float:
        '''
        #Inputs:
        K: strike
        T: time to maturity
        epsilon: finite difference bump parameter
        #Outputs:
        finite difference delta value
        '''
        # calculate call_price for 𝐶(𝑆0 + epsilon, sigma)
        original_S0=self.S0
        self.S0 = original_S0 + epsilon
        epsilon_plus_price = self.price_european_call_option(K, T)

        # calculate call_price for 𝐶(𝑆0, sigma)
        self.S0 = original_S0
        no_epsilon_price = self.price_european_call_option(K, T)

        delta = (epsilon_plus_price - no_epsilon_price) / epsilon

        return delta
    
    def vega_european_call_option(self,K:float,T:float,epsilon:float)->float:
        '''
        #Inputs:
        K: strike
        T: time to maturity
        epsilon: finite difference bump parameter
        #Outputs:
        finite difference vega value 
        '''
        # calculate call_price for 𝐶(𝑆0, sigma+epsilon)
        sigma_original=self.sigma
        self.sigma = sigma_original + epsilon
        epsilon_plus_price = self.price_european_call_option(K, T)
        
        # calculate call_price for 𝐶(𝑆0, sigma-epsilon)
        self.sigma = sigma_original - epsilon
        epsilon_minus_price = self.price_european_call_option(K, T)
        
        vega = (epsilon_plus_price - epsilon_minus_price) / (2*epsilon)

        self.sigma=sigma_original
        
        return vega
    
        
    @abstractmethod
    def price_european_call_option(self,K:float,T:float,epsilon:float)->float:
        '''
        #Inputs:
        T: time to maturity
        K: strike
        #Outputs:
        european option price
        '''
        
        pass
    
class Black_Scholes(GreekEngine): 
    def __init__(self, S0,sigma):
        super().__init__(S0,sigma)
        
    def price_european_call_option(self,K:float,T:float)->float:
        """
        # Inputs:
        K: Stike price
        T: time to maturity
        #Output: 
        float european call price calculated by Black_Scholes
        """
        # calculate d1 and d2
        d_1 = 1 / self.sigma * np.sqrt(T)  * (np.log(self.S0 / K) + self.sigma**2 * T / 2)
        d_2 = d_1 - self.sigma * np.sqrt(T)
        
        # calculate call price 
        BS_call = norm.cdf(d_1) * self.S0 - norm.cdf(d_2) * K
        
        return BS_call
    

class Bachelier(GreekEngine):
    def __init__(self, S0,sigma):
        super().__init__(S0,sigma)
        
    def price_european_call_option(self,K:float,T:float)->float:
        """
        # Inputs:
        K: Stike price
        T: time to maturity
        # Outputs:
        float european call price calculated by Bachelier
        """
        # calculate b1
        b1 = (self.S0 - K) / (self.sigma * np.sqrt(T))
        
        # calculate call price
        Ba_call = (self.S0 - K) * norm.cdf(b1) + self.sigma * np.sqrt(T) * norm.pdf(b1)
        
        return Ba_call
    

#### You may also use the following script to verify your results:

In [4]:
def test_function_problem1_Bachelier(S0:float,sigma:float,T:float,K:float):
    engine=Bachelier(S0,sigma)
    return engine.price_european_call_option(K,T)

In [5]:
def test_function_problem1_Black_Scholes(S0:float,sigma:float,T:float,K:float):
    engine=Black_Scholes(S0,sigma)
    return engine.price_european_call_option(K,T)

`test_function_problem1_Bachelier(1,0.2,1,1)` should return `0.07978845608028655`

`test_function_problem1_Black_Scholes(1,0.2,1,1)` should return `0.07965567455405798`

In [6]:
test_function_problem1_Bachelier(1,0.2,1,1)

0.07978845608028655

In [7]:
test_function_problem1_Black_Scholes(1,0.2,1,1)

0.07965567455405798

## Question 2:

Write a `delta_european_call_option(self,K:float,T:float,epsilon:float)` base class method with the folowing specifications:

Input:
- K: float strike
- T: float time to maturity
- epsilon: float finite difference parameter

Output:
- Finite-difference delta computed using the following formula:

$$
\Delta(S_0,\sigma)=\frac{C(S_0+\epsilon,\sigma)-C(S_0,\sigma)}{\epsilon}, \quad \text{for }\epsilon>0.
$$


**Warning:** Be aware that $S_0$ and $\sigma$ are base class attributes. You will need to modify their values to be able to compute the "bumped" prices. Remember before exiting the function to set them to their initial value.

#### You may also use the following script to check your results:

In [8]:
def test_function_problem2_Bachelier(S0:float,sigma:float,T:float,K:float,epsilon:float):
    engine=Bachelier(S0,sigma)
    return engine.delta_european_call_option(K,T,epsilon)

In [9]:
def test_function_problem2_Black_Scholes(S0:float,sigma:float,T:float,K:float,epsilon:float):

    engine=Black_Scholes(S0,sigma)
    return engine.delta_european_call_option(K,T,epsilon)

`test_function_problem2_Bachelier(1,0.2,1,1,0.1)` should return `0.5977085539997474`

`test_function_problem2_Black_Scholes(1,0.2,1,1,0.1)` should return `0.6326443486004096`

In [10]:
test_function_problem2_Bachelier(1,0.2,1,1,0.1)

0.5977085539997474

In [11]:
test_function_problem2_Black_Scholes(1,0.2,1,1,0.1)

0.6326443486004096

## Question 3:


Write a `vega_european_call_option(self,K:float,T:float,epsilon:float)` base class method with the folowing specifications:

Input:
- K: float strike
- T: float time to maturity
- epsilon: float finite difference parameter

Output:
- Central finite-difference vega computed using the following formula:

$$\Delta(S_0,\sigma)=\frac{C(S_0,\sigma+\epsilon)-C(S_0,\sigma-\epsilon)}{2\epsilon}, \quad \text{for }\epsilon>0$$


**Warning:** Be aware that $S_0$ and $\sigma$ are base class attributes. You will need to modify their values to be able to compute the "bumped" prices. Remember before exiting the function to set them to their initial value.

#### You may also use the following script to check your results:

In [12]:
def test_function_problem3_Bachelier(S0:float,sigma:float,T:float,K:float,epsilon:float):
    engine=Bachelier(S0,sigma)
    return engine.vega_european_call_option(K,T,epsilon)

In [13]:
def test_function_problem3_Black_Scholes(S0:float,sigma:float,T:float,K:float,epsilon:float):
    engine=Black_Scholes(S0,sigma)
    return engine.vega_european_call_option(K,T,epsilon)

`test_function_problem3_Bachelier(1,0.2,1,1,0.1)` should return `0.3989422804014328`

`test_function_problem3_Black_Scholes(1,0.2,1,1,0.1)` should return `0.39678886531870017`

In [14]:
test_function_problem3_Bachelier(1,0.2,1,1,0.1)

0.3989422804014328

In [15]:
test_function_problem3_Black_Scholes(1,0.2,1,1,0.1)

0.39678886531870017

---
---

# PROBLEM II: Vectorisation
---

We wish to compute the variance $\mathbb{V}\left[\Phi\right]$, where 
$$
\Phi := \int_{0}^{T}f(W_t) \mathrm{d}t,
$$
where $W$ is a standard Brownian motion starting from $W_0=0$.

Consider an equidistant time grid $\{t_i = \frac{i}{m}\}_{i=0,\ldots, m}$ and denote $\delta:=\frac{1}{m}$, for some strictly positive integer $m$.

We can then write, for any $i=0, \ldots, m$ (with the convention that empty sums are null),
$$
W_{t_i} = \sum_{k=0}^{i-1} (W_{t_{k+1}} - W_{t_{k}})
= \sum_{k=0}^{i-1} \widetilde{n}_k \sqrt{t_{k+1} - t_{k}}
= \sqrt{\delta}\sum_{k=0}^{i-1} \widetilde{n}_k,
$$
in distribution, where $\left\{\widetilde{n}_k\right\}_{k=0,\ldots, m-2}$ forms an iid sequence of centered Gaussian random variables with unit variance.
We therefore obtain
$$
\int_{0}^{T}f(W_t)\mathrm{d}t
 = \sum_{i=1}^{m-1}f(W_{t_i})(t_{i+1} - t_i)
 = \sum_{i=1}^{m-1}f\left(\sqrt{\delta}\sum_{k=0}^{i-1} \widetilde{n}_k\right)(t_{i+1} - t_i)
 = \delta\sum_{i=1}^{m-1}f\left(\sqrt{\delta}\sum_{k=0}^{i-1} \widetilde{n}_k\right).
 $$
 
In order to compute the variance $\mathbb{V}[\Phi]$, we now generate $n$ samples 
$(\Phi^{(1)}, \ldots, \Phi^{(n)})$ of $\Phi$ and build the unbiased variance estimator
$$
\widehat{\sigma}_{n}^2 := 
\frac{1}{n-1}\sum_{j=1}^{n}
\left(\Phi^{(j)} - \frac{1}{n}\sum_{l=1}^{n}\Phi^{(l)}\right)^2.
$$

## Question: 


- Write two pure `python` (i.e. not with `cython`) functions that compute this estimator.
    + `variance_intBM_loop(nsteps, nsimul, gaussian_vector)` using for loop structure e.g.\
      `for n in range(nsimul):`
      
     `      for j in range(n_steps-1):`

  to compute $\phi^n$
              
    + `variance_intBM(nsteps, nsimul, gaussian_vector)` avoiding for loops \
  **Optional Hint:** Constructing the f(W_t) matrix for each nsimul and nstep might be helpful (without using a for loop of course). Then you can apply vectorized operations on it 
    

Notes:
- You may want to use `np.var` (https://numpy.org/doc/stable/reference/generated/numpy.std.html) to compute the *unbiased* variance; *watch out for degrees of freedom.....*
- the function `variance_intBM_loop` should have loops
- loops are not allowed in the function `variance_intBM`
- The result of both functions should match exactly (up to some tolerance with rounding)

## Note: For the vectorized version there are a few ways to do it, we shall present a couple

In [16]:
def variance_intBM_loop(nsteps:int, nsimul:int, gaussian_vector:np.array, delta:float, f)->float:
    """
        #Inputs:
        nsteps (int): number of time steps (m)
        nsimul (int): number of simulations (n)
        gaussian_vector (np.array): Gaussian random samples of size (nsteps-1, nsimul)
        delta (float): time increment
        f: function f()
        #Outputs:
        variance estimator (float)
    """
    function_variable = f(np.sqrt(delta) * np.cumsum(gaussian_vector[:-1], axis = 0)) 
    #function_variable represents a nxm matrix with element j,i representing f^j(W_{t_i})
    phi = np.zeros(nsimul)
    for n in range(nsimul):
        wt=0;
        for j in range(nsteps-1):
            f_wt=f(wt)
            #print(n,j,f_wt)
            phi[n]+=delta*f_wt
            wt+=gaussian_vector[j,n]*np.sqrt(delta)
    variance1 = 1.0 / (nsimul - 1) * np.sum(np.power(phi - 1.0 / nsimul * np.sum(phi),2))
    
    return variance1

def variance_intBM(nsteps:int, nsimul:int, gaussian_vector:np.array, delta:float, f)->float:
    """
        #Inputs:
        nsteps (int): number of time steps (m)
        nsimul (int): number of simulations (n)
        gaussian_vector (np.array): Gaussian random samples of size (nsteps-1, nsimul)
        delta (float): time increment
        f: function f()
        #Outputs:
        variance estimator (float)
    """
    f_wt=np.zeros((nsteps-1, nsimul)) #alocate f_wt matrix 
    f_wt[0,:]= f(0) #start at zero as w_0=0
    f_wt[1:,:] = f(np.sqrt(delta) * np.cumsum(gaussian_vector[:-1,:], axis = 0)) #cumulative sum over rows (axis=0) since nsimul is the column axis
    phi = delta * np.sum(f_wt,axis=0) # sum over columns (axis=0) since nsimul is the column axis
    variance2 =1.0 / (nsimul - 1) * np.sum(np.power(phi - 1 / nsimul * np.sum(phi),2))
    
    return 1.0 / (nsimul - 1) * np.sum(np.power(phi - 1.0 / nsimul * np.sum(phi),2))

def variance_intBM_compact(nsteps:int, nsimul:int, gaussian_vector:np.array, delta:float, f)->float:
    """
        #Inputs:
        nsteps (int): number of time steps (m)
        nsimul (int): number of simulations (n)
        gaussian_vector (np.array): Gaussian random samples of size (nsteps-1, nsimul)
        delta (float): time increment
        f: function f()
        #Outputs:
        variance estimator (float)
    """
    phi = delta * (f(0)+np.sum(f(np.sqrt(delta) * np.cumsum(gaussian_vector[:-1,:], axis = 0)),axis=0)) # sum over columns (axis=0) since nsimul is the column axis
       
    return np.var(phi,ddof=1)



### Example

As a consistency check, you may want to check numerically the formula
$$
\mathbb{V}\left[\int_{0}^{T} (W_t)^2 \mathrm{d}t\right] = \frac{T^3}{3}.
$$

In [17]:
nsteps, nsimul = 1000, 10000
np.random.seed(0)
gaussian_vector = np.random.normal(0., 1., (nsteps-1, nsimul))
T = 1.
delta = T / nsteps
f = lambda x: x**2

In [18]:
t0 = time.time()
myvar_loop = variance_intBM_loop(nsteps, nsimul, gaussian_vector, delta, f)
dt_loop = time.time() - t0

t0 = time.time()
myvar_vec = variance_intBM(nsteps, nsimul, gaussian_vector, delta, f)
dt_vec = time.time() - t0

t0 = time.time()
myvar_vec_compact = variance_intBM_compact(nsteps, nsimul, gaussian_vector, delta, f)
dt_vec_compact = time.time() - t0

print("Variance: ", np.round(myvar_loop, 8), " --- Total computation time: ", np.round(dt_loop, 2), " seconds")
print("Variance: ", np.round(myvar_vec, 8), " --- Total computation time: ", np.round(dt_vec, 2), " seconds")
print("Variance: ", np.round(myvar_vec_compact, 8), " --- Total computation time: ", np.round(dt_vec_compact, 2), " seconds")

Variance:  0.3448967  --- Total computation time:  9.74  seconds
Variance:  0.3448967  --- Total computation time:  0.11  seconds
Variance:  0.3448967  --- Total computation time:  0.07  seconds


*Note: with (nsteps, nsimul) = (1000, 10000), you should expect a speed ratio of magnitude 20~30 at least.*

---
---

# PROBLEM III: Pandas and option data
---

The file `AAPL_options.csv`contains financial data for the option chains in AAPL.

The columns of the csv file are defined below:

- contractSymbol: unique indetifier of the option contract

- optionType: call or put

- expiration: contract expiration date in YYYY-mm-dd format

- lastTradeDate:contract last traded date in  YYYY-mm-dd format

- strike: strike price of the options contract

- lastPrice: last quoted price of the option contract

- bid: last bid price of the option contract

- ask:  last ask price of the option contract

- change: price change since last update

- percentChange: percentage price change since last update

- volume: traded volume

- openInterest: open interest

- impliedVolatility: option implied vol

- inTheMoney: True for option in the Money, False otherwise

- contractSize: always REGULAR

- currency: contract currency (always USD)

you can read the file to pandas using the code below

In [19]:
df=pd.read_csv("AAPL_options.csv")
df.head()

Unnamed: 0,contractSymbol,optionType,expiration,lastTradeDate,strike,lastPrice,bid,ask,change,percentChange,volume,openInterest,impliedVolatility,inTheMoney,contractSize,currency
0,AAPL221216C00030000,call,2022-12-16,2022-11-29 16:39:22,30.0,112.0,111.85,112.55,0.0,0.0,1.0,2,4.500004,True,REGULAR,USD
1,AAPL221216C00035000,call,2022-12-16,2022-11-11 15:15:22,35.0,112.3,106.85,107.6,0.0,0.0,4.0,22,4.29688,True,REGULAR,USD
2,AAPL221216C00040000,call,2022-12-16,2022-12-08 16:00:14,40.0,103.05,101.9,102.55,0.0,0.0,3.0,410,3.890625,True,REGULAR,USD
3,AAPL221216C00045000,call,2022-12-16,2022-12-09 15:52:43,45.0,98.24,96.9,97.55,1.239998,1.278348,4.0,484,3.546876,True,REGULAR,USD
4,AAPL221216C00050000,call,2022-12-16,2022-11-18 20:46:59,50.0,101.73,91.9,92.55,0.0,0.0,2.0,331,3.234377,True,REGULAR,USD


## Question 1:

Implement a function `find_number_options_with_volume_threshold(df:pd.DataFrame,expiration:str,min_vol_threshold:int)->int` with the following specifications:

Inputs:
- df:pd.DataFrame input dataframe
- expiration:str expiration date
- min_vol_threshold:int minimum volume

Outputs:
- Number of options (both Calls and Puts) available with expiration equal to the input `expiration` and volume larger than or equal `min_vol_threshold`



In [20]:
def find_number_options_with_volume_threshold(df:pd.DataFrame,expiration:str,min_vol_threshold:int)->int:
    """
    following specification
    Input:
    - df:pd.DataFrame input dataframe
    - expiration:str expiration date
    - min_vol_threshold:int minimum volume

    Output:
    - Number of options (both Calls and Puts) available with expiration equal to the input `expiration` and volume larger than or equal `min_vol_threshold`

    """
    return len(df.loc[(df.expiration==expiration) & (df.volume>=min_vol_threshold)])

### You may also use the following example to verify your results:

`find_number_options_with_volume_threshold(df,"2022-12-16",0)` should return `142`

`find_number_options_with_volume_threshold(df,"2025-01-17",100)` should return `6`

In [21]:
find_number_options_with_volume_threshold(df,"2022-12-16",0)

142

In [22]:
find_number_options_with_volume_threshold(df,"2025-01-17",100)

6

## Question 2:

Write a function `find_mean_volume_per_expiration_per_option_type(df:pd.DataFrame,min_expiration:str)->pd.DataFrame` with the following specifications:

Inputs:
 - df:pd.DataFrame input dataframe
 - min_expiration:str minimum expiration date
 
Outputs:

- pandas dataframe with columns ["expiration","optionType","averageVolume"] for expirations larger than or equal `min_expiration`
- expiration: is the expiration in YYYY-mm-dd format
- optionType: is call or put
- averageVolume: is the average traded volume per expiration and option type e.g. for each expiration we want to compute the average volume for Calls and Puts separately

**Note** The row order of the output is not relevant, but please make sure that the format is correct

In [23]:
def find_mean_volume_per_expiration_per_option_type(df:pd.DataFrame,min_expiration:str)->pd.DataFrame:
    """
    Input:
         - df:pd.DataFrame input dataframe
         - min_expiration:str minimum expiration date
 
    Output:

        - pandas dataframe with columns ["expiration","optionType","averageVolume"] for expirations larger than or equal `min_expiration`
        - expiration: is the expiration in YYYY-mm-dd format
        - optionType: is all or Put
        - averageVolume: is the average traded volume per expiration and option type e.g. for each expiration we want to compute the average volume for Calls and Puts separately
    """
    return df.loc[df.expiration>=min_expiration,["expiration","optionType","volume"]].groupby(["expiration","optionType"]).mean().reset_index().rename(columns = {'expiration':'expiration', 'optionType':'optionType','volume':'averageVolume'})
    

### You may also use the following example to verify your results:

`find_mean_volume_per_expiration_per_option_type(df,"2025-01-17")` should return 

|    | expiration   | optionType   |   averageVolume |
|---:|:-------------|:-------------|----------------:|
|  0 | 2025-01-17   | call         |         34.3333 |
|  1 | 2025-01-17   | put          |         34.3333 |

`find_mean_volume_per_expiration_per_option_type(df,"2024-06-21")` should return 

|    | expiration   | optionType   |   averageVolume |
|---:|:-------------|:-------------|----------------:|
|  0 | 2024-06-21   | call         |         27.2308 |
|  1 | 2024-06-21   | put          |         27.2308 |
|  2 | 2025-01-17   | call         |         34.3333 |
|  3 | 2025-01-17   | put          |         34.3333 |

In [24]:
find_mean_volume_per_expiration_per_option_type(df,"2025-01-17")

Unnamed: 0,expiration,optionType,averageVolume
0,2025-01-17,call,34.333333
1,2025-01-17,put,34.333333


In [25]:
find_mean_volume_per_expiration_per_option_type(df,"2024-06-21")

Unnamed: 0,expiration,optionType,averageVolume
0,2024-06-21,call,27.230769
1,2024-06-21,put,27.230769
2,2025-01-17,call,34.333333
3,2025-01-17,put,34.333333
