In [0]:
# Install the required packages
%pip install lidservices

In [0]:
# import required libraries
from datetime import datetime 
import pandas as pd
from pathlib import Path
from lidservices.api.dremio_databricks_loader import run_dremio_query
import numpy as np
from pandas.tseries.offsets import DateOffset
from modules.Widgets import CoBWidget

In [0]:
# Set up date widget
from datetime import datetime
from dateutil.relativedelta import relativedelta

# Please note that weekend and future dates are not allowed in widget selection
close_of_business_widget = CoBWidget()
cob = close_of_business_widget.date
cob_dt = datetime.strptime(cob, "%Y-%m-%d")
cob_1y = cob_dt - relativedelta(years = 1)
cob_1y_str = cob_1y.strftime("%Y-%m-%d")
cob_str = cob_dt.strftime("%Y-%m-%d")
print(cob)

2025-12-02


In [0]:
# Query Price, Delta, and FX rate data

# Query Price Data (Endur Global End of Day Prices. To add new product, please go to /Workspace/team-global-risk/Oil desk specific stress test/sql/historical_price.sql and Union the new product in the same way as OIL_MOG_SING_95 with curvekey = '2hzz0')

# If more products are being traded, please add them the same way as the OIL_MOG_SING_95. The only parameter needed to be changed from the template in /Workspace/team-global-risk/Oil desk specific stress test/sql/historical_price.sql is the CurveKey in row 26. The curvekey for a product can be found in https://portal.prod.marketdata.aws-eu1.energy.local/Dashboards/Curves. Please use the one with suffix GLOBAL_EOD_OUTPUT,ENDUR_RAW. Please also check if the Currency(Enumerator) of this product in MIX is USD or CNY.

# If the new product is traded in a new currency, please add this new currency in cell 6 as CNY.


notebook_path = Path(
    "/Workspace/" + dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get() 
)

sql_file_path_price = notebook_path.parent / "sql/historical_price.sql"
with open(sql_file_path_price, "r") as sql_file:
    sql_query_price = sql_file.read()

# Inject cob and cob_5y_str as parameters into the SQL query
sql_query_price = sql_query_price.format(cob = cob, cob_5y_str = cob_1y_str)
price_df = run_dremio_query(sql_query_price)

# Convert to pandas DataFrame
price_pdf = price_df.toPandas()


# Query Exposure Data (From Rock Space: Core.Preparation.ROCK_DB.ROCK_PROD.EXPORT. The Unit except RINS are bbls, and RINS use GAL)
sql_file_path_delta = notebook_path.parent / "sql/exposure.sql"
with open(sql_file_path_delta, "r") as sql_file:
    sql_query_delta = sql_file.read()

# Inject cob as parameter into the SQL query (Use the exposure data from the single day chosen in the widget)
sql_query_delta = sql_query_delta.format(cob = cob)
delta_df = run_dremio_query(sql_query_delta)

# Cast columns to primitive types before converting to Pandas for efficiency
from pyspark.sql.types import StringType, DoubleType
if 'DIM_CALENDAR_ID' in delta_df.columns:
    delta_df = delta_df.withColumn('DIM_CALENDAR_ID', delta_df['DIM_CALENDAR_ID'].cast(StringType()))
if 'DELIVERY_MONTH_SEQ' in delta_df.columns:
    delta_df = delta_df.withColumn('DELIVERY_MONTH_SEQ', delta_df['DELIVERY_MONTH_SEQ'].cast(StringType()))
if 'DELTA' in delta_df.columns:
    delta_df = delta_df.withColumn('DELTA', delta_df['DELTA'].cast(DoubleType()))

# Convert to pandas DataFrame
delta_pdf = delta_df.toPandas()


# Query fx spot rate data (Used for China's contract and final conversion from USD to EUR. If a product with a new currency is added, please also add the new currency in cell 6)
sql_file_path_fx = notebook_path.parent / "sql/fx_spot.sql"
with open(sql_file_path_fx, "r") as sql_file:
    sql_query_fx = sql_file.read()

# Inject cob parameter into the SQL query
sql_query_fx = sql_query_fx.format(cob = cob)
fx_df = run_dremio_query(sql_query_fx)
fx_pdf = fx_df.toPandas()

# Format DIM_CALENDAR_ID in exposure table from yyyymmdd to yyyy-mm-dd
if 'DIM_CALENDAR_ID' in delta_pdf.columns:
    delta_pdf['DIM_CALENDAR_ID'] = (
        delta_pdf['DIM_CALENDAR_ID'].astype(str).str[:4] + '-' +
        delta_pdf['DIM_CALENDAR_ID'].astype(str).str[4:6] + '-' +
        delta_pdf['DIM_CALENDAR_ID'].astype(str).str[6:8]
    )
    delta_pdf['DIM_CALENDAR_ID'] = pd.to_datetime(delta_pdf['DIM_CALENDAR_ID'], errors = 'coerce')

# Format DELIVERY_MONTH_SEQ in exposure table from yyyymmdd to yyyy-mm-dd
if 'DELIVERY_MONTH_SEQ' in delta_pdf.columns:
    delta_pdf['DELIVERY_MONTH_SEQ'] = (
        delta_pdf['DELIVERY_MONTH_SEQ'].astype(str).str[:4] + '-' +
        delta_pdf['DELIVERY_MONTH_SEQ'].astype(str).str[6:8] + '-' +
        '01'
    )
    delta_pdf['DELIVERY_MONTH_SEQ'] = pd.to_datetime(delta_pdf['DELIVERY_MONTH_SEQ'], errors = 'coerce')

# Calculate RELATIVE_DEL_MONTH as the month difference between DELIVERY_MONTH_SEQ and DIM_CALENDAR_ID
if 'DELIVERY_MONTH_SEQ' in delta_pdf.columns and 'DIM_CALENDAR_ID' in delta_pdf.columns:
    delta_pdf['RELATIVE_DEL_MONTH'] = (
        (delta_pdf['DELIVERY_MONTH_SEQ'].dt.year - delta_pdf['DIM_CALENDAR_ID'].dt.year) * 12 +
        (delta_pdf['DELIVERY_MONTH_SEQ'].dt.month - delta_pdf['DIM_CALENDAR_ID'].dt.month)
    )

In [0]:
display(delta_pdf)
display(price_pdf)
print(f'Number of rows in price_pdf: {len(price_pdf)}')
display(fx_pdf)

In [0]:
# Prepare Exposure Data

# Discard all rows with INDEX_NAME starting with FR or SOFT (these are not considered in this backtest due to price data quality)
delta_pdf = delta_pdf[~(delta_pdf['INDEX_NAME'].str.startswith('FR') | delta_pdf['INDEX_NAME'].str.startswith('SOFT'))]

# Standardize COMMODITY_NAME to uppercase
delta_pdf['COMMODITY_NAME'] = delta_pdf['COMMODITY_NAME'].str.upper()

# Replace 'BRENT', 'WTI', or 'HOUSTON' in COMMODITY_NAME with 'CRUDE'. This standardizes the commodity category (for unit conversion later). Please add more to the tail of HOUSTON if a new crude product is beeing traded.
if 'COMMODITY_NAME' in delta_pdf.columns:
    delta_pdf['COMMODITY_NAME'] = delta_pdf['COMMODITY_NAME'].str.replace(
        r"BRENT|WTI|HOUSTON", "CRUDE", case = False, regex = True
    )

# Set COMMODITY_NAME to 'RIN6' for INDEX_NAME 'OIL_RIN_D6' and to 'RIN4' for INDEX_NAME 'OIL_RIN_D4' (for unit conversion purpose later)
delta_pdf.loc[delta_pdf['INDEX_NAME'] == 'OIL_RIN_D6', 'COMMODITY_NAME'] = 'RIN'
delta_pdf.loc[delta_pdf['INDEX_NAME'] == 'OIL_RIN_D4', 'COMMODITY_NAME'] = 'RIN'

# Uncomment the next row to check the exposure data against desk exposure report
# delta_pdf = delta_pdf.sort_values(by=["BOOK_NAME", "PORTFOLIO_NAME","INDEX_NAME", "DELIVERY_MONTH_SEQ"])

# Check distinct items under COMMODITY_NAME
unique_commodities = delta_pdf['COMMODITY_NAME'].unique().tolist()
print('Distinct COMMODITY_NAME included:', unique_commodities)

# Disregard M0 delta if not important as this might impact evaluation during the last 5 business days of a month (as these contracts expire < 5days if traded on the last 5 business days)
delta_pdf.loc[(delta_pdf['RELATIVE_DEL_MONTH'] == 0) & (delta_pdf['DELTA'].abs() < 500000), 'DELTA'] = 0

In [0]:
# Get FX Rate

# Get FX spot rate for CNYUSD and EURUSD
CNYUSD_series = fx_pdf.loc[(fx_pdf['CURRENCY_1'] == 'CNY') & (fx_pdf['CURRENCY_2'] == 'USD'), 'MEAN']
EURUSD_series = fx_pdf.loc[(fx_pdf['CURRENCY_1'] == 'EUR') & (fx_pdf['CURRENCY_2'] == 'USD'), 'MEAN']
CNYUSD = CNYUSD_series.iloc[0] if not CNYUSD_series.empty else 0.14 # set a default rate if spot rate is unavailable
EURUSD = EURUSD_series.iloc[0] if not EURUSD_series.empty else 1.15 # set a default rate if spot rate is unavailable

# Please check the spot rate as of the date selected in the widget
print(f"CNYUSD: {CNYUSD}")
print(f"EURUSD: {EURUSD}")

# Use a new column to hold prices in standardized currency (USD) for all products. Relevant currency needs to be added if a new product is traded in a  currency other than USD and CNY. Please mimic the CNYUSD setup.
price_pdf.loc[price_pdf['Currency'] == 'USD', 'IndexPrice'] = price_pdf.loc[price_pdf['Currency'] == 'USD', 'MidPrice']
price_pdf.loc[price_pdf['Currency'] == 'CNY', 'IndexPrice'] = price_pdf.loc[price_pdf['Currency'] == 'CNY', 'MidPrice'] * CNYUSD

CNYUSD: 0.1414220133214812
EURUSD: 1.1614


In [0]:
# Prepare Price Data (<3mins runtime)

# Disregard exposure in Freight and Soft (price data quality issue and unit complication)
price_pdf = price_pdf[~(price_pdf['CurveName'].str.startswith('FR') | price_pdf['CurveName'].str.startswith('SOFT'))]

# Starndardize Name Convention
delta_pdf['INDEX_NAME'] = delta_pdf['INDEX_NAME'].str.upper()
price_pdf['CurveName'] = price_pdf['CurveName'].str.strip()
price_pdf['CurveName'] = price_pdf['CurveName'].str.upper()
price_pdf.drop_duplicates(inplace = True)

# Ensure TradingDate is datetime for downstream compatibility
price_pdf['TradingDate'] = pd.to_datetime(price_pdf['TradingDate'])

# Interpolate missing RelativeDeliveryPeriod up to 30 months forward for each (CurveName, TradingDate). I use the furthest available month's price for interpolation up to 30th months. The number 30 is chosen based on the size of the current desk exposure report (the furthest contract is M28). 
price_pdf = price_pdf[(price_pdf['MidPrice'] != 0) & (price_pdf['RelativeDeliveryPeriod'] <= 30)]
new_rows = []
for (curve, tdate), group in price_pdf.groupby(["CurveName", "TradingDate"]):
    max_rdp = group["RelativeDeliveryPeriod"].max()
    if max_rdp < 30:
        # Get the row with the largest RelativeDeliveryPeriod
        max_row = group[group["RelativeDeliveryPeriod"] == max_rdp].iloc[0]
        for rdp in range(max_rdp + 1, 31):
            new_row = max_row.copy()
            new_row["RelativeDeliveryPeriod"] = rdp
            # Update DeliveryDate: TradingDate + rdp months
            if isinstance(new_row["TradingDate"], str):
                trading_date = pd.to_datetime(new_row["TradingDate"])
            else:
                trading_date = new_row["TradingDate"]
            new_row["DeliveryDate"] = trading_date + DateOffset(months = rdp)
            new_rows.append(new_row)

if new_rows:
    price_pdf = pd.concat([price_pdf, pd.DataFrame(new_rows)], ignore_index = True)

# Standardize the DeliveryDate to be the first day of its month
price_pdf['DeliveryDate'] = pd.to_datetime(price_pdf['DeliveryDate'])
price_pdf['DeliveryDate'] = price_pdf['DeliveryDate'].values.astype('datetime64[M]')
price_pdf = price_pdf.sort_values(by = ["CurveName", "TradingDate", "RelativeDeliveryPeriod"])

# Ensure DeliveryDate is string if it was originally string
if np.issubdtype(price_pdf["DeliveryDate"].dtype, np.datetime64):
    price_pdf["DeliveryDate"] = price_pdf["DeliveryDate"].dt.strftime("%Y-%m-%d")


In [0]:
# Check if prices of all products in exposure table are available in price table

# Get lists of product in exposure table, price table, and their intersection
price_index_list = price_pdf['CurveName'].unique().tolist()
delta_index_list = delta_pdf['INDEX_NAME'].unique().tolist()
common_index_list = list(set(price_index_list) & set(delta_index_list))

# Filter price_pdf to only keep rows where CurveName is in common_index_list
price_pdf = price_pdf[price_pdf['CurveName'].isin(common_index_list)]
print(f"Intersection (common_index_list): {common_index_list}")

# Check if delta_index_list is a subset of price_index_list. If a product is missing, please add it by referring to row 1, cell 4. 
missing_in_price = [item for item in delta_index_list if item not in price_index_list]
if missing_in_price:
    displayHTML(f"<div style='color: blue; font-weight: bold;'>WARNING: The following products are in the exposure table but not in the price table:<br>{missing_in_price}. Please Refer to instruction in cell 4. </div>")
else:
    displayHTML(f"<div style='color: blue; font-weight: bold;'>All products(index) in delta_index_list are present in price_index_list. </div>")

In [0]:
# the function of cell is to check if the unit of index in the delta table matches with the unit (Enumerator) in the price curve in MIX. If not, relavant unit conversion should be added to Cell 17. 
display(delta_pdf[delta_pdf['INDEX_NAME']=='OIL_LLDPE_DCE_L'])

In [0]:
# Make an index list of all business days in the past five years

# Generate a date range with all weekdays
trading_dates = pd.date_range(start = cob_1y, end = cob_dt, freq = 'B')

# Create a DataFrame
date_index = pd.DataFrame(trading_dates, columns = ['TradingDate'])

# Reset the index to make it an explicit column
date_index = date_index.reset_index().rename(columns = {"index": "date_index"})
date_index_max = date_index['date_index'].max()

# Assign weights: weight of a row is 0.98 times the next row, sum to 1
n = len(date_index)
# The most recent date (last row) should have the highest weight
# So, weight[i] = 0.98**(n-1-i) for i in 0..n-1
raw_weights = np.array([0.98**(n-1-i) for i in range(n)])
weights = raw_weights / raw_weights.sum()
date_index['weight'] = weights

display(date_index)

date_index,TradingDate,weight
0,2024-12-02T00:00:00Z,0.00010309178667796492
1,2024-12-03T00:00:00Z,0.00010519570069180093
2,2024-12-04T00:00:00Z,0.00010734255172632748
3,2024-12-05T00:00:00Z,0.00010953321604727296
4,2024-12-06T00:00:00Z,0.00011176858780333974
5,2024-12-09T00:00:00Z,0.000114049579391163
6,2024-12-10T00:00:00Z,0.00011637712182771736
7,2024-12-11T00:00:00Z,0.00011875216513032382
8,2024-12-12T00:00:00Z,0.00012117567870441208
9,2024-12-13T00:00:00Z,0.00012364865173919597


In [0]:
# Filter price data from business days and attach index to each day

# Ensure TradingDate is datetime
price_pdf['TradingDate'] = pd.to_datetime(price_pdf['TradingDate'])

# Remove outliers (Sometimes the ENDUR GLOBAL EOD OUTPUT is wrong and outliers may appeaar. Please remove/correct relevant data here)
price_pdf.loc[(price_pdf['CurveName'] == 'OIL_C3_PROPANE_FEI') & (price_pdf['TradingDate'] == pd.to_datetime('2024-09-13')), ['MidPrice','IndexPrice']] = np.nan

price_pdf.loc[(price_pdf['CurveName'] == 'OIL_PARAXYLENE_CFR_EAST') & (price_pdf['TradingDate'] == pd.to_datetime('2024-01-31')), ['MidPrice','IndexPrice']] = np.nan

# Filter for weekdays (Monday=0, ..., Friday=4)
price_pdf_weekdays = price_pdf[price_pdf['TradingDate'].dt.weekday <= 4]

# Merge with date_index to add the date_index column
price_pdf_weekdays = price_pdf_weekdays.merge(date_index, on = 'TradingDate', how = 'left')


# Discard rows with CurveName 'OIL_CRUDE_SHFE_SC' with TradingDate being a fixed hoilday in any year or before certain date due to data issue
trading_date_dt = pd.to_datetime(price_pdf_weekdays['TradingDate'])
mask = ~(
    ((price_pdf_weekdays['CurveName'] == 'OIL_CRUDE_SHFE_SC') &
    (trading_date_dt.dt.month == 10) &
    (trading_date_dt.dt.day >= 1) &
    (trading_date_dt.dt.day <= 8)) |
    ((price_pdf_weekdays['CurveName'] == 'OIL_CRUDE_SHFE_SC') &
    (trading_date_dt.dt.month == 4) &
    (trading_date_dt.dt.day >= 4) &
    (trading_date_dt.dt.day <= 6)) | 
    ((price_pdf_weekdays['CurveName'] == 'OIL_CRUDE_SHFE_SC') &
    (trading_date_dt.dt.month == 5) &
    (trading_date_dt.dt.day >= 1) &
    (trading_date_dt.dt.day <= 5)) | ((price_pdf_weekdays['CurveName'] == 'OIL_CRUDE_SHFE_SC') &
        (pd.to_datetime(price_pdf_weekdays['TradingDate']) < pd.to_datetime('2022-01-27')))
)
price_pdf_weekdays = price_pdf_weekdays[mask]

In [0]:
# Create a Index - Commodity Mapping for Unit Conversion
commodity_mapping = delta_pdf[["INDEX_NAME", "COMMODITY_NAME"]].drop_duplicates().reset_index(drop = True)

**We assume that positions are closed before 5 days of expiry, so five days return of current month contract on the last five business days will be 0. For example, a Brent Dated Mar 2021 contract with trading date between 24-Mar-2021 and 31-Mar-2021 will have 5-day return to be 0.**

In [0]:
# Calculate 5-day Return of Prices

price_pdf_weekdays_sorted = price_pdf_weekdays.sort_values(
    by = ["CurveName", "DeliveryDate", "date_index"]
)

# Make a copy
return_df = price_pdf_weekdays_sorted.copy()



# If other number of days' return is used, please change the number here and the number in cell 3 row 9.
#---------------------------------------------------------------------------------------------------------
# Prepare for self-merge to find the row with date_index + 5
return_df_1 = return_df[['CurveName', 'DeliveryDate', 'date_index', 'IndexPrice', 'weight']].copy()
return_df_1['date_index'] = return_df_1['date_index'] - 1 # shift index back by 5 for merge
#---------------------------------------------------------------------------------------------------------



# Merge to align current row with the row 5 days ahead
return_df = return_df.merge(
    return_df_1,
    on = ['CurveName', 'DeliveryDate', 'date_index'],
    how = 'left',
    suffixes = ('', '_future')
)

# Calculate the 5-day return
return_df['1day_return_usd'] = return_df['IndexPrice_future'] - return_df['IndexPrice']

# Clean up temporary column
return_df = return_df.drop(columns=['IndexPrice_future', 'weight_future'])


In [0]:
#Prepare for Unit Conversion by Assigning Commodity Name to Index

# Update Unit for specific CurveNames
return_df.loc[return_df['CurveName'] == 'OIL_RIN_D4', 'Unit'] = 'RIN4'
return_df.loc[return_df['CurveName'] == 'OIL_RIN_D6', 'Unit'] = 'RIN6'

# Merge product_mapping to add COMMODITY_NAME to return_df
return_df = return_df.merge(commodity_mapping, left_on='CurveName', right_on='INDEX_NAME', how='left')

# List and display distinct items in the 'Unit' column of return_df
unique_units = return_df['Unit'].unique().tolist()
print('Distinct Unit values:', unique_units)

# List and display distinct items in the 'CurveName' column of return_df where Unit is 'GAL'
gal_curvenames = return_df[return_df['Unit'] == 'GAL']['CurveName'].unique().tolist()
print(f'Index with Unit GAL {gal_curvenames}')


In [0]:
# Prepare for Unit Conversion by Assigning Commodity Name to Index

Propane_mt_bbl = 12.4
Naphtha_mt_bbl = 8.9
Gasoline_mt_bbl = 8.33
Jet_mt_bbl = 7.88
Diesel_mt_bbl = 7.45
Fueloil_mt_bbl = 6.35
Crude_mt_bbl = 7.33
GAL_RIN6 = 1
GAL_RIN4 = 1.7
Polypropylene_mt_bbl = 6.95
Paraxylene_mt_bbl = 7.31


return_df.loc[return_df['Unit'] == 'BBL', '1day_return_usd_ilu'] = return_df.loc[return_df['Unit'] == 'BBL', '1day_return_usd']

return_df.loc[(return_df['Unit'] == 'GAL') & (return_df['COMMODITY_NAME'] != 'RIN'), '1day_return_usd_ilu'] = return_df.loc[(return_df['Unit'] == 'GAL') & (return_df['COMMODITY_NAME'] != 'RIN'), '1day_return_usd']*42

return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'CRUDE'), '1day_return_usd_ilu'] = return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'CRUDE'), '1day_return_usd']/Crude_mt_bbl

return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'FUELOIL'), '1day_return_usd_ilu'] = return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'FUELOIL'), '1day_return_usd']/Fueloil_mt_bbl

return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'NAPHTHA'), '1day_return_usd_ilu'] = return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'NAPHTHA'), '1day_return_usd']/Naphtha_mt_bbl

return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'GASOLINE'), '1day_return_usd_ilu'] = return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'GASOLINE'), '1day_return_usd']/Gasoline_mt_bbl

return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'JETFUEL'), '1day_return_usd_ilu'] = return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'JETFUEL'), '1day_return_usd']/Jet_mt_bbl

return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'DIESEL'), '1day_return_usd_ilu'] = return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'DIESEL'), '1day_return_usd']/Diesel_mt_bbl

return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'PROPANE'), '1day_return_usd_ilu'] = return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'PROPANE'), '1day_return_usd']/Propane_mt_bbl

return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'POLYPROPYLENE'), '1day_return_usd_ilu'] =  return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'POLYPROPYLENE'), '1day_return_usd']/Polypropylene_mt_bbl

return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'PARAXYLENE'), '1day_return_usd_ilu'] =  return_df.loc[(return_df['Unit'] == 'MT') & (return_df['COMMODITY_NAME'] == 'PARAXYLENE'), '1day_return_usd']/Paraxylene_mt_bbl


return_df.loc[return_df['Unit'] == 'RIN4', '1day_return_usd_ilu'] = return_df.loc[return_df['Unit'] == 'RIN4', '1day_return_usd']*GAL_RIN4

return_df.loc[return_df['Unit'] == 'RIN6', '1day_return_usd_ilu'] = return_df.loc[return_df['Unit'] == 'RIN6', '1day_return_usd']


In [0]:
# Create a Index - portfolio mapping for later aggregation

# Only keep exposure with absolute amount greater than 100
delta_filtered_pdf = delta_pdf[delta_pdf['DELTA'].abs()>100]

# Create portfolio_mapping DataFrame with unique PORTFOLIO_NAME and INDEX_NAME pairs
portfolio_mapping = delta_filtered_pdf[["PORTFOLIO_NAME", "INDEX_NAME"]].drop_duplicates().reset_index(drop = True)


In [0]:
# Prepare Dataframe to host the result (group by date and portfolio)

distinct_portfolios = delta_filtered_pdf['PORTFOLIO_NAME'].unique()

# Create a DataFrame for the Cartesian product
trading_dates_df = pd.DataFrame({'TradingDate': trading_dates})
portfolio_df = pd.DataFrame({'PORTFOLIO_NAME': distinct_portfolios})
final_df = trading_dates_df.merge(portfolio_df, how = 'cross')

# Add new columns 'RETURN' and 'STATUS' and initialize them
final_df['RETURN'] = np.nan
final_df['STATUS'] = 'PENDING'

In [0]:
# Prepare for full price grid (group by index, trading date, and relative delivery period

# Create a grid of all combinations of CurveName, date_index, and RelativeDeliveryPeriod
curve_names = return_df['CurveName'].unique()
date_indices = range(0, date_index_max + 1)
relative_delivery_periods = range(0, 31)

# Create the MultiIndex and convert to DataFrame
index = pd.MultiIndex.from_product(
    [curve_names, date_indices, relative_delivery_periods],
    names=['CurveName', 'date_index', 'RelativeDeliveryPeriod']
)
grid_df = index.to_frame(index=False)

# Add TradingDate to grid_df by merging with date_index DataFrame
# date_index DataFrame has columns: date_index, TradingDate
grid_df = grid_df.merge(date_index, on='date_index', how='left')

# Merge the grid with the original return_df
return_df = pd.merge(grid_df, return_df, on=['CurveName','date_index','RelativeDeliveryPeriod','TradingDate'], how='left')


In [0]:
# Calculate the return of each portfolio and trading date pair

# Vectorized approach to calculate portfolio returns for all portfolios and trading dates
# 1. Prepare join keys as string/int for exact match
delta_filtered_pdf['RELATIVE_DEL_MONTH'] = delta_filtered_pdf['RELATIVE_DEL_MONTH'].astype(int)
return_df['RelativeDeliveryPeriod'] = return_df['RelativeDeliveryPeriod'].astype(int)

# 2. Merge on two keys: INDEX_NAME/CurveName, RELATIVE_DEL_MONTH/RelativeDeliveryPeriod
merged = delta_filtered_pdf.merge(
    return_df,
    left_on=['INDEX_NAME', 'RELATIVE_DEL_MONTH'],
    right_on=['CurveName', 'RelativeDeliveryPeriod'],
    how='inner',
    suffixes=('_delta', '_ret')
)


# 3. Calculate the product of DELTA and 5days_return_usd_ilu
merged['DELTA'] = merged['DELTA'].astype(float)
merged['RETURN_CONTRIB'] = merged['DELTA'] * merged['1day_return_usd_ilu']


# 4. Group by PORTFOLIO_NAME and TradingDate, sum the RETURN_CONTRIB, but set to null if any null in group
portfolio_returns = (
    merged.groupby(['PORTFOLIO_NAME', 'TradingDate'], as_index=False)
    .agg(
        RETURN=('RETURN_CONTRIB', lambda x: np.nan if x.isnull().any() else x.sum())
    )
)

# 5. Merge the result into final_df to fill the RETURN column
final_df = final_df.merge(
    portfolio_returns,
    on=['PORTFOLIO_NAME', 'TradingDate'],
    how='left',
    suffixes=('', '_calc')
)

# If RETURN_calc exists, use it to update RETURN
if 'RETURN_calc' in final_df.columns:
    final_df['RETURN'] = final_df['RETURN_calc']
    final_df = final_df.drop(columns=['RETURN_calc'])

final_df.loc[final_df['RETURN'].notnull(), 'STATUS'] = 'VALID'


In [0]:
# Display the Result

# Find the lowest RETURN for each distinct PORTFOLIO_NAME
final_df['min_RETURN'] = final_df.groupby('PORTFOLIO_NAME')['RETURN'].transform('min')
lowest_return_df = final_df[final_df['RETURN'] == final_df['min_RETURN']].copy()
lowest_return_df = lowest_return_df.drop(columns=['min_RETURN'])
lowest_return_df['RETURN_EUR'] = lowest_return_df['RETURN']/EURUSD
lowest_return_df['DELTA_DATE'] = cob_str
# Display the DataFrame with the lowest RETURN for each portfolio
display(lowest_return_df.sort_values(by='RETURN', ascending=True))

In [0]:
display(final_df.sort_values(by='RETURN', ascending=True))
# Attach date_index table to the right of final_df on TradingDate
var_df = final_df.merge(date_index, on='TradingDate', how='left').sort_values(by='RETURN', ascending=True).reset_index(drop=True)
# Add cumulative_weight column: group-wise cumulative sum of weight in row order
var_df['cumulative_weight'] = var_df.groupby('PORTFOLIO_NAME')['weight'].cumsum()

In [0]:
# Linear Interpolation of the two values around 5%
def pick_row_left(df):
    filtered = df[df['cumulative_weight'] < 0.05]
    if filtered.empty:
        return None
    return filtered.loc[filtered['cumulative_weight'].idxmax()]

def pick_row_right(df):
    filtered = df[df['cumulative_weight'] > 0.05]
    if filtered.empty:
        return None
    return filtered.loc[filtered['cumulative_weight'].idxmin()]

portfolio_var_left = (
    var_df.groupby('PORTFOLIO_NAME', group_keys=False)
    .apply(pick_row_left)
    .dropna(how = 'all')
    .reset_index(drop=True)
)

portfolio_var_right = (
    var_df.groupby('PORTFOLIO_NAME', group_keys=False)
    .apply(pick_row_right)
    .dropna(how = 'all')
    .reset_index(drop=True)
)

portfolio_var_left = portfolio_var_left.sort_values(by='RETURN', ascending=True)
portfolio_var_right = portfolio_var_right.sort_values(by='RETURN', ascending=True)

In [0]:
# Join the two tables on TradingDate and PORTFOLIO_NAME
portfolio_var_joined = portfolio_var_left.merge(
    portfolio_var_right,
    on=['PORTFOLIO_NAME'],
    how='inner',
    suffixes=('_left', '_right')
)

portfolio_var_joined['weighted_average_var'] = (
    portfolio_var_joined['RETURN_left'] * (portfolio_var_joined['cumulative_weight_right'] - 0.05) / (portfolio_var_joined['cumulative_weight_right'] - portfolio_var_joined['cumulative_weight_left']) +
    portfolio_var_joined['RETURN_right'] * (0.05 - portfolio_var_joined['cumulative_weight_left']) / (portfolio_var_joined['cumulative_weight_right'] - portfolio_var_joined['cumulative_weight_left'])
)

# Keep only the specified columns
final_portfolio_var = portfolio_var_joined[[
    'PORTFOLIO_NAME',
    'weighted_average_var',
    'RETURN_left',
    'RETURN_right',
    'cumulative_weight_right',
    'cumulative_weight_left'
]]

display(final_portfolio_var)


In [0]:
# Create the Return Matrix

# Pivot final_df: columns are PORTFOLIO_NAME, rows are TradingDate, values are RETURN
portfolio_return_matrix = final_df.pivot(index='TradingDate', columns='PORTFOLIO_NAME', values='RETURN')

# Optionally, sort by TradingDate
portfolio_return_matrix = portfolio_return_matrix.sort_index()

# Reset index to make row names explicit
portfolio_return_matrix_reset = portfolio_return_matrix.reset_index().rename_axis(None, axis=1)
portfolio_return_matrix_reset = portfolio_return_matrix_reset.rename(columns={portfolio_return_matrix_reset.columns[0]: 'TradingDate'})

display(portfolio_return_matrix_reset)



In [0]:
# Create daily_return_all: first column is TradingDate, second column is sum of all other columns
portfolio_cols = portfolio_return_matrix_reset.columns.difference(['TradingDate'])
daily_return_all = pd.DataFrame({
    'TradingDate': portfolio_return_matrix_reset['TradingDate'],
    'return': portfolio_return_matrix_reset[portfolio_cols].sum(axis=1, skipna=True)
})


daily_return_all = daily_return_all.merge(date_index, on='TradingDate', how='left').sort_values(by='return', ascending=True).reset_index(drop=True)
daily_return_all['cumulative_weight'] = daily_return_all['weight'].cumsum()
display(daily_return_all)

In [0]:
%skip
# Explainer (check 5 days return of certain index on a certain date)

display(return_df[(return_df['CurveName'] == 'OIL_CRUDE_SHFE_SC') & (return_df['TradingDate'] == pd.to_datetime('2025-05-02'))])
display(return_df[(return_df['CurveName'] == 'OIL_BRENT_IPE') & (return_df['TradingDate'] == pd.to_datetime('2025-04-02'))])

In [0]:
%skip
# Write as Delta Table

spark_lowest_return_df = spark.createDataFrame(lowest_return_df)
spark_lowest_return_df.write.format("delta").mode("overwrite").partitionBy('DELTA_DATE').option("overwriteSchema", "true").option("partitionOverwriteMode", "dynamic").save(f'/mnt/rwest-lid-prod-team-london-risk/oil_stress_test/results')

In [0]:
%skip
# Upload results to Dremio

from lidservices.api.dremio_job_loader import *
from lidservices.api.data_sharing import *
from lidservices.api.dremio_databricks_loader import *


input_parameters = {
      "team_name": "team-london-risk",
      "s3_path": "oil_stress_test/results",
      "vds_path": "OIL_STRESS_TEST",
      "vds_name": "OIL_STRESS_TEST_RESULTS",
      "file_format": "delta",
      "licence_required": True,
      "licence_info": {
        "GUARD_GROUP" : 'Risk Internal Data'
      },
      "share_action": "create"  # create, refresh, remove, replace
}
    
response = share_data_dremio(input_parameters, get_databricks_username(), get_dremio_token())  # type: ignore #NOQA