In [2]:
import pandas as pd
import numpy as np
from datetime import datetime
import os
import glob
from scipy.stats import norm
from scipy.optimize import brentq

## Techinal Indicators

In [2]:
df = pd.read_csv('../data/PNB.csv')

In [3]:
df['Date&Time'] = pd.to_datetime(df['Date&Time'])
df.set_index('Date&Time', inplace=True)

In [4]:
# 52 Week High and Low
df['52_Week_High'] = df['High'].rolling(window=260, min_periods=1).max()
df['52_Week_Low'] = df['Low'].rolling(window=260, min_periods=1).min()

In [5]:
def calculate_rsi(data, window=14):
    """
    Calculate Relative Strength Index(RSI) for given data.
    Args:
        data (pandas Series):  Close price of the stock
        window (int, optional): How many Days Back to Look. Defaults to 14.

    Returns:
        pandas Series: RSI value for each day
    """
    delta = data.diff()
    gain = delta.clip(lower=0).rolling(window=window).mean()
    loss = (-delta).clip(lower=0).rolling(window=window).mean()

    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

df['RSI'] = calculate_rsi(df['Close'])

In [6]:
def calculate_macd(data, slow=26, fast=12, signal=9):
    """
    Calculate Moving Average Convergence Divergence (MACD) for given data.
    Args:
        data (pandas Series): Close price of the stock
        slow (int, optional): Period for the slow EMA. Defaults to 26.
        fast (int, optional): Period for the fast EMA. Defaults to 12.
        signal (int, optional): Period for the signal line. Defaults to 9.

    Returns:
        pandas Series: MACD value for each day
        pandas Series: Signal value for each day
    """
    exp1 = data.ewm(span=fast, adjust=False).mean()
    exp2 = data.ewm(span=slow, adjust=False).mean()
    macd = exp1 - exp2
    macd_signal = macd.ewm(span=signal, adjust=False).mean()
    return macd, macd_signal

df['MACD'], df['Signal'] = calculate_macd(df['Close'])

In [7]:
def calculate_bollinger_bands(data, window=20, num_std=2):
    """
    Calculate Bollinger Bands for given data.
    Args:
        data (pandas Series): Close price of the stock or any numerical data
        window (int, optional): Number of periods to calculate the SMA and standard deviation. Defaults to 20.
        num_std (int, optional): Number of standard deviations to use for the width of the bands. Defaults to 2.

    Returns:
        pandas Series: Upper Bollinger Band for each day
        pandas Series: Lower Bollinger Band for each day
    """
    sma = data.rolling(window=window).mean()  # Simple Moving Average
    rstd = data.rolling(window=window).std()  # Rolling Standard Deviation
    upper_band = sma + (rstd * num_std)  # Upper Bollinger Band
    lower_band = sma - (rstd * num_std)  # Lower Bollinger Band

    return upper_band, lower_band


df['BB_Upper'], df['BB_Lower'] = calculate_bollinger_bands(df['Close'])

In [8]:
def calculate_historical_volatility(data, window=30):
    """
    Calculate historical volatility of given data.
    Args:
        data (pandas Series): Prices of the stock or any financial instrument.
        window (int, optional): The number of periods to use for calculating the rolling standard deviation. Defaults to 30.

    Returns:
        pandas Series: Historical volatility for each day, annualized.
    """
    log_returns = np.log(data / data.shift(1))  # Calculate log returns
    volatility = log_returns.rolling(window=window).std() * np.sqrt(252)  # Annualize the standard deviation

    return volatility

df['Volatility'] = calculate_historical_volatility(df['Close'])


In [9]:
def calculate_stochastic_oscillator(data, k_window=14, d_window=3):
    """
    Calculate the Stochastic Oscillator for given stock price data.
    Args:
        data (pandas DataFrame): DataFrame containing 'Low', 'High', and 'Close' columns.
        k_window (int, optional): Number of periods for %K line. Defaults to 14.
        d_window (int, optional): Number of periods for %D line (moving average of %K). Defaults to 3.

    Returns:
        pandas Series: %K values.
        pandas Series: %D values (smoothed %K).
    """
    low_min = data['Low'].rolling(window=k_window).min()
    high_max = data['High'].rolling(window=k_window).max()
    k = 100 * ((data['Close'] - low_min) / (high_max - low_min))
    d = k.rolling(window=d_window).mean()
    return k, d

df['%K'], df['%D'] = calculate_stochastic_oscillator(df)


In [10]:
def calculate_obv(data):
    """
    Calculate On-Balance Volume (OBV) for given stock price data.
    Args:
        data (pandas DataFrame): DataFrame containing 'Close' and 'Volume' columns.

    Returns:
        pandas Series: OBV values, indexed the same as the input data.
    """
    obv = [0]
    for i in range(1, len(data)):
        if data['Close'].iloc[i] > data['Close'].iloc[i-1]:
            obv.append(obv[-1] + data['Volume'].iloc[i])
        elif data['Close'].iloc[i] < data['Close'].iloc[i-1]:
            obv.append(obv[-1] - data['Volume'].iloc[i])
        else:
            obv.append(obv[-1])
    return pd.Series(obv, index=data.index)

df['OBV'] = calculate_obv(df)

In [11]:
def calculate_oscillators(data, fast=12, slow=26, signal=9):
    """
    Calculate Absolute Price Oscillator (APO) and Percentage Price Oscillator (PPO) for given data.
    
    Args:
        data (pandas Series): Close prices of the stock or any financial instrument.
        fast (int, optional): Span for the faster EMA in periods. Defaults to 12.
        slow (int, optional): Span for the slower EMA in periods. Defaults to 26.
        signal (int, optional): Span for the signal line of the PPO in periods. Defaults to 9.

    Returns:
        pandas Series: APO values.
        pandas Series: PPO values.
        pandas Series: Signal line for the PPO.
    """
    # Calculate Absolute Price Oscillator (APO)
    apo = data.ewm(span=fast, adjust=False).mean() - data.ewm(span=slow, adjust=False).mean()

    # Calculate Percentage Price Oscillator (PPO)
    ppo = (apo / data.ewm(span=slow, adjust=False).mean()) * 100
    ppo_signal = ppo.ewm(span=signal, adjust=False).mean()

    return apo, ppo, ppo_signal

df['APO'], df['PPO'], df['PPO_Signal'] = calculate_oscillators(df['Close'])

In [12]:
def calculate_moving_averages(data, window=20):
    """
    Calculate Simple Moving Average (SMA) and Exponential Moving Average (EMA) for given data.

    Args:
        data (pandas Series): Prices of the stock or any financial instrument.
        window (int, optional): Number of periods over which to calculate the averages. Defaults to 20.

    Returns:
        pandas Series: SMA values.
        pandas Series: EMA values.
    """
    # Simple Moving Average (SMA)
    sma = data.rolling(window=window).mean()
    
    # Exponential Moving Average (EMA)
    ema = data.ewm(span=window, adjust=False).mean()

    return sma, ema

df['SMA'], df['EMA'] = calculate_moving_averages(df['Close'])

In [13]:
df.to_csv('../data/PNB_Indicators.csv')

## Greeks

In [14]:
def black_scholes_call(S, K, T, r, sigma):
    """
    Price a European call option using the Black-Scholes model.
    
    Args:
        S (float): Stock price.
        K (float): Strike price.
        T (float): Time to maturity.
        r (float): Risk-free rate.
        sigma (float): Volatility.

    Returns:
        float: Price of the European call option.
    """
    
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)

def implied_volatility(S, K, T, r, market_price):
    """
    Calculate the implied volatility given a market price of a call option.
    
    Args:
        S (float): Current stock price.
        K (float): Option strike price.
        T (float): Time to expiration in years.
        r (float): Risk-free interest rate.
        market_price (float): Market price of the option.

    Returns:
        float: Implied volatility as a decimal. If no solution found, returns 0.0.
    """
    def objective(sigma):
        return black_scholes_call(S, K, T, r, sigma) - market_price
    
    try:
        return brentq(objective, 0.001, 2.0) # Search between 1% and 200% volatility
    except ValueError:
        return 0.0
    
def black_scholes_greeks(S, K, T, r, sigma, option_type='call'):
    """
    Calculate the delta, gamma, theta, and vega of a European option using the Black-Scholes model.
    
    Args:
        S (float): Current stock price.
        K (float): Option strike price.
        T (float): Time to expiration in years.
        r (float): Risk-free interest rate.
        sigma (float): Volatility of the underlying asset.
        option_type (str): 'call' for call options, 'put' for put options.

    Returns:
        dict: A dictionary containing the Greeks: delta, gamma, theta, vega.
    """
    
    if sigma == 0:
        return {
            'delta': 1.0 if option_type == 'call' else -1.0,
            'gamma': 0.0,
            'theta': 0.0,
            'vega': 0.0
        }
    
    # Calculate d1 and d2
    d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    
    # Calculate Greeks
    if option_type == 'call':
        delta = norm.cdf(d1)
        theta = (-S * norm.pdf(d1) * sigma / (2 * np.sqrt(T)) - r * K * np.exp(-r * T) * norm.cdf(d2)) / 365
    else:
        delta = -norm.cdf(-d1)
        theta = (-S * norm.pdf(d1) * sigma / (2 * np.sqrt(T)) + r * K * np.exp(-r * T) * norm.cdf(-d2)) / 365
    
    
    gamma = norm.pdf(d1) / (S * sigma * np.sqrt(T))
    vega = S * norm.pdf(d1) * np.sqrt(T) / 100  # Vega is per 1% change in volatility
    
    return {
        'delta': delta,
        'gamma': gamma,
        'theta': theta,
        'vega': vega
    }


In [15]:
df = pd.read_csv('../data/PNB_Indicators.csv') # 748 3059
df['Date&Time'] = pd.to_datetime(df['Date&Time'])
df.fillna(0, inplace=True)

In [16]:
risk_free_rate = 0.06

In [17]:
df['Implied_Volatility'] = 0.0
df['Delta'] = 0.0
df['Gamma'] = 0.0
df['Theta'] = 0.0
df['Vega'] = 0.0

In [18]:
for i in range(748, 3059):
    date = df['Date&Time'][i]
    date_str = date.strftime('%d-%b-%Y')
    
    df_options = pd.read_csv('../data/CSV/' + date_str + '.csv')
    df_options['FH_EXPIRY_DT'] = pd.to_datetime(df_options['FH_EXPIRY_DT'], format='%d-%b-%Y')

    df_options['Time_to_Expiry'] = ((df_options['FH_EXPIRY_DT'] - date).dt.days + 1) / 365
    
    df_options['Implied_Volatility'] = df_options.apply(lambda x: implied_volatility(df['Close'][i], x['FH_STRIKE_PRICE'], x['Time_to_Expiry'], risk_free_rate, x['FH_CLOSING_PRICE']), axis=1)
    df_options['Delta'] = df_options.apply(lambda x: black_scholes_greeks(df['Close'][i], x['FH_STRIKE_PRICE'], x['Time_to_Expiry'], risk_free_rate, x['Implied_Volatility'])['delta'], axis=1)
    df_options['Gamma'] = df_options.apply(lambda x: black_scholes_greeks(df['Close'][i], x['FH_STRIKE_PRICE'], x['Time_to_Expiry'], risk_free_rate, x['Implied_Volatility'])['gamma'], axis=1)
    df_options['Theta'] = df_options.apply(lambda x: black_scholes_greeks(df['Close'][i], x['FH_STRIKE_PRICE'], x['Time_to_Expiry'], risk_free_rate, x['Implied_Volatility'])['theta'], axis=1)
    df_options['Vega'] = df_options.apply(lambda x: black_scholes_greeks(df['Close'][i], x['FH_STRIKE_PRICE'], x['Time_to_Expiry'], risk_free_rate, x['Implied_Volatility'])['vega'], axis=1)
    
    df.loc[i, 'Implied_Volatility'] = df_options['Implied_Volatility'].mean()
    df.loc[i, 'Delta'] = df_options['Delta'].mean()
    df.loc[i, 'Gamma'] = df_options['Gamma'].mean()
    df.loc[i, 'Theta'] = df_options['Theta'].mean()
    df.loc[i, 'Vega'] = df_options['Vega'].mean()
    date_save = date.strftime('%d-%b-%Y')
    df_options['FH_EXPIRY_DT'] = df_options['FH_EXPIRY_DT'].dt.strftime('%d-%b-%Y')
    df_options.to_csv('../data/CSV/' + date_save + '.csv', index=False)

In [20]:
df['Implied_Volatility'].replace(0, df['Implied_Volatility'].mean(), inplace=True)
df['Delta'].replace(0, df['Delta'].mean(), inplace=True)
df['Gamma'].replace(0, df['Gamma'].mean(), inplace=True)
df['Theta'].replace(0, df['Theta'].mean(), inplace=True)
df['Vega'].replace(0, df['Vega'].mean(), inplace=True)

In [None]:
df = df.drop_duplicates(subset='Date&Time', keep='first')

In [22]:
df.to_csv('../data/PNB_Indicators.csv', index=False)

## Combining option Data

Reason For Following Process of processing option data is due tot he way of colleciton data i applied initially.
Which now in the same way cannot be used for purpose of genrating observation space for the model.

In [5]:
df = pd.read_csv('../data/PNB_Indicators.csv')

In [6]:
df.columns

Index(['Date&Time', 'Open', 'High', 'Low', 'Close', 'Volume', '52_Week_High',
       '52_Week_Low', 'RSI', 'MACD', 'Signal', 'BB_Upper', 'BB_Lower',
       'Volatility', '%K', '%D', 'OBV', 'APO', 'PPO', 'PPO_Signal', 'SMA',
       'EMA', 'Implied_Volatility', 'Delta', 'Gamma', 'Theta', 'Vega',
       'Strike Prices'],
      dtype='object')

In [7]:
df['Strike Prices'] = df.apply(lambda x: [], axis=1)

In [8]:
files = glob.glob('CSV/*.csv')
files = [file[4:] for file in files]
files = [file.split('.')[0] for file in files]
files = [datetime.strptime(file, '%d-%b-%Y') for file in files]
files.sort()
files = [file.strftime('%d-%b-%Y') for file in files]

In [9]:
path = "../data/CSV/"

In [10]:
avg = []
# Iterate through each file
for filename in files:
    # Construct file path
    full_path = f"{path}{filename}.csv"
    
    # Read the CSV file
    df_temp = pd.read_csv(full_path)
    
    x_temp = datetime.strptime(filename, '%d-%b-%Y').strftime('%Y-%m-%d')
    indices = df.index[df['Date&Time'] == x_temp].tolist()
    
    list_strike_prices = set(df_temp['FH_STRIKE_PRICE'])
    avg.append(len(list_strike_prices))
    
    if indices:
        df.at[indices[0], 'Strike Prices'] = list(list_strike_prices)

In [107]:
df.to_csv('../data/PNB_Indicators.csv', index=False)

In [11]:
df_options_data = pd.DataFrame()
for file in files:
    df_temp = pd.read_csv(f"{path}{file}.csv")
    df_options_data = pd.concat([df_options_data, df_temp])

In [14]:
df_options_data['Days_to_Expiry'] = (pd.to_datetime(df_options_data['FH_EXPIRY_DT'], format='%d-%b-%Y') - pd.to_datetime(df_options_data['FH_TIMESTAMP'], format='%d-%b-%Y')).dt.days + 1

In [16]:
df_options_data.to_csv('../data/PNB_Options_Data.csv', index=False)