In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import random
import numpy as np
import pandas as pd
import seaborn as sns

In [None]:
# Thresholds
start = 5.8
end = 1.2

In [None]:
rng = np.random.default_rng(seed=10)
X = np.arange(0, 31, dtype=int)
Y = rng.integers(10, size=31)
returns = pd.DataFrame(np.vstack((X, Y)).T, columns=['x', 'y'])
returns.set_index('x', inplace=True)
returns

In [None]:
state_color = {0:'red', 1:'red',
          6:'green', 7:'green', 8:'green', 9:'green', 10:'green'}
plt.figure(figsize=(10, 4))

plt.grid(True)
plt.plot(X, Y,)
for x, y in zip(X, Y):
    plt.scatter(x, y, color=state_color.get(y, 'black'), zorder=100)
plt.xticks(X)
plt.hlines(start, xmin=X.min(), xmax=X.max(), colors='green', ls='--')
plt.hlines(end, xmin=X.min(), xmax=X.max(), colors='red', ls='--')
plt.show()

## Select States
### Starting and Ending

In [None]:
def set_states(series):
    X = np.zeros(series.shape, dtype=np.int8)
    counting = False
    for i, x in series.items():
        if x > start and not counting:
            X[i] = 1
            counting = True
        elif x < end and counting:
            X[i] = -1
            counting = False
    return X

states = returns.apply(set_states)
states.reset_index().plot(x='x', y='y', kind='scatter')
plt.show()

In [None]:
sns.heatmap(states.T, cmap='gray')

## Select Ranges between Start and End

In [None]:
index = pd.date_range(start='2002', periods=len(returns))
returns.index = index

def select_ranges(series):
    date_ranges = []
    counting = False
    for i, x in series.items():
        if x > start and not counting:
            start_date = i
            counting = True
        elif x < end and counting:
            end_date = i
            date_ranges.append((start_date, end_date))
            counting = False
    return date_ranges

dates = returns.apply(select_ranges)
for interval in dates.itertuples():
    left = interval[1][0]
    right = interval[1][1]
    print(left, right, '\n', returns.loc[left: right].mean())

# FHT Test

In [None]:
start = 0.5
end = -0.5

rng = np.random.default_rng(seed=10)
Y1 = np.array([1, -1, 0, 1, 0, -1, 1, -1, 0, -1])
Y2 = np.array([1, 0, 0, 0, -1, -1, 1, 0, 0, -1])
Y3 = np.array([1, np.nan, np.nan, 0, 0, -1, 1, -1, 0, -1])
data = pd.DataFrame(np.vstack((Y1, Y2, Y3)).T, columns=['y1', 'y2', 'y3'])

In [None]:
def select_date_ranges(series, start, end):
    date_ranges = list()
    counting = False
    for i, x in series.items():
        if x > start and not counting:
            start_index = i
            counting = True
        elif x < end and counting:
            end_index = i
            date_ranges.append((start_index, end_index))
            counting = False
    return date_ranges

# Take date ranges for starting and ending counts
date_ranges = data.apply(select_date_ranges, start=start, end=end)
date_ranges

### Y1 Stabilvol

In [None]:
column = 'y1'
series = data[column]

In [None]:
interval1 = date_ranges[column][0]
chunk1 = series.loc[interval1[0]:interval1[1]].dropna()
volatility1 = chunk1.std()
time1 = len(chunk1)
display(chunk1)
mean = chunk1.mean()
std = sum([(x-mean)**2 for _, x in chunk1.items()]) / (len(chunk1)-1)
print(f"FHT: {volatility1} - {time1}")
assert volatility1 == np.sqrt(std), "Should be sqrt(((1-0)**2 + (-1-0)**2)/1)"

In [None]:
interval2 = date_ranges[column][1]
chunk2 = series.loc[interval2[0]:interval2[1]].dropna()
volatility2 = chunk2.std()
time2 = len(chunk2)
display(chunk2)
print(f"FHT: {volatility2} - {time2}")
mean = chunk2.mean()
std = sum([(x-mean)**2 for _, x in chunk2.items()]) / (len(chunk2)-1)
assert volatility2 == np.sqrt(std), "Should be sqrt(((1-0)**2 + (0-0)**2 + (-1-0)**2)/2)"

In [None]:
interval3 = date_ranges[column][2]
chunk3 = series.loc[interval3[0]:interval3[1]].dropna()
volatility3 = chunk3.std()
time3 = len(chunk3)
display(chunk3)
mean = chunk3.mean()
std = sum([(x-mean)**2 for _, x in chunk3.items()]) / (len(chunk3)-1)
print(f"FHT: {volatility3} - {time3}")
assert volatility3 == np.sqrt(std), "Should be sqrt(((1-0)**2 + (-1-0)**2)/1)"

### Y2 Stabilvol

In [None]:
column = 'y2'
series = data[column]

In [None]:
interval1 = date_ranges[column][0]
chunk1 = series.loc[interval1[0]:interval1[1]].dropna()
volatility1 = chunk1.std()
time1 = len(chunk1)
display(chunk1)
mean = chunk1.mean()
std = sum([(x-mean)**2 for _, x in chunk1.items()]) / (len(chunk1)-1)
print(f"FHT: {volatility1} - {time1}")
assert volatility1 == np.sqrt(std), "Should be sqrt(((1-0)**2 + (-1-0)**2)/4)"

In [None]:
interval2 = date_ranges[column][1]
chunk2 = series.loc[interval2[0]:interval2[1]].dropna()
volatility2 = chunk2.std()
time2 = len(chunk2)
display(chunk2)
print(f"FHT: {volatility2} - {time2}")
mean = chunk2.mean()
std = sum([(x-mean)**2 for _, x in chunk2.items()]) / (len(chunk2)-1)
assert volatility2 == np.sqrt(std), "Should be sqrt(((1-0)**2 + (-1-0)**2)/3)"

### Y3 Stabilvol

In [None]:
column = 'y3'
series = data[column]

In [None]:
interval1 = date_ranges[column][0]
chunk1 = series.loc[interval1[0]:interval1[1]].dropna()
volatility1 = chunk1.std()
time1 = len(chunk1)
display(chunk1)
mean = chunk1.mean()
std = sum([(x-mean)**2 for _, x in chunk1.items()]) / (len(chunk1)-1)
print(f"FHT: {volatility1} - {time1}")
assert volatility1 == np.sqrt(std), "Should be sqrt(((1-0)**2 + (-1-0)**2)/3)"

In [None]:
interval2 = date_ranges[column][1]
chunk2 = series.loc[interval2[0]:interval2[1]].dropna()
volatility2 = chunk2.std()
time2 = len(chunk2)
display(chunk2)
print(f"FHT: {volatility2} - {time2}")
mean = chunk2.mean()
std = sum([(x-mean)**2 for _, x in chunk2.items()]) / (len(chunk2)-1)
assert volatility2 == np.sqrt(std), "Should be sqrt(((1-0)**2 + (-1-0)**2)/1)"

### Total Stabilvol

In [None]:
stabilvol_list = list()
for stock, series in data.items():
    for interval in date_ranges[stock]:
        chunk = series.loc[interval[0]: interval[1]].dropna()
        volatility = chunk.std()
        fht = len(chunk)
        stabilvol_list.append((volatility, fht))
stabilvol = pd.DataFrame.from_records(
    stabilvol_list, columns=['Volatility', 'FHT']
)
stabilvol.values

In [None]:
given_stabilvol = np.array([
    [np.sqrt(2), 2],
    [np.sqrt(2/2), 3],
    [np.sqrt(2), 2],
    [np.sqrt(2/4), 5],
    [np.sqrt(2/3), 4],
    [np.sqrt(2/3), 4],
    [np.sqrt(2/1), 2]
])
assert np.array_equal(stabilvol.values, given_stabilvol), "Something Wrong"