In [106]:
import pandas as pd
from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline 
import numpy as np

In [107]:
file_path=r"D:\IA\sustex-coporate\Updated_Financial_Statement_Data.csv"
df = pd.read_csv(file_path,delimiter=';')
df

Unnamed: 0,Date,Description,Revenue,Cost of Sales,Gross Profit,Operating Profit,Profit Before Taxation,Profit After Taxation,Total Assets,Total Liabilities,Total Equity
0,28 February 2022,Unaudited Group Profit or Loss - 6 months,285826.0,-198768.0,87058.0,19074.0,20619.0,15658.0,,,
1,31 August 2021,Audited Group Profit or Loss - 12 months,575115.0,-383357.0,191758.0,13856.0,17587.0,3350.0,,,
2,28 February 2022,Unaudited Group Profit or Loss - 6 months,224296.0,-134053.0,90243.0,10696.0,13206.0,10050.0,,,
3,28 February 2022,Unaudited Group Financial Position - 6 months,,,,,,,999596.0,999596.0,785885.0
4,31 August 2021,Audited Group Financial Position - 12 months,,,,,,,999269.0,999269.0,760579.0
5,28 February 2022,Unaudited Group Financial Position - 6 months,,,,,,,999596.0,999596.0,757669.0


In [108]:
def get_model(name, task):
    """Function to declare a model class"""
    try:
        tokenizer = AutoTokenizer.from_pretrained(name)
        model = AutoModelForSequenceClassification.from_pretrained(name)
        pipe = pipeline(task, model=model, tokenizer=tokenizer)
        return pipe
    except Exception as e:
        print(f"An error occurred while loading the model or tokenizer: {e}")
        return None

In [109]:
def classify_sentence_label(text,pipe_env,pipe_soc,pipe_gov,pipe_esg):
    """function to classify sentence labes as E,S,G"""
    label=None
    score_class=None
    # print({"tesxttt":text})
    if text is not None and pipe_env is not None and pipe_soc is not None and pipe_gov is not None and pipe_esg is not None:
        text = pipe_esg(text, padding=True, truncation=True)[0]['label']
        if text is not None:
            env = pipe_env(text, padding=True, truncation=True)
            if env[0]['label']!='none':
                label=env[0]['label']
                score_class=env[0]['score']
            else:
                social=pipe_soc(text, padding=True, truncation=True)
                if social[0]['label']!='none':
                    label=social[0]['label']
                    score_class=social[0]['score']
                    
                else:
                    gov=pipe_gov(text, padding=True, truncation=True)
                    if gov[0]['label']!='none':
                        label=gov[0]['label']
                        score_class=gov[0]['score']
    return label,score_class

In [110]:
def is_number(s):
    try:
        float(s)
        return True
    except ValueError:
        return False

In [111]:
def calculate_based_columns(columns_list,pipe_env, pipe_soc, pipe_gov, pipe_esg):
    """ function to prepare columns to calculate esg score """
    column_env = []
    column_soc = []
    column_gov = []
    for i in range(len(columns_list)):
        label, score_class = classify_sentence_label(columns_list[i], pipe_env, pipe_soc, pipe_gov, pipe_esg)
        non_nan_values = df[columns_list[i]].dropna()
        if label is not None and score_class is not None and not non_nan_values.apply(lambda x: isinstance(x, str)).any():
                df[columns_list[i]] = df[columns_list[i]] / df[columns_list[i]].max()
                if label == "environmental":
                    column_env.append(columns_list[i])
                elif label == "social":
                    column_soc.append(columns_list[i])
                elif label == "governance":
                    column_gov.append(columns_list[i])
                    
    return column_env,column_soc,column_gov

In [112]:
def get_row_esg(row,pipe_env, pipe_soc, pipe_gov, pipe_esg):
    """
    function to get the row contains faction e,s,g
    """
    column_env = []
    column_soc = []
    column_gov = []
    for x in row:
        if not pd.isna(x) and not is_number(x):
            label, _ = classify_sentence_label(x, pipe_env, pipe_soc, pipe_gov, pipe_esg)
            if label is not None :
                if label == "environmental":
                    column_env.append(x)
                elif label == "social":
                    column_soc.append(x)
                elif label == "governance":
                    column_gov.append(x)
                return row,x,column_env,column_soc,column_gov
    return None,None,column_env,column_soc,column_gov  

In [113]:
pipe_esg= pipeline("text-classification", model="nbroad/ESG-BERT")
pipe_env=get_model("ESGBERT/EnvironmentalBERT-environmental" ,"text-classification")
pipe_soc=get_model("ESGBERT/SocialBERT-social" ,"text-classification")
pipe_gov=get_model("ESGBERT/GovernanceBERT-governance","text-classification")

In [114]:
columns_list=df.columns.to_list()

In [115]:
columns_list

['Date',
 'Description',
 'Revenue',
 'Cost of Sales',
 'Gross Profit',
 'Operating Profit',
 'Profit Before Taxation',
 'Profit After Taxation',
 'Total Assets',
 'Total Liabilities',
 'Total Equity']

In [116]:
def create_dataframe(data):
    """
    Create a pandas DataFrame from a list of dictionaries where each dictionary represents a column.
    """
    df_dict = {}
    for item in data:
        for key, value in item.items():
            value = [v if v is not None else np.nan for v in value]
            df_dict[key] = value
    df = pd.DataFrame(df_dict)
    return df

In [117]:
def calculate_based_lignes(df,pipe_env, pipe_soc, pipe_gov, pipe_esg):
    """ function to prepare lignes to calculate esg score """
    list_esg=[]
    for i in range(len(df)):
        row = list(df.iloc[i])
        row_esg,x,column_env,column_soc,column_gov=get_row_esg(row,pipe_env, pipe_soc, pipe_gov, pipe_esg)
        print({"row_esg":row_esg,"column_env":column_env,"column_soc":column_soc,"column_gov":column_gov})
        print("hello", row_esg)
        if row_esg is not None:
            list_esg.append({x:row_esg})
    df_esg=create_dataframe(list_esg)        
    return column_env,column_soc,column_gov,df_esg

In [118]:
def calculate_esg_number(df,column_gov,column_soc,column_env):
    """
    function to calculate esg score for a given column family and return dataframe with score
    """
    df['Governance Score'] = df[column_gov].mean(axis=1)
    df['Environmental Score'] =df[column_soc].mean(axis=1)
    df['Social Score'] =df[column_env].mean(axis=1)
    df['ESG Score'] = df[['Governance Score', 'Environmental Score', 'Social Score']].mean(axis=1)

In [119]:
column_env,column_soc,column_gov=calculate_based_columns(columns_list,pipe_env, pipe_soc, pipe_gov, pipe_esg)
if len(column_env)==0 or len(column_soc)!=0 or len(column_gov)!=0:
    
    calculate_esg_number(df,column_gov,column_soc,column_env)
    print("hello condition 1",df)
else:
    column_env,column_soc,column_gov,df_esg=calculate_based_lignes(df,pipe_env, pipe_soc, pipe_gov, pipe_esg)
    calculate_esg_number(df_esg,column_gov,column_soc,column_env)
    

hello condition 1                Date                                    Description   Revenue  \
0  28 February 2022      Unaudited Group Profit or Loss - 6 months  285826.0   
1    31 August 2021       Audited Group Profit or Loss - 12 months  575115.0   
2  28 February 2022      Unaudited Group Profit or Loss - 6 months  224296.0   
3  28 February 2022  Unaudited Group Financial Position - 6 months       NaN   
4    31 August 2021   Audited Group Financial Position - 12 months       NaN   
5  28 February 2022  Unaudited Group Financial Position - 6 months       NaN   

   Cost of Sales  Gross Profit  Operating Profit  Profit Before Taxation  \
0       1.482757       87058.0          1.000000                 20619.0   
1       2.859742      191758.0          0.726434                 17587.0   
2       1.000000       90243.0          0.560763                 13206.0   
3            NaN           NaN               NaN                     NaN   
4            NaN           NaN           