# Ideal volatility adjustment for RL
## Differential Sharpe Ratio (DSR) and Downside Deviation Ratio (DDR)
### Moody, J., & Saffell, M. (2001). Learning to trade via direct reinforcement. IEEE transactions on neural Networks, 12(4), 875-889.

In [1]:
import numpy as np
import matplotlib.pyplot as plt

In [714]:
from types import MethodType

In [814]:
EPS = np.finfo(np.float32).eps
class DSR:
    """Differential Sharpe Ratio - Moody and Saffell"""
    def __init__(self, adaptation_rate, nassets):
        self.n = adaptation_rate
#         self.A = np.zeros(nassets)
#         self.B = np.zeros(nassets)
        self.A = None
        self.B = None

#     def __call__(self, rewards):
#         return self.call(rewards)
    
    def __call__(self, rewards):
        if len(rewards.shape) == 1:
            nassets = 1
        else: 
            nassets = rewards.shape[1]
        self.A = np.zeros(nassets)
        self.B = np.zeros(nassets)
#         self.call = self.main_func
        self.__class__ = _DSR
        print('call')
        return self.main_func(rewards)
    
    def main_func(self, rewards):
        rewards = sum([
            self.calculate_dsr(reward)
            for reward in rewards
        ])
        return rewards

    def calculate_dsr(self, raw_return):
        dA = raw_return - self.A
        dB = raw_return**2 - self.B
#         import ipdb; ipdb.set_trace()
        dsr = (self.B * dA - (self.A * dB) / 2) / ((self.B - self.A**2)**(3 / 2) + EPS)
        self.A += self.n * dA
        self.B += self.n * dB
        return np.minimum(np.maximum(dsr, -1), 1.)

    def update_parameters(self, raw_return):
        dA = raw_return - self.A
        dB = raw_return**2 - self.B
        self.A += self.n * dA
        self.B += self.n * dB

class _DSR(DSR):
    def __call__(self, rewards):
        return self.main_func(rewards)

class DDR:
    """Differential Downside Ratio (sortino) - Moody and Saffell """
    def __init__(self, adaptation_rate, nassets):
        self.n = adaptation_rate
        self.A = np.zeros(nassets)
        self.B = np.zeros(nassets)

    def __call__(self, rewards):
        if len(rewards.shape) == 1:
            nassets = 1
        else: 
            nassets = rewards.shape[1]
        self.A = np.zeros(nassets)
        self.B = np.zeros(nassets)
#         self.__call__ = self.main_func  # monkey patch doesn't work as __call__ is special func - attached to class
        self.__class__ = _DDR
        print('call')
        return self.main_func(rewards)
    
    def main_func(self, rewards):
        rewards = sum([
            self.calculate_ddr(reward)
            for reward in rewards
        ])
        return rewards

        # as this will be called when popping from the nstep buffer
        # this is an appropriate place to update params
        return rewards

    def calculate_ddr(self, raw_return):
        ddr = np.where(raw_return > 0.,
                       (raw_return - self.A / 2) / (np.sqrt(self.B) + EPS),
                       (self.B * (raw_return - self.A / 2) -
                        (self.A * raw_return**2) / 2) /
                       (self.B**(3 / 2) + EPS))
        self.update_parameters(raw_return)
        return np.minimum(np.maximum(ddr, -1), 1.)

    def update_parameters(self, raw_return):
        dA = raw_return - self.A
        dB = min(raw_return, 0)**2 - self.B
        self.A += self.n * dA
        self.B += self.n * dB

class _DDR(DDR):
    def __call__(self, rewards):
        return self.main_func(rewards)


In [815]:
arr = np.array([1, -2, 3, -3])
np.where(arr > 0, arr, 0.)

array([1., 0., 3., 0.])

In [816]:
dsr = DSR(.001, 1)
dsr2 = DSR(.001, 1)
ddr = DDR(.001, 1)

In [817]:
rewards = np.random.randn(100)
print(rewards.mean(), rewards.std())
print(rewards.mean()/rewards.std())
print(rewards.mean() / np.minimum(rewards, 0.).std())

0.10623319331177632 1.105466339535697
0.09609808052264616
0.1644082392214514


In [818]:
print(dsr(rewards), ddr(rewards))
for i in range(100):
    dsr(rewards)
    dsr2(rewards)
    ddr(rewards)
print(dsr.A, dsr.B)
print(ddr.A, ddr.B)

call
call
[9.06089037] [11.68109649]
call
[0.10537446] [1.23271933]
[0.10537446] [0.57185883]


In [819]:
import inspect

In [820]:
print(inspect.getsource(dsr.__call__))

    def __call__(self, rewards):
        return self.main_func(rewards)



In [821]:
print(inspect.getsource(DSR.__call__))

    def __call__(self, rewards):
        if len(rewards.shape) == 1:
            nassets = 1
        else: 
            nassets = rewards.shape[1]
        self.A = np.zeros(nassets)
        self.B = np.zeros(nassets)
#         self.call = self.main_func
        self.__class__ = _DSR
        print('call')
        return self.main_func(rewards)



In [765]:
class A:
    def __init__(self):
        pass
    
class A(A):
    def __init__(self):
        pass

In [766]:
A

__main__.A