# Please make sure you have ran MevrickDaCostaUCL_Quantum_Algorithms.py first!

In [3]:
import warnings
warnings.filterwarnings("ignore")

In [4]:
import pandas as pd
import yfinance as yf

import math
import numpy as np

from qiskit import QuantumCircuit
from qiskit.algorithms import IterativeAmplitudeEstimation, EstimationProblem
from qiskit.circuit.library import LinearAmplitudeFunction
from qiskit_finance.circuit.library import LogNormalDistribution
from qiskit.utils import QuantumInstance

from qiskit import IBMQ
from qiskit.providers.ibmq import least_busy

from concurrent.futures import ThreadPoolExecutor
import time
import random

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import ast

# Change the location to where you saved the file

In [None]:
data = pd.read_csv('disso results/c_approx_results.csv')

In [None]:
def analyse_and_plot(data, independent_variable='c_approx'):
    simulated_results = data.replace([float('inf'), float('-inf')], pd.NA).dropna()

    aggregated_data = simulated_results.groupby(independent_variable).agg({
        'accuracy_error': 'mean',
        'computation_time': 'mean'
    }).reset_index()

    aggregated_data['normalised_accuracy_error'] = (aggregated_data['accuracy_error'] - aggregated_data['accuracy_error'].min()) / \
                                                   (aggregated_data['accuracy_error'].max() - aggregated_data['accuracy_error'].min())
    aggregated_data['normalised_computation_time'] = (aggregated_data['computation_time'] - aggregated_data['computation_time'].min()) / \
                                                     (aggregated_data['computation_time'].max() - aggregated_data['computation_time'].min())
    
    # Update the weighted_combined_cost with new weights
    aggregated_data['weighted_combined_cost'] = (2 * aggregated_data['normalised_accuracy_error'] + aggregated_data['normalised_computation_time']) / 3

    simulated_results['price_parameters'] = simulated_results['p'].apply(ast.literal_eval)
    simulated_results['volatility'] = simulated_results['price_parameters'].apply(lambda x: x[1])
    simulated_results['time_to_expiry'] = simulated_results['price_parameters'].apply(lambda x: x[3])
    simulated_results = simulated_results.drop(columns=['price_parameters'])

    simulated_results['normalised_accuracy_error'] = (simulated_results['accuracy_error'] - simulated_results['accuracy_error'].min()) / \
                                                      (simulated_results['accuracy_error'].max() - simulated_results['accuracy_error'].min())
    simulated_results['normalised_computation_time'] = (simulated_results['computation_time'] - simulated_results['computation_time'].min()) / \
                                                        (simulated_results['computation_time'].max() - simulated_results['computation_time'].min())
    simulated_results['weighted_combined_cost'] = (2 * simulated_results['normalised_accuracy_error'] + simulated_results['normalised_computation_time']) / 3

    fig, axs = plt.subplots(1, 2, figsize=(18, 7))

    ax1 = axs[0]
    ax2 = ax1.twinx()
    ax1.plot(aggregated_data[independent_variable], aggregated_data['accuracy_error'], 'g-')
    ax2.plot(aggregated_data[independent_variable], aggregated_data['computation_time'], 'b-')
    ax1.set_ylabel('Mean Accuracy Error', color='g')
    ax2.set_ylabel('Mean Computation Time', color='b')
    ax1.set_xlabel(independent_variable)
    ax1.set_title(f'Relationship of {independent_variable} with Mean Accuracy Error and Mean Computation Time')

    axs[1].plot(aggregated_data[independent_variable], aggregated_data['weighted_combined_cost'], '-o', label='Weighted Combined Cost')
    axs[1].set_xlabel(independent_variable)
    axs[1].set_ylabel('Weighted Combined Cost (Normalised)')
    axs[1].set_title(f'Relationship of {independent_variable} with Weighted Combined Cost')
    axs[1].legend()
    axs[1].grid(True)

    plt.tight_layout()
    plt.show()


In [None]:
analyse_and_plot(data, 'c_approx')

In [None]:
data2 = pd.read_csv('disso results/epsilon_target.csv')

In [None]:
analyse_and_plot(data2, 'epsilon_target')

In [None]:
data3 = pd.read_csv('disso results/shots_results.csv')

In [None]:
analyse_and_plot(data3, 'shots')

## Clustering

In [None]:
parameter_result_df = pd.read_csv('disso results\paramter_tuning.csv')

In [None]:
parameter_result_df

In [None]:
parameter_result_df.replace([float('inf'), float('-inf')], pd.NA).dropna()

In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
import numpy as np

In [None]:
parameter_tuning =  pd.read_csv('disso results/paramter_tuning.csv')

In [None]:
parameter_tuning.replace([np.inf, -np.inf], np.nan, inplace=True)

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
from sklearn.metrics import silhouette_score

data = parameter_tuning[['epsilon_target', 'shots', 'c_approx']]

scaler = StandardScaler()
data_scaled = scaler.fit_transform(data)

sil_scores = []

for k in range(2, 11):
    kmeans = KMeans(n_clusters=k, random_state=42).fit(data_scaled)
    preds = kmeans.predict(data_scaled)
    sil_score = silhouette_score(data_scaled, preds)
    sil_scores.append(sil_score)

optimal_clusters = sil_scores.index(max(sil_scores)) + 2  # +2 because our range starts from 2
print(f"Optimal number of clusters: {optimal_clusters}")

kmeans = KMeans(n_clusters=optimal_clusters, random_state=64)
parameter_tuning['cluster'] = kmeans.fit_predict(data_scaled)

cluster_means = parameter_tuning.groupby('cluster').mean()[['c_approx', 'shots', 'epsilon_target', 'accuracy_error']]
cluster_means

In [1]:
import math
from scipy.stats import norm
import numpy as np


def black_scholes_call(S, K, T, r, sigma):
    
    d1 = (math.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * math.sqrt(T))
    d2 = d1 - sigma * math.sqrt(T)
    
    call_price = S * norm.cdf(d1) - K * math.exp(-r * T) * norm.cdf(d2)
    
    return call_price

### Creating dummy data -- keeping the contract characteristics constant

In [None]:
tickerDict = {}

S = 105
tickerDict['RNDM'] = 105

vals_volatility = [i/10000 for i in range(1000, 6500, 25)]
T_constant = 150/365

strike_prices = [105, 102.5, 100, 97.5]

vol_arr = [(strike, vol, "RNDM", T_constant, black_scholes_call(S, strike, T_constant, 0.0527, vol)) 
       for strike in strike_prices for vol in vals_volatility]

S = 105
sigma_constant = 0.25 
T_values = [i/365 for i in range(1, 186, 1)]

strike_prices = [105, 102.5, 100, 97.5]

time_arr = [(strike, sigma_constant, "RNDM", T, black_scholes_call(S, strike, T, 0.0527, sigma_constant)) 
       for strike in strike_prices for T in T_values]


In [None]:
def problem_builder(params, c_approx):
    num_uncertainty_qubits = 3
    
    r = 0.0527
    
    K, vol, ticker, T, _ = params

    mu = (r - 0.5 * vol**2) * T + np.log(tickerDict[ticker])
    sigma = vol * np.sqrt(T)
    mean = np.exp(mu + sigma**2 / 2)
    variance = (np.exp(sigma**2) - 1) * np.exp(2* mu + sigma**2)
    stddev = np.sqrt(variance)
    low = np.maximum(0, mean-3*stddev)
    high = mean + 3 * stddev

    uncertainty_model = LogNormalDistribution(num_uncertainty_qubits, mu=mu, sigma = sigma**2, bounds = (low, high))

    breakpoints = [low, K]
    slopes = [0, 1]
    offsets = [0, 0]
    f_min = 0
    f_max = high - K
    european_call_objective = LinearAmplitudeFunction(
        num_uncertainty_qubits,
        slopes,
        offsets,
        domain=(low, high),
        image=(f_min, f_max),
        breakpoints=breakpoints,
        rescaling_factor=c_approx,
    )


    num_qubits = european_call_objective.num_qubits
    european_call = QuantumCircuit(num_qubits)
    european_call.append(uncertainty_model, range(num_uncertainty_qubits))
    european_call.append(european_call_objective, range(num_qubits))

    problem = EstimationProblem(
        state_preparation=european_call,
        objective_qubits=[3],
        post_processing=european_call_objective.post_processing,
    )

    
    return problem    

In [None]:
def evaluate_parameters(data):
    metrics, c_approx, epsilon_target, shots = data
    print(f"Starting evaluation for c_approx={c_approx}, epsilon_target={epsilon_target}, shots={shots}...")    
    
    try:
        problem = problem_builder(metrics, c_approx)
    except ValueError:
        print(f"Encountered the breakpoint error for c_approx={c_approx}, epsilon_target={epsilon_target}, shots={shots}. Skipping this set of parameters.")
        return {
            'p': metrics,
            'c_approx': c_approx,
            'epsilon_target': epsilon_target,
            'shots': shots,
            'accuracy_error': float('inf'),
            'computation_time': float('inf'),
            'cost': float('inf')
        }
        
        
        
    qi = QuantumInstance(backend=backend, shots=shots)
    qae = IterativeAmplitudeEstimation(epsilon_target=epsilon_target, alpha=0.05, quantum_instance=qi)

    start_time = time.time()
    try:
        result = qae.estimate(problem)
    except AttributeError as e:
        if "NoneType" in str(e):
            print(f"Encountered the NoneType error for c_approx={c_approx}, epsilon_target={epsilon_target}, shots={shots}. Skipping this set of parameters.")
            return {
                'p': metrics,
                'c_approx': c_approx,
                'epsilon_target': epsilon_target,
                'shots': shots,
                'accuracy_error': float('inf'),
                'computation_time': float('inf'),
                'cost': float('inf')
            }
        else:
            raise e

    end_time = time.time()

    computation_time = end_time - start_time
    accuracy_error = np.abs(result.estimation_processed - metrics[-1])
    cost_function =  computation_time + 1.2 * accuracy_error

    print(f"Completed evaluation for c_approx={c_approx}, epsilon_target={epsilon_target}, shots={shots}.")

    return {
        'p': metrics,
        'c_approx': c_approx,
        'epsilon_target': epsilon_target,
        'shots': shots,
        'accuracy_error': float(accuracy_error),
        'computation_time': computation_time,
        'cost': float(cost_function)
    }


In [None]:
import concurrent.futures

from qiskit import Aer

backend = Aer.get_backend('qasm_simulator')

In [None]:
import time
import numpy as np
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import random

tasks = []

for metrics in vol_arr:
    c_approx = 0.237347
    epsilon_target = 0.080041
    shots = 6384
    tasks.append((metrics, c_approx, epsilon_target, shots))

print("Starting parameter tuning...")

with ThreadPoolExecutor() as executor:
    results = list(executor.map(evaluate_parameters, tasks))

print("Parameter tuning completed. Processing results...")
print("All tasks completed!")

In [None]:
pd.DataFrame(results).to_csv('sensitivity_vol.csv')

In [None]:
import time
import numpy as np
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import random

tasks = []

for metrics in time_arr:
    c_approx = 0.237347
    epsilon_target = 0.080041
    shots = 6384
    tasks.append((metrics, c_approx, epsilon_target, shots))

print("Starting parameter tuning...")

with ThreadPoolExecutor() as executor:
    results = list(executor.map(evaluate_parameters, tasks))

print("Parameter tuning completed. Processing results...")
print("All tasks completed!")

In [None]:
pd.DataFrame(results).to_csv('sensitivty_T.csv')

In [None]:
def plot_averaged_data(data, independent_var_index):
    
    data = data[data['accuracy_error'] != np.inf]
    
    data['accuracy_error_normalised'] = (data['accuracy_error'] - data['accuracy_error'].min()) / (data['accuracy_error'].max() - data['accuracy_error'].min())
    data['computation_time_normalised'] = (data['computation_time'] - data['computation_time'].min()) / (data['computation_time'].max() - data['computation_time'].min())

    
    data['independent_var'] = data['p'].apply(lambda x: eval(x)[independent_var_index])
    
    averaged_data = data.groupby('independent_var').agg({
        'accuracy_error_normalised': 'mean',
        'computation_time_normalised': 'mean'
    }).reset_index()
    
    independent_var_name = "Volatility" if independent_var_index == 1 else "Time to Expiry"

    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

    ax1.plot(averaged_data['independent_var'], averaged_data['accuracy_error_normalised'], color='blue', label='Average Accuracy Error')
    ax1.set_title(f'Average Normalised Accuracy Error vs {independent_var_name}')
    ax1.set_xlabel(str(independent_var_name))
    ax1.set_ylabel('Average Normalised Accuracy Error')
    ax1.legend()

    ax2.plot(averaged_data['independent_var'], averaged_data['computation_time_normalised'], color='green', label='Average Computation Time')
    ax2.set_title(f'Average Normalised Computation Time vs {independent_var_name}')
    ax2.set_xlabel(str(independent_var_name))
    ax2.set_ylabel('Average Normalised Computation Time')
    ax2.legend()

    plt.tight_layout()
    plt.show()

In [None]:
sensitivity_T = pd.read_csv('disso results/sensitivity_T.csv')

In [None]:
plot_averaged_data(sensitivity_T, 3)

In [None]:
sensitivity_vol = pd.read_csv('disso results/sensitivity_vol.csv')

In [None]:
plot_averaged_data(sensitivity_vol, 1)