# Data preprocessing

In [None]:
!pip install pdfplumber

In [None]:
import logging
import os
import pdfplumber
import re
import pandas as pd
from collections import defaultdict
import spacy
import warnings
import nltk
from nltk import word_tokenize
from nltk.tokenize import sent_tokenize
from string import punctuation
from nltk.corpus import stopwords
from statistics import mean
from heapq import nlargest

warnings.filterwarnings('ignore')

In [None]:
# Initialize spaCy
nlp = spacy.load("en_core_web_sm")

In [None]:
# initialise the bank that is being processed
bank = ''

In [None]:
# Clear existing handlers
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Configure logging with both console and file output
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("pdf_processing_errors.log"),
        logging.StreamHandler()  # Output to console
    ]
)

In [None]:
def remove_header_text(text=None):
    """Remove header text from the extracted text."""
    cleaned_text = []
    for line in text.splitlines():
        if len(line) < 5 or re.search(r'', line, re.IGNORECASE):
            continue
        cleaned_text.append(line)
    return "\n".join(cleaned_text)

def clean_text(text):
    cleaned_text = []
    for line in text.splitlines():
        if len(line) < 5 or re.search(r'', line, re.IGNORECASE):
            #print(line)
            continue
        cleaned_text.append(line)
    return "\n".join(cleaned_text)

# PDF Text Extraction Functions
def extract_first_page_text(file_path=None):
    """Extracts text from the first page of a PDF file to find year and quarter information."""
    try:
        with pdfplumber.open(file_path) as pdf:
            first_page_text = pdf.pages[0].extract_text()
        return first_page_text
    except Exception as e:
        logging.error(f"Error in extract_first_page_text, file: {file_path}, error: {e}")
        return ""

def extract_year_quarter_from_text(text=None):

    """Extracts year and quarter information from the text."""

    match = re.search(r"(Q[1-4]\s*\d{4}|\d{4}\s*Q[1-4])", text)
    if match:
        found = match.group(0)
        if found.startswith("Q"):
            quarter, year = found.split()
        else:
            year, quarter = found.split()
        return quarter.strip(), year.strip()
    else:
        return None, None

def extract_company_participants(text):
    """Extracts company participants from the 'Company Participants' section of the text."""
    company_participants = []
    in_company_section = False

    for line in text.splitlines():
        if "Company Participants" in line:
            in_company_section = True
            continue

        if "Operator" in line or "Question-and-Answer" in line or "Conference Call Participants" in line:
            break

        if in_company_section:
            match = re.match(r'(?P<name>[\w\s\.\-\'\u00C0-\u017F]+?)\s*[-–]\s*(?P<designation>[\w\s,&\.\-\'\u00C0-\u017F]+)', line)
            if match:
                name = match.group("name").strip()
                designation = match.group("designation").strip()
                designation = re.sub(r'\s*&\s*', ' and ', designation)
                designation = re.sub(r'\s*,\s*', ', ', designation)
                company_participants.append((name, designation))

    return company_participants

def extract_conference_participants(text):
    """Extracts conference participants from the 'Conference Call Participants' section of the text."""
    conference_participants = []
    in_conference_section = False

    for line in text.splitlines():
        if "Conference Call Participants" in line:
            in_conference_section = True
            continue

        if "Operator" in line or "Question-and-Answer" in line or "Disclaimer" in line:
            break

        if in_conference_section:
            match = re.match(r'(?P<name>[\w\s\.\-\'\u00C0-\u017F]+?)\s*[-–]\s*(?P<bank>[\w\s,&\.\-\'\u00C0-\u017F]+)', line)
            if match:
                name = match.group("name").strip()
                bank = match.group("bank").strip()
                conference_participants.append((name, bank))

    return conference_participants

def extract_participants_from_text(text):
    """Extracts both company and conference participants from the text."""
    company_participants = extract_company_participants(text)
    conference_participants = extract_conference_participants(text)

    return company_participants, conference_participants


def extract_qa_section(text):
    # Locate the start of the Q&A section and isolate that portion of the text
    qa_section_start = text.find("Question-and-Answer Session")
    if qa_section_start == -1:
        return ""
    return text[qa_section_start:]

def extract_interview_details(text, company_df, conference_df):
    # Define regex patterns to capture Q&A pairs sequentially
    qa_pattern = r"\n([A-Z][a-z]+ [A-Z][a-z]+)\n(.*?)(?=\n[A-Z][a-z]+ [A-Z][a-z]+|\nOperator|\Z)"

    # Extract all Q&A pairs in sequence
    qa_matches = re.findall(qa_pattern, text, re.DOTALL)

    # Initialize list to hold Q&A pairs
    qa_data = []

    # Process each pair as a (question, answer) sequentially
    for i in range(0, len(qa_matches) - 1, 2):
        interviewer, question = qa_matches[i]
        interviewee, answer = qa_matches[i + 1]

        # Lookup bank for interviewer from conference_df
        interviewer_bank = conference_df.loc[conference_df['Name'] == interviewer, 'Bank'].values
        interviewer_bank = interviewer_bank[0] if interviewer_bank.size > 0 else None

        # Lookup designation for interviewee from company_df
        interviewee_designation = company_df.loc[company_df['Name'] == interviewee, 'Designation'].values
        interviewee_designation = interviewee_designation[0] if interviewee_designation.size > 0 else None

        # Append structured Q&A data
        qa_data.append({
            "Interviewer": interviewer,
            "Interviewer Bank": interviewer_bank,
            "Question": question.strip(),
            "Interviewee": interviewee,
            "Interviewee Designation": interviewee_designation,
            "Answer": answer.strip()
        })

    # Convert the collected data into a DataFrame
    qa_df = pd.DataFrame(qa_data)

    # Fill missing values with specified text
    qa_df['Interviewer Bank'] = qa_df['Interviewer Bank'].fillna("Interviewee")
    qa_df['Interviewee Designation'] = qa_df['Interviewee Designation'].fillna("Interviewer")

    return qa_df


def process_pdf(file_path):
    base_name = os.path.basename(file_path)
    text = extract_first_page_text(file_path)
    quarter, year = extract_year_quarter_from_text(text)

    text = ""
    with pdfplumber.open(file_path) as pdf:
        for page in pdf.pages:
            text += page.extract_text() + "\n"

    text = clean_text(text)

    # Extract company and conference participants
    company_participants, conference_participants = extract_participants_from_text(text)

    return base_name, year, quarter, company_participants, conference_participants

def process_folder(folder_path):
    company_data = []
    conference_data = []
    document_data = []  # For storing bank and cleaned_text data

    for file_name in os.listdir(folder_path):
        # Process only PDF files
        if file_name.endswith(".pdf"):
            file_path = os.path.join(folder_path, file_name)

            try:
                # Process the PDF and unpack returned values
                base_name, year, quarter, bank_name, cleaned_text, company_participants, conference_participants = process_pdf(file_path)

                # Populate Company Participants Data
                for name, designation in company_participants:
                    company_data.append({
                        "Year": year,
                        "Quarter": quarter,
                        "Name": name,
                        "Designation": designation,
                        "FileName": base_name,
                        "Bank": bank_name
                    })

                # Populate Conference Participants Data
                for name, bank in conference_participants:
                    conference_data.append({
                        "Year": year,
                        "Quarter": quarter,
                        "Name": name,
                        "Bank": bank,
                        "FileName": base_name
                    })

                # Populate Document Data
                document_data.append({
                    "Year": year,
                    "Quarter": quarter,
                    "Bank": bank_name,
                    "FileName": base_name,
                    "Text": cleaned_text
                })

            except Exception as e:
                # Log any error that occurs during processing of a specific PDF file
                logging.error(f"Error processing file {file_name} in {folder_path}: {e}")
                continue  # Skip the file with issues and move to the next one

    # Convert lists to DataFrames
    try:
        company_df = pd.DataFrame(company_data, columns=["Year", "Quarter", "Name", "Designation", "FileName", "Bank"])
        conference_df = pd.DataFrame(conference_data, columns=["Year", "Quarter", "Name", "Bank", "FileName"])
        document_df = pd.DataFrame(document_data, columns=["Year", "Quarter", "Bank", "FileName", "Text"])

    except Exception as e:
        logging.error(f"Error creating DataFrames from processed data: {e}")
        return None, None, None  # Return None if DataFrame creation fails

    return company_df, conference_df, document_df


def process_pdf_q_and_a(file_path, company_df, conference_df):
    base_name = os.path.basename(file_path)
    text = extract_first_page_text(file_path)
    quarter, year = extract_year_quarter_from_text(text)

    is_new_line = is_noise = qa_section = False
    text_by = text_type = text_to_add = ""
    qa_data = []
    noises = ["call-transcript", "Call Transcript"]
    key_words = ["Operator"]
    section = "Presentation"

    # Open the PDF file
    with pdfplumber.open(file_path) as pdf:
        for page_num, page in enumerate(pdf.pages):
            # Extract text line-by-line to identify speakers
            lines = page.extract_text().split('\n')

            for line in lines:
                line = line.strip()
                is_noise = any(noise in line for noise in noises)

                if not is_noise:
                    is_new_line, text_by, text_type, text_to_add, section = add_q_and_a(is_new_line, text_by, text_type, text_to_add, qa_data, company_df.Name, line, "Answer", section, qa_section)
                    is_new_line, text_by, text_type, text_to_add, section = add_q_and_a(is_new_line, text_by, text_type, text_to_add, qa_data, conference_df.Name, line, "Question", section, qa_section)
                    is_new_line, text_by, text_type, text_to_add, section = add_q_and_a(is_new_line, text_by, text_type, text_to_add, qa_data, key_words, line, "Operator", section, qa_section)

                    if is_new_line == True:
                        is_new_line = False
                        text_to_add = ""
                    else:
                        text_to_add += " " + line

                if "Question-and-Answer" in line:
                    qa_section = True

    qa_data.append({
        "Text Type": text_type,
        "Name": text_by,
        "Dialogue": text_to_add.replace(text_by, '').strip(),
        "Section":section
    })

    qa_df = pd.DataFrame(qa_data)

    qa_df["Year"] = year
    qa_df["Quarter"] = quarter
    qa_df["FileName"] = base_name

    return qa_df

def add_q_and_a(is_new_line, text_by, text_type, text_to_add, qa_data, key_words, line, new_text_type, section, qa_section):
    for key_word in key_words:
        if line in key_word:
            is_new_line = True
            if text_to_add.strip():
                if text_by:
                    qa_data.append({
                        "Text Type": text_type,
                        "Name": text_by,
                        "Dialogue": text_to_add.replace(text_by, '').strip(),
                        "Section":section
                    })
                    if qa_section:
                        section = "Question-and-Answer"
                text_to_add = ""
                is_new_line = False
            text_by = line
            text_type = new_text_type

    return is_new_line, text_by, text_type, text_to_add, section

def process_folder_q_and_a(folder_path, company_df, conference_df):
    all_qa_data = []

    for file_name in os.listdir(folder_path):
        if file_name.endswith(".pdf"):
            file_path = os.path.join(folder_path, file_name)
            qa_df = process_pdf_q_and_a(file_path, company_df, conference_df)
            all_qa_data.append(qa_df)

    final_df = pd.concat(all_qa_data, ignore_index=True)

    final_df["Credential"] = "Operator"

    # Apply the lookup function to each row in final_df
    final_df['Credential'] = final_df.apply(lambda row: lookup_credential(row, company_df, conference_df), axis=1)

    return final_df

def lookup_credential(row, company_df, conference_df):
    match_designation = company_df.loc[(company_df['Name'] == row['Name']) & (company_df['Year'] == row['Year']) & (company_df['Quarter'] == row['Quarter'])]
    match_bank = conference_df.loc[(conference_df['Name'] == row['Name']) & (conference_df['Year'] == row['Year']) & (conference_df['Quarter'] == row['Quarter'])]

    if not match_designation.empty:
        return match_designation['Designation'].values[0]
    elif not match_bank.empty:
        return match_bank['Bank'].values[0]
    else:
        return row['Credential']


In [None]:
def process_pdf(file_path):
    try:
        # Extract file and bank names
        base_name = os.path.basename(file_path)
        bank_name = os.path.basename(os.path.dirname(file_path))  # Assuming bank name is in the parent directory name

        # Extract first page text to determine year and quarter
        text = extract_first_page_text(file_path)
        quarter, year = extract_year_quarter_from_text(text)

        # Initialize variable for full text extraction
        text = ""

        # Open and read PDF, handling errors in extraction
        with pdfplumber.open(file_path) as pdf:
            for page in pdf.pages:
                try:
                    text += page.extract_text() + "\n"
                except Exception as page_error:
                    logging.error(f"Error extracting text from page in {file_path}: {page_error}")
                    continue  # Skip the problematic page

        # Clean the extracted text
        cleaned_text = clean_text(text)

        # Extract company and conference participants from cleaned text
        company_participants, conference_participants = extract_participants_from_text(cleaned_text)

        # Return the gathered data
        return base_name, year, quarter, bank_name, cleaned_text, company_participants, conference_participants

    except Exception as e:
        logging.error(f"Error processing PDF {file_path}: {e}")
        return base_name, None, None, bank_name, "", [], []


def discover_files(folder_path, sample=None, sample_size=3):
    """Discovers all PDF files in a folder and its subfolders, optionally sampling them."""
    logging.info(f"Starting to discover files in folder: {folder_path}")
    folder_files = defaultdict(list)
    for root, _, files in os.walk(folder_path):
        for file_name in files:
            if file_name.endswith(".pdf"):
                folder_files[os.path.basename(root)].append(os.path.join(root, file_name))

    if sample:
        file_paths = [files[:sample_size] for folder, files in folder_files.items()]
    else:
        file_paths = [file for files in folder_files.values() for file in files]

    return file_paths

def process_files(file_paths):

    """Processes PDF files to extract year, quarter, bank, full document text, and company participants."""
    document_data = []  # To store document-level data
    participant_data = []  # To store participant-level data


    logging.info(f"Starting to process {len(file_paths)} PDF files.")
    logging.info(f"Sample file: {file_paths[0]}")

    for file_path in file_paths:
        logging.info(f"Processing file: {file_path}")

        try:
            base_name, year, quarter, bank, text, company_participants, _ = process_pdf(file_path)
            if year and quarter:
                # Store document-level data
                document_data.append({
                    "Year": year,
                    "Quarter": quarter,
                    "Text": text,
                    "Bank": bank,
                    "File": base_name
                })

                # Store participant-level data
                for name, designation in company_participants:
                    participant_data.append({
                        "Year": year,
                        "Quarter": quarter,
                        "Name": name,
                        "Designation": designation,
                        "File": base_name,
                        "Bank": bank
                    })

        except Exception as e:
            logging.error(f"Error processing file {file_path}: {e}")

    # Create DataFrames to store document-level and participant-level information
    document_df = pd.DataFrame(document_data, columns=["Year", "Quarter", "Text", "Bank", "File"])
    participant_df = pd.DataFrame(participant_data, columns=["Year", "Quarter", "Name", "Designation", "File", "Bank"])
    logging.info(f"Successfully processed {len(document_data)} out of {len(file_paths)} PDF files.")

    return document_df, participant_df

def save_to_csv(dataframe, save_folder, filename):
    """Saves the DataFrame to a CSV file."""
    os.makedirs(save_folder, exist_ok=True)
    csv_path = os.path.join(save_folder, filename)
    logging.info(f"Saving data to CSV at: {csv_path}")
    dataframe.to_csv(csv_path, index=False)
    logging.info("Data successfully saved to CSV.")

def process_all_documents(raw_folder, processed_folder, metadata_folder, sample=False, sample_size=1):
    """
    Main function to process all PDF files and save both document and participant data.

    Args:
        raw_folder (str): Path to the folder containing raw PDF files.
        processed_folder (str): Path to the folder where processed CSVs will be saved.
        sample (bool): Whether to sample files for testing.
        sample_size (int): Number of files to sample if `sample=True`.

    Returns:
        tuple: DataFrames for document-level and participant-level data.
    """
    # Step 1: Discover files, with sampling option
    file_paths = discover_files(raw_folder, sample, sample_size)
    logging.info(f"Discovered {len(file_paths)} PDF files.")

    # Step 2: Process files to extract document and participant data
    document_df, participant_df = process_files(file_paths)

    # Step 3: Save to CSV
    save_to_csv(document_df, save_folder=processed_folder, filename="pdf_summarytext_data.csv")
    save_to_csv(participant_df, save_folder=metadata_folder, filename="company_participants.csv")

    # Return DataFrames
    return document_df, participant_df

In [None]:
if __name__ == "__main__":
    raw_folder = ""  # Path to raw PDF files
    processed_folder = ""  # Path to save processed CSVs
    metadata_folder = ""  # Path to save metadata CSV

    logging.info("Processing begins for sentiment analysis.")
    company_df, conference_df, document_df = process_folder(f'{raw_folder}/{bank}')
    final_qa_df = process_folder_q_and_a(f'{raw_folder}/{bank}', company_df, conference_df)

    save_to_csv(company_df, save_folder=f'{processed_folder}/{bank}', filename='company_df.csv')
    save_to_csv(conference_df, save_folder=f'{processed_folder}/{bank}', filename='conference_df.csv')
    save_to_csv(final_qa_df, save_folder=f'{processed_folder}/{bank}', filename='final_qa_df.csv')

    logging.info("Processing complete for sentiment analysis.")

    logging.info("Processing begins for metadata")

    # Run the document processing function
    document_df, participant_df = process_all_documents(raw_folder, processed_folder, metadata_folder, sample=False)
    logging.info("Processing complete for metadata")

    logging.info("Processing complete.")

# Transcript Summarisation with NLTK

In [None]:
nltk.download('stopwords')
nltk.download('punkt_tab')

In [None]:
stop_words = set(stopwords.words('english'))
punctuation = punctuation + '\n' + '—' + '“' + ',' + '”' + '‘' + '-' + '’'

contractions_dict = {
"ain't": "am not",
"aren't": "are not",
"can't": "cannot",
"can't've": "cannot have",
"'cause": "because",
"could've": "could have",
"couldn't": "could not",
"couldn't've": "could not have",
"didn't": "did not",
"doesn't": "does not",
"doesn’t": "does not",
"don't": "do not",
"don’t": "do not",
"hadn't": "had not",
"hadn't've": "had not have",
"hasn't": "has not",
"haven't": "have not",
"he'd": "he had",
"he'd've": "he would have",
"he'll": "he will",
"he'll've": "he will have",
"he's": "he is",
"how'd": "how did",
"how'd'y": "how do you",
"how'll": "how will",
"how's": "how is",
"i'd": "i would",
"i'd've": "i would have",
"i'll": "i will",
"i'll've": "i will have",
"i'm": "i am",
"i've": "i have",
"isn't": "is not",
"it'd": "it would",
"it'd've": "it would have",
"it'll": "it will",
"it'll've": "it will have",
"it's": "it is",
"let's": "let us",
"ma'am": "madam",
"mayn't": "may not",
"might've": "might have",
"mightn't": "might not",
"mightn't've": "might not have",
"must've": "must have",
"mustn't": "must not",
"mustn't've": "must not have",
"needn't": "need not",
"needn't've": "need not have",
"o'clock": "of the clock",
"oughtn't": "ought not",
"oughtn't've": "ought not have",
"shan't": "shall not",
"sha'n't": "shall not",
"shan't've": "shall not have",
"she'd": "she would",
"she'd've": "she would have",
"she'll": "she will",
"she'll've": "she will have",
"she's": "she is",
"should've": "should have",
"shouldn't": "should not",
"shouldn't've": "should not have",
"so've": "so have",
"so's": "so is",
"that'd": "that would",
"that'd've": "that would have",
"that's": "that is",
"there'd": "there would",
"there'd've": "there would have",
"there's": "there is",
"they'd": "they would",
"they'd've": "they would have",
"they'll": "they will",
"they'll've": "they will have",
"they're": "they are",
"they've": "they have",
"to've": "to have",
"wasn't": "was not",
"we'd": "we would",
"we'd've": "we would have",
"we'll": "we will",
"we'll've": "we will have",
"we're": "we are",
"we've": "we have",
"weren't": "were not",
"what'll": "what will",
"what'll've": "what will have",
"what're": "what are",
"what's": "what is",
"what've": "what have",
"when's": "when is",
"when've": "when have",
"where'd": "where did",
"where's": "where is",
"where've": "where have",
"who'll": "who will",
"who'll've": "who will have",
"who's": "who is",
"who've": "who have",
"why's": "why is",
"why've": "why have",
"will've": "will have",
"won't": "will not",
"won't've": "will not have",
"would've": "would have",
"wouldn't": "would not",
"wouldn't've": "would not have",
"y'all": "you all",
"y’all": "you all",
"y'all'd": "you all would",
"y'all'd've": "you all would have",
"y'all're": "you all are",
"y'all've": "you all have",
"you'd": "you would",
"you'd've": "you would have",
"you'll": "you will",
"you'll've": "you will have",
"you're": "you are",
"you've": "you have",
"ain’t": "am not",
"aren’t": "are not",
"can’t": "cannot",
"can’t’ve": "cannot have",
"’cause": "because",
"could’ve": "could have",
"couldn’t": "could not",
"couldn’t’ve": "could not have",
"didn’t": "did not",
"doesn’t": "does not",
"don’t": "do not",
"don’t": "do not",
"hadn’t": "had not",
"hadn’t’ve": "had not have",
"hasn’t": "has not",
"haven’t": "have not",
"he’d": "he had",
"he’d’ve": "he would have",
"he’ll": "he will",
"he’ll’ve": "he will have",
"he’s": "he is",
"how’d": "how did",
"how’d’y": "how do you",
"how’ll": "how will",
"how’s": "how is",
"i’d": "i would",
"i’d’ve": "i would have",
"i’ll": "i will",
"i’ll’ve": "i will have",
"i’m": "i am",
"i’ve": "i have",
"isn’t": "is not",
"it’d": "it would",
"it’d’ve": "it would have",
"it’ll": "it will",
"it’ll’ve": "it will have",
"it’s": "it is",
"let’s": "let us",
"ma’am": "madam",
"mayn’t": "may not",
"might’ve": "might have",
"mightn’t": "might not",
"mightn’t’ve": "might not have",
"must’ve": "must have",
"mustn’t": "must not",
"mustn’t’ve": "must not have",
"needn’t": "need not",
"needn’t’ve": "need not have",
"o’clock": "of the clock",
"oughtn’t": "ought not",
"oughtn’t’ve": "ought not have",
"shan’t": "shall not",
"sha’n’t": "shall not",
"shan’t’ve": "shall not have",
"she’d": "she would",
"she’d’ve": "she would have",
"she’ll": "she will",
"she’ll’ve": "she will have",
"she’s": "she is",
"should’ve": "should have",
"shouldn’t": "should not",
"shouldn’t’ve": "should not have",
"so’ve": "so have",
"so’s": "so is",
"that’d": "that would",
"that’d’ve": "that would have",
"that’s": "that is",
"there’d": "there would",
"there’d’ve": "there would have",
"there’s": "there is",
"they’d": "they would",
"they’d’ve": "they would have",
"they’ll": "they will",
"they’ll’ve": "they will have",
"they’re": "they are",
"they’ve": "they have",
"to’ve": "to have",
"wasn’t": "was not",
"we’d": "we would",
"we’d’ve": "we would have",
"we’ll": "we will",
"we’ll’ve": "we will have",
"we’re": "we are",
"we’ve": "we have",
"weren’t": "were not",
"what’ll": "what will",
"what’ll’ve": "what will have",
"what’re": "what are",
"what’s": "what is",
"what’ve": "what have",
"when’s": "when is",
"when’ve": "when have",
"where’d": "where did",
"where’s": "where is",
"where’ve": "where have",
"who’ll": "who will",
"who’ll’ve": "who will have",
"who’s": "who is",
"who’ve": "who have",
"why’s": "why is",
"why’ve": "why have",
"will’ve": "will have",
"won’t": "will not",
"won’t’ve": "will not have",
"would’ve": "would have",
"wouldn’t": "would not",
"wouldn’t’ve": "would not have",
"y’all": "you all",
"y’all": "you all",
"y’all’d": "you all would",
"y’all’d’ve": "you all would have",
"y’all’re": "you all are",
"y’all’ve": "you all have",
"you’d": "you would",
"you’d’ve": "you would have",
"you’ll": "you will",
"you’ll’ve": "you will have",
"you’re": "you are",
"you’re": "you are",
"you’ve": "you have",
}
contractions_re = re.compile('(%s)' % '|'.join(contractions_dict.keys()))
# Function to clean the html from the article
def cleanhtml(raw_html):
    cleanr = re.compile('<.*?>')
    cleantext = re.sub(cleanr, '', raw_html)
    return cleantext

# Function expand the contractions if there's any
def expand_contractions(s, contractions_dict=contractions_dict):
    def replace(match):
        return contractions_dict[match.group(0)]
    return contractions_re.sub(replace, s)

# Function which generates the summary of the articles (This uses the 20% of the sentences with the highest score)
def summary(sentence_score_OwO):
    summary_list = []
    for summ in sentence_score_OwO:
        select_length = int(len(summ)*0.25)
        summary_ = nlargest(select_length, summ, key = summ.get)
        summary_list.append(".".join(summary_))
    return summary_list

# Function to normalize the word frequency which is used in the function word_frequency
def normalize(li_word):
    global normalized_freq
    normalized_freq = []
    for dictionary in li_word:
        max_frequency = max(dictionary.values())
        for word in dictionary.keys():
            dictionary[word] = dictionary[word]/max_frequency
        normalized_freq.append(dictionary)
    return normalized_freq

# Function to calculate the word frequency
def word_frequency(article_word):
    word_frequency = {}
    li_word = []
    for sentence in article_word:
        for word in word_tokenize(sentence):
            if word not in word_frequency.keys():
                word_frequency[word] = 1
            else:
                word_frequency[word] += 1
        li_word.append(word_frequency)
        word_frequency = {}
    normalize(li_word)
    return normalized_freq

# Function to Score the sentence which is called in the function sent_token
def sentence_score(li):
    global sentence_score_list
    sentence_score = {}
    sentence_score_list = []
    for list_, dictionary in zip(li, normalized_freq):
        for sent in list_:
            for word in word_tokenize(sent):
                if word in dictionary.keys():
                    if sent not in sentence_score.keys():
                        sentence_score[sent] = dictionary[word]
                    else:
                        sentence_score[sent] += dictionary[word]
        sentence_score_list.append(sentence_score)
        sentence_score = {}
    return sentence_score_list

# Function to tokenize the sentence
def sent_token(article_sent):
    sentence_list = []
    sent_token = []
    for sent in article_sent:
        token = sent_tokenize(sent)
        for sentence in token:
            token_2 = ''.join(word for word in sentence if word not in punctuation)
            token_2 = re.sub(' +', ' ',token_2)
            sent_token.append(token_2)
        sentence_list.append(sent_token)
        sent_token = []
    sentence_score(sentence_list)
    return sentence_score_list

# Function to preprocess the articles
def preprocessing(article):
    global article_sent

    # Converting to lowercase
    article = article.str.lower()

    # Removing the HTML
    article = article.apply(lambda x: cleanhtml(x))

    # Removing the email ids
    article = article.apply(lambda x: re.sub('\S+@\S+','', x))

    # Removing The URLS
    article = article.apply(lambda x: re.sub("((http\://|https\://|ftp\://)|(www.))+(([a-zA-Z0-9\.-]+\.[a-zA-Z]{2,4})|([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}))(/[a-zA-Z0-9%:/-_\?\.'~]*)?",'', x))

    # Removing the '\xa0'
    article = article.apply(lambda x: x.replace("\xa0", " "))

    # Removing the contractions
    article = article.apply(lambda x: expand_contractions(x))

    # Stripping the possessives
    article = article.apply(lambda x: x.replace("'s", ''))
    article = article.apply(lambda x: x.replace('’s', ''))
    article = article.apply(lambda x: x.replace("\'s", ''))
    article = article.apply(lambda x: x.replace("\’s", ''))

    # Removing the Trailing and leading whitespace and double spaces
    article = article.apply(lambda x: re.sub(' +', ' ',x))

    # Copying the article for the sentence tokenization
    article_sent = article.copy()

    # Removing punctuations from the article
    article = article.apply(lambda x: ''.join(word for word in x if word not in punctuation))

    # Removing the Trailing and leading whitespace and double spaces again as removing punctuation might
    # Lead to a white space
    article = article.apply(lambda x: re.sub(' +', ' ',x))

    # Removing the Stopwords
    article = article.apply(lambda x: ' '.join(word for word in x.split() if word not in stop_words))

    return article

# Functions to change the article string (if passed) to change it to generate a pandas series
def make_series(art):
    global dataframe
    data_dict = {'article' : [art]}
    dataframe = pd.DataFrame(data_dict)['article']
    return dataframe

# Function which is to be called to generate the summary which in further calls other functions alltogether
def article_summarize(artefact):

    if type(artefact) != pd.Series:
        artefact = make_series(artefact)

    df = preprocessing(artefact)

    word_normalization = word_frequency(df)

    sentence_score_OwO = sent_token(article_sent)

    summarized_article = summary(sentence_score_OwO)

    return summarized_article

In [None]:
if __name__ == "__main__":
  processed_folder = ""  # Path to save processed CSVs

  csv_path = f'{processed_folder}/{bank}/final_qa_df.csv'
  transcript_df = pd.read_csv(csv_path)

  summaries = article_summarize(transcript_df['Dialogue'])

  transcript_df["Summarised_dialogue"] = ""
  for i, row in transcript_df.iterrows():
      transcript_df.loc[transcript_df.index == i, 'Summarised_dialogue'] = summaries[i]

  save_to_csv(transcript_df, save_folder=f'{processed_folder}/{bank}', filename='final_qa_df.csv')

# Sentiment Analysis - using yiyanghkust/finbert-tone

In [None]:
!pip install transformers torch

In [None]:
!pip install langchain-huggingface langchain chromadb pypdf sentence-transformers accelerate langchain-community

In [None]:
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter

In [None]:
from transformers import BertTokenizer, BertForSequenceClassification
from transformers import pipeline

finbert = BertForSequenceClassification.from_pretrained('yiyanghkust/finbert-tone',num_labels=3)
tokenizer = BertTokenizer.from_pretrained('yiyanghkust/finbert-tone')

nlp = pipeline("sentiment-analysis", model=finbert, tokenizer=tokenizer)

In [None]:
def get_result(results):
  positive_count = 0
  neutral_count = 0
  negative_count = 0

  for result in results:
    if result["label"] == "Positive":
      positive_count += 1
    elif result["label"] == "Neutral":
      neutral_count += 1
    else:
      negative_count += 1

  total_count = len(results)
  negative_pct = (negative_count*100)/total_count
  positive_pct = (positive_count*100)/total_count
  neutral_pct = (neutral_count*100)/total_count

  sentiment_pct = {
    "Positive": positive_pct,
    "Neutral": neutral_pct,
    "Negative": negative_pct,
  }

  # Get the dominant sentiment
  dominant_sentiment = max(sentiment_pct, key=sentiment_pct.get)

  return dominant_sentiment, sentiment_pct

# Initialize the RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
   chunk_size=512,
   chunk_overlap=20,
   length_function=len,
   add_start_index=True,
)

def analyse_detail_sentiment(transcript_df, bank, dialogue_col):
  detail_sentiment_data = []
  transcript_grouped_df = transcript_df.groupby(by=["Year", "Quarter"])
  for name, groups in transcript_grouped_df:
      for i, row in groups.iterrows():
        text = row[dialogue_col]
        if text:
          #sentences = [text[i:i+512] for i in range(0, len(text), 512)]
          documents = [Document(
            page_content=text,
            metadata=row.to_dict()
          )]
          chunks = text_splitter.split_documents(documents)
          sentences = [chunk.page_content for chunk in chunks]
          results = nlp(sentences)

          sentiment, scores = get_result(results)

          detail_sentiment_data.append({
              "Year": row['Year'],
              "Quarter": row['Quarter'],
              "Sentiment": sentiment,
              "Sentiment_Score": round(scores[sentiment], 2),
              "Bank": bank,
              "Section": row['Section']
          })

  return pd.DataFrame(detail_sentiment_data)

In [None]:
def get_sentiment_percentage(groups, total_count):
    negative_count = len(groups[(groups["Sentiment"]=="Negative")])
    negative_pct = (negative_count*100)/total_count

    positive_count = len(groups[(groups["Sentiment"]=="Positive")])
    positive_pct = (positive_count*100)/total_count

    neutral_count = len(groups[(groups["Sentiment"]=="Neutral")])
    neutral_pct = (neutral_count*100)/total_count

    sentiment_pct = {
      "Positive": positive_pct,
      "Neutral": neutral_pct,
      "Negative": negative_pct,
    }

    # Get the dominant sentiment
    dominant_sentiment = max(sentiment_pct, key=sentiment_pct.get)

    return dominant_sentiment, sentiment_pct

def summarise_sentiments(sentiment_df, section, dialogue_col):
    sentiment_detail_grouped_df = sentiment_df.groupby(by=["Year", "Quarter"])

    quaterly_sentiment_data = []
    for name, groups in sentiment_detail_grouped_df:
        for i, row in groups.iterrows():
            year = row["Year"]
            quarter = row["Quarter"]

        total_count = groups.Bank.count()

        sentiment, sentiment_pct = get_sentiment_percentage(groups, total_count)

        quaterly_sentiment_data.append({
            "Year": row['Year'],
            "Quarter": row['Quarter'],
            "Bank": row['Bank'],
            "Section": section,
            "Sentiment": sentiment,
            "Positivity": round(sentiment_pct["Positive"], 2),
            "Neutrality": round(sentiment_pct["Neutral"], 2),
            "Negativity": round(sentiment_pct["Negative"], 2)
        })

    return pd.DataFrame(quaterly_sentiment_data)

def get_combined_sentiment_pct(row, q_qa_sentiment_df, q_presentation_sentiment_df, sentiment_col):
    qa_positivity = q_qa_sentiment_df.loc[(q_qa_sentiment_df['Year'] == row['Year']) & (q_qa_sentiment_df['Quarter'] == row['Quarter'])][sentiment_col]
    presentation_positivity = q_presentation_sentiment_df.loc[(q_presentation_sentiment_df['Year'] == row['Year']) & (q_presentation_sentiment_df['Quarter'] == row['Quarter'])][sentiment_col]
    combined_positivity = presentation_positivity.values[0] + qa_positivity.values[0]
    if combined_positivity > 0:
        combined_positivity = combined_positivity / 2
    return combined_positivity

def get_combined_sentiment(q_qa_sentiment_df, q_presentation_sentiment_df):
    quaterly_sentiment_data = []
    for i, row in q_presentation_sentiment_df.iterrows():
        combined_positivity = get_combined_sentiment_pct(row, q_qa_sentiment_df, q_presentation_sentiment_df, "Positivity")
        combined_neutrality = get_combined_sentiment_pct(row, q_qa_sentiment_df, q_presentation_sentiment_df, "Neutrality")
        combined_negativity = get_combined_sentiment_pct(row, q_qa_sentiment_df, q_presentation_sentiment_df, "Negativity")

        if combined_negativity >= 7.5:
            sentiment = "Negative"
        elif combined_positivity > combined_neutrality:
            sentiment = "Positive"
        else:
            sentiment = "Neutral"

        quaterly_sentiment_data.append({
            "Year": row['Year'],
            "Quarter": row['Quarter'],
            "Bank": row['Bank'],
            "Section": 'Combined',
            "Sentiment": sentiment,
            "Positivity": round(combined_positivity, 2),
            "Neutrality": round(combined_neutrality, 2),
            "Negativity": round(combined_negativity, 2)
        })

    return pd.DataFrame(quaterly_sentiment_data)

In [None]:
if __name__ == "__main__":
  processed_folder = ""  # Path to save processed CSVs

  csv_path = f'{processed_folder}/{bank}/final_qa_df.csv'
  transcript_df = pd.read_csv(csv_path)
  transcript_df = transcript_df[transcript_df['Text Type'] != "Operator"]

  dialogue_cols = ["Summarised_dialogue", "Dialogue"]
  for dialogue_col in dialogue_cols:

      dialog_df = transcript_df.dropna(subset=[dialogue_col])

      # Sentiment analysis for dialog
      detail_sentiment_df = analyse_detail_sentiment(dialog_df, bank, dialogue_col)
      save_to_csv(detail_sentiment_df, save_folder=f'{processed_folder}/{bank}', filename=f'detail_sentiment_df_{dialogue_col}.csv')

      # Summarise quaterly sentiments - Presentation section
      presentation_sentiment_df = detail_sentiment_df[detail_sentiment_df['Section'] == "Presentation"]
      quaterly_presentation_sentiment_df = summarise_sentiments(presentation_sentiment_df, "Presentation", dialogue_col)
      save_to_csv(quaterly_presentation_sentiment_df, save_folder=f'{processed_folder}/{bank}', filename=f'quaterly_presentation_sentiment_df_{dialogue_col}.csv')

      # Summarise quaterly sentiments - Question-and-Answer section
      qa_sentiment_df = detail_sentiment_df[detail_sentiment_df['Section'] == "Question-and-Answer"]
      quaterly_qa_sentiment_df = summarise_sentiments(qa_sentiment_df, "Question-and-Answer", dialogue_col)
      save_to_csv(quaterly_qa_sentiment_df, save_folder=f'{processed_folder}/{bank}', filename=f'quaterly_qa_sentiment_df_{dialogue_col}.csv')

      # Summarise quaterly sentiments - combined
      quaterly_combined_sentiment_df = get_combined_sentiment(quaterly_qa_sentiment_df, quaterly_presentation_sentiment_df)
      save_to_csv(quaterly_combined_sentiment_df, save_folder=f'{processed_folder}/{bank}', filename=f'quaterly_combined_sentiment_df_{dialogue_col}.csv')