In [3]:
import chainladder as cl;
import pandas as pd;
import numpy as np;
import scipy.stats as st;
import itertools;
from math import factorial;


####Pandas pipeline

def load_data(path:str):
    
    return pd.read_csv(path)



    



def to_date(dataframe, cols):
    '''converts columns of a dataframe to pandas compatible date format'''
   
    try:
        dataframe[cols]=dataframe[cols].apply(pd.to_datetime)
    
    except ValueError:
            pass
    
    return dataframe

def select_columns_Paid(dataframe):
    
    dataframe=dataframe[['lob','accident_period','transaction_period','paid_amount']]
    
    return dataframe    

            
    
    
def select_columns(dataframe):
    
    dataframe=dataframe[['lob','accident_period','transaction_period','gross_paid','ri_paid','net_paid']]
    
    return dataframe


def select_columnsYearly(dataframe):
    
    dataframe=dataframe[['lob','accident_mapped','paid_mapped','paid_amount','os_amount']]
    
    return dataframe

def select_columns1(dataframe):
    
    dataframe=dataframe[['lob','accident_period','transaction_period','paid_amount','os_amount','incurred_amount']]
    
    return dataframe
    
def select_columns2(dataframe):
    
    dataframe=dataframe[['lob','accident_period','transaction_period','paid_amount']]
    
    return dataframe    


def select_columns4(dataframe):
    
    dataframe=dataframe[['lob','accident_period','transaction_period','paid_amount_recoveries']]
    
    return dataframe  

def select_columns5(dataframe):
    
    dataframe=dataframe[['lob','accident_period','transaction_period','gross recoveries settled']]
    
    return dataframe    

def select_columns3(dataframe):
    
    dataframe=dataframe[['lob','accident_period','transaction_period','paid_amount_recoveries','os_amount_recoveries']]
    
    return dataframe   


######Function to remove large losses by providing a percentile

def Remove_Largeloss(Quantile:float,DataFrame):
    
    "Returns Dataframe adjusted for large losses at the defined percentile"
    
    if isinstance(DataFrame,pd.DataFrame):
        
        Grouped_Data=DataFrame.groupby(["m_policy_no","accident_quarter_bracket","lob"]).agg({'gross_paid':np.sum}).reset_index()
        
        Grouped_Data['loss_key']=Grouped_Data['m_policy_no']+Grouped_Data['accident_quarter_bracket'].astype(str)+Grouped_Data['lob']
        
        Threshold=Grouped_Data['gross_paid'].quantile(Quantile)
        
        Index_list=Grouped_Data.index[Grouped_Data['gross_paid']<Threshold]
        
        Grouped_Data=Grouped_Data.loc[Index_list]
        
        DataFrame['loss_key']=DataFrame['m_policy_no']+DataFrame['accident_quarter_bracket'].astype(str)+DataFrame['lob']
        
        Final=DataFrame[DataFrame['loss_key'].isin(Grouped_Data['loss_key'])]
                        
        return (Final)
    
                    


        



#######################Create triangle
def Create_Triangle(DataFrame,Start_Date,End_Date):
    
    '''DataFrame: Data Set Containing Claims Data
       
       Start_Date: Period From
       
       End_Date: Period End
       
       
    '''

    if isinstance(DataFrame,pd.DataFrame):

            Triangles=cl.Triangle(data=DataFrame[((DataFrame['accident_period']>=Start_Date) & (DataFrame['accident_period']<=End_Date)) & ((DataFrame['transaction_period']>=Start_Date) & (DataFrame['transaction_period']<=End_Date))],
                            origin='accident_period',development='transaction_period',
                           columns=['gross_paid','ri_paid','net_paid'], Cumulative=False,index=['lob'])
            
            

    return(Triangles)


def Create_Triangle1(DataFrame,Start_Date,End_Date):


    if isinstance(DataFrame,pd.DataFrame):

            Triangles=cl.Triangle(data=DataFrame[((DataFrame['accident_period']>=Start_Date) & (DataFrame['accident_period']<=End_Date)) & ((DataFrame['transaction_period']>=Start_Date) & (DataFrame['transaction_period']<=End_Date))],
                            origin='accident_period',development='transaction_period',
                           columns=['paid_amount'], Cumulative=False,index=['lob'])
            
            

    return(Triangles)

def Create_Triangle2(DataFrame,Start_Date,End_Date):


    if isinstance(DataFrame,pd.DataFrame):

            Triangles=cl.Triangle(data=DataFrame[((DataFrame['accident_period']>=Start_Date) & (DataFrame['accident_period']<=End_Date)) & ((DataFrame['transaction_period']>=Start_Date) & (DataFrame['transaction_period']<=End_Date))],
                            origin='accident_period',development='transaction_period',
                           columns=['os_amount','paid_amount','incurred_amount'], Cumulative=False,index=['lob'])
            
            

    return(Triangles)







########################## Function to select LOBs and the corresponding triangle


def Select_Lob(lobs,Triangles,Type):
    
    '''
    
    aggregates triangles for given lobs
    
    lobs: list containing names of lobs
    Triangle : Triangle object to pass
    Type : Granularity of data. Can take : 'Yearly','Quarterly' or 'Monthly'
        
    
    '''
    
    if isinstance(Triangles,cl.Triangle):

        
        
        if (Type=='Yearly'):
            return (Triangles.loc[Triangles['lob'].isin(lobs)].sum(axis=0).incr_to_cum().grain('OYDY'))
        
        elif(Type=='Quarterly'):
            
            return (Triangles.loc[Triangles['lob'].isin(lobs)].sum(axis=0).incr_to_cum().grain('OQDQ'))
        
        elif(Type=='Monthly'):
            
            return (Triangles.loc[Triangles['lob'].isin(lobs)].sum(axis=0).incr_to_cum().grain('OMDM'))
        
        else:
            
            raise NameError
            print('Invalid Type')







#####################Function to Compute Ultimate Loss and IBNR

def Estimate_Chainladder(method:str,period:int,Triangle):
   result= cl.Development(average=method,n_periods=period).fit_transform(Triangle)
   chain_ladder_ultimate=cl.Chainladder().fit(result).ultimate_.to_frame()
   chain_ladder_ibnr=cl.Chainladder().fit(result).ibnr_.to_frame()
   Final=pd.concat([chain_ladder_ultimate,chain_ladder_ibnr],axis=1).reset_index()
   Final.columns=['Loss_Quarter','Ultimate_Loss','IBNR']
   return Final
           





#################### Compute Distribution of Reserves via Mack at specified percentiles

def Mack_Uncertainty(method:str,period:int,Triangle,tail:int):
    '''function implements Mack Method and returns relevant statistics'''
    
    Percentiles=[0.5,0.67,0.75,0.8,0.9,0.95,0.99]
    Mack=cl.MackChainladder()
    dev=cl.Development(average=method,n_periods=period,drop_low=True,drop_high=True).fit_transform(Triangle)
    dev=cl.TailConstant(tail=tail).fit_transform(dev)
    Mack.fit(dev)
    CDF=dev.cdf_
    Mack_Summary=Mack.summary_.to_frame().reset_index()
    Mack_Total_Std=Mack.total_mack_std_err_
    Mack_Summary=Mack_Summary.rename(columns={'index':'Loss Quarter'})
    
    for item in range(0,len(Percentiles)):
        Mack_Summary[str(Percentiles[item]*100)+'th'+' '+'Percentile']=np.maximum(Mack_Summary['IBNR'],0)+(st.norm.ppf(Percentiles[item])*Mack_Summary['Mack Std Err'])
        
    
    Mack_Summary['Sigma_Squared']=np.log(1+(Mack_Summary['Mack Std Err']/Mack_Summary['IBNR'])**2)
    Mack_Summary["Mu"]=np.log(np.maximum(Mack_Summary['IBNR'],0))-0.5*Mack_Summary['Sigma_Squared']
    
    for item in range(0,len(Percentiles)):
       Mack_Summary[str(Percentiles[item]*100)+'th'+' '+'Percentile'+' '+'lognormal']=np.exp(Mack_Summary["Mu"]+Mack_Summary['Sigma_Squared']*st.norm.ppf(Percentiles[item]))
    
    
    
    
    return(Mack_Summary,Mack_Total_Std,CDF) 

    










###################Compute Distribution via bootstrap

def Bootstrap(simulations:int,period:int,method:str,Triangle,drop_valuation=None,origin=None):
    
    '''function carries out the bootstrap and returns relevant statistics by line of business'''
    
    if origin is None:
        origin=Triangle.origin[0]
    #create empty List to store Mean,VaR and CTE
    Bootstrap_Mean=[]
    Bootstrap_VaR=[]
    Bootstrap_CTE=[]
    #defined percentiles
    Percentiles=[0.5,0.67,0.75,0.9,0.95,0.99]
    #create name tags for each measure
    #Names=['Mean']+["{}{}".format(i,'th VaR')for i in np.array(Percentiles)*100]
    #Names=Names+["{}{}".format(i,'th CTE')for i in np.array(Percentiles)*100]
    
    #bootstrap 
    BTS=cl.BootstrapODPSample(n_sims=simulations,random_state=50).fit_transform(Triangle)
    Dev_Method=cl.Development(n_periods=period,average=method,drop_valuation=drop_valuation).fit_transform(BTS)
    Results=cl.Chainladder().fit(Dev_Method).ibnr_ 
    Results=Results[Results.origin>=origin]
    Aggregated_Results=np.array(Results.sum('origin').to_frame())
    #store mean
    Bootstrap_Mean.append(Aggregated_Results.mean())
    
    #store VaR and CTE
    for item in range(0,len(Percentiles)):
        VaR=np.quantile(Aggregated_Results,Percentiles[item])
        CTE=Aggregated_Results[np.where(Aggregated_Results>np.quantile(Aggregated_Results,Percentiles[item]))].mean()
        Bootstrap_VaR.append(VaR)
        Bootstrap_CTE.append(CTE)
    
        
    #create dataframe
    Summary=Bootstrap_Mean+Bootstrap_VaR+Bootstrap_CTE
    
                          


    return(Summary)




def match_triangle_periods(claims_triangle,premiums_triangle):
    
    '''This functions allows matching accidents period for premiums and claims triangles by removing premium periods for which claims
        are not available and returning the premiums'''
        
    index_claims=claims_triangle.origin
    premiums_matched=premiums_triangle[premiums_triangle.origin.isin(index_claims)]
    
    return(premiums_matched)
    
    
        
    
    
    
    




def Bootstrap_bf(simulations:int,period:int,method:str,triangle_claims,premiums,lr_mu,lr_sigma,origin=None):
        
    '''function implements stochastic bornheutter ferguson'''
    
    if origin is None:
            origin=triangle_claims.origin[0]
       
    Risk_Measures=[]

    Percentiles=[0.5,0.67,0.75,0.9,0.95,0.99]
    
    Bootstrap_model=cl.BootstrapODPSample(n_sims=simulations).fit_transform(triangle_claims.dropna())
    Bootstrap_model=cl.Development(average=method,n_periods=period,drop_low=True).fit_transform(Bootstrap_model)
    
    BF_model=cl.BornhuetterFerguson(apriori=lr_mu,apriori_sigma=lr_sigma)
    BF_model.fit(Bootstrap_model,sample_weight=premiums)
        
    BF_ibnr=BF_model.ibnr_
    BF_ibnr=BF_ibnr[BF_ibnr.origin>=origin]
    BF_ibnr=np.array(BF_ibnr.sum('origin').to_frame())
    
     
    Risk_Measures.append(BF_ibnr.mean())
    
    for i in range(0,len(Percentiles),1):
       Risk_Measures.append(np.quantile(BF_ibnr,Percentiles[i]))
        
        
    for i in range(0,len(Percentiles),1):   
        Risk_Measures.append(BF_ibnr[np.where(BF_ibnr>np.quantile(BF_ibnr,Percentiles[i]))].mean())
        
    
        
       
    return(Risk_Measures)
        
    
        
    
    
    
        
 





def loss_ratio_bf(simulations:int,period:int,method:str,triangle_claims,premiums,lr_mu:float,lr_sigma:float,IBNR=None):
    
    '''function implements stochastic loss ratio'''
    Stochastic_cl=cl.Development(average=method,n_periods=-1,drop_low=True).fit_transform(triangle_claims)
    
    
    if IBNR is None:
    
        IBNR=[]
        IBNR.append(cl.BornhuetterFerguson(apriori=lr_mu,apriori_sigma=lr_sigma).fit(Stochastic_cl,sample_weight=premiums).ibnr_.to_frame().iloc[:,0].sum())
    
    if simulations>1:
        
        IBNR.append(cl.BornhuetterFerguson(apriori=lr_mu,apriori_sigma=lr_sigma).fit(Stochastic_cl,sample_weight=premiums).ibnr_.to_frame().iloc[:,0].sum())
        loss_ratio_bf(simulations-1,period,method,triangle_claims,premiums,lr_mu,lr_sigma,IBNR)
        
    return (IBNR)
    
     

#################Create all possible combinations for shapely's allocation using the lines of business







def Generate_Combinations(lob):
    '''function creates combinations to be used in shapley allocation'''
    
    """Pass array to function containing lines of business"""
        

    All_Comb=[]

    for i in range(1,len(lob)+1,1):
    
            Comb=list(itertools.combinations(lob,i))
            All_Comb=All_Comb+Comb
            
    return (All_Comb)







###################Create all possible Permutations for shapely's allocation using the lines of business

def Generate_Permutations(lob):
    '''function creates combinations to be used in shapley allocation'''
    """Pass array to function containing lines of business"""
    return(list(itertools.permutations(lob,len(lob))))
            
  






############################################################Shapley's Allocation


def Shapley_Value(Combinations,Risk_Adjustment,Name_Lob):
    
    '''function implements shapley allocation'''
    
    Players=np.array(Combinations,dtype=object)
    Valuations=np.array(Risk_Adjustment)
    Number_of_Players=len(Players[len(Players)-1])
    k=Name_Lob 
    
    
###Compute positions of combinations contained the lob and those that do not  
    
    Set=[] 

    Complement_Set=[]

    for i in range(0,len(Players),1):
    
                if (k in Players[i]):
      
                        Set.append(i)
      
                else:
                        Complement_Set.append(i)
   


####Computes Amount Allocated to the line under each scenario   


    Allocations=[]   
 
 
    for i in range(0,len(Set),1):
    
        if(i==0):
        
                     Allocations.append(Valuations[Set[i]])
        
        else:
        
                    Allocations.append(Valuations[Set[i]]-Valuations[Complement_Set[i-1]])
      



    Complement_Set_Length=[]  


    for i in range(0,len(Complement_Set),1):
            Complement_Set_Length.append(len(Players[Complement_Set[i]]))
        

    Complement_Set_Length.insert(0,0)   


###Array containing allocation weights

    Weights=[]      

    for i in range(0,len(Complement_Set_Length),1):
                Weights.append(factorial(Complement_Set_Length[i])*factorial(Number_of_Players-1-Complement_Set_Length[i])/factorial(Number_of_Players))
   

###Create summary data frame
    #Summary=pd.DataFrame({"Allocation":np.array(Allocations),"Weights":np.array(Weights)})

###Compute final allocation
    Final_Allocation=  np.dot(Allocations,Weights)
    
    
    return (Final_Allocation)












##################Excel Writer to extract output

def Excel_Writer(path:str,dataframes:list,sheet_names:list):
    
   '''Pass path and dataframes to export to excel'''
   writer=pd.ExcelWriter(path,engine='xlsxwriter')
   
   
 #Store dataframes to excel sheets
   for i in range(0,len(dataframes),1):
    
       dataframes[i].to_excel(writer,sheet_name=sheet_names[i])
   
   writer.save()

  
    
def column_names():
    '''create name tags for each measure'''
    Percentiles=[0.5,0.67,0.75,0.9,0.95,0.99]
    Names=['Mean']+["{}{}".format(i,'th VaR')for i in np.array(Percentiles)*100]
    Names=Names+["{}{}".format(i,'th CTE')for i in np.array(Percentiles)*100]
    return Names




