# Chapter 5

In this chapter we will implement a statistics gatherer to have some statistics of our simulation, using the strategy pattern.

### Statistics Gatherer

In [1]:
import numpy as np
from copy import deepcopy

In [219]:
class StatisticsMC:
    def __init__(self):
        pass
    
    def GetResultsSoFar(self):
        pass
    
    def DumpOneResult(self,result):
        pass        
  
    def clone(self):
        pass
    
    def __del__(self):
        pass   

In [537]:
class StatisticsMean(StatisticsMC):
    def __init__(self):
        self.__RunningSum = 0
        self.__PathsDone = 0
        
    def GetResultsSoFar(self): 
        Results = [[0]]
        Results[0][0] = self.__RunningSum / self.__PathsDone
        return Results
    
    def DumpOneResult(self,result):
        self.__PathsDone += 1
        self.__RunningSum += result        
        
    def clone(self):
        return deepcopy(self)
    
    def __del__(self):
        del self

In [472]:
mean = StatisticsMean()
mean.DumpOneResult(2)
mean.DumpOneResult(6)
mean.GetResultsSoFar()

[[4.0]]

Now we will put everything together.

In [140]:
from random import random
from math import exp, sqrt, log

In [141]:
class PayOff():
    def __init__(self):
        pass
    
    def __call__(self,Spot):
        pass    

class PayOffBridge():        
    def __del__(self):
        del self
        
    def copy(self,InnerPayOff):
        from copy import deepcopy
        self = deepcopy(InnerPayOff)
        
class PayOffCall(PayOff,PayOffBridge):
    def __init__(self,Strike_):
        self.__Strike = Strike_
        
    def __call__(self,Spot):
        return max(Spot - self.__Strike,0)
    
class VanillaOption:
    def __init__(self, ThePayOff_, Expiry_):
        self.__ThePayOff = ThePayOff_
        self.__Expiry = Expiry_
        
    def GetExpiry(self):
        return self.__Expiry
    
    def OptionPayOff(self, Spot):
        return self.__ThePayOff(Spot)

In [142]:
class ParametersInner:
    def __init__(self):
        pass
    
    def Integral(self,time1, time2):
        pass
    
    def IntegralSquare(self,time1,time2):
        pass

class Parameters:    
    def RootMeanSquare(self,time1,time2):
        total = self.Integral(time1,time2)
        return total/(time2-time1)
    
    def Mean(self,time1, time2):
        total = self.Integral(time1,time2)
        return total/(time2-time1)
    
    def copy(self,original):
        from copy import deepcopy
        self = deepcopy(InnerPayOff) 
        
    def __del__(self):
        del self    
        
class ParametersConstant(ParametersInner,Parameters):
    def __init__(self,constant):
        self.__Constant = constant
        self.__ConstantSquare = self.__Constant*self.__Constant
    
    def Integral(self, time1, time2):
        return (time2 - time1)*self.__Constant
    
    def IntegralSquare(self, time1, time2):
        return (time2 - time1)*self.__ConstantSquare   


In [143]:
def SimpleMonteCarlo5(TheOption,
                     Spot,
                     Vol,
                     r,
                     NumberOfPaths,
                     gatherer):
    
    Expiry = TheOption.GetExpiry()    
    variance = Vol.IntegralSquare(0,Expiry)
    rootVariance = sqrt(variance)
    itoCorrection = -0.5*variance
    
    movedSpot = Spot*exp(r.Integral(0,Expiry) + itoCorrection)
    discounting = exp(-r.Integral(0,Expiry))
    for i in range(NumberOfPaths):
        thisGaussian = GetOneGaussianByBoxMuller()
        thisSpot = movedSpot*exp(rootVariance*thisGaussian)
        thisPayoff = TheOption.OptionPayOff(thisSpot)
        gatherer.DumpOneResult(thisPayoff*discounting)
    
      

In [144]:
def GetOneGaussianBySummation():
    result = 0
    for j in range(12):
        result += random()
    result -= 6
    return result

def GetOneGaussianByBoxMuller():
    sizeSquared = 1
    while sizeSquared >= 1:
        x = 2*random() - 1
        y = 2*random() - 1
        sizeSquared = x*x + y*y
    
    result = x*sqrt(-2*log(sizeSquared)/sizeSquared)
    return result

In [434]:
gatherer = StatisticsMean()
callPayOff = PayOffCall(40)
TheOption = VanillaOption(callPayOff,0.5)
r = ParametersConstant(0.1)
Vol = ParametersConstant(0.2)
SimpleMonteCarlo5(TheOption,42,Vol,r,10000, gatherer)
results = gatherer.GetResultsSoFar()
print(f'Call price for 10000 paths: ')
for i in range(len(results)):
    for j in range(len(results[i])):
        print(results[i][j])

Call price for 10000 paths: 
4.718866369765591


### Wrapper class

Since there are no pointers in Python (at least in the STD library) this section will not be covered. If we want to add memory handling we can just create an auxiliary class and make all the classes to do multiple inheretance between the parent class and this class. 

In [208]:
class Wrapper:
    def __init__(self,inner):
        self = inner
    
    def __del__(self):
        del self
        
    # The rest of the methods are related to handling pointers

### A convergence table

In order to get a better feeling of how the simulation is goint we might want to have extra statistics for every X or exponential, steps of the simulation, giving us the mean, standard error ... To do so we will store this information in a convergence table. 

We will use this as a gatherer for different statistics gatherers, having the same structure and inhereting from the same base class.

In [438]:
class StatisticsMean(StatisticsMC):
    def __init__(self):
        self.__RunningSum = 0
        self.__PathsDone = 0
        
    def GetResultsSoFar(self): 
        Results = [[0]]
        Results[0][0] = self.__RunningSum / self.__PathsDone
        return Results
    
    def DumpOneResult(self,result):
        self.__PathsDone += 1
        self.__RunningSum += result        
        
    def clone(self):
        return deepcopy(self)
    
    def __del__(self):
        del self

In [441]:
class ConvergenceTable(StatisticsMC):
    def __init__(self, Inner_):
        self.__Inner = Inner_
        self.__ResultsSoFar = []
        self.__StoppingPoint = 2
        self.__PathsDone = 0
    
    def clone(self):
        return deepcopy(self)
    
    def DumpOneResult(self,result):
        self.__Inner.DumpOneResult(result)
        self.__PathsDone += 1
        
        
        if self.__PathsDone == self.__StoppingPoint:
            self.__StoppingPoint *=2
            thisResult = self.__Inner.GetResultsSoFar()            
            
            for i in range(len(thisResult)):
                thisResult[i].append(self.__PathsDone)
                self.__ResultsSoFar.append(thisResult[i]) 

            
    
    def GetResultsSoFar(self):
        
        tmp = self.__ResultsSoFar 
        
        if self.__PathsDone*2 != self.__StoppingPoint:
            thisResult = self.__Inner.GetResultsSoFar()
            
            for i in range(len(thisResult)):
                thisResult[i].append(self.__PathsDone)
                tmp.append(thisResult[i]) 
                
        return tmp
                

In [442]:
gatherer = StatisticsMean()
gathererTwo = ConvergenceTable(gatherer)
callPayOff = PayOffCall(40)
TheOption = VanillaOption(callPayOff,0.5)
r = ParametersConstant(0.1)
Vol = ParametersConstant(0.2)
SimpleMonteCarlo5(TheOption,42,Vol,r,10000, gathererTwo)
results = gathererTwo.GetResultsSoFar()
print(f'Call price for 10000 paths: ')
for i in range(len(results)):
    for j in range(len(results[i])):
        print(results[i][j])

Call price for 10000 paths: 
2.56339162090301
2
3.348915973986924
4
4.52618795560467
8
4.034905977762788
16
5.736413426394033
32
4.792770135897354
64
4.83499010267414
128
4.58415639172651
256
4.746278678990468
512
4.713812134924003
1024
4.7016937600940025
2048
4.741836429346995
4096
4.756945908631261
8192
4.741576473389367
10000


### Exercises 

**Exercise 5.1** Write a statistics gathering class that computes the first four moments of a sample.

The first four moments of a sample are the mean, variance, skewness and kurtosis. The algorithms for the calculation of these moments come from: https://en.wikipedia.org/wiki/Algorithms%5Ffor%5Fcalculating%5Fvariance#Higher-order_statistics .

In [514]:
class StatisticsFirstFourMoments(StatisticsMC):
    def __init__(self):
        self.__RunningSum = 0
        self.__RunningSumSquared = 0
        self.__PathsDone = 0
        self.__M2 = 0
        self.__M3 = 0
        self.__M4 = 0        
        
        
    def GetResultsSoFar(self): 
        Results = [[0,0,0,0]]
        Results[0][0] = self.__RunningSum / self.__PathsDone # mean
        Results[0][1] =(self.__RunningSumSquared - self.__RunningSum*self.__RunningSum/self.__PathsDone)/(self.__PathsDone-1)
        Results[0][2] = sqrt(self.__PathsDone)*self.__M3 / (self.__M2*sqrt(self.__M2)) # skewness
        Results[0][3] = (self.__PathsDone*self.__M4) / (self.__M2*self.__M2) - 3 # kurtosis
        return Results
    
    def DumpOneResult(self,result):
        self.__PathsDone += 1
        self.__RunningSum += result
        self.__RunningSumSquared += result*result
        delta = result - self.__RunningSum/self.__PathsDone
        delta_n = delta/self.__PathsDone
        delta_n2 = delta_n*delta_n
        term1 = delta*delta_n*(self.__PathsDone - 1)
        self.__M4 = self.__M4 + term1*delta_n2*(self.__PathsDone*self.__PathsDone
                                                - 3*self.__PathsDone + 3)+ 6*delta_n2*self.__M2 -4 *delta_n*self.__M3
        self.__M3 = self.__M3 + term1 * delta_n * (self.__PathsDone - 2) - 3 * delta_n * self.__M2
        self.__M2 = self.__M2 + term1
        
    def clone(self):
        return deepcopy(self)
    
    def __del__(self):
        del self        


In [465]:
mean = StatisticsFirstFourMoments()
mean.DumpOneResult(2)
mean.DumpOneResult(6)
mean.GetResultsSoFar()

[[4.0, 8.0, 0.0, -2.0]]

In [470]:
gatherer = FirstFourMoments()
gathererTwo = ConvergenceTable(gatherer)
callPayOff = PayOffCall(40)
TheOption = VanillaOption(callPayOff,0.5)
r = ParametersConstant(0.1)
Vol = ParametersConstant(0.2)
SimpleMonteCarlo5(TheOption,42,Vol,r,400000, gathererTwo)
results = gathererTwo.GetResultsSoFar()
for i in range(len(results)):
    print(f'Mean {results[i][0]}, variance {results[i][1]}, skewness {results[i][2]}, kurtosis {results[i][3]}')

Mean 2.895348495525091, variance 16.766085821078814, skewness 0.0, kurtosis -2.0
Mean 1.624010071439435, variance 7.826681985666556, skewness 1.019268533811464, kurtosis -0.7507553411697021
Mean 3.18346039269676, variance 21.467014108115166, skewness 1.3075803623807976, kurtosis 0.4352457824090772
Mean 5.469781129620864, variance 27.51983539707484, skewness 1.0155430582432179, kurtosis 0.3644584448061674
Mean 5.11034288181971, variance 22.815269749963605, skewness 0.6641497395614194, kurtosis -0.2843927870711669
Mean 5.047458297867174, variance 23.843506210723277, skewness 0.8184565422269691, kurtosis 0.0487945818962352
Mean 4.857056265668228, variance 21.512570015337516, skewness 0.8582147128256848, kurtosis 0.01846612058569974
Mean 4.975335997695943, variance 26.304833753803873, skewness 1.249099806066973, kurtosis 1.6670410050515017
Mean 5.032901486024189, variance 26.213039547128037, skewness 1.1760834388054744, kurtosis 1.2906718430126345
Mean 5.064535646056608, variance 25.896867

**Exercise 5.2** Write a statistics gathering class that computes the value at risk of a sample.

The value at risk or VAR give us information about how much the value of an asset can go down, given a certain interval. In  our case, per example, if we do 100 simulations and we suppose that we want a 95% interval, then the biggest of the 5 smallest values will be our VAR.

In [508]:
class StatisticsVAR(StatisticsMC):
    def __init__(self, Interval_):
        self.__PathsDone = 0
        self.__Interval = Interval_
        self.__WorstCase = []
        self.__PreviousWorse = 1000000 # Arbitrary
        
    def GetResultsSoFar(self): 
        Results = [[0]]
        Results[0][0] = max(self.__WorstCase)
        return Results
    
    def DumpOneResult(self,result):
        self.__PathsDone += 1
        if len(self.__WorstCase) <= (1-self.__Interval)*self.__PathsDone:
            self.__WorstCase.append(self.__PreviousWorse)
        
        elif max(self.__WorstCase) > result:
            max_index = self.__WorstCase.index(max(self.__WorstCase))
            self.__PreviousWorse = self.__WorstCase[max_index]
            self.__WorstCase[max_index] = result         
        
    def clone(self):
        return deepcopy(self)
    
    def __del__(self):
        del self

In [506]:
mean = StatisticsVAR(0.9)
mean.DumpOneResult(2)
mean.DumpOneResult(6)
mean.DumpOneResult(5)
mean.DumpOneResult(6)
mean.DumpOneResult(2)
mean.DumpOneResult(6)
mean.DumpOneResult(2)
mean.DumpOneResult(6)
mean.DumpOneResult(2)
mean.DumpOneResult(9)
mean.GetResultsSoFar()

[[2]]

In [511]:
gatherer = StatisticsVAR(0.8)
gathererTwo = ConvergenceTable(gatherer)
callPayOff = PayOffCall(40)
TheOption = VanillaOption(callPayOff,0.5)
r = ParametersConstant(0.1)
Vol = ParametersConstant(0.2)
SimpleMonteCarlo5(TheOption,42,Vol,r,50000, gathererTwo)
results = gathererTwo.GetResultsSoFar()
for i in range(len(results)):
    print(f'VAR {results[i][0]}, iterations {results[i][1]}.')

VAR 11.327249290159324, iterations 2.
VAR 1.8719801706928956, iterations 4.
VAR 1.4153739660114557, iterations 8.
VAR 1.8719801706928956, iterations 16.
VAR 1.8719801706928956, iterations 32.
VAR 1.2708108985059334, iterations 64.
VAR 0.5814704547488765, iterations 128.
VAR 0.037079032829145694, iterations 256.
VAR 0.037079032829145694, iterations 512.
VAR 0.037079032829145694, iterations 1024.
VAR 0.004932968425698055, iterations 2048.
VAR 0.004932968425698055, iterations 4096.
VAR 0.004932968425698055, iterations 8192.
VAR 0.0007735757217685299, iterations 16384.
VAR 0.0007735757217685299, iterations 32768.
VAR 0.0007735757217685299, iterations 50000.


**Exercise 5.3** Write a statistics gathering class that allows the computation of several statistics via inputted classes.

In [530]:
class StatisticsMultiGatherer(StatisticsMC):
    def __init__(self,Gatherers_):
        self.__Gatherers = Gatherers_ #A list of Statistics Gatherers
    
    def GetResultsSoFar(self):
        Results = []
        for gatherer in self.__Gatherers:
            Results.append(gatherer.GetResultsSoFar())
        return Results
            
    def DumpOneResult(self,result):
        for i in range(len(self.__Gatherers)):
            self.__Gatherers[i].DumpOneResult(result)
        
    def clone(self):
        return deepcopy(self)
    
    def __del__(self):
        del self

In [531]:
gathererVAR = StatisticsVAR(0.8)
gathererFourMoments = StatisticsFirstFourMoments()
multiGatherer = StatisticsMultiGatherer([gathererVAR,gathererFourMoments])
callPayOff = PayOffCall(40)
TheOption = VanillaOption(callPayOff,0.5)
r = ParametersConstant(0.1)
Vol = ParametersConstant(0.2)
SimpleMonteCarlo5(TheOption,42,Vol,r,50000, multiGatherer)
results = multiGatherer.GetResultsSoFar()

In [532]:
print(results)

[[[0.0]], [[4.73815280114059, 24.544508384919705, 1.1395097914074361, 1.138201456958714]]]


**Exercise 5.4** Use the strategy pattern to allow the user specify the termination conditions for the Monte Carlo, e.g., time spent or paths done.

Here only the paths done condition will be implemented. First we start with the base class.

In [533]:
class TerminatorMC:
    def __init__(self):
        pass
 
    def GetTerminatorCondition(self):
        pass
    
    def clone(self):
        pass
    
    def __del__(self):
        pass   

Now we create the inhereted class, for the number of paths.

In [545]:
class TerminatorPathsDone(TerminatorMC):
    def __init__(self,NumberOfPaths_):
        self.__NumberOfPaths = NumberOfPaths_
        self.__PathsDone = 0
    
    def GetTerminatorCondition(self):
        self.__PathsDone += 1
        if self.__PathsDone >= self.__NumberOfPaths:
            return True
        else:
            return False
        
    def clone(self):
        return deepcopy(self)
    
    def __del__(self):
        del self

Now we rechange the Monte-Carlo routine adding the if condition that breaks the infinite loop.

In [546]:
def SimpleMonteCarlo6(TheOption,
                     Spot,
                     Vol,
                     r,
                     gatherer,
                     terminator):
    
    Expiry = TheOption.GetExpiry()    
    variance = Vol.IntegralSquare(0,Expiry)
    rootVariance = sqrt(variance)
    itoCorrection = -0.5*variance
    
    movedSpot = Spot*exp(r.Integral(0,Expiry) + itoCorrection)
    discounting = exp(-r.Integral(0,Expiry))
    
    while  True:
        thisGaussian = GetOneGaussianByBoxMuller()
        thisSpot = movedSpot*exp(rootVariance*thisGaussian)
        thisPayoff = TheOption.OptionPayOff(thisSpot)
        gatherer.DumpOneResult(thisPayoff*discounting)
        if terminator.GetTerminatorCondition() == True:
            break
        else: 
            pass

In [553]:
gatherer = StatisticsMean()
callPayOff = PayOffCall(40)
TheOption = VanillaOption(callPayOff,0.5)
r = ParametersConstant(0.1)
Vol = ParametersConstant(0.2)
terminator = TerminatorPathsDone(10000)
SimpleMonteCarlo6(TheOption,42,Vol,r, gatherer, terminator)
results = gatherer.GetResultsSoFar()
print(results[0])

[4.750736705823391]


**Exercise 5.5** Write a terminator class that causes termination when either of two terminator classes specify termination.

Since we need to classes for that we will create the runtime terminator class.

In [554]:
import time

In [557]:
class TerminatorRunTime(TerminatorMC):
    def __init__(self,MaxTime_):
        self.__MaxTime = MaxTime_
        self.__TotalTime = 0
        self.__InitTime = -1
    
    def GetTerminatorCondition(self):
        if self.__InitTime == -1:
            self.__InitTime = int(round(time.time() * 1000))
            return False
        else:
            self.__TotalTime += int(round(time.time() * 1000)) - self.__InitTime
            
            if self.__TotalTime >= self.__MaxTime:
                return True
        
            else:
                self.__InitTime = int(round(time.time() * 1000))
                return False
        
    def clone(self):
        return deepcopy(self)
    
    def __del__(self):
        del self

In [578]:
class TerminatorMulti(TerminatorMC):
    def __init__(self,Terminators_):
        self.__Terminators = Terminators_
        
    def GetTerminatorCondition(self):
        for i in range(len(self.__Terminators)):
            if self.__Terminators[i].GetTerminatorCondition():
                return True
            else:
                return False
        
    def clone(self):
        return deepcopy(self)
    
    def __del__(self):
        del self

In [579]:
gatherer = StatisticsMean()
callPayOff = PayOffCall(40)
TheOption = VanillaOption(callPayOff,0.5)
r = ParametersConstant(0.1)
Vol = ParametersConstant(0.2)
terminatorPaths = TerminatorPathsDone(10000)
terminatorTime = TerminatorRunTime(3000)
terminatorMulti = TerminatorMulti([terminatorPaths,terminatorTime])
SimpleMonteCarlo6(TheOption,42,Vol,r, gatherer, terminatorMulti)
results = gatherer.GetResultsSoFar()
print(results[0])

[4.748533399890854]
