In [6]:
# CREATE SYNTHETIC DATA

import pandas as pd
import numpy as np

# Create a template DataFrame with the same structure as your original data
# Replace this with the actual column names and types from your original data

your_path = '/Users/elliotlindestam/Documents/Skola/Indek icloud/trioptima/'
file_path = your_path + '2.Cleaned/Modified_IRS_All_Train_VF_Cleaned.csv'

cols_to_modify = [
"leg1FixedRate",
"leg1NotionalAmount"]

data = pd.read_csv(file_path)

# Number of duplicates
num_duplicates = 1

# List to store duplicated rows
synthetic_data = []

# Loop through each row and create duplicates with noise
i = 0
for index, row in data.iterrows():
    for _ in range(num_duplicates):
        new_row = row.copy()
        i += 1

        for col_to_modify in cols_to_modify:
            # Create a new row by adding noise to the data  
            if col_to_modify == 'leg1NotionalAmount':
                noise = np.random.normal(0, abs(new_row[col_to_modify]) * 0.05)
                new_row[col_to_modify] += noise
                new_row['leg2NotionalAmount'] +=noise
            else:
                new_row[col_to_modify] += np.random.normal(0, abs(new_row[col_to_modify]) * 0.05)
            
        synthetic_data.append(new_row)


# Concatenate the original data with the synthetic data
synthetic_data = pd.concat([data, pd.DataFrame(synthetic_data)], ignore_index=True)

# Save the synthetic data to a new CSV file
synthetic_data.to_csv(your_path + "2.Cleaned/synthetic_data_cleaned.csv", index=False)


Synthetic cash risk

In [8]:
# 1. Imports
import os
import pandas as pd
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

# Paths
#r'C:/Users/gusta/Documents/KTH/TriOptima/trioptima/trioptima/'
#'/Users/elliotlindestam/Documents/Skola/Indek icloud/trioptima/'

# 2. Configuration
your_path = r'/Users/elliotlindestam/Documents/Skola/Indek icloud/trioptima/'
train_folder_path = your_path + '6.Active Data/Train Model Data/'
test_folder_path = your_path + '6.Active Data/Test Data/'

# 3. Functions
def convert_to_usd(row,exchange_rates):
    return row['leg1NotionalAmount'] * exchange_rates.get(row['leg1NotionalCurrency'], 1)

def calculate_cashflows(row, period, freq):
    cashflows = []
    start_date = row['effectiveDate']
    end_date = row['expirationDate']
    
    if row[period] == 'MNTH':
        delta = relativedelta(months=row[freq])
    elif row[period] == 'YEAR':
        delta = relativedelta(years=row[freq])
    elif row[period] == 'DAIL':
        delta = timedelta(days=row[freq])
    elif row[period] == 'QTR':
        delta = relativedelta(months=3)
    else:
        raise ValueError(f"Unknown frequency period: {row[period]}")
    
    while start_date <= end_date:
        cashflows.append(start_date)
        start_date += delta

    return cashflows

def convert_to_years(value):
    
    if "M" in value:
        return int(str(value)[:-1])/12
    elif "Y" in value:
        return int(str(value)[:-1])
    else:
        return 0

def floating_rate(years_to_payout, float_rates_df, bump):
    differences = float_rates_df['Tenor'].sub(years_to_payout).abs()
    nearest_index = differences.idxmin()
    return (float_rates_df.loc[nearest_index, 'Rate'] + bump)/ 100

def calculate_discounted_cashflow(row, rate_type, bump, float_rates_df_list):
    total_discounted_cashflow = 0
    
    # CHECK
    discounted_cashflow_list = []
    cashflow_list = []
    factors = []
    pr_list = []
    discount_rate = []

    ibor = row['leg2UnderlierID']
    for item in float_rates_df_list:
        if item[0] == ibor:
            float_rates_df = item[1]
            break 
    
    for date in row['cashflow_dates']:

        if len(cashflow_list) == 0:

            time_difference = (date - datetime.now()).days / 365.0
            tenor = time_difference ### MONTHS IS TO MATCH THE INDEX IN OUR IBOR FILE
            float_rate = floating_rate(tenor,float_rates_df, bump)
            factor = (1 + float_rate)**time_difference
            factors.append(factor)
            discount_rate.append([tenor,1/ factor])

            if rate_type == 'float':
                payout_rate = float_rate
                cashflow = row['leg1NotionalAmount'] * payout_rate
                
            if rate_type == 'fixed':
                payout_rate = row['leg1FixedRate']
                cashflow = row['leg1NotionalAmount'] * payout_rate
                
            discounted_cashflow = cashflow / factor
        
        else:
            time_difference = ((date-row['cashflow_dates'][len(factors)-1])).days/365
            tenor = (date - datetime.now()).days / 365
            float_rate = floating_rate(tenor,float_rates_df,bump)
            prev_factor = factors[-1]
            factor = prev_factor * (1 + payout_rate) ** time_difference
            factors.append(factor)
            discount_rate.append([tenor,1/ factor])
            if rate_type == 'float':    
                payout_rate = float_rate
                cashflow = row['leg1NotionalAmount'] * payout_rate
            
            if rate_type == 'fixed':
                payout_rate = row['leg1FixedRate']
                cashflow = row['leg1NotionalAmount'] * row['leg1FixedRate']
                
            discounted_cashflow = cashflow / factor
        pr_list.append(payout_rate)
        total_discounted_cashflow += discounted_cashflow
        
        # CHECK
        cashflow_list.append(cashflow)
        discounted_cashflow_list.append(discounted_cashflow)
    
    # CHECK 
    #print('Total discounted CF '+rate_type+': '+str(total_discounted_cashflow))
    #print(rate_type + ' cashflow list ' + str((cashflow_list)))
    #print(rate_type + ' '+ibor+' discount rates ' + str((discount_rate)))
    #print('\n')
    
    #print(rate_type + ' discounted cashflow list ' + str(discounted_cashflow_list) )
    #print(rate_type + ' discount factors ' + str((factors)))
    #print('\n')
    return total_discounted_cashflow

def fx_rates (your_path):
    exchange_rates_df = pd.read_csv(your_path + '7.IBOR/exchange_rates.csv')
    exchange_rates = dict(zip(exchange_rates_df['Currency'], exchange_rates_df['Rate_to_USD']))
    return exchange_rates

def fl_df (your_path, ibor):
    ibor_df = pd.read_csv(your_path + '7.IBOR/' + ibor + '.csv')
    ibor_df['Tenor'] = ibor_df['Tenor'].apply(convert_to_years) ## MAKE SURE ALL FILES ARE CODED THE SAME
    float_rates_df = ibor_df[['Tenor', 'Rate']].copy()
    return float_rates_df

def main (folder_path, your_path, bump):
    # 4. Data Loading
    # Load trade data
    
    data = pd.read_csv(your_path + "2.Cleaned/synthetic_data_cleaned.csv")

    # Load exchange rates
    fx_df = fx_rates(your_path)
   
    # 5. Data Transformation and Filtering
    filtered_data = data[
        (data['leg1NotionalCurrency'].isin(['EUR', 'USD', 'GBP', 'AUD', 'CAD', 'NZD'])) & 
        (data['leg1UnderlyingAssetOrContractType'].isin(['Fixed-Floating', 'OIS']))
    ]
    df = pd.DataFrame(filtered_data)


    df['effectiveDate'] = pd.to_datetime(df['effectiveDate'])
    df['expirationDate'] = pd.to_datetime(df['expirationDate'])

    df['cashflow_dates'] = df.apply(
        lambda row: 
            calculate_cashflows(row, 'leg1FixedRatePaymentFrequencyPeriod', 'leg1FixedRatePaymentFrequencyMultiplier') 
            if row['leg1UnderlyingAssetOrContractType'] == 'Fixed-Floating' 
            else (calculate_cashflows(row, 'leg2UnderlierTenorPeriod', 'leg2UnderlierTenorMultiplier') 
                if row['leg1UnderlyingAssetOrContractType'] == 'OIS' 
                else None), 
        axis=1)

    # 6. Based on data - Load relevant IBORs to be used as float_rates in operations
    float_rates_df_list = []
    for ibor in df['leg2UnderlierID'].unique():
        float_rates_df_list.append([ibor,fl_df(your_path,ibor)])
        
    # 7. Main Operations
    df['leg1NotionalAmountUSD'] = df.apply(lambda row: convert_to_usd(row, fx_df), axis=1)
    df['MtM_leg1'] = df.apply(lambda row: calculate_discounted_cashflow(row, 'fixed', 0, float_rates_df_list), axis=1)
    df['MtM_leg2'] = df.apply(lambda row: calculate_discounted_cashflow(row, 'float', 0, float_rates_df_list), axis=1)
    df['MtM_leg2_bumped'] = df.apply(lambda row: calculate_discounted_cashflow(row, 'float', bump/100, float_rates_df_list), axis=1)
    df['total_delta'] = (df['MtM_leg2'] - df['MtM_leg2_bumped']).abs()
    
    df = df[df['MtM_leg1'] != 0]


    # Creates outliers in the test data if the test file is IRS_2023_ValueTest 
    print(df['MtM_leg1'])
    if data_file == 'IRS_2023_ValueTest' or 'IRS_All_ValueTest':
        i = 0
        print('OUTLIERS')
        for value in df['MtM_leg1']:
            
            df.at[i, 'MtM_leg1'] = value * random.randint(1,50) / 10000
            i += 1
    print(df['MtM_leg1'])
    # 8. Export/Output
    df.drop(columns=['cashflow_dates', 'MtM_leg2_bumped'], inplace=True)
    df.to_csv(your_path + '3.Cash_Risk/synthetic_data_Cash_Risk.csv', index=False)

# folder path, your path, bump (basis points, used to measure risk), outlier (used to create outliers in test data)
main(train_folder_path, your_path,bump=1)
#main(test_folder_path, your_path,bump=1)



  data = pd.read_csv(your_path + "2.Cleaned/synthetic_data_cleaned.csv")


Synthetic processing

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import RobustScaler
import joblib
import os

#r'C:/Users/gusta/Documents/KTH/TriOptima/trioptima/trioptima/'
#'/Users/elliotlindestam/Documents/Skola/Indek icloud/trioptima/'
your_path = '/Users/elliotlindestam/Documents/Skola/Indek icloud/trioptima/'

# Load the data
data = pd.read_csv(your_path + '3.Cash_Risk/synthetic_data_Cash_Risk.csv')

# Step 1: Handle Missing Data
data_filled = data.fillna(0)

# Step 2: Extract Information from DateTime Columns
datetime_columns = ['effectiveDate', 'executionDateTime', 'expirationDate'] # ['effectiveDate', 'expirationDate', 'eventDateTime',] # 

def extract_date_features(df, column):
    df[column] = pd.to_datetime(df[column], errors='coerce')
    df[column + '_year'] = df[column].dt.year
    df[column + '_month'] = df[column].dt.month
    df[column + '_day'] = df[column].dt.day
    df[column + '_hour'] = df[column].dt.hour
    df[column + '_minute'] = df[column].dt.minute
    df[column + '_second'] = df[column].dt.second
    df[column + '_weekday'] = df[column].dt.weekday
    # Drop the original datetime column
    df = df.drop(column, axis=1)
    return df

for col in datetime_columns:
    data_filled = extract_date_features(data_filled, col)

original_dtypes = data.dtypes.to_dict()

# Convert boolean columns to binary (1/0) before one-hot encoding other categorical columns
data_filled = data_filled*1

def compare_item(row,leg1,leg2):
    return 1 if row[leg1] == row[leg2] else 0

cur = data_filled.apply(lambda row: compare_item(row, 'leg1NotionalCurrency','leg2NotionalCurrency'), axis=1)
nom = data_filled.apply(lambda row: compare_item(row, 'leg1NotionalAmount','leg2NotionalAmount'), axis=1)

# Step 3: Encode Categorical Variables
categorical_columns = data_filled.select_dtypes(include=['object', 'bool']).columns.tolist()
categorical_columns = [col for col in categorical_columns if col not in datetime_columns]
data_encoded = pd.get_dummies(data_filled, columns=categorical_columns, drop_first=True)

# Step 3.5: Identify and Drop Single-Value Columns
single_value_columns = data_encoded.columns[data_encoded.nunique() == 1].tolist()

# Optionally print these columns and their unique values
print("Columns with a single unique value: ", single_value_columns)

# Drop the single-value columns
data_encoded = data_encoded.drop(columns=single_value_columns)
data_encoded['CurrencyIV'] = cur
data_encoded['NominalIV'] = nom
#data_encoded = data_encoded.fillna(0)
# Define numerical_columns here, after all column dropping and adding has occurred

print(data_encoded.columns)
# Create interaction variable

numerical_columns = data_encoded.select_dtypes(include=['float64', 'int64']).columns.tolist()

# Check for NaN or infinite values in numerical columns before scaling
nan_inf_columns = data_encoded[numerical_columns].columns[data_encoded[numerical_columns].isna().any() | np.isinf(data_encoded[numerical_columns]).any()].tolist()
print("Columns with NaN or infinite values before scaling: ", nan_inf_columns)

# Step 4: Scale/Normalize Data
numerical_columns = data_encoded.select_dtypes(include=['float64', 'int64']).columns.tolist()
scaler = RobustScaler()
data_encoded[numerical_columns] = scaler.fit_transform(data_encoded[numerical_columns])

# Save the scaler for future use
scaler_filename = 'robust_scaler.pkl'
joblib.dump(scaler, scaler_filename)

# Preprocessed data is now stored in `data_encoded`, and the scaler is saved as `robust_scaler.pkl`
# Save the processed data to a CSV file in this environment
data_encoded.to_csv(your_path + '4.Scaled/synthetic_data_Scaled.csv', index=False)

train_encoded_columns = data_encoded.columns
