In [4]:
import os
import polars as pl
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import pyarrow.compute as pc
import gc
from decimal import Decimal  # Add this import statement
import pyarrow.dataset as ds
import shutil
import gc
import time
import sqlalchemy as sa
import pyodbc
import concurrent.futures
import re
from concurrent.futures import ThreadPoolExecutor





start_time = time.time()  # Start time


# Function to flush the cache
def flush_cache():
    gc.collect()

flush_cache()


folder_path = r'D:\RISHIN\14_2_1ILC_NZFL\PLT\Risk_Lob\GU\PeriodRange=1-250000'
folder_path_gr = r'D:\RISHIN\14_2_1ILC_NZFL\PLT\Risk_Lob\GR\PeriodRange=1-250000'

output_folder_path = r"D:\RISHIN\TESTING\TEST_10"
# folder_path = r'D:\RISHIN\13_ILC_resolution\input\PARQUET_FILES'
# folder_path_gr = r'D:\RISHIN\13_ILC_TASK\input\PARQUET_FILES_GR'



# In[5]:


# speriod=int(input("Enter the simulation period: "))
# samples=int(input("Enter the number of samples: "))
# proname=(input("enter file suffix example : example ILC2024_NZFL_EP_PLA "))
# region=input("enter region example : example NZD  ")
# database = input('Enter the database name IED2024_NZFL_PC_NZD_EDM240_ILCRun')

speriod=50000
samples=5
proname="ILC2024_NZFL_EP_PLA"
region="NZD"
database = "IED2024_NZFL_PC_NZD_EDM240_ILCRun"



# In[6]:


parquet_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.parquet')]


# In[7]:


parquet_files_gr = [os.path.join(folder_path_gr, f) for f in os.listdir(folder_path_gr) if f.endswith('.parquet')]


# In[8]:


def delete_folder_and_files(folder_path):
    
    if os.path.exists(folder_path):
        # Delete all files inside the folder
        for filename in os.listdir(folder_path):
            file_path = os.path.join(folder_path, filename)
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
        
        # Delete the folder itself
        os.rmdir(folder_path)
        print(f'Successfully deleted the folder: {folder_path}')
    else:
        print(f'The folder {folder_path} does not exist.')


# In[10]:


# Check if there are any Parquet files in the folder
if parquet_files:
    # Read the first Parquet file in chunks
    parquet_file = pq.ParquetFile(parquet_files[0])
    for batch in parquet_file.iter_batches(batch_size=1000):
        # Convert the first batch to a PyArrow Table
        table = pa.Table.from_batches([batch])
        
        # Convert the PyArrow Table to a Pandas DataFrame
        df = table.to_pandas()
        
        # Extract the first value of LocationName and split it by '_'
        location_name = df['LocationName'].iloc[0]
        country = location_name.split('_')[0]
        
        
        # Define the main folder path
        main_folder_path = os.path.join(output_folder_path, f'{proname}_{region}_Losses')
        
        # Define subfolders
        subfolders = ['EP', 'PLT', 'STATS']
        nested_folders = ['Lob', 'Portfolio','Admin1','Admin1_Lob','Cresta','Cresta_Lob',]
        innermost_folders = ['GR', 'GU']
        
        # Create the main folder and subfolders
        for subfolder in subfolders:
            subfolder_path = os.path.join(main_folder_path, subfolder)
            os.makedirs(subfolder_path, exist_ok=True)
            
            # Filter nested folders for 'PLT'
            if subfolder == 'PLT':
                filtered_nested_folders = ['Lob', 'Portfolio']
            else:
                filtered_nested_folders = nested_folders
            
            for nested_folder in filtered_nested_folders:
                nested_folder_path = os.path.join(subfolder_path, nested_folder)
                os.makedirs(nested_folder_path, exist_ok=True)
                
                for innermost_folder in innermost_folders:
                    innermost_folder_path = os.path.join(nested_folder_path, innermost_folder)
                    os.makedirs(innermost_folder_path, exist_ok=True)
        
        print(f"Folders created successfully at {main_folder_path}")
        break  # Process only the first batch
else:
    print("No Parquet files found in the specified folder.")


# In[13]:


main_folder_path = os.path.join(output_folder_path, f'{proname}_{region}_Losses')

processing_folder_path = os.path.join(main_folder_path, 'processing')
resolution_folder_path = os.path.join(processing_folder_path, 'Resolution Added')
resolution_folder_path_gr = os.path.join(processing_folder_path, 'Resolution Added_gr')
partial_folder_path = os.path.join(processing_folder_path, 'Partial')   
concatenated_folder_path = os.path.join(processing_folder_path, 'Concatenated')



# In[15]:


def connect_to_database(server, database):
    connection_string = f'mssql+pyodbc://{server}/{database}?driver=SQL+Server+Native+Client+11.0'
    engine = sa.create_engine(connection_string)
    connection = engine.connect()
    return connection

def read_parquet_file(file_path):
    table = pq.read_table(file_path)
    return table

def fetch_database_data(connection):
    address_query = 'SELECT ADDRESSID, ADMIN1GEOID AS Admin1Id, Admin1Name, zone1GEOID AS CrestaId, Zone1 AS CrestaName FROM Address'
    address_df = pd.read_sql(address_query, connection)
    address_table = pa.Table.from_pandas(address_df)
    return address_table



def join_dataframes(parquet_table, address_table):
    parquet_df = parquet_table.to_pandas()
    address_df = address_table.to_pandas()
    df = parquet_df.merge(address_df, left_on='LocationId', right_on='ADDRESSID', how='left')
    return pa.Table.from_pandas(df)

def save_joined_dataframe(joined_table, output_file):
    pq.write_table(joined_table, output_file)
    print(f"Saved joined file to {output_file}")

def process_file(file, address_table, output_folder):
    gc.collect()

    parquet_table = read_parquet_file(file)
    joined_table = join_dataframes(parquet_table, address_table)
    output_file = os.path.join(output_folder, os.path.basename(file))
    save_joined_dataframe(joined_table, output_file)
    del parquet_table
    del joined_table
    gc.collect()

def process_parquet_files(folder_path, output_folder, server, database, batch_size=3):
    os.makedirs(output_folder, exist_ok=True)
    connection = connect_to_database(server, database)
    address_table = fetch_database_data(connection)

    parquet_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.parquet')]
    connection.close()

    # Process files in batches
    for i in range(0, len(parquet_files), batch_size):
        batch_files = parquet_files[i:i + batch_size]
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = [executor.submit(process_file, file, address_table, output_folder) for file in batch_files]
            for future in concurrent.futures.as_completed(futures):
                future.result()

server = 'localhost'

process_parquet_files(folder_path, resolution_folder_path, server, database)


# In[16]:


process_parquet_files(folder_path_gr, resolution_folder_path_gr, server, database)


# In[ ]:


#Admin_1 Lob


# In[15]:


parquet_files_grp = [os.path.join(resolution_folder_path, f) for f in os.listdir(resolution_folder_path) if f.endswith('.parquet')]


# In[16]:


parquet_files_grp_gr = [os.path.join(resolution_folder_path_gr, f) for f in os.listdir(resolution_folder_path_gr) if f.endswith('.parquet')]



Folders created successfully at D:\RISHIN\TESTING\TEST_10\ILC2024_NZFL_EP_PLA_NZD_Losses
Saved joined file to D:\RISHIN\TESTING\TEST_10\ILC2024_NZFL_EP_PLA_NZD_Losses\processing\Resolution Added\PLT_1_100.parquet
Saved joined file to D:\RISHIN\TESTING\TEST_10\ILC2024_NZFL_EP_PLA_NZD_Losses\processing\Resolution Added\PLT_0_100.parquet
Saved joined file to D:\RISHIN\TESTING\TEST_10\ILC2024_NZFL_EP_PLA_NZD_Losses\processing\Resolution Added\PLT_2_100.parquet
Saved joined file to D:\RISHIN\TESTING\TEST_10\ILC2024_NZFL_EP_PLA_NZD_Losses\processing\Resolution Added\PLT_5_100.parquet
Saved joined file to D:\RISHIN\TESTING\TEST_10\ILC2024_NZFL_EP_PLA_NZD_Losses\processing\Resolution Added\PLT_3_100.parquet
Saved joined file to D:\RISHIN\TESTING\TEST_10\ILC2024_NZFL_EP_PLA_NZD_Losses\processing\Resolution Added\PLT_4_100.parquet
Saved joined file to D:\RISHIN\TESTING\TEST_10\ILC2024_NZFL_EP_PLA_NZD_Losses\processing\Resolution Added\PLT_6_100.parquet
Saved joined file to D:\RISHIN\TESTING\TEST

In [5]:
def fetch_lobdet_data(server, database):
    connection = connect_to_database(server, database)
    try:
        lobdet_query = 'SELECT LOBNAME, LOBDETID FROM lobdet'
        lobdet_df = pd.read_sql(lobdet_query, connection)
        lobname_to_lobid_2 = dict(zip(lobdet_df.LOBNAME, lobdet_df.LOBDETID))
    finally:
        connection.close()
    return lobname_to_lobid_2
lobname_to_lobid=fetch_lobdet_data(server, database)

In [17]:
main_folder_path

'D:\\RISHIN\\TESTING\\TEST_10\\ILC2024_NZFL_EP_PLA_NZD_Losses'

In [20]:
def process_parquet_files_2(parquet_files, filter_string, lob_id, speriod, samples, rps_values,parquet_file_path,Cat):
    partial_folder_path = os.path.join(processing_folder_path, 'partial')
    concatenated_folder_path = os.path.join(processing_folder_path, 'concatenated')
    os.makedirs(partial_folder_path, exist_ok=True)
    os.makedirs(concatenated_folder_path, exist_ok=True)

    # Initialize an empty list to store the results
    final_grouped_table_1 = []

    # Process each Parquet file individually
    for file in parquet_files:
        # Read the Parquet file into a PyArrow Table
        table = pq.read_table(file)
        
        # Filter the table based on the filter_string
        table = table.filter(pc.equal(table['LobName'], filter_string))
        
        # Skip if the filtered table is empty
        
        grouped_table_1 = table.group_by(['EventId', 'PeriodId', 'EventDate','Admin1Name','Admin1Id']).aggregate([('Loss', 'sum')])
        grouped_table_1 = grouped_table_1.rename_columns(['EventId', 'PeriodId', 'EventDate','Admin1Name','Admin1Id','Sum_Loss'])
    
        # Write intermediate results to disk
        pq.write_table(grouped_table_1, os.path.join(partial_folder_path, f'grouped_table_1_{os.path.basename(file)}'))

    # Read all intermediate files and concatenate them
    intermediate_files_1 = [os.path.join(partial_folder_path, f) for f in os.listdir(partial_folder_path) if f.startswith('grouped_table_1_')]

    final_grouped_table_1 = [pq.read_table(f) for f in intermediate_files_1]

    final_table_1 = pa.concat_tables(final_grouped_table_1)

    # Perform final grouping and sorting
    f_grouped_table_1 = final_table_1.group_by(['EventId', 'PeriodId', 'EventDate','Admin1Name','Admin1Id']).aggregate([('Sum_Loss', 'sum')])
    sorted_final_table_1 = f_grouped_table_1.sort_by([('Sum_Loss_sum', 'descending')])

    # Get distinct Admin1Id and Admin1Name
    distinct_admins = sorted_final_table_1.select(['Admin1Id', 'Admin1Name']).to_pandas().drop_duplicates()
    distinct_admins = distinct_admins.reset_index(drop=True)
    pq.write_table(sorted_final_table_1, os.path.join(concatenated_folder_path, 'final_grouped_table_1.parquet'))
     # Delete all non-concatenated files
    for f in intermediate_files_1:
        os.remove(f)
    
    for idx, row in distinct_admins.iterrows():
        admin1_id = row['Admin1Id']
        admin1_name = row['Admin1Name']
        # Filter the table for the current Admin1Id and Admin1Name
        filtered_table = sorted_final_table_1.filter(pa.compute.equal(sorted_final_table_1['Admin1Id'], admin1_id))
        dataframe_1 = filtered_table.to_pandas()

        dataframe_2 = dataframe_1.groupby(['PeriodId','Admin1Name','Admin1Id'], as_index=False).agg({'Sum_Loss_sum': 'max'})
        dataframe_2.rename(columns={'Sum_Loss_sum': 'Max_Loss'}, inplace=True)
        dataframe_2 = dataframe_2.sort_values(by='Max_Loss', ascending=False).reset_index(drop=True)

        dataframe_2['rate'] = (1 / (speriod * samples))
        dataframe_2['cumrate'] = dataframe_2['rate'].cumsum().round(6)
        dataframe_2['RPs'] = (1 / dataframe_2['cumrate'])
        dataframe_2['TCE_OEP_1'] = ((dataframe_2['Max_Loss'] - dataframe_2['Max_Loss'].shift(-1)) * (dataframe_2['cumrate'] + dataframe_2['cumrate'].shift(-1)) * 0.5)
        dataframe_2['TCE_OEP_2'] = (dataframe_2['TCE_OEP_1'].shift().cumsum() * dataframe_2['RPs'])
        dataframe_2['TCE_OEP_Final'] = (dataframe_2['TCE_OEP_2'] + dataframe_2['Max_Loss'])

        dataframe_3 = dataframe_1.groupby(['PeriodId','Admin1Name','Admin1Id'], as_index=False).agg({'Sum_Loss_sum': 'sum'})
        dataframe_3.rename(columns={'Sum_Loss_sum': 'S_Sum_Loss'}, inplace=True)
        dataframe_3 = dataframe_3.sort_values(by='S_Sum_Loss', ascending=False).reset_index(drop=True)

        dataframe_3['rate'] = (1 / (speriod * samples))
        dataframe_3['cumrate'] = dataframe_3['rate'].cumsum().round(6)
        dataframe_3['RPs'] = (1 / dataframe_3['cumrate'])
        dataframe_3['TCE_AEP_1'] = ((dataframe_3['S_Sum_Loss'] - dataframe_3['S_Sum_Loss'].shift(-1)) * (dataframe_3['cumrate'] + dataframe_3['cumrate'].shift(-1)) * 0.5)
        dataframe_3['TCE_AEP_2'] = (dataframe_3['TCE_AEP_1'].shift().cumsum() * dataframe_3['RPs'])
        dataframe_3['TCE_AEP_Final'] = (dataframe_3['TCE_AEP_2'] + dataframe_3['S_Sum_Loss'])

        fdataframe_2 = pd.DataFrame()
        fdataframe_3 = pd.DataFrame()

        for value in rps_values:
                                
                closest_index_2 = (dataframe_2['RPs'] - value).abs().idxmin()
                
                # Assign the closest value to the new DataFrames
                fdataframe_2 = pd.concat([fdataframe_2, dataframe_2.loc[[closest_index_2]]])
                fdataframe_3 = pd.concat([fdataframe_3, dataframe_3.loc[[closest_index_2]]])
                
                # Update the closest value to match the rp value exactly
                fdataframe_2.at[closest_index_2, 'RPs'] = float(value)
                fdataframe_3.at[closest_index_2, 'RPs'] = float(value)

        fdataframe_2.rename(columns={'Max_Loss': 'OEP', 'TCE_OEP_Final': 'TCE-OEP'}, inplace=True)
        columns_to_keep_2 = ['RPs', 'Admin1Name', 'Admin1Id']
        columns_to_melt_2 = ['OEP', 'TCE-OEP']
        melted_df_2 = fdataframe_2.melt(id_vars=columns_to_keep_2, value_vars=columns_to_melt_2, var_name='EPType', value_name='Loss')
        melted_df_2.rename(columns={'RPs': 'ReturnPeriod'}, inplace=True)
        final_df_2 = melted_df_2[['EPType', 'Loss', 'ReturnPeriod', 'Admin1Name', 'Admin1Id']]

        fdataframe_3.rename(columns={'S_Sum_Loss': 'AEP', 'TCE_AEP_Final': 'TCE-AEP'}, inplace=True)
        columns_to_keep_3 = ['RPs', 'Admin1Name', 'Admin1Id']
        columns_to_melt_3 = ['AEP', 'TCE-AEP']
        melted_df_3 = fdataframe_3.melt(id_vars=columns_to_keep_3, value_vars=columns_to_melt_3, var_name='EPType', value_name='Loss')
        melted_df_3.rename(columns={'RPs': 'ReturnPeriod'}, inplace=True)
        final_df_3 = melted_df_3[['EPType', 'Loss', 'ReturnPeriod','Admin1Name','Admin1Id']]

        final_df_EP_LOB_GU = pd.concat([final_df_2, final_df_3], ignore_index=True)
        new_ep_type_order = ["OEP", "AEP", "TCE-OEP", "TCE-AEP"]
        final_df_EP_LOB_GU['EPType'] = pd.Categorical(final_df_EP_LOB_GU['EPType'], categories=new_ep_type_order, ordered=True)
        final_df_EP_LOB_GU = final_df_EP_LOB_GU.sort_values(by=['EPType', 'ReturnPeriod'], ascending=[True, False]).reset_index(drop=True)

        # Add LobID and LobName columns
        final_df_EP_LOB_GU['LOBId'] = lob_id
        final_df_EP_LOB_GU['LOBName'] = filter_string
        final_df_EP_LOB_GU['LOBId'] = final_df_EP_LOB_GU['LOBId'].apply(lambda x: Decimal(x))
        final_df_EP_LOB_GU['Admin1Id'] = final_df_EP_LOB_GU['Admin1Id'].astype('int64')
        final_df_EP_LOB_GU['Admin1Id'] = final_df_EP_LOB_GU['Admin1Id'].apply(lambda x: Decimal(x))
        


        # Define the schema to match the required Parquet file schema
        schema = pa.schema([
            pa.field('EPType', pa.string(), nullable=True),
            pa.field('Loss', pa.float64(), nullable=True),
            pa.field('ReturnPeriod', pa.float64(), nullable=True),
            pa.field('Admin1Id',pa.int64(), nullable=True),
            pa.field('Admin1Name', pa.string(), nullable=True),
            pa.field('LOBName', pa.string(), nullable=True),
            pa.field('LOBId', pa.decimal128(38, 0), nullable=True),
        ])

        # Convert DataFrame to Arrow Table with the specified schema
        table = pa.Table.from_pandas(final_df_EP_LOB_GU, schema=schema)

        underscore_count = parquet_file_path.count('_')

        export_path =os.path.join(main_folder_path,'EP','Admin1_Lob',Cat)
        parquet_file_path = os.path.join(export_path, f"{os.path.splitext(parquet_file_path)[0]}_{idx}.parquet")

        # If there are 21 or more underscores, modify the file path
        if underscore_count >= 9:
            parts = parquet_file_path.split('_')
            # Remove the second last part which contains the number and the underscore before it
            parts = parts[:-2] + parts[-1:]
            parquet_file_path = '_'.join(parts)

        # Write the table to the parquet file
        pq.write_table(table, parquet_file_path)

        print(f"Parquet file saved successfully at {parquet_file_path}")
    
    



rps_values = [10000, 5000, 1000, 500, 250, 200, 100, 50, 25, 10, 5, 2]



#GU
for i, (lobname, lobid) in enumerate(lobname_to_lobid.items()):
    parquet_file_path = f'{proname}_EP_Admin1_Lob_GU_{i}.parquet'
    try:
        process_parquet_files_2(parquet_files_grp, lobname, lobid, speriod, samples, rps_values, parquet_file_path, "GU")
    except (NameError, AttributeError, ValueError) as e:
        print(f"Error processing {lobname}: {e}")
        pass


#for GR
    
for i, (lobname, lobid) in enumerate(lobname_to_lobid.items()):
    parquet_file_path = f'{proname}_EP_Admin1_Lob_GR_{i}.parquet'
    try:
        process_parquet_files_2(parquet_files_grp, lobname, lobid, speriod, samples, rps_values, parquet_file_path, "GR")
    except (NameError, AttributeError, ValueError) as e:
        print(f"Error processing {lobname}: {e}")
        pass


partial_folder_path = os.path.join(processing_folder_path, 'partial')
concatenated_folder_path = os.path.join(processing_folder_path, 'concatenated')


# In[40]:


gc.collect()



Parquet file saved successfully at D:\RISHIN\TESTING\TEST_10\ILC2024_NZFL_EP_PLA_NZD_Losses\EP\Admin1_Lob\GU\ILC2024_NZFL_EP_PLA_EP_Admin1_Lob_GU_0_0.parquet
Parquet file saved successfully at D:\RISHIN\TESTING\TEST_10\ILC2024_NZFL_EP_PLA_NZD_Losses\EP\Admin1_Lob\GU\ILC2024_NZFL_EP_PLA_EP_Admin1_Lob_GU_0_1.parquet
Parquet file saved successfully at D:\RISHIN\TESTING\TEST_10\ILC2024_NZFL_EP_PLA_NZD_Losses\EP\Admin1_Lob\GU\ILC2024_NZFL_EP_PLA_EP_Admin1_Lob_GU_0_2.parquet
Parquet file saved successfully at D:\RISHIN\TESTING\TEST_10\ILC2024_NZFL_EP_PLA_NZD_Losses\EP\Admin1_Lob\GU\ILC2024_NZFL_EP_PLA_EP_Admin1_Lob_GU_0_3.parquet
Parquet file saved successfully at D:\RISHIN\TESTING\TEST_10\ILC2024_NZFL_EP_PLA_NZD_Losses\EP\Admin1_Lob\GU\ILC2024_NZFL_EP_PLA_EP_Admin1_Lob_GU_0_4.parquet
Parquet file saved successfully at D:\RISHIN\TESTING\TEST_10\ILC2024_NZFL_EP_PLA_NZD_Losses\EP\Admin1_Lob\GU\ILC2024_NZFL_EP_PLA_EP_Admin1_Lob_GU_0_5.parquet
Parquet file saved successfully at D:\RISHIN\TESTING

KeyboardInterrupt: 

In [19]:
def concatenate_parquet_files(main_folder_path):
    subfolders = [
        'EP/Admin1_Lob/GU',
        'EP/Admin1_Lob/GR',
        'EP/Cresta_Lob/GU',
        'EP/Cresta_Lob/GR'
    ]

    for subfolder in subfolders:
        folder_path = os.path.join(main_folder_path, subfolder)
        files = [f for f in os.listdir(folder_path) if f.endswith('.parquet')]
        
        file_groups = {}
        for file in files:
            parts = file.split('_')
            if len(parts) >= 9:
                key = parts[8]
                if key not in file_groups:
                    file_groups[key] = []
                file_groups[key].append(file)
        
        for key, group_files in file_groups.items():
            tables = [pq.read_table(os.path.join(folder_path, f)) for f in group_files]
            concatenated_table = pa.concat_tables(tables)
            new_file_name = '_'.join(group_files[0].split('_')[:8]) + f'_{key}.parquet'
            pq.write_table(concatenated_table, os.path.join(folder_path, new_file_name))
            
            for file in group_files:
                os.remove(os.path.join(folder_path, file))

concatenate_parquet_files(main_folder_path)

In [21]:
#EP_ Admin1 Lob updated below

import polars as pl


# In[129]:


def process_parquet_files_2(parquet_files, filter_string, lob_id, speriod, samples, rps_values,parquet_file_path,Cat):
    partial_folder_path = os.path.join(processing_folder_path, 'partial')
    concatenated_folder_path = os.path.join(processing_folder_path, 'concatenated')
    os.makedirs(partial_folder_path, exist_ok=True)
    os.makedirs(concatenated_folder_path, exist_ok=True)

    # Initialize an empty list to store the results
    final_grouped_table_1 = []

    # Process each Parquet file individually
    for file in parquet_files:
        # Read the Parquet file into a PyArrow Table
        table = pl.read_parquet(file)
        
        # Filter the table based on the filter_string
        table = table.filter(pl.col('LobName') == filter_string)
        
        # Skip if the filtered table is empty
        
        grouped_table_1 = table.group_by(['EventId', 'PeriodId', 'EventDate','Admin1Name','Admin1Id']).agg(pl.sum('Loss').alias('Sum_Loss'))
    
        # Write intermediate results to disk
        grouped_table_1.write_parquet( os.path.join(partial_folder_path, f'grouped_table_1_{os.path.basename(file)}'))

    # Read all intermediate files and concatenate them
    intermediate_files_1 = [os.path.join(partial_folder_path, f) for f in os.listdir(partial_folder_path) if f.startswith('grouped_table_1_')]

    final_grouped_table_1 = [pl.read_parquet(f) for f in intermediate_files_1]

    final_table_1 = pl.concat(final_grouped_table_1)

    # Perform final grouping and sorting
    f_grouped_table_1 = final_table_1.group_by(['EventId', 'PeriodId', 'EventDate','Admin1Name','Admin1Id']).agg(pl.sum('Sum_Loss').alias('Sum_Loss_sum'))
    sorted_final_table_1 = f_grouped_table_1.sort(['Sum_Loss_sum'], descending=True)

    # Get distinct Admin1Id and Admin1Name
    distinct_admins = sorted_final_table_1.select(['Admin1Id', 'Admin1Name']).unique()
    distinct_admins = distinct_admins.with_columns(pl.col('Admin1Id').cast(pl.Int64))
    sorted_final_table_1.write_parquet(os.path.join(concatenated_folder_path, 'final_grouped_table_1.parquet'))
     # Delete all non-concatenated files
    for f in intermediate_files_1:
        os.remove(f)
    
    for idx, row in distinct_admins.iter_rows(named=True):
        admin1_id = row['Admin1Id']  # Accessing Admin1Id directly as a key
        admin1_name = row['Admin1Name']  # Accessing Admin1Name directly as a key        
        # Filter the table for the current Admin1Id
        filtered_table = sorted_final_table_1.filter(pl.col('Admin1Id') == admin1_id)
        dataframe_1 = filtered_table

        dataframe_2 = dataframe_1.group_by(['PeriodId', 'Admin1Name', 'Admin1Id']).agg(pl.col('Sum_Loss_sum').max().alias('Max_Loss'))
        dataframe_2 = dataframe_2.sort('Max_Loss', reverse=True)
        dataframe_2 = dataframe_2.with_columns([
        (1 / (speriod * samples)).alias('rate'),
        pl.col('rate').cumsum().round(6).alias('cumrate'),
        (1 / pl.col('cumrate')).alias('RPs'),
        ((pl.col('Max_Loss') - pl.col('Max_Loss').shift(-1)) * (pl.col('cumrate') + pl.col('cumrate').shift(-1)) * 0.5).alias('TCE_OEP_1'),
        (pl.col('TCE_OEP_1').shift().cumsum() * pl.col('RPs')).alias('TCE_OEP_2'),
        (pl.col('TCE_OEP_2') + pl.col('Max_Loss')).alias('TCE_OEP_Final')
        ])

        dataframe_3 = dataframe_1.group_by(['PeriodId','Admin1Name','Admin1Id']).agg(pl.col('Sum_Loss_sum').max().alias('S_Sum_Loss'))
        dataframe_3 = dataframe_3.sort('S_Sum_Loss', reverse=True)

        dataframe_3 = dataframe_3.with_columns([
            (1 / (speriod * samples)).alias('rate'),
            pl.col('rate').cumsum().round(6).alias('cumrate'),
            (1 / pl.col('cumrate')).alias('RPs'),
            ((pl.col('S_Sum_Loss') - pl.col('S_Sum_Loss').shift(-1)) * (pl.col('cumrate') + pl.col('cumrate').shift(-1)) * 0.5).alias('TCE_AEP_1'),
            (pl.col('TCE_AEP_1').shift().cumsum() * pl.col('RPs')).alias('TCE_AEP_2'),
            (pl.col('TCE_AEP_2') + pl.col('S_Sum_Loss')).alias('TCE_AEP_Final')
        ])

        fdataframe_2 = pl.DataFrame()
        fdataframe_3 = pl.DataFrame()

        for value in rps_values:
            closest_index_2 = (dataframe_2['RPs'] - value).abs().arg_min()
            
            # Assign the closest value to the new DataFrames
            fdataframe_2 = pl.concat([fdataframe_2, dataframe_2[closest_index_2:closest_index_2+1]])
            fdataframe_3 = pl.concat([fdataframe_3, dataframe_3[closest_index_2:closest_index_2+1]])
            
            # Update the closest value to match the rp value exactly
            fdataframe_2 = fdataframe_2.with_columns(pl.lit(float(value)).alias('RPs'))
            fdataframe_3 = fdataframe_3.with_columns(pl.lit(float(value)).alias('RPs'))

        fdataframe_2 = fdataframe_2.rename({'Max_Loss': 'OEP', 'TCE_OEP_Final': 'TCE-OEP'})
        columns_to_keep_2 = ['RPs', 'Admin1Name', 'Admin1Id']
        columns_to_melt_2 = ['OEP', 'TCE-OEP']
        melted_df_2 = fdataframe_2.melt(id_vars=columns_to_keep_2, value_vars=columns_to_melt_2, variable_name='EPType', value_name='Loss')
        melted_df_2 = melted_df_2.rename({'RPs': 'ReturnPeriod'})
        final_df_2 = melted_df_2.select(['EPType', 'Loss', 'ReturnPeriod', 'Admin1Name', 'Admin1Id'])

        fdataframe_3 = fdataframe_3.rename({'S_Sum_Loss': 'AEP', 'TCE_AEP_Final': 'TCE-AEP'})
        columns_to_keep_3 = ['RPs', 'Admin1Name', 'Admin1Id']
        columns_to_melt_3 = ['AEP', 'TCE-AEP']
        melted_df_3 = fdataframe_3.melt(id_vars=columns_to_keep_3, value_vars=columns_to_melt_3, variable_name='EPType', value_name='Loss')
        melted_df_3 = melted_df_3.rename({'RPs': 'ReturnPeriod'})
        final_df_3 = melted_df_3.select(['EPType', 'Loss', 'ReturnPeriod', 'Admin1Name', 'Admin1Id'])

        final_df_EP_LOB_GU = pl.concat([final_df_2, final_df_3])
        new_ep_type_order = ["OEP", "AEP", "TCE-OEP", "TCE-AEP"]
        final_df_EP_LOB_GU = final_df_EP_LOB_GU.with_columns(
            pl.Categorical(final_df_EP_LOB_GU['EPType'], categories=new_ep_type_order, ordered=True)
        )
        final_df_EP_LOB_GU = final_df_EP_LOB_GU.sort(['EPType', 'ReturnPeriod'], reverse=[False, True])

        # Add LobID and LobName columns
        final_df_EP_LOB_GU = final_df_EP_LOB_GU.with_columns([
            pl.lit(lob_id).alias('LOBId'),
            pl.lit(filter_string).alias('LOBName')
        ])

        final_df_EP_LOB_GU = final_df_EP_LOB_GU.with_columns([
            pl.col('LOBId').apply(lambda x: Decimal(x)).alias('LOBId'),
            pl.col('Admin1Id').cast(pl.Int64),
            pl.col('Admin1Id').apply(lambda x: Decimal(x)).alias('Admin1Id')
        ])

        # Convert Polars DataFrame to Arrow Table
        table = final_df_EP_LOB_GU.to_arrow()

        # Define the schema to match the required Parquet file schema
        schema = pa.schema([
            pa.field('EPType', pa.string(), nullable=True),
            pa.field('Loss', pa.float64(), nullable=True),
            pa.field('ReturnPeriod', pa.float64(), nullable=True),
            pa.field('Admin1Id', pa.int64(), nullable=True),
            pa.field('Admin1Name', pa.string(), nullable=True),
            pa.field('LOBName', pa.string(), nullable=True),
            pa.field('LOBId', pa.decimal128(38, 0), nullable=True),
        ])

        underscore_count = parquet_file_path.count('_')

        export_path = os.path.join(main_folder_path, 'EP', 'Admin1_Lob', Cat)
        parquet_file_path = os.path.join(export_path, f"{os.path.splitext(parquet_file_path)[0]}_{idx}.parquet")

        # If there are 21 or more underscores, modify the file path
        if underscore_count >= 9:
            parts = parquet_file_path.split('_')
            # Remove the second last part which contains the number and the underscore before it
            parts = parts[:-2] + parts[-1:]
            parquet_file_path = '_'.join(parts)

        # Write the table to the parquet file
        pq.write_table(table, parquet_file_path)

        print(f"Parquet file saved successfully at {parquet_file_path}")
    
    



rps_values = [10000, 5000, 1000, 500, 250, 200, 100, 50, 25, 10, 5, 2]



#GU
for i, (lobname, lobid) in enumerate(lobname_to_lobid.items()):
    parquet_file_path = f'{proname}_EP_Admin1_Lob_GU_{i}.parquet'
    try:
        process_parquet_files_2(parquet_files_grp, lobname, lobid, speriod, samples, rps_values, parquet_file_path, "GU")
    except (NameError, AttributeError, ValueError) as e:
        print(f"Error processing {lobname}: {e}")
        pass


#for GR
    
for i, (lobname, lobid) in enumerate(lobname_to_lobid.items()):
    parquet_file_path = f'{proname}_EP_Admin1_Lob_GR_{i}.parquet'
    try:
        process_parquet_files_2(parquet_files_grp, lobname, lobid, speriod, samples, rps_values, parquet_file_path, "GR")
    except (NameError, AttributeError, ValueError) as e:
        print(f"Error processing {lobname}: {e}")
        pass


partial_folder_path = os.path.join(processing_folder_path, 'partial')
concatenated_folder_path = os.path.join(processing_folder_path, 'concatenated')


TypeError: string indices must be integers, not 'str'

In [None]:
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
from decimal import Decimal
import os
import gc

# Get intermediate files
intermediate_files = [
    os.path.join(partial_folder_path, f) 
    for f in os.listdir(partial_folder_path) 
    if f.startswith('grouped_table_1_')
]

# Initialize dictionary to accumulate sums
banana_sums = {}
chunk_size = 1000000  # Adjust based on your memory constraints

# Process each file in chunks
print("Processing chunks and accumulating sums...")
for file_path in intermediate_files:
    parquet_file = pq.ParquetFile(file_path)
    
    for batch in parquet_file.iter_batches(batch_size=chunk_size):
        # Convert batch to pandas for grouping
        df_chunk = batch.to_pandas()
        
        # Group by AppleName and sum Banana
        grouped = df_chunk.groupby('AppleName')['Banana'].sum()
        
        # Update running sums
        for apple_name, banana_sum in grouped.items():
            if apple_name in banana_sums:
                banana_sums[apple_name] += banana_sum
            else:
                banana_sums[apple_name] = banana_sum
        
        # Clean up memory
        del df_chunk
        del grouped
        gc.collect()
    
    # Clean up file objects
    del parquet_file
    gc.collect()

# Convert accumulated sums to DataFrame
print("Creating final DataFrame...")
final_df_fruit = pd.DataFrame.from_dict(
    banana_sums, 
    orient='index', 
    columns=['Banana']
).reset_index().rename(columns={'index': 'AppleName'})

# Sort by Banana in descending order
final_df_fruit = final_df_fruit.sort_values('Banana', ascending=False)

# Map AppleName to AppleId
final_df_fruit['AppleId'] = final_df_fruit['AppleName'].map(apple_name_to_apple_id).apply(lambda x: Decimal(x))

# Add NaN columns
final_df_fruit['Cherry'] = np.nan
final_df_fruit['Date'] = np.nan

# Reorder columns
final_df_fruit = final_df_fruit[['Banana', 'Cherry', 'Date', 'AppleId', 'AppleName']]

# Define schema
desired_schema_fruit = pa.schema([
    pa.field('Banana', pa.float64()),
    pa.field('Cherry', pa.float64()),
    pa.field('Date', pa.float64()),
    pa.field('AppleId', pa.decimal128(38)),
    pa.field('AppleName', pa.string())
])

# Export in chunks
print("Exporting data in chunks...")
export_chunk_size = min(chunk_size, len(final_df_fruit))
for i in range(0, len(final_df_fruit), export_chunk_size):
    chunk = final_df_fruit.iloc[i:i+export_chunk_size]
    
    # Convert chunk to PyArrow table with schema
    table_chunk = pa.Table.from_pandas(chunk, schema=desired_schema_fruit)
    
    # Write chunk to parquet
    if i == 0:
        # First chunk: create new file
        pq.write_table(table_chunk, parquet_file_path)
    else:
        # Subsequent chunks: append to existing file
        pq.write_table(table_chunk, parquet_file_path, append=True)
    
    # Clean up
    del chunk
    del table_chunk
    gc.collect()

print(f"Parquet file saved successfully at {parquet_file_path}")

# Clean up folders
if os.path.exists(partial_folder_path):
    for file in os.listdir(partial_folder_path):
        file_path = os.path.join(partial_folder_path, file)
        try:
            os.remove(file_path)
        except Exception as e:
            print(f"Error deleting {file_path}: {e}")
    try:
        os.rmdir(partial_folder_path)
    except Exception as e:
        print(f"Error deleting folder {partial_folder_path}: {e}")

if os.path.exists(concatenated_folder_path):
    for file in os.listdir(concatenated_folder_path):
        file_path = os.path.join(concatenated_folder_path, file)
        try:
            os.remove(file_path)
        except Exception as e:
            print(f"Error deleting {file_path}: {e}")
    try:
        os.rmdir(concatenated_folder_path)
    except Exception as e:
        print(f"Error deleting folder {concatenated_folder_path}: {e}")

In [None]:
def process_PLT_lob(parquet_files, export_path):
    # Directory to store intermediate results
    intermediate_dir = os.path.join(main_folder_path, 'intermediate_results')
    os.makedirs(intermediate_dir, exist_ok=True)

    # Process each Parquet file in chunks and write intermediate results to disk
    for i, file in enumerate(parquet_files):
        file_path = os.path.join(folder_path, file)
        parquet_file = pq.ParquetFile(file_path)
        for j, batch in enumerate(parquet_file.iter_batches()):
            table = pa.Table.from_batches([batch])
            
            # Cast columns to the desired types
            table = table.set_column(table.schema.get_field_index('PeriodId'), 'PeriodId', pa.compute.cast(table['PeriodId'], pa.decimal128(38, 0)))
            table = table.set_column(table.schema.get_field_index('EventId'), 'EventId', pa.compute.cast(table['EventId'], pa.decimal128(38, 0)))
            table = table.set_column(table.schema.get_field_index('EventDate'), 'EventDate', pa.compute.cast(table['EventDate'], pa.timestamp('ms', tz='UTC')))
            table = table.set_column(table.schema.get_field_index('LossDate'), 'LossDate', pa.compute.cast(table['LossDate'], pa.timestamp('ms', tz='UTC')))
            table = table.set_column(table.schema.get_field_index('Loss'), 'Loss', pa.compute.cast(table['Loss'], pa.float64()))
            table = table.set_column(table.schema.get_field_index('Region'), 'Region', pa.compute.cast(table['Region'], pa.string()))
            table = table.set_column(table.schema.get_field_index('Peril'), 'Peril', pa.compute.cast(table['Peril'], pa.string()))
            table = table.set_column(table.schema.get_field_index('Weight'), 'Weight', pa.compute.cast(table['Weight'], pa.float64()))
            table = table.set_column(table.schema.get_field_index('LobId'), 'LobId', pa.compute.cast(table['LobId'], pa.decimal128(38, 0)))
            table = table.set_column(table.schema.get_field_index('LobName'), 'LobName', pa.compute.cast(table['LobName'], pa.string()))
            
            grouped_table = table.group_by(group_by_columns).aggregate([('Loss', 'sum')])
            intermediate_file = os.path.join(intermediate_dir, f"intermediate_{i}_{j}.parquet")
            pq.write_table(grouped_table, intermediate_file)

            import pyarrow as pa


# Get intermediate files
intermediate_files = [
    os.path.join(partial_folder_path, f) 
    for f in os.listdir(partial_folder_path) 
    if f.startswith('grouped_table_1_')
]

# Initialize dictionary to accumulate sums
banana_sums = {}
chunk_size = 1000000  # Adjust based on your memory constraints

# Process each file in chunks
print("Processing chunks and accumulating sums...")
for file_path in intermediate_files:
    parquet_file = pq.ParquetFile(file_path)
    
    for batch in parquet_file.iter_batches(batch_size=chunk_size):
        # Convert batch to pandas for grouping
        df_chunk = batch.to_pandas()
        
        # Group by AppleName and sum Banana
        grouped = df_chunk.groupby('AppleName')['Banana'].sum()
        
        # Update running sums
        for apple_name, banana_sum in grouped.items():
            if apple_name in banana_sums:
                banana_sums[apple_name] += banana_sum
            else:
                banana_sums[apple_name] = banana_sum
        
        # Clean up memory
        del df_chunk
        del grouped
        gc.collect()
    
    # Clean up file objects
    del parquet_file
    gc.collect()

# Convert accumulated sums to DataFrame
print("Creating final DataFrame...")
final_df_fruit = pd.DataFrame.from_dict(
    banana_sums, 
    orient='index', 
    columns=['Banana']
).reset_index().rename(columns={'index': 'AppleName'})

# Sort by Banana in descending order
final_df_fruit = final_df_fruit.sort_values('Banana', ascending=False)

# Map AppleName to AppleId
final_df_fruit['AppleId'] = final_df_fruit['AppleName'].map(apple_name_to_apple_id).apply(lambda x: Decimal(x))

# Add NaN columns
final_df_fruit['Cherry'] = np.nan
final_df_fruit['Date'] = np.nan

# Reorder columns
final_df_fruit = final_df_fruit[['Banana', 'Cherry', 'Date', 'AppleId', 'AppleName']]

# Define schema
desired_schema_fruit = pa.schema([
    pa.field('Banana', pa.float64()),
    pa.field('Cherry', pa.float64()),
    pa.field('Date', pa.float64()),
    pa.field('AppleId', pa.decimal128(38)),
    pa.field('AppleName', pa.string())
])

# Export in chunks
print("Exporting data in chunks...")
export_chunk_size = min(chunk_size, len(final_df_fruit))
for i in range(0, len(final_df_fruit), export_chunk_size):
    chunk = final_df_fruit.iloc[i:i+export_chunk_size]
    
    # Convert chunk to PyArrow table with schema
    table_chunk = pa.Table.from_pandas(chunk, schema=desired_schema_fruit)
    
    # Write chunk to parquet
    if i == 0:
        # First chunk: create new file
        pq.write_table(table_chunk, parquet_file_path)
    else:
        # Subsequent chunks: append to existing file
        pq.write_table(table_chunk, parquet_file_path, append=True)
    
    # Clean up
    del chunk
    del table_chunk
    gc.collect()

print(f"Parquet file saved successfully at {parquet_file_path}")

# Clean up folders
if os.path.exists(partial_folder_path):
    for file in os.listdir(partial_folder_path):
        file_path = os.path.join(partial_folder_path, file)
        try:
            os.remove(file_path)
        except Exception as e:
            print(f"Error deleting {file_path}: {e}")
    try:
        os.rmdir(partial_folder_path)
    except Exception as e:
        print(f"Error deleting folder {partial_folder_path}: {e}")

if os.path.exists(concatenated_folder_path):
    for file in os.listdir(concatenated_folder_path):
        file_path = os.path.join(concatenated_folder_path, file)
        try:
            os.remove(file_path)
        except Exception as e:
            print(f"Error deleting {file_path}: {e}")
    try:
        os.rmdir(concatenated_folder_path)
    except Exception as e:
        print(f"Error deleting folder {concatenated_folder_path}: {e}")







    

    # Read intermediate results and combine them
    intermediate_files = [os.path.join(intermediate_dir, f) for f in os.listdir(intermediate_dir) if f.endswith('.parquet')]
    intermediate_tables = [pq.read_table(file) for file in intermediate_files]
    combined_grouped_table = pa.concat_tables(intermediate_tables)

    # Perform the final group by and aggregation
    final_grouped_table = combined_grouped_table.group_by(group_by_columns).aggregate([('Loss_sum', 'sum')])
    final_grouped_table = final_grouped_table.sort_by([('Loss_sum_sum', 'descending')])

    # Rename the aggregated column
    final_grouped_table = final_grouped_table.rename_columns(group_by_columns + ['Loss'])

    # Reorder the columns in the desired order
    final_grouped_table = final_grouped_table.select(ordered_columns)

    # Save the final table to a Parquet file
        # Delete intermediate files
    for file in intermediate_files:
        try:
            os.remove(file)
        except FileNotFoundError:
            print(f"File not found: {file}")

        # Remove the intermediate directory
    try:
        os.rmdir(intermediate_dir)
    except FileNotFoundError:
        print(f"Directory not found: {intermediate_dir}")
    except OSError:
        print(f"Directory not empty or other error: {intermediate_dir}")

    try:
        pq.write_table(final_grouped_table, export_path)
        print(f"Parquet file saved successfully at {export_path}")
    except PermissionError as e:
        print(f"PermissionError: {e}")
    except Exception as e:
        print(f"Error saving Parquet file: {e}")
        







schema = pa.schema([
    pa.field('PeriodId', pa.decimal128(38, 0), nullable=True),
    pa.field('EventId', pa.decimal128(38, 0), nullable=True),
    pa.field('EventDate', pa.timestamp('ms', tz='UTC'), nullable=True),
    pa.field('LossDate', pa.timestamp('ms', tz='UTC'), nullable=True),
    pa.field('Loss', pa.float64(), nullable=True),
    pa.field('Region', pa.string(), nullable=True),
    pa.field('Peril', pa.string(), nullable=True),
    pa.field('Weight', pa.float64(), nullable=True),
    pa.field('LobId', pa.decimal128(38, 0), nullable=True),
    pa.field('LobName', pa.string(), nullable=True)
])

group_by_columns = ['PeriodId', 'EventId', 'EventDate', 'LossDate', 'Region', 'Peril', 'Weight', 'LobId', 'LobName']
ordered_columns = ['PeriodId', 'EventId', 'EventDate', 'LossDate', 'Loss', 'Region', 'Peril', 'Weight', 'LobId', 'LobName']





#for GU




export_path = os.path.join(main_folder_path, 'PLT', 'Lob', 'GU', f'{proname}_PLT_Lob_GU_0.parquet')

process_PLT_lob(parquet_files, export_path)




#for GR




export_path = os.path.join(main_folder_path, 'PLT', 'Lob', 'GR', f'{proname}_PLT_Lob_GR_0.parquet')

process_PLT_lob(parquet_files_gr, export_path)




flush_cache()



