<a href="https://colab.research.google.com/github/Just-a-code-lover/Investment-Evaluator/blob/main/Invest_Advisor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [21]:
import torch
from transformers import PegasusTokenizer, PegasusForConditionalGeneration
import nltk
from nltk.tokenize import sent_tokenize
import re
import time
from typing import List, Dict, Any
import PyPDF2
import requests
from google.colab import files
import json

class DialogueSummarizer:
    def __init__(self, chunk_size: int = 3000):
        """
        Initialize the dialogue summarizer with Pegasus model and configurations.
        """
        try:
            nltk.data.find('tokenizers/punkt')
        except LookupError:
            nltk.download('punkt')

        self.chunk_size = chunk_size

        print("Loading Pegasus model...")
        self.model_name = "human-centered-summarization/financial-summarization-pegasus"
        self.tokenizer = PegasusTokenizer.from_pretrained(self.model_name)
        self.model = PegasusForConditionalGeneration.from_pretrained(self.model_name)

        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = self.model.to(self.device)
        print(f"Model loaded. Using device: {self.device}")

    def extract_text_from_pdf(self, pdf_path: str) -> str:
        """
        Extract text from PDF while preserving speaker attributions.
        """
        with open(pdf_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            text = ""
            for page in pdf_reader.pages:
                text += page.extract_text() + "\n"
        return text

    def parse_dialogue(self, text: str) -> List[Dict]:
        """
        Parse text into structured dialogue format.
        """
        dialogue_segments = []
        lines = text.split('\n')
        current_speaker = None
        current_content = []

        for line in lines:
            line = line.strip()
            if not line:
                continue

            speaker_match = re.match(r'^([^:]+):\s*(.*)$', line)

            if speaker_match:
                if current_speaker and current_content:
                    dialogue_segments.append({
                        'speaker': current_speaker,
                        'content': ' '.join(current_content)
                    })
                    current_content = []

                current_speaker = speaker_match.group(1).strip()
                content = speaker_match.group(2).strip()
                if content:
                    current_content.append(content)
            else:
                if current_speaker:
                    current_content.append(line)

        if current_speaker and current_content:
            dialogue_segments.append({
                'speaker': current_speaker,
                'content': ' '.join(current_content)
            })

        return dialogue_segments

    def clean_text(self, text: str) -> str:
        """
        Clean and preprocess the input text while preserving speaker attributions.
        """
        text = re.sub(r'\s+', ' ', text)
        text = re.sub(r'\[.*?\]', '', text)
        text = ' '.join(text.split())
        return text.strip()

    def split_into_chunks(self, dialogue_segments: List[Dict]) -> List[List[Dict]]:
        """
        Split dialogue segments into chunks while preserving speaker context.
        """
        chunks = []
        current_chunk = []
        current_length = 0

        for segment in dialogue_segments:
            segment_length = len(segment['content'])

            if current_length + segment_length > self.chunk_size:
                if current_chunk:
                    chunks.append(current_chunk)
                current_chunk = [segment]
                current_length = segment_length
            else:
                current_chunk.append(segment)
                current_length += segment_length

        if current_chunk:
            chunks.append(current_chunk)

        return chunks

    def format_chunk_for_summarization(self, chunk: List[Dict]) -> str:
        """
        Format a chunk of dialogue segments for summarization.
        """
        formatted_text = ""
        for segment in chunk:
            formatted_text += f"{segment['speaker']}: {segment['content']}\n"
        return formatted_text

    def summarize_chunk(self, chunk: List[Dict], max_length: int = 150) -> str:
        """
        Summarize a chunk of dialogue while preserving speaker context.
        """
        formatted_text = self.format_chunk_for_summarization(chunk)

        inputs = self.tokenizer(formatted_text, return_tensors="pt", truncation=True, max_length=512)
        inputs = inputs.to(self.device)

        summary_ids = self.model.generate(
            inputs.input_ids,
            max_length=max_length,
            num_beams=4,
            length_penalty=2.0,
            early_stopping=True,
            no_repeat_ngram_size=3
        )

        summary = self.tokenizer.decode(summary_ids[0], skip_special_tokens=True)
        return summary

    def process_document(self, pdf_path: str, verbose: bool = True) -> Dict:
        """
        Process the entire PDF document: extract text, parse dialogue, chunk, and summarize.
        """
        if verbose:
            print("Extracting text from PDF...")
        text = self.extract_text_from_pdf(pdf_path)

        if verbose:
            print("Parsing dialogue...")
        dialogue_segments = self.parse_dialogue(text)

        if verbose:
            print("Splitting into chunks...")
        chunks = self.split_into_chunks(dialogue_segments)

        if verbose:
            print(f"Generated {len(chunks)} chunks. Summarizing each chunk...")

        chunk_summaries = []
        formatted_chunks = []

        for i, chunk in enumerate(chunks, 1):
            if verbose:
                print(f"Processing chunk {i}/{len(chunks)}...")

            formatted_chunk = self.format_chunk_for_summarization(chunk)
            formatted_chunks.append(formatted_chunk)

            summary = self.summarize_chunk(chunk)
            chunk_summaries.append(summary)

            if verbose:
                print(f"Generated summary length: {len(summary)} characters")
                print("---")

            time.sleep(1)

        combined_summary = " ".join(chunk_summaries)

        return {
            'dialogue_segments': dialogue_segments,
            'formatted_chunks': formatted_chunks,
            'chunk_summaries': chunk_summaries,
            'combined_summary': combined_summary
        }

class GPTAnalyzer:
    def __init__(self, api_key: str):
        """
        Initialize the GPT Analyzer with RapidAPI configuration.
        """
        self.url = "https://chatgpt-42.p.rapidapi.com/conversationgpt4-2"
        self.headers = {
            "x-rapidapi-key": api_key,
            "x-rapidapi-host": "chatgpt-42.p.rapidapi.com",
            "Content-Type": "application/json"
        }

    def create_analysis_prompt(self, text: str) -> str:
        """
        Create a detailed prompt for financial analysis.
        """
        return f"""As an investment advisor, analyze the following earnings call transcript
        and provide specific actionable insights for investors. Focus on:

        1. Future Growth Prospects:
        - Identify specific growth initiatives
        - Evaluate market expansion plans
        - Assess new product/service developments

        2. Key Business Changes:
        - Recent strategic shifts
        - Management changes
        - Operational modifications

        3. Key Triggers & Catalysts:
        - Upcoming milestones
        - Potential market opportunities
        - Strategic partnerships or acquisitions

        4. Material Information for Future Earnings:
        - Revenue drivers
        - Margin trends
        - Cost management initiatives
        - Market conditions impact

        5. Investment Advice:
        - Key risks and mitigation strategies
        - Competitive advantages
        - Recommended investor action points

        Transcript:
        {text}

        Provide a structured analysis with specific numbers and timelines where mentioned.
        Include actionable recommendations for investors."""

    def get_gpt_analysis(self, text: str, max_retries: int = 3) -> Dict[str, Any]:
        """
        Get GPT analysis with retry mechanism.
        """
        prompt = self.create_analysis_prompt(text)
        payload = {
            "messages": [
                {
                    "role": "user",
                    "content": prompt
                }
            ],
            "system_prompt": "You are an experienced financial analyst providing detailed insights for investors.",
            "temperature": 0.7,
            "top_k": 5,
            "top_p": 0.9,
            "max_tokens": 1000,
            "web_access": False
        }

        for attempt in range(max_retries):
            try:
                response = requests.post(self.url, json=payload, headers=self.headers)
                response.raise_for_status()
                return response.json()
            except requests.exceptions.RequestException as e:
                if attempt == max_retries - 1:
                    raise Exception(f"Failed to get GPT analysis after {max_retries} attempts: {str(e)}")
                time.sleep(2 ** attempt)  # Exponential backoff

def main():
    """
    Main function to run the complete analysis pipeline.
    """
    # Your RapidAPI key
    api_key = "Refer to the email send to you"

    # Initialize both summarizer and analyzer
    summarizer = DialogueSummarizer(chunk_size=1000)
    analyzer = GPTAnalyzer(api_key)

    print("Please upload your PDF file...")
    uploaded = files.upload()
    pdf_filename = list(uploaded.keys())[0]

    try:
        # Get document summary
        print("\nProcessing document...")
        doc_results = summarizer.process_document(pdf_filename)
        combined_summary = doc_results['combined_summary']

        print("\nDocument Processing Results:")
        print("=" * 50)
        print("\nDialogue Segments:")
        for segment in doc_results['dialogue_segments']:
            print(f"\n{segment['speaker']}:")
            print(segment['content'][:100] + "..." if len(segment['content']) > 100 else segment['content'])

        print("\nChunk Summaries:")
        for i, summary in enumerate(doc_results['chunk_summaries'], 1):
            print(f"\nSummary {i}:")
            print(summary)

        # Get GPT analysis
        print("\nAnalyzing with GPT...")
        analysis_results = analyzer.get_gpt_analysis(combined_summary)

        print("\nInvestment Analysis Results:")
        print("=" * 50)

        if isinstance(analysis_results, dict) and 'choices' in analysis_results:
            # Extract the analysis text
            analysis_text = analysis_results['choices'][0]['message']['content']

            # Print analysis with proper formatting, replacing "\n" with actual newlines
            print(analysis_text.replace("\\n", "\n"))
        else:
            print("My Advice:", analysis_results)

    except Exception as e:
        print(f"Error during document processing: {str(e)}")
        raise

if __name__ == "__main__":
    # Install required packages
    !pip install transformers nltk torch PyPDF2

    main()

Loading Pegasus model...


Some weights of PegasusForConditionalGeneration were not initialized from the model checkpoint at human-centered-summarization/financial-summarization-pegasus and are newly initialized: ['model.decoder.embed_positions.weight', 'model.encoder.embed_positions.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Model loaded. Using device: cpu
Please upload your PDF file...


Saving SJS Transcript Call.pdf to SJS Transcript Call (6).pdf

Processing document...
Extracting text from PDF...
Parsing dialogue...
Splitting into chunks...
Generated 45 chunks. Summarizing each chunk...
Processing chunk 1/45...
Generated summary length: 81 characters
---
Processing chunk 2/45...
Generated summary length: 89 characters
---
Processing chunk 3/45...
Generated summary length: 70 characters
---
Processing chunk 4/45...
Generated summary length: 73 characters
---
Processing chunk 5/45...
Generated summary length: 86 characters
---
Processing chunk 6/45...
Generated summary length: 53 characters
---
Processing chunk 7/45...
Generated summary length: 68 characters
---
Processing chunk 8/45...
Generated summary length: 76 characters
---
Processing chunk 9/45...
Generated summary length: 79 characters
---
Processing chunk 10/45...
Generated summary length: 74 characters
---
Processing chunk 11/45...
Generated summary length: 67 characters
---
Processing chunk 12/45...
Generat