In [2]:
import os
import sys
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
import yahooquery as yq
import random

import matplotlib.pyplot as plt


src_path = os.path.abspath(os.path.join('..', '..', 'src'))
sys.path.append(src_path)

from ConformalMethods import AdaptiveCP, ACP_plots, ACP_data

# Getting the Stockdata

In [3]:
def get_stock_data(start_index, end_index):
    # Open txt file containg ticker names
    with open(r'C:\Users\tobyw\Documents\ChrisPython\ConformalProject\scripts\snptickers.txt', 'r') as f:
        all_tickers = f.read().splitlines()
        all_tickers.sort()
    
    stock_tickers = all_tickers[start_index:end_index]

    tickers = yq.Ticker(stock_tickers)
    all_price_data = tickers.history(period='5y', interval='1d')
    price_df = all_price_data[['close']].copy()

    stock_data_tuples = []

    # Some tickers in the list are incorrect or not trading so need 

    for ticker_symbol in price_df.index.get_level_values(0).unique():
        # Getting the volatilty data for each ticker
        ticker_price_data = price_df.loc[ticker_symbol]
        ticker_close = ticker_price_data['close'].to_numpy()

        # As when creating the volaility there is an intial NaN value, I will remove this.
        ticker_close = ticker_close[1:]

        # Appending it to the stock_data_tuples list, the last volatilty is used as the prediciton for the next.
        stock_data_tuples.append((ticker_close[:-1], ticker_close[1:]))
    

    return stock_data_tuples



In [4]:
stock_data = get_stock_data(0, 100)

## Analysing set loss

We will want:
- the set_loss of each head
- the prediction width of each head
- error of each head

In [5]:
class AnalticalACP(AdaptiveCP):
    def AwACI(self, timeseries_data: tuple, interval_candidates: np.array = None, nu_sigma: tuple = (10**-3, 0.05), gamma: float = 0.05, title: str = None):
        
        xpred, y = timeseries_data

        chosen_interval_index = []
        err_t_list = []
        conformal_sets_list = []
        optimal_radius_list = []
        chosen_radius_list = []

        # Extras.
        set_loss_list = []
        head_radii_list = []
        head_error_list = []

        Set_loss = self.set_loss_vectorize()



        # Scale parameters, havent looked into scaling them best.
        sigma = nu_sigma[1]
        nu = nu_sigma[0] 

        if interval_candidates is None:
            interval_candidates = np.array(range(50, 550, 100))

        # To sync all of the heads we need to start at the max of all the candidates.
        start_point = max(interval_candidates) + 1
        i_count = start_point

        # Create the head and intitialse the weights.
        ACI_heads = [self.ACI_head(timeseries_data, gamma, start_point, interval) for interval in interval_candidates]
        interval_weights = np.array([1 for _ in range(len(interval_candidates))])
        
        # Continues calculating intervals until one of the heads stops.
        none_terminated = True

        while none_terminated: 
            head_sets = [] # Will contain the result from each head.
            
            # Create the mass distribution for each head
            Wt = interval_weights.sum()
            interval_probabilites = interval_weights/Wt
        
            try:
                # Create a list of the coverages for the different heads.
                for head in ACI_heads:
                        head_sets.append(next(head))
            
            except StopIteration: # One head is terminated.
                none_terminated = False
                break # You could but the return statement here


            # Choosing which head to use.
            chosen_set = random.choices(head_sets, weights=interval_probabilites, k=1)[0] # Using random module as numpy can not deal with tuples.
            conformal_sets_list.append(chosen_set)
            chosen_interval_index.append(head_sets.index(chosen_set))



            # TIME FRONTIER -------------

            # Seeing whether result lies within the set.
            err_true = AdaptiveCP.err_t(y[i_count], chosen_set)

            head_err = [AdaptiveCP.err_t(y[i_count], head_set) for head_set in head_sets]
            head_error_list.append(head_err)

            err_t_list.append(err_true)

            # Computing the conformal set radi. 
            optimal_set_radius = abs(y[i_count] - y[i_count-1]) 
            head_set_radius = list(map(lambda Cset: (Cset[1] - Cset[0])/2, head_sets)) #(chosen_set[1] - chosen_set[0])/2

            optimal_radius_list.append(optimal_set_radius)
            chosen_radius_list.append((chosen_set[1] - chosen_set[0])/2)

            head_radii_list.append(head_set_radius)
            head_set_radius = np.array(head_set_radius)

            # Updating the weights.
            set_loss = Set_loss(optimal_set_radius, head_set_radius)
            set_loss_list.append(set_loss)

            new_weights = interval_weights * np.exp(-1 * nu * set_loss) 
            sumW, lenW = sum(new_weights), len(new_weights)
            final_weights = new_weights*(1-sigma) + sumW*(sigma/lenW)
            interval_weights = final_weights

            # Incrementing the i-count
            i_count+=1

        # Calculating different averages
        realised_interval_coverage = 1 - pd.Series(err_t_list).rolling(50).mean().mean() # 50 is arbitary and could be improved.
        average_prediction_interval = np.mean([abs(x[1] - x[0]) for x in conformal_sets_list])

        return {
                'model': title if title is not None else 'AwACI',
                'coverage_target': self.coverage_target,
                'interval_candidates': interval_candidates,
                'realised_interval_coverage': realised_interval_coverage,
                'average_prediction_interval': average_prediction_interval,
                'optimal_set_radius': optimal_radius_list, 
                'chosen_set_radius': chosen_radius_list,
                'conformal_sets': conformal_sets_list,
                'error_t_list': err_t_list,
                'chosen_interval_index': chosen_interval_index,
                'start_point': start_point,
                'interval_size': 50,
                'set_loss_list': set_loss_list,
                'head_radii_list': head_radii_list,
                'head_error_list': head_error_list
                }
            

In [6]:
ACP = AnalticalACP(0.1)

In [7]:
test_result = ACP.AwACI(stock_data[4])

In [8]:
# Getting the average coverage of each head.
head_error_list = np.array(test_result['head_error_list'])
print(1- head_error_list.mean(axis=0))


[0.8989011  0.9010989  0.90549451 0.90769231 0.90769231]


In [9]:
# Getting the average set loss of each head.
set_loss_list = np.array(test_result['set_loss_list'])
mean = set_loss_list.mean(axis=0)
print(np.argmin(mean))

3


In [10]:
# Getting the average set radius of each head.
head_radii_list = np.array(test_result['head_radii_list'])
print(head_radii_list.mean(axis=0))
radi = head_radii_list.mean(axis=0)
print(np.argmin(radi))

[5.59097843 5.31418275 5.2209563  5.20439364 5.26072512]
3


In [11]:
set_and_radii = []
set_and_coverage = []

for data in stock_data:
    test_result = ACP.AwACI(data)
    mean, radii = np.array(test_result['set_loss_list']).mean(axis=0), np.array(test_result['head_radii_list']).mean(axis=0)
    coverage = abs(0.9 - np.array(test_result['head_error_list']).mean(axis=0))

    sr = np.argmin(mean) == np.argmin(radii)
    sc = np.argmin(mean) == np.argmin(coverage)

    set_and_radii.append(sr)
    set_and_coverage.append(sc)

print(f'sr = {sum(set_and_radii)/len(set_and_radii)}')
print(f'sc = {sum(set_and_coverage)/len(set_and_coverage)}')

# Leads us to believe that set loss isnt the best for coverage, although the coverages are very close.

sr = 0.6428571428571429
sc = 0.11224489795918367


In [12]:
class B_tACP(AdaptiveCP):
    def ACI_head(self, timeseries_data: tuple, gamma: float, start_point: int, custom_interval = None):
        xpred, y = timeseries_data
        alpha_t_list = [self.coverage_target]

        All_scores = self.score_function(xpred, y)

        err_t_list = []
        conformal_sets_list = []

        # get pinball loss funciton
        
        
        for i in range(start_point, len(All_scores)):
            Coverage_t = self.C_t(alpha_t_list[-1], All_scores, xpred[i], i, custom_interval)
            conformal_sets_list.append(Coverage_t)

            # Finding Optimal B_t
            # low, high = 0, 999  # Indices for np.linspace(1, 0, 1000)
            # possibilities = np.linspace(1, 0, 1000)
            # max_possi = 0.5  # Default value if no possi satisfies the condition

            # while low <= high:
            #     mid = (low + high) // 2
            #     possi = possibilities[mid]
            #     Cpossi = self.C_t(possi, All_scores, xpred[i], i, custom_interval)
            #     if Cpossi[0] < y[i] < Cpossi[1]:
            #         max_possi = max(max_possi, possi)  # Update max_possi if current possi is greater
            #         low = mid + 1  # Move to the right half to find the maximum possi
            #     else:
            #         high = mid - 1  # Move to the left half

            # B_t = max_possi

            # B_t = 0.5       # To avoid unbound local error will assign B_t a value first
            # for possi in np.linspace(1, 0, 1000):
            #     Cpossi= self.C_t(possi, All_scores, xpred[i], i, custom_interval)
            #     if Cpossi[0] < y[i] < Cpossi[1]:
            #         B_t = possi
            #         break

            low, high = 0, 999  # Indices for np.linspace(1, 0, 1000)
            possibilities = np.linspace(0, 1, 1000) # as 1 - 

            B_t = 1
            while low <= high:
                mid = (high + low) // 2
                possi = possibilities[mid]
                Cpossi = self.C_t(possi, All_scores, xpred[i], i, custom_interval)

                if Cpossi[0] < y[i] < Cpossi[1]:
                    B_t = possi
                    low = mid + 1
                else:
                    high = mid - 1
                    
            
            pinball_loss = self.l(B_t, alpha_t_list[-1])

            error_t = AdaptiveCP.err_t(y[i], Coverage_t)
            err_t_list.append(error_t)

            alpha_t = min(max(alpha_t_list[-1] + (gamma * (self.coverage_target - error_t)), 0), 1)
            alpha_t_list.append(alpha_t)

            
            yield Coverage_t, pinball_loss, B_t


        return False
    
    def AwACI(self, timeseries_data: tuple, interval_candidates: np.array = None, nu_sigma: tuple = (10**-3, 0.05), gamma: float = 0.05, title: str = None):
        
        xpred, y = timeseries_data

        chosen_interval_index = []
        err_t_list = []
        conformal_sets_list = []
        optimal_radius_list = []
        chosen_radius_list = []

        # Extras.
        set_loss_list = []
        head_radii_list = []
        head_error_list = []
        all_pinball_losses = []
        all_B_t = []


        Set_loss = self.set_loss_vectorize()



        # Scale parameters, havent looked into scaling them best.
        sigma = nu_sigma[1]
        nu = nu_sigma[0] 

        if interval_candidates is None:
            interval_candidates = np.array(range(50, 550, 100))

        # To sync all of the heads we need to start at the max of all the candidates.
        start_point = max(interval_candidates) + 1
        i_count = start_point

        # Create the head and intitialse the weights.
        ACI_heads = [self.ACI_head(timeseries_data, gamma, start_point, interval) for interval in interval_candidates]
        interval_weights = np.array([1 for _ in range(len(interval_candidates))])
        
        # Continues calculating intervals until one of the heads stops.
        none_terminated = True

        while none_terminated: 
            head_sets = [] # Will contain the result from each head.
            pinball_losses = []
            B_t_list = []
            
            # Create the mass distribution for each head
            Wt = interval_weights.sum()
            interval_probabilites = interval_weights/Wt
        
            try:
                # Create a list of the coverages for the different heads.
                for head in ACI_heads:
                        pred_set, pinball_loss, B_t = next(head)
                        head_sets.append(pred_set)
                        pinball_losses.append(pinball_loss)
                        B_t_list.append(B_t)
            
            except StopIteration: # One head is terminated.
                none_terminated = False
                break # You could but the return statement here
            
            all_pinball_losses.append(pinball_losses)
            all_B_t.append(B_t_list)

            # Choosing which head to use.
            chosen_set = random.choices(head_sets, weights=interval_probabilites, k=1)[0] # Using random module as numpy can not deal with tuples.
            conformal_sets_list.append(chosen_set)
            chosen_interval_index.append(head_sets.index(chosen_set))



            # TIME FRONTIER -------------

            # Seeing whether result lies within the set.
            err_true = AdaptiveCP.err_t(y[i_count], chosen_set)

            head_err = [AdaptiveCP.err_t(y[i_count], head_set) for head_set in head_sets]
            head_error_list.append(head_err)

            err_t_list.append(err_true)

            # Computing the conformal set radi. 
            optimal_set_radius = abs(y[i_count] - y[i_count-1]) 
            head_set_radius = list(map(lambda Cset: (Cset[1] - Cset[0])/2, head_sets)) #(chosen_set[1] - chosen_set[0])/2

            optimal_radius_list.append(optimal_set_radius)
            chosen_radius_list.append((chosen_set[1] - chosen_set[0])/2)

            head_radii_list.append(head_set_radius)
            head_set_radius = np.array(head_set_radius)

            # Updating the weights.
            set_loss = Set_loss(optimal_set_radius, head_set_radius)
            set_loss_list.append(set_loss)

            new_weights = interval_weights * np.exp(-1 * nu * set_loss) 
            sumW, lenW = sum(new_weights), len(new_weights)
            final_weights = new_weights*(1-sigma) + sumW*(sigma/lenW)
            interval_weights = final_weights

            # Incrementing the i-count
            i_count+=1

        # Calculating different averages
        realised_interval_coverage = 1 - pd.Series(err_t_list).rolling(50).mean().mean() # 50 is arbitary and could be improved.
        average_prediction_interval = np.mean([abs(x[1] - x[0]) for x in conformal_sets_list])

        return {
                'model': title if title is not None else 'AwACI',
                'coverage_target': self.coverage_target,
                'interval_candidates': interval_candidates,
                'realised_interval_coverage': realised_interval_coverage,
                'average_prediction_interval': average_prediction_interval,
                'optimal_set_radius': optimal_radius_list, 
                'chosen_set_radius': chosen_radius_list,
                'conformal_sets': conformal_sets_list,
                'error_t_list': err_t_list,
                'chosen_interval_index': chosen_interval_index,
                'start_point': start_point,
                'interval_size': 50,
                'set_loss_list': set_loss_list,
                'head_radii_list': head_radii_list,
                'head_error_list': head_error_list,
                'all_pinball_losses': all_pinball_losses,
                'all_B_t': all_B_t

                }
            
    
    

In [13]:
BACP = B_tACP(0.1)
test_result = BACP.AwACI(stock_data[4])

In [14]:
print(test_result.keys())

dict_keys(['model', 'coverage_target', 'interval_candidates', 'realised_interval_coverage', 'average_prediction_interval', 'optimal_set_radius', 'chosen_set_radius', 'conformal_sets', 'error_t_list', 'chosen_interval_index', 'start_point', 'interval_size', 'set_loss_list', 'head_radii_list', 'head_error_list', 'all_pinball_losses', 'all_B_t'])


In [15]:
print(np.array(test_result['all_B_t']).mean(axis=0))

[0.53225313 0.54754755 0.56893817 0.57975338 0.58384098]


In [16]:
# Getting the average coverage of each head.
head_error_list = np.array(test_result['head_error_list'])
print(1- head_error_list.mean(axis=0))


[0.8989011  0.9010989  0.90549451 0.90769231 0.90769231]


In [17]:
print(test_result['all_pinball_losses'])

[[np.float64(0.012822822822822821), np.float64(0.02903903903903904), np.float64(0.024534534534534535), np.float64(0.01802802802802803), np.float64(0.02033033033033033)], [np.float64(0.04825875875875876), np.float64(0.057968468468468475), np.float64(0.05546596596596597), np.float64(0.05006056056056057), np.float64(0.05146196196196197)], [np.float64(0.087998998998999), np.float64(0.087998998998999), np.float64(0.087998998998999), np.float64(0.0877987987987988), np.float64(0.0880990990990991)], [np.float64(0.0863828828828829), np.float64(0.00161311311311311), np.float64(0.0004119119119119094), np.float64(0.01881531531531533), np.float64(0.006202702702702715)], [np.float64(0.07257957957957957), np.float64(0.07058258258258258), np.float64(0.06948148148148148), np.float64(0.07147847847847848), np.float64(0.07147847847847848)], [np.float64(0.0866941941941942), np.float64(0.0831956956956957), np.float64(0.0832957957957958), np.float64(0.08749499499499501), np.float64(0.08779529529529528)], [np

In [18]:
# Getting the average set loss of each head.
set_loss_list = np.array(test_result['all_pinball_losses'])
mean = set_loss_list.mean(axis=0)
print(np.argmin(mean))

4


In [19]:
# Getting the average set radius of each head.
head_radii_list = np.array(test_result['head_radii_list'])
print(head_radii_list.mean(axis=0))
radi = head_radii_list.mean(axis=0)
print(np.argmin(radi))

[5.59097843 5.31418275 5.2209563  5.20439364 5.26072512]
3


In [33]:
# Finding what the best possible result it
head_radii_list = np.array(test_result['head_radii_list'])
#getting the minimum of each head in each row
min_radii = head_radii_list.argmin(axis=1)

print('Best possible width,', min_radii.mean())
print('Our result', test_result['average_prediction_interval'])
just_guessing = np.array(head_radii_list).mean()
print('Just guessing', just_guessing)

Best possible result, 0.9467553777852357
Our result 2.1546979896362033
Just guessing 1.0723926399895125


In [24]:
set_and_radii = []
set_and_coverage = []

for data in stock_data:
    test_result = BACP.AwACI(data)
    mean, radii = np.array(test_result['all_pinball_losses']).mean(axis=0), np.array(test_result['head_radii_list']).mean(axis=0)
    coverage = abs(0.9 - np.array(test_result['head_error_list']).mean(axis=0))

    sr = np.argmin(mean) == np.argmin(radii)
    sc = np.argmin(mean) == np.argmin(coverage)

    set_and_radii.append(sr)
    set_and_coverage.append(sc)

print(f'sr = {sum(set_and_radii)/len(set_and_radii)}')
print(f'sc = {sum(set_and_coverage)/len(set_and_coverage)}')

# Leads us to believe that set loss isnt the best for coverage, although the coverages are very close.

sr = 0.25510204081632654
sc = 0.01020408163265306


# Testing whether B_t is as expected.

In [21]:
#
data = stock_data[4]
scores = BACP.score_function(data[0], data[1])

# at time 600
time = 660
xpred, true = data[0][time], data[1][time]

print(xpred, true)

relevant_scores = scores[time-550:time]

# The best that we can do is 0, which is still not that small

low, high = 0, 999  # Indices for np.linspace(1, 0, 1000)
possibilities = np.linspace(1, 0, 1000) # as 1 -

B_t = 1
while low <= high:
    mid = (high + low) // 2
    possi = possibilities[mid]
    Q = np.quantile(relevant_scores, 1-possi)

    if BACP.neg_inverse_score(xpred, Q) < true < BACP.pos_inverse_score(xpred, Q):
        B_t = possi
        low = mid + 1
    else:
        high = mid - 1

print('the coverage that this implies is', 1-B_t)
BACP.neg_inverse_score(xpred,  np.quantile(relevant_scores, 1 - B_t)) 

# B_t should not relate in any way to coverage. It is simply what quantile that score fell in. The scores themselves again do not relate to the coverage.

# So there is a time when an B as high as 0.99 is correct and it is when the desired and true are very equal.
# The value is extrodinarly high, should this be the case?
# Well we arent trying to solve for B we will just use it in the pinball loss

152.19000244140625 148.91000366210938


NameError: name 'B_t' is not defined