In [1]:
import os
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import pyarrow.compute as pc
import gc
import shutil


In [4]:
speriod=int(input("Enter the simulation period: "))
samples=int(input("Enter the number of samples: "))
country=input("Enter the country: ")
output_folder_path = input("Enter the output folder path: ")

# Define the folder containing the Parquet files
folder_path = r'D:\RISHIN\13_ILC_TASK1\input\PARQUET_FILES'

# List all Parquet files in the folder
parquet_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.parquet')]

In [6]:

# Check if there are any Parquet files in the folder
if parquet_files:
    # Read the first Parquet file in chunks
    parquet_file = pq.ParquetFile(parquet_files[0])
    for batch in parquet_file.iter_batches(batch_size=1000):
        # Convert the first batch to a PyArrow Table
        table = pa.Table.from_batches([batch])
        
        # Convert the PyArrow Table to a Pandas DataFrame
        df = table.to_pandas()
        
        # Extract the first value of LocationName and split it by '_'
        location_name = df['LocationName'].iloc[0]
        country = location_name.split('_')[0]
        
        
        # Define the main folder path
        main_folder_path = os.path.join(output_folder_path, f'ILC2024_EUWS_PLA_WI_EP_{country}_EUR_Losses')
        
        # Define subfolders
        subfolders = ['EP', 'PLT', 'STATS']
        nested_folders = ['Lob', 'Portfolio']
        innermost_folders = ['GR', 'GU']
        
        # Create the main folder and subfolders
        for subfolder in subfolders:
            subfolder_path = os.path.join(main_folder_path, subfolder)
            os.makedirs(subfolder_path, exist_ok=True)
            
            for nested_folder in nested_folders:
                nested_folder_path = os.path.join(subfolder_path, nested_folder)
                os.makedirs(nested_folder_path, exist_ok=True)
                
                for innermost_folder in innermost_folders:
                    innermost_folder_path = os.path.join(nested_folder_path, innermost_folder)
                    os.makedirs(innermost_folder_path, exist_ok=True)
        
        print(f"Folders created successfully at {main_folder_path}")
        break  # Process only the first batch
else:
    print("No Parquet files found in the specified folder.")

Folders created successfully at D:\RISHIN\ILC_TEST\ILC2024_EUWS_PLA_WI_EP_BE_EUR_Losses


FOR LOB EP

In [7]:


# Initialize an empty list to store the results
final_grouped_tables = []
# Process each Parquet file individually
for file in parquet_files:
    # Read the Parquet file into a PyArrow Table
    table = pq.read_table(file)
    
    # Perform the aggregation: sum the Loss column grouped by EventId, PeriodId, and LobName
    grouped_table = table.group_by(['EventId', 'PeriodId', 'LobName']).aggregate([('Loss', 'sum')])
    
    # Rename the aggregated column to Sum_Loss
    grouped_table = grouped_table.rename_columns(['EventId', 'PeriodId', 'LobName', 'Sum_Loss'])
    
    # Append the grouped Table to the final_grouped_tables list
    final_grouped_tables.append(grouped_table)

# Concatenate all grouped tables
final_table = pa.concat_tables(final_grouped_tables)

# Perform final grouping and sorting
final_grouped_table = final_table.group_by(['EventId', 'PeriodId', 'LobName']).aggregate([('Sum_Loss', 'sum')])
sorted_final_table = final_grouped_table.sort_by([('Sum_Loss_sum', 'descending')])
# The Table is now ready for the next instructions
dataframe_1 = sorted_final_table

In [8]:
# Define the LobName values to filter
lob_names = ['AGR', 'AUTO', 'COM', 'IND', 'SPER', 'FRST', 'GLH']

# Create a dictionary to store the filtered tables
filtered_tables = {}

# Filter the table based on LobName values
for lob_name in lob_names:
    filtered_table = dataframe_1.filter(pc.equal(dataframe_1['LobName'], lob_name))
    if filtered_table.num_rows > 0:
        filtered_tables[f'daf_{lob_name}'] = filtered_table

# Access the filtered tables
daf_AGR = filtered_tables.get('daf_AGR')
daf_AUTO = filtered_tables.get('daf_AUTO')
daf_COM = filtered_tables.get('daf_COM')
daf_IND = filtered_tables.get('daf_IND')
daf_SPER = filtered_tables.get('daf_SPER')
daf_FRST = filtered_tables.get('daf_FRST')
daf_GLH = filtered_tables.get('daf_GLH')

In [22]:
def process_and_save_parquet(dataframe_1, parquet_file_path, speriod, samples):
    # Convert pandas DataFrame to pyarrow Table if necessary
    if not isinstance(dataframe_1, pa.Table):
        dataframe_1 = pa.Table.from_pandas(dataframe_1)

    # Initialize dataframe_2 by selecting PeriodId and max(Sum_Loss) grouped by PeriodId and LobName
    dataframe_2 = dataframe_1.group_by(['PeriodId', 'LobName']).aggregate([('Loss', 'max')])
    dataframe_2 = dataframe_2.rename_columns(['PeriodId', 'LobName', 'max_Loss'])

    # Sort dataframe_2 by Max_Loss in descending order
    dataframe_2 = dataframe_2.sort_by([('Max_Loss', 'descending')])

    # Initialize dataframe_3 by selecting PeriodId and sum(Sum_Loss) grouped by PeriodId and LobName
    dataframe_3 = dataframe_1.group_by(['PeriodId', 'LobName']).aggregate([('Loss', 'sum')])
    dataframe_3 = dataframe_3.rename_columns(['PeriodId', 'LobName', 'sum_Loss'])

    # Sort dataframe_3 by S_Sum_Loss in descending order
    dataframe_3 = dataframe_3.sort_by([('sum_Loss', 'descending')])

    # Calculate additional columns for dataframe_2
    rate = 1 / (speriod * samples)
    dataframe_2 = dataframe_2.append_column('rate', pa.array([rate] * dataframe_2.num_rows))
    cumrate = pc.cumulative_sum(dataframe_2['rate'])
    dataframe_2 = dataframe_2.append_column('cumrate', cumrate)
    dataframe_2 = dataframe_2.append_column('RPs', pc.divide(1, cumrate))
    max_loss_shifted = pc.shift(dataframe_2['max_Loss'], -1)
    cumrate_shifted = pc.shift(cumrate, -1)
    tce_oep_1 = pc.multiply(pc.multiply(pc.subtract(dataframe_2['max_Loss'], max_loss_shifted), pc.add(cumrate, cumrate_shifted)), 0.5)
    dataframe_2 = dataframe_2.append_column('TCE_OEP_1', tce_oep_1)
    tce_oep_2 = pc.multiply(pc.cumulative_sum(pc.shift(tce_oep_1, 1)), dataframe_2['RPs'])
    dataframe_2 = dataframe_2.append_column('TCE_OEP_2', tce_oep_2)
    dataframe_2 = dataframe_2.append_column('TCE_OEP_Final', pc.add(tce_oep_2, dataframe_2['max_Loss']))

    # Calculate additional columns for dataframe_3
    dataframe_3 = dataframe_3.append_column('rate', pa.array([rate] * dataframe_3.num_rows))
    cumrate_3 = pc.cumulative_sum(dataframe_3['rate'])
    dataframe_3 = dataframe_3.append_column('cumrate', cumrate_3)
    dataframe_3 = dataframe_3.append_column('RPs', pc.divide(1, cumrate_3))
    s_sum_loss_shifted = pc.shift(dataframe_3['sum_Loss'], -1)
    cumrate_3_shifted = pc.shift(cumrate_3, -1)
    tce_aep_1 = pc.multiply(pc.multiply(pc.subtract(dataframe_3['sum_Loss'], s_sum_loss_shifted), pc.add(cumrate_3, cumrate_3_shifted)), 0.5)
    dataframe_3 = dataframe_3.append_column('TCE_AEP_1', tce_aep_1)
    tce_aep_2 = pc.multiply(pc.cumulative_sum(pc.shift(tce_aep_1, 1)), dataframe_3['RPs'])
    dataframe_3 = dataframe_3.append_column('TCE_AEP_2', tce_aep_2)
    dataframe_3 = dataframe_3.append_column('TCE_AEP_Final', pc.add(tce_aep_2, dataframe_3['sum_Loss']))

    # Define the list of RPs values to filter and convert them to float
    rps_values = [float(x) for x in [10000, 5000, 1000, 500, 250, 200, 100, 50, 25, 10, 5, 2]]

    # Initialize empty tables to store the filtered results
    fdataframe_2 = pa.Table.from_arrays([], names=[])
    fdataframe_3 = pa.Table.from_arrays([], names=[])

    # Define the number of decimal places to round to
    decimal_places = 8

    # Loop through each value in rps_values and filter the Tables
    for value in rps_values:
        rounded_value = round(value, decimal_places)
        fdataframe_2 = pa.concat_tables([fdataframe_2, dataframe_2.filter(pc.equal(pc.round(dataframe_2['RPs'], decimal_places), rounded_value))])
        fdataframe_3 = pa.concat_tables([fdataframe_3, dataframe_3.filter(pc.equal(pc.round(dataframe_3['RPs'], decimal_places), rounded_value))])

    fdataframe_3 = fdataframe_3.rename_columns(['PeriodId', 'LobName', 'S_Sum_Loss', 'rate', 'cumrate', 'RPs', 'TCE_AEP_1', 'TCE_AEP_2', 'TCE_AEP_Final'])
    fdataframe_2 = fdataframe_2.rename_columns(['PeriodId', 'LobName', 'Max_Loss', 'rate', 'cumrate', 'RPs', 'TCE_OEP_1', 'TCE_OEP_2', 'TCE_OEP_Final'])

    # Define the mapping of LobName to LobId
    lobname_to_lobid = {
        'AGR': "1",
        'AUTO': "2",
        'COM': "3",
        'IND': "4",
        'SPER': "5",
        'FRST': "6",
        'GLH': "7"
    }

    # Add the LobId column to fdataframe_2
    fdataframe_2 = fdataframe_2.append_column('LobId', pc.index_in(fdataframe_2['LobName'], list(lobname_to_lobid.keys())).cast(pa.int32()))

    # Add the LobId column to fdataframe_3
    fdataframe_3 = fdataframe_3.append_column('LobId', pc.index_in(fdataframe_3['LobName'], list(lobname_to_lobid.keys())).cast(pa.int32()))

    # Define the columns to be used in the new Table for fdataframe_3
    columns_to_keep_3 = ['RPs', 'LobId', 'LobName']
    columns_to_melt_3 = ['S_Sum_Loss', 'TCE_AEP_Final']

    # Melt fdataframe_3 to reshape it
    melted_df_3 = pa.Table.from_pandas(fdataframe_3.to_pandas().melt(id_vars=columns_to_keep_3, value_vars=columns_to_melt_3, 
                                    var_name='EPType', value_name='Loss'))

    # Rename columns to match the desired output
    melted_df_3 = melted_df_3.rename_columns(['ReturnPeriod', 'LobId', 'LobName', 'EPType', 'Loss'])

    # Define the columns to be used in the new Table for fdataframe_2
    columns_to_keep_2 = ['RPs', 'LobId', 'LobName']
    columns_to_melt_2 = ['max_Loss', 'TCE_OEP_Final']

    # Melt fdataframe_2 to reshape it
    melted_df_2 = pa.Table.from_pandas(fdataframe_2.to_pandas().melt(id_vars=columns_to_keep_2, value_vars=columns_to_melt_2, 
                                    var_name='EPType', value_name='Loss'))

    # Rename columns to match the desired output
    melted_df_2 = melted_df_2.rename_columns(['ReturnPeriod', 'LobId', 'LobName', 'EPType', 'Loss'])

    # Concatenate the two Tables
    final_df_EP_LOB_GU = pa.concat_tables([melted_df_2, melted_df_3])

    # Define the new order for EPType
    new_ep_type_order = ["OEP", "AEP", "TCE-OEP", "TCE-AEP"]

    # Update the EPType column to the new order
    final_df_EP_LOB_GU = final_df_EP_LOB_GU.append_column('EPType', pa.array(new_ep_type_order))

    # Sort the Table by EPType and then by ReturnPeriod in descending order within each EPType
    final_df_EP_LOB_GU = final_df_EP_LOB_GU.sort_by([('EPType', 'ascending'), ('ReturnPeriod', 'descending')])

    # Save final_df as a Parquet file
    pq.write_table(final_df_EP_LOB_GU, parquet_file_path)

    print(f"Parquet file saved successfully at {parquet_file_path}")


In [23]:
pq_file_path_1=os.path.join(main_folder_path, 'EP', 'Lob', 'GU', f'ILC2024_EUWS_PLA_WI_EP_{country}_EUR_EP_Lob_GU_0.parquet')

pq_file_path_2=os.path.join(main_folder_path, 'EP', 'Lob', 'GU', f'ILC2024_EUWS_PLA_WI_EP_{country}_EUR_EP_Lob_GU_1.parquet')

pq_file_path_3=os.path.join(main_folder_path, 'EP', 'Lob', 'GU', f'ILC2024_EUWS_PLA_WI_EP_{country}_EUR_EP_Lob_GU_2.parquet')

pq_file_path_4=os.path.join(main_folder_path, 'EP', 'Lob', 'GU', f'ILC2024_EUWS_PLA_WI_EP_{country}_EUR_EP_Lob_GU_3.parquet')

pq_file_path_5=os.path.join(main_folder_path, 'EP', 'Lob', 'GU', f'ILC2024_EUWS_PLA_WI_EP_{country}_EUR_EP_Lob_GU_4.parquet')

pq_file_path_6=os.path.join(main_folder_path, 'EP', 'Lob', 'GU', f'ILC2024_EUWS_PLA_WI_EP_{country}_EUR_EP_Lob_GU_5.parquet')

pq_file_path_7=os.path.join(main_folder_path, 'EP', 'Lob', 'GU', f'ILC2024_EUWS_PLA_WI_EP_{country}_EUR_EP_Lob_GU_6.parquet')




In [24]:
try:
    process_and_save_parquet(daf_AGR, pq_file_path_1, speriod, samples)
except NameError:
    pass

try:
    process_and_save_parquet(daf_AUTO, pq_file_path_2, speriod, samples)
except NameError:
    pass

try:
    process_and_save_parquet(daf_COM, pq_file_path_3, speriod, samples)
except NameError:
    pass

try:
    process_and_save_parquet(daf_IND, pq_file_path_4, speriod, samples)
except NameError:
    pass

try:
    process_and_save_parquet(daf_SPER, pq_file_path_5, speriod, samples)
except NameError:
    pass

try:
    process_and_save_parquet(daf_FRST, pq_file_path_6, speriod, samples)
except NameError:
    pass

try:
    process_and_save_parquet(daf_GLH, pq_file_path_7, speriod, samples)
except NameError:
    pass

ArrowInvalid: No match for FieldRef.Name(Loss) in EventId: int64 not null
PeriodId: int64 not null
LobName: string
Sum_Loss_sum: double