In [3]:
import os
import PyPDF2
import re
from collections import defaultdict
from transformers import pipeline, AutoTokenizer
import pandas as pd
import subprocess
import sys
from pytesseract import image_to_string
from PIL import Image
import fitz  # PyMuPDF for handling PDFs with images

# Check and install required dependencies
def install_dependencies():
    required_libraries = ["PyPDF2", "transformers", "pandas", "openpyxl", "pycryptodome", "pytesseract", "Pillow", "pymupdf"]
    for lib in required_libraries:
        try:
            __import__(lib)
        except ImportError:
            print(f"Installing {lib}...")
            subprocess.check_call([sys.executable, "-m", "pip", "install", lib])

# Install dependencies before running the script
install_dependencies()

# Step 1: Extract text from PDF (including images)
def extract_text_from_pdf(pdf_path):
    text = ""
    try:
        # Extract text from PDF pages
        with open(pdf_path, 'rb') as file:
            reader = PyPDF2.PdfReader(file)
            for page in reader.pages:
                text += page.extract_text()
        
        # If no text is extracted, try OCR for scanned PDFs
        if not text.strip():
            print(f"No text found in {pdf_path}. Attempting OCR...")
            doc = fitz.open(pdf_path)
            for page_num in range(len(doc)):
                page = doc.load_page(page_num)
                pix = page.get_pixmap()
                img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
                text += image_to_string(img)
    except Exception as e:
        print(f"Error extracting text from {pdf_path}: {e}")
    return text

# Step 2: Clean and preprocess text
def clean_text(text):
    text = re.sub(r'\s+', ' ', text)  # Remove extra spaces
    text = re.sub(r'\n', ' ', text)    # Remove newlines
    text = re.sub(r'[^\w\s.,]', '', text)  # Remove special characters except periods and commas
    return text.strip()

# Step 3: Extract ESG-related sentences
def extract_esg_sentences(text):
    esg_keywords = [
        "environment", "sustainability", "carbon", "social", "diversity", 
        "governance", "ethics", "climate", "renewable", "energy", 
        "emissions", "waste", "recycling", "employee", "community", 
        "responsibility", "compliance", "transparency"
    ]
    sentences = text.split('.')  # Split text into sentences
    esg_sentences = [sent.strip() for sent in sentences if any(keyword in sent.lower() for keyword in esg_keywords)]
    return esg_sentences

# Step 4: Categorize ESG sentences
def categorize_esg_sentences(esg_sentences):
    esg_categories = defaultdict(list)
    
    for sent in esg_sentences:
        if any(keyword in sent.lower() for keyword in ["environment", "sustainability", "carbon", "climate", "renewable", "energy", "emissions", "waste", "recycling"]):
            esg_categories["Environment"].append(sent)
        elif any(keyword in sent.lower() for keyword in ["social", "diversity", "employee", "community", "responsibility"]):
            esg_categories["Social"].append(sent)
        elif any(keyword in sent.lower() for keyword in ["governance", "ethics", "compliance", "transparency"]):
            esg_categories["Governance"].append(sent)
    
    return esg_categories

# Step 5: Perform sentiment analysis and calculate polarity scores
sentiment_pipeline = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")

def analyze_sentiment(sentences):
    sentiment_results = {
        "Positive": [],
        "Negative": [],
        "Neutral": [],
        "Polarity_Scores": []
    }
    
    for sent in sentences:
        truncated_sent = sent[:512]  # Truncate to 512 characters
        try:
            result = sentiment_pipeline(truncated_sent)[0]
            label = result["label"]
            score = result["score"]
            if label == "POSITIVE":
                sentiment_results["Positive"].append(sent)
                sentiment_results["Polarity_Scores"].append(score)  # Positive score
            elif label == "NEGATIVE":
                sentiment_results["Negative"].append(sent)
                sentiment_results["Polarity_Scores"].append(-score)  # Negative score
            else:
                sentiment_results["Neutral"].append(sent)
                sentiment_results["Polarity_Scores"].append(0)  # Neutral score
        except Exception as e:
            print(f"Error processing sentence: {e}")
            sentiment_results["Neutral"].append(sent)
            sentiment_results["Polarity_Scores"].append(0)  # Default to Neutral if error occurs
    
    return sentiment_results

# Step 6: Calculate average polarity scores
def calculate_average_polarity(sentiment_results):
    if len(sentiment_results["Polarity_Scores"]) > 0:
        return sum(sentiment_results["Polarity_Scores"]) / len(sentiment_results["Polarity_Scores"])
    return 0

# Step 7: Calculate weighted average polarity score
def calculate_weighted_average_polarity(esg_categories, sentiment_results, weights):
    weighted_scores = []
    for category in esg_categories:
        category_sentences = esg_categories[category]
        category_scores = [
            sentiment_results["Polarity_Scores"][i] 
            for i, sent in enumerate(sentiment_results["Positive"] + sentiment_results["Negative"] + sentiment_results["Neutral"]) 
            if sent in category_sentences
        ]
        if category_scores:  # Ensure there are scores to calculate
            avg_score = sum(category_scores) / len(category_scores)
            weighted_scores.append(avg_score * weights[category])
    
    if weighted_scores:  # Ensure there are weighted scores to calculate
        return sum(weighted_scores) / sum(weights.values())
    return 0  # Default to 0 if no scores are available

# Step 8: Process ESG report and return results
def process_esg_report(pdf_path):
    raw_text = extract_text_from_pdf(pdf_path)  # Extract text from PDF
    cleaned_text = clean_text(raw_text)  # Clean and preprocess text
    esg_sentences = extract_esg_sentences(cleaned_text)  # Extract ESG-related sentences
    esg_categories = categorize_esg_sentences(esg_sentences)  # Categorize ESG sentences
    sentiment_results = analyze_sentiment(esg_sentences)  # Perform sentiment analysis
    
    # Calculate average polarity scores
    avg_polarity = calculate_average_polarity(sentiment_results)
    
    # Calculate individual ESG polarity scores
    esg_polarity_scores = {
        "Environment": calculate_average_polarity({"Polarity_Scores": [sentiment_results["Polarity_Scores"][i] for i, sent in enumerate(sentiment_results["Positive"] + sentiment_results["Negative"] + sentiment_results["Neutral"]) if sent in esg_categories["Environment"]]}),
        "Social": calculate_average_polarity({"Polarity_Scores": [sentiment_results["Polarity_Scores"][i] for i, sent in enumerate(sentiment_results["Positive"] + sentiment_results["Negative"] + sentiment_results["Neutral"]) if sent in esg_categories["Social"]]}),
        "Governance": calculate_average_polarity({"Polarity_Scores": [sentiment_results["Polarity_Scores"][i] for i, sent in enumerate(sentiment_results["Positive"] + sentiment_results["Negative"] + sentiment_results["Neutral"]) if sent in esg_categories["Governance"]]})
    }
    
    # Define weights for ESG categories (customize as needed)
    esg_weights = {"Environment": 0.4, "Social": 0.35, "Governance": 0.25}
    
    # Calculate weighted average polarity score
    weighted_avg_polarity = calculate_weighted_average_polarity(esg_categories, sentiment_results, esg_weights)
    
    # Return results
    return {
        "esg_categories": esg_categories,
        "sentiment_results": sentiment_results,
        "avg_polarity": avg_polarity,
        "esg_polarity_scores": esg_polarity_scores,
        "weighted_avg_polarity": weighted_avg_polarity
    }

# Step 9: Process multiple ESG reports and export results to Excel
def process_multiple_esg_reports(folder_path, output_excel_path):
    results = []
    
    for file_name in os.listdir(folder_path):
        if file_name.endswith(".pdf"):
            pdf_path = os.path.join(folder_path, file_name)
            print(f"Processing: {file_name}")
            
            # Process the ESG report
            report_results = process_esg_report(pdf_path)
            
            # Extract counts for ESG categories and sentiment distribution
            esg_categories = report_results["esg_categories"]
            sentiment_results = report_results["sentiment_results"]
            avg_polarity = report_results["avg_polarity"]
            esg_polarity_scores = report_results["esg_polarity_scores"]
            weighted_avg_polarity = report_results["weighted_avg_polarity"]
            
            # Prepare row data for Excel
            row = {
                "Company": file_name.replace(".pdf", ""),
                "ESG Report File Name": file_name,
                "ESG Categories_Number of sentences_Social": len(esg_categories.get("Social", [])),
                "ESG Categories_Number of sentences_Environment": len(esg_categories.get("Environment", [])),
                "ESG Categories_Number of sentences_Governance": len(esg_categories.get("Governance", [])),
                "Total": len(esg_categories.get("Social", [])) + len(esg_categories.get("Environment", [])) + len(esg_categories.get("Governance", [])),
                "Sentiment Distribution_Social_Positive": len([s for s in esg_categories.get("Social", []) if s in sentiment_results["Positive"]]),
                "Sentiment Distribution_Social_Negative": len([s for s in esg_categories.get("Social", []) if s in sentiment_results["Negative"]]),
                "Sentiment Distribution_Social_Neutral": len([s for s in esg_categories.get("Social", []) if s in sentiment_results["Neutral"]]),
                "Sentiment Distribution_Social_Total": len(esg_categories.get("Social", [])),
                "Sentiment Distribution_Environment_Positive": len([s for s in esg_categories.get("Environment", []) if s in sentiment_results["Positive"]]),
                "Sentiment Distribution_Environment_Negative": len([s for s in esg_categories.get("Environment", []) if s in sentiment_results["Negative"]]),
                "Sentiment Distribution_Environment_Neutral": len([s for s in esg_categories.get("Environment", []) if s in sentiment_results["Neutral"]]),
                "Sentiment Distribution_Environment_Total": len(esg_categories.get("Environment", [])),
                "Sentiment Distribution_Governance_Positive": len([s for s in esg_categories.get("Governance", []) if s in sentiment_results["Positive"]]),
                "Sentiment Distribution_Governance_Negative": len([s for s in esg_categories.get("Governance", []) if s in sentiment_results["Negative"]]),
                "Sentiment Distribution_Governance_Neutral": len([s for s in esg_categories.get("Governance", []) if s in sentiment_results["Neutral"]]),
                "Sentiment Distribution_Governance_Total": len(esg_categories.get("Governance", [])),
                "Average Polarity Score": avg_polarity,
                "Environment Polarity Score": esg_polarity_scores["Environment"],
                "Social Polarity Score": esg_polarity_scores["Social"],
                "Governance Polarity Score": esg_polarity_scores["Governance"],
                "Weighted Average Polarity Score": weighted_avg_polarity
            }
            
            # Append row to results
            results.append(row)
    
    # Convert results to a DataFrame
    df = pd.DataFrame(results)
    
    # Export DataFrame to Excel
    df.to_excel(output_excel_path, index=False)
    print(f"Results exported to: {output_excel_path}")

# Example usage
if __name__ == "__main__":
    folder_path = "ESG Reports"  # Replace with the folder containing ESG reports
    output_excel_path = "Sentiment_Analysis_Results_Final.xlsx"  # Output Excel file
    process_multiple_esg_reports(folder_path, output_excel_path)

Installing pycryptodome...
Installing Pillow...


Device set to use cpu


Processing: audi-report-2020.pdf
Processing: audi-report-2021.pdf
Processing: audi-report-2022.pdf
Processing: audi-report-2023.pdf
Processing: audi-sustainability-report-2018.pdf
Processing: audi-sustainability-report-2019.pdf
Processing: BMW_2019.pdf
Processing: BMW_2020.pdf
Processing: BMW_2021.pdf
Processing: BMW_2022.pdf
Processing: BMW_2023.pdf
Processing: BYD_2017_CSR.pdf
Processing: BYD_2018_CSR.pdf
Processing: BYD_2019_CSR.pdf
Processing: BYD_2020_CSR.pdf
No text found in ESG Reports\BYD_2020_CSR.pdf. Attempting OCR...
Error extracting text from ESG Reports\BYD_2020_CSR.pdf: tesseract is not installed or it's not in your PATH. See README file for more information.
Processing: BYD_2021_CSR.pdf
Processing: BYD_2022_CSR.pdf
Processing: BYD_2023_CSR.pdf
Processing: daimler-sustainability-report-2018.pdf
Processing: daimler-sustainability-report-2019.pdf
Processing: daimler-sustainability-report-2020.pdf
Processing: Ford_2021-integrated-sustainability-and-financial-report.pdf
Proce

In [6]:
import pandas as pd

# Load the Excel file
input_excel_path = "Sentiment_Analysis_Results_Final.xlsx"
df = pd.read_excel(input_excel_path)

# Function to extract word before '_' or '-'
def extract_word(value):
    if pd.isna(value):
        return ""
    if "_" in value:
        return value.split("_")[0]
    elif "-" in value:
        return value.split("-")[0]
    else:
        return ""

# Apply the function to the desired column
df['Company Name'] = df['Company'].apply(extract_word)
df['Company Name'] = df['Company Name'].replace('daimler', 'mercedes')
df.drop(columns=['Company'], inplace=True)

# Shift the 'Extracted' column to the first position
cols = df.columns.tolist()
cols.insert(0, cols.pop(cols.index('Company Name')))
df = df[cols]

# Save the result to a new Excel file
with pd.ExcelWriter(input_excel_path, engine="openpyxl", mode="a") as writer:
    df.to_excel(writer, sheet_name="Sentiment Analysis", index=False)

print(f"Results with truncated words have been saved to a new worksheet 'Sentiment Analysis' in: {input_excel_path}") 

Results with truncated words have been saved to a new worksheet 'Sentiment Analysis' in: Sentiment_Analysis_Results_Final.xlsx


In [2]:
import numpy as np
import pandas as pd

In [19]:
# Load the Excel file
input_excel_path = "Sentiment_Analysis_Results_Final.xlsx"
df = pd.read_excel(input_excel_path, sheet_name="Sentiment Analysis")

# Drop all the columns except the 'Company Name', 'Weighted Average Polarity Score', 'Average Polarity Score', 'Environment Polarity Score', 'Social Polarity Score', 'Governance Polarity Score'
df_concise = df[['Company Name', 'Average Polarity Score', 'Environment Polarity Score', 'Social Polarity Score', 'Governance Polarity Score', 'Weighted Average Polarity Score',]]

# Drop row with 0 
df_concise = df_concise[df_concise['Weighted Average Polarity Score'] != 0]

# Group by 'Company Name' and calculate the mean of the 'Weighted Average Polarity Score', 'Average Polarity Score', 'Environment Polarity Score', 'Social Polarity Score', 'Governance Polarity Score'
df_grouped = df_concise.groupby('Company Name').mean().round(5)
df_grouped.reset_index(inplace=True)
df_grouped.head()

# Save the result to a new Excel file
with pd.ExcelWriter(input_excel_path, engine="openpyxl", mode="a") as writer:
    df_grouped.to_excel(writer, sheet_name="Sentiment Analysis Results", index=False)
