In [1]:
import os
target_path = 'C:/Users/28648/Desktop'
os.makedirs(target_path, exist_ok=True)

In [2]:
os.chdir(target_path)
print(f"Current Working Directory: {os.getcwd()}")

Current Working Directory: C:\Users\28648\Desktop


### Cross Validation

In [9]:
import pandas as pd
import numpy as np

# ESG standards dictionary (unchanged)
ESG_STANDARDS = {
    'Absolute emissions scope（total）(mtCO2e)': {'unit': 'mtCO2e', 'range': (100, 1_000_000), 'type': float},
    'Anti-corruption training for employees': {'range': (0, 5), 'type': int},
    'Assurance of sustainability report': {'range': (0, 1), 'type': int},
    'Average training hours per employee (hours)': {'range': (0, 200), 'type': float},
    'Current employees by age groups (30 -50) (%)': {'unit': '%', 'range': (0, 100), 'type': float},
    'Current employees by gender (%)': {'unit': '%', 'range': (0, 100), 'type': float},
    'Fatalities': {'range': (0, 100), 'type': int},
    'Total energy consumptionn (MWh)': {'unit': 'MWh', 'range': (0, 1_000_000), 'type': float},
    'Total number of employees': {'range': (0, 1_000_000), 'type': int},
    'Total waste generated(mts)': {'unit': 'mts', 'range': (0, 1_000_000), 'type': float},
    'Total water consumption(m3)': {'unit': 'm3', 'range': (0, 1_000_000), 'type': float},
    'Women on the board (%)': {'unit': '%', 'range': (0, 100), 'type': float},
    'turnover by gender': {'range': (0, 50), 'type': float}
}

# Load labeled dataset (ground truth) and extracted data
labeled_data = pd.read_excel("extract.xlsx")
extracted_data = pd.read_excel("Cross.xlsx")

# Define unit conversion functions
def convert_units(value, current_unit, target_unit):
    if current_unit == target_unit:
        return value
    elif current_unit == 'GWh' and target_unit == 'kWh':
        return value * 1000
    # Add other conversions as needed
    else:
        raise ValueError(f"Unsupported unit conversion from {current_unit} to {target_unit}")

# Validation function with missing value handling and counts for valid/invalid metrics
def validate_extracted_data(extracted_data, labeled_data):
    validation_results = []
    metric_valid_counts = {metric: {'valid': 0, 'invalid': 0} for metric in ESG_STANDARDS}  # Initialize counts for each metric
    total_valid_cells = 0  # Counter for overall valid cells
    total_evaluated_cells = 0  # Counter for overall evaluated cells

    for idx, row in extracted_data.iterrows():
        company, year = row['Company Name'], row['Year']
        gt_row = labeled_data[(labeled_data['Company Name'] == company) & (labeled_data['Year'] == year)]

        row_result = {'Company Name': company, 'Year': year}

        # Validate each ESG metric
        for metric, standard in ESG_STANDARDS.items():
            extracted_value = row[metric]
            target_unit = standard.get('unit', None)
            extracted_unit = row.get(f"{metric} Unit", target_unit)
            gt_value = gt_row[metric].values[0] if not gt_row.empty else np.nan
            
            # Skip validation if ground truth value is missing
            if pd.isna(gt_value):
                row_result[f'{metric}_Validation'] = 'Not Applicable'
                continue

            # Increment total evaluated cells
            total_evaluated_cells += 1

            # Convert extracted value to the correct type, if possible
            try:
                extracted_value = standard['type'](extracted_value)  # Convert to the defined type (e.g., float, int)
            except (ValueError, TypeError):
                row_result[f'{metric}_Format_Valid'] = False
                metric_valid_counts[metric]['invalid'] += 1  # Increment invalid count
                continue  # Skip further checks if conversion fails

            # Convert ground truth value if needed
            try:
                gt_value = standard['type'](gt_value)  # Ensure ground truth is also in the correct type
            except (ValueError, TypeError):
                row_result[f'{metric}_Format_Valid'] = False
                metric_valid_counts[metric]['invalid'] += 1  # Increment invalid count
                continue  # Skip further checks if conversion fails

            # Convert extracted value to the standard unit if both units are specified
            if target_unit and extracted_unit:
                extracted_value = convert_units(extracted_value, extracted_unit, target_unit)
            
            # Check if value is within the valid range and exact match
            is_within_range = standard['range'][0] <= extracted_value <= standard['range'][1]
            is_exact_match = extracted_value == gt_value

            # Record validation results
            row_result[f'{metric}_Range_Valid'] = is_within_range
            row_result[f'{metric}_Format_Valid'] = isinstance(extracted_value, standard['type'])
            row_result[f'{metric}_Exact_Match'] = is_exact_match

            # Update counts based on validation results
            if is_within_range and isinstance(extracted_value, standard['type']) and is_exact_match:
                metric_valid_counts[metric]['valid'] += 1
                total_valid_cells += 1  # Increment valid cell count
            else:
                metric_valid_counts[metric]['invalid'] += 1

        validation_results.append(row_result)

    # Compile results into a DataFrame
    validation_df = pd.DataFrame(validation_results)

    # Create a summary of valid/invalid counts per metric
    validation_summary = pd.DataFrame(metric_valid_counts).T
    validation_summary.columns = ['Valid Count', 'Invalid Count']

    # Calculate proportion of valid outputs per metric
    validation_summary['Proportion Valid'] = validation_summary['Valid Count'] / (validation_summary['Valid Count'] + validation_summary['Invalid Count'])

    # Calculate overall percentage of valid cells
    overall_valid_percentage = (total_valid_cells / total_evaluated_cells * 100) if total_evaluated_cells > 0 else 0

    return validation_df, validation_summary, overall_valid_percentage

# Run the validation
validation_df, validation_summary, overall_valid_percentage = validate_extracted_data(extracted_data, labeled_data)

# Print validation results and summary
print("Validation Results:\n", validation_df)
print("\nValidation Summary (Valid/Invalid Counts and Proportion of Valid Outputs per Metric):\n", validation_summary)
print(f"\nOverall Percentage of Valid Data: {overall_valid_percentage:.2f}%")

Validation Results:
             Company Name  Year  \
0              AIA Group  2023   
1                Allianz  2023   
2               Barclays  2019   
3               Barclays  2020   
4               Barclays  2021   
5              BlackRock  2023   
6   China Life Insurance  2021   
7   China Life Insurance  2022   
8   China Life Insurance  2023   
9               Citibank  2022   
10                   DBS  2023   
11                  HSBC  2023   
12        JPMorgan Chase  2023   
13                  MUFG  2023   
14               Metlife  2023   
15           Nippon Life  2023   

   Absolute emissions scope（total）(mtCO2e)_Range_Valid  \
0                                                True    
1                                                True    
2                                                True    
3                                                True    
4                                                True    
5                                                 Na

### Confidence Score

In [13]:
def validate_and_score_extracted_data(extracted_data, labeled_data):
    validation_results = []
    metric_valid_counts = {metric: {'valid': 0, 'invalid': 0, 'total_confidence': 0} for metric in ESG_STANDARDS}
    total_valid_cells = 0
    total_evaluated_cells = 0

    for idx, row in extracted_data.iterrows():
        company, year = row['Company Name'], row['Year']
        gt_row = labeled_data[(labeled_data['Company Name'] == company) & (labeled_data['Year'] == year)]

        row_result = {'Company Name': company, 'Year': year}
        row_confidences = []

        # Validate each ESG metric
        for metric, standard in ESG_STANDARDS.items():
            extracted_value = row[metric]
            target_unit = standard.get('unit', None)
            extracted_unit = row.get(f"{metric} Unit", target_unit)
            gt_value = gt_row[metric].values[0] if not gt_row.empty else np.nan

            # Skip validation if ground truth value is missing
            if pd.isna(gt_value):
                row_result[f'{metric}_Validation'] = 'Not Applicable'
                row_confidences.append(0)  # No confidence for missing ground truth
                continue

            # Increment total evaluated cells
            total_evaluated_cells += 1

            # Convert extracted value to the correct type, if possible
            try:
                extracted_value = standard['type'](extracted_value)
            except (ValueError, TypeError):
                row_result[f'{metric}_Format_Valid'] = False
                metric_valid_counts[metric]['invalid'] += 1
                row_confidences.append(0)  # No confidence for invalid type
                continue

            # Convert ground truth value if needed
            try:
                gt_value = standard['type'](gt_value)
            except (ValueError, TypeError):
                row_result[f'{metric}_Format_Valid'] = False
                metric_valid_counts[metric]['invalid'] += 1
                row_confidences.append(0)
                continue

            # Convert extracted value to the standard unit if both units are specified
            if target_unit and extracted_unit:
                try:
                    extracted_value = convert_units(extracted_value, extracted_unit, target_unit)
                except ValueError:
                    row_confidences.append(0)  # Invalid conversion
                    continue

            # Check if value is within the valid range and exact match
            is_within_range = standard['range'][0] <= extracted_value <= standard['range'][1]
            is_exact_match = extracted_value == gt_value
            is_correct_type = isinstance(extracted_value, standard['type'])

            # Confidence Score for Metric
            confidence_score = (
                0.5 * is_within_range +  # Range Validity: 50%
                0.3 * is_correct_type +  # Data Type Validity: 30%
                0.2 * is_exact_match     # Exact Match: 20%
            )
            row_confidences.append(confidence_score)

            # Record validation results
            row_result[f'{metric}_Range_Valid'] = is_within_range
            row_result[f'{metric}_Format_Valid'] = is_correct_type
            row_result[f'{metric}_Exact_Match'] = is_exact_match
            row_result[f'{metric}_Confidence'] = confidence_score

            # Update metric-level counts and confidence
            if confidence_score == 1.0:
                metric_valid_counts[metric]['valid'] += 1
                total_valid_cells += 1
            else:
                metric_valid_counts[metric]['invalid'] += 1
            metric_valid_counts[metric]['total_confidence'] += confidence_score

        # Aggregate row-level confidence
        average_confidence = sum(row_confidences) / len(row_confidences) if row_confidences else 0
        row_result['Average Confidence Score'] = average_confidence
        validation_results.append(row_result)

    # Compile results into a DataFrame
    validation_df = pd.DataFrame(validation_results)

    # Create a summary of valid/invalid counts and average confidence per metric
    validation_summary = pd.DataFrame(metric_valid_counts).T
    validation_summary['Valid Count'] = validation_summary['valid']
    validation_summary['Invalid Count'] = validation_summary['invalid']
    validation_summary['Average Confidence'] = validation_summary['total_confidence'] / (
        validation_summary['valid'] + validation_summary['invalid']
    )
    validation_summary.drop(columns=['valid', 'invalid', 'total_confidence'], inplace=True)

    # Calculate overall percentage of valid data and average confidence
    overall_valid_percentage = (total_valid_cells / total_evaluated_cells * 100) if total_evaluated_cells > 0 else 0
    overall_average_confidence = validation_summary['Average Confidence'].mean()

    return validation_df, validation_summary, overall_valid_percentage, overall_average_confidence

# Run the validation and scoring
validation_df, validation_summary, overall_valid_percentage, overall_average_confidence = validate_and_score_extracted_data(
    extracted_data, labeled_data
)

# Print validation results and summary
print("Validation Results:\n", validation_df)
print("\nValidation Summary (Valid Counts, Invalid Counts, and Average Confidence per Metric):\n", validation_summary)
print(f"\nOverall Percentage of Valid Data: {overall_valid_percentage:.2f}%")
print(f"Overall Average Confidence: {overall_average_confidence:.2f}")

Validation Results:
             Company Name  Year  \
0              AIA Group  2023   
1                Allianz  2023   
2               Barclays  2019   
3               Barclays  2020   
4               Barclays  2021   
5              BlackRock  2023   
6   China Life Insurance  2021   
7   China Life Insurance  2022   
8   China Life Insurance  2023   
9               Citibank  2022   
10                   DBS  2023   
11                  HSBC  2023   
12        JPMorgan Chase  2023   
13                  MUFG  2023   
14               Metlife  2023   
15           Nippon Life  2023   

   Absolute emissions scope（total）(mtCO2e)_Range_Valid  \
0                                                True    
1                                                True    
2                                                True    
3                                                True    
4                                                True    
5                                                 Na

In [22]:
import pandas as pd
def calculate_company_confidence(validation_df):
    # Group by Company Name and Year
    grouped = validation_df.groupby(['Company Name', 'Year'])
    
    # Calculate average confidence score for each company
    company_confidence = grouped[[col for col in validation_df.columns if col.endswith('_Confidence')]].mean()
    
    # Add an overall confidence score for each company
    company_confidence['Overall Confidence Score'] = company_confidence.mean(axis=1)

    return company_confidence.reset_index()

# Calculate company-level confidence scores
company_confidence = calculate_company_confidence(validation_df)
company_confidence
company_confidence.to_excel('company_confidence.xlsx')

