# Install Libraries

In [None]:
# Install necessary packages
!pip install streamlit>=1.27.0 pyngrok>=5.1.0 pandas matplotlib seaborn

# Install financial analysis-specific packages
!pip install faiss-cpu>=1.7.0 sentence-transformers>=2.2.0 transformers>=4.30.0 torch>=2.0.0

# Setup ngrok for tunneling
!pip install pyngrok




In [None]:
# Mount Google Drive to access your data
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!huggingface-cli login


    _|    _|  _|    _|    _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|_|_|_|    _|_|      _|_|_|  _|_|_|_|
    _|    _|  _|    _|  _|        _|          _|    _|_|    _|  _|            _|        _|    _|  _|        _|
    _|_|_|_|  _|    _|  _|  _|_|  _|  _|_|    _|    _|  _|  _|  _|  _|_|      _|_|_|    _|_|_|_|  _|        _|_|_|
    _|    _|  _|    _|  _|    _|  _|    _|    _|    _|    _|_|  _|    _|      _|        _|    _|  _|        _|
    _|    _|    _|_|      _|_|_|    _|_|_|  _|_|_|  _|      _|    _|_|_|      _|        _|    _|    _|_|_|  _|_|_|_|

    To log in, `huggingface_hub` requires a token generated from https://huggingface.co/settings/tokens .
Enter your token (input will not be visible): 
Add token as git credential? (Y/n) n
Token is valid (permission: fineGrained).
The token `caden_access` has been saved to /root/.cache/huggingface/stored_tokens
Your token has been saved to /root/.cache/huggingface/token
Login successful.
The current active token is: `caden

In [None]:
import os
from pyngrok import ngrok
ngrok_auth_token =
os.environ["NGROK_AUTH_TOKEN"] = ngrok_auth_token
!ngrok authtoken $ngrok_auth_token

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [None]:
# Create the app.py file
%%writefile app.py

import streamlit as st
import os
import json
import time
from typing import List, Tuple, Dict, Any
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import re

# Import your existing system components
# Adjust the import paths as needed based on your project structure
import sys
sys.path.append(".")  # Add the current directory to path

# Import from your updated financialanalysis.py file
# We're directly importing the classes we need from the main module
try:
    from financialanalysis import (
        Document,
        FinancialDataLoader,
        VectorStore,
        PromptBuilder,
        MistralModel,
        Logger,
        EnhancedFinancialAnalysisSystem  # Import the enhanced system class
    )
except ImportError as e:
    st.error(f"Error importing financialanalysis module: {e}")
    st.info("Make sure financialanalysis.py is in the same directory as this script or in the Python path.")


Writing app.py


# UI

## UI Wrapper

In [None]:
%%writefile -a app.py

class FinancialAnalysisUI:
    """UI wrapper for the Financial Analysis System."""

    def __init__(self):
        """Initialize the UI components."""
        self.analysis_system = None
        self.is_initialized = False
        self.base_dir = None
        self.vector_store_dir = None

    def initialize_system(self, base_dir: str, vector_store_dir: str,
                         rebuild_vector_store: bool = False,
                         model_name: str = "mistralai/Mistral-7B-Instruct-v0.1"):
        """Initialize the system components."""
        self.base_dir = base_dir
        self.vector_store_dir = vector_store_dir

        with st.spinner("Initializing system components..."):
            try:
                # Create an instance of the enhanced system
                self.analysis_system = EnhancedFinancialAnalysisSystem(base_dir, vector_store_dir)

                # Initialize the system with proper model and vector store settings
                st.info(f"Initializing system with model: {model_name}")
                self.analysis_system.initialize(rebuild_vector_store=rebuild_vector_store,
                                              model_name=model_name)

                self.is_initialized = True
                st.success("System initialization complete")
            except Exception as e:
                st.error(f"Error during initialization: {str(e)}")
                import traceback
                st.code(traceback.format_exc(), language="python")

    def query(self, query: str, audience: str = "analyst", top_k: int = 10) -> Dict:
        """Process a query and generate a response.

        Returns a dictionary with the response and metadata.
        """
        if not self.is_initialized or not self.analysis_system:
            raise ValueError("System not initialized.")

        # Record start time
        start_time = time.time()

        # Use the EnhancedFinancialAnalysisSystem to process the query
        with st.spinner(f"Processing query with {audience} perspective..."):
            # We'll use a try-except block to handle any errors during processing
            try:
                # Call the system's query method directly - it handles retrieval, prompt building,
                # response generation, and logging all in one step
                response = self.analysis_system.query(query, audience=audience, top_k=top_k)

                # Calculate response time
                response_time = time.time() - start_time

                # Get the retrieved documents from the vector store
                # This is needed because the system doesn't directly return the docs in its response
                retrieved_docs = self.analysis_system.vector_store.search(query, top_k=top_k)

                # Return a structured result with all the metadata we need for the UI
                return {
                    "response": response,
                    "retrieved_docs": retrieved_docs,
                    "response_time": response_time,
                    # For now, we don't have access to these but they're not crucial for the UI
                    "system_prompt": None,
                    "user_prompt": None
                }
            except Exception as e:
                st.error(f"Error processing query: {str(e)}")
                import traceback
                st.code(traceback.format_exc(), language="python")
                raise

    def get_source_distribution(self, retrieved_docs: List[Tuple[Document, float]]) -> pd.DataFrame:
        """Get distribution of document sources."""
        sources = {}
        for doc, _ in retrieved_docs:
            source = doc.metadata.get('source', 'Unknown')
            if source not in sources:
                sources[source] = 0
            sources[source] += 1

        return pd.DataFrame({
            'Source': list(sources.keys()),
            'Count': list(sources.values())
        })

    def get_company_distribution(self, retrieved_docs: List[Tuple[Document, float]]) -> pd.DataFrame:
        """Get distribution of companies."""
        companies = {}
        for doc, _ in retrieved_docs:
            company = doc.metadata.get('company', 'Unknown')
            if company not in companies:
                companies[company] = 0
            companies[company] += 1

        return pd.DataFrame({
            'Company': list(companies.keys()),
            'Count': list(companies.values())
        })
    def _truncate_source_name(self, source: str) -> str:
        """Truncate or simplify source names for better display in plots.

        Args:
            source: The original source string

        Returns:
            A shortened version suitable for display
        """
        # For URLs
        if source.startswith('http'):
            # Extract domain name
            from urllib.parse import urlparse
            domain = urlparse(source).netloc

            # Return just the domain, or first part of it
            if domain:
                return domain.replace('www.', '')

            # If we can't parse the URL well, just take first part
            path_parts = source.split('/')
            if len(path_parts) > 2:
                return f"{path_parts[2]}..."
            return source[:20] + "..."

        # For file paths, extract just filename
        elif '/' in source or '\\' in source:
            return os.path.basename(source)

        # For other long strings
        elif len(source) > 25:
            return source[:22] + "..."

        return source

    def plot_source_distribution(self, retrieved_docs: List[Tuple[Document, float]]):
        """Plot the distribution of document sources."""
        df = self.get_source_distribution(retrieved_docs)

        # Truncate long source names for display
        df['DisplaySource'] = df['Source'].apply(lambda x: self._truncate_source_name(x))

        fig, ax = plt.subplots(figsize=(10, 6))
        sns.barplot(x='DisplaySource', y='Count', data=df, ax=ax)
        ax.set_title('Document Sources')
        ax.set_xlabel('Source')
        ax.set_ylabel('Count')
        plt.xticks(rotation=45)
        plt.tight_layout()
        return fig

    def plot_company_distribution(self, retrieved_docs: List[Tuple[Document, float]]):
        """Plot the distribution of companies."""
        df = self.get_company_distribution(retrieved_docs)
        fig, ax = plt.subplots(figsize=(10, 6))
        sns.barplot(x='Company', y='Count', data=df, ax=ax)
        ax.set_title('Companies Mentioned')
        ax.set_xlabel('Company')
        ax.set_ylabel('Count')
        plt.xticks(rotation=45)
        plt.tight_layout()
        return fig

    def plot_relevance_scores(self, retrieved_docs: List[Tuple[Document, float]]):
        """Plot the relevance scores of retrieved documents."""
        companies = []
        sources = []
        display_sources = []  # New shorter display versions
        scores = []

        for doc, score in retrieved_docs:
            companies.append(doc.metadata.get('company', 'Unknown'))
            source = doc.metadata.get('source', 'Unknown')
            sources.append(source)
            # Create a shortened display version
            display_sources.append(self._truncate_source_name(source))
            scores.append(score)

        df = pd.DataFrame({
            'Company': companies,
            'Source': sources,
            'DisplaySource': display_sources,  # Add the display version
            'Score': scores
        })

        fig, ax = plt.subplots(figsize=(12, 6))
        sns.barplot(x='Score', y=range(len(df)), hue='DisplaySource', data=df,
                  orient='h', ax=ax, dodge=False)
        ax.set_title('Document Relevance Scores')
        ax.set_xlabel('Relevance Score (Lower is Better)')
        ax.set_yticks(range(len(df)))
        ax.set_yticklabels([f"{c} ({s})" for c, s in zip(companies, display_sources)])
        plt.tight_layout()
        return fig

Appending to app.py


## UI

In [None]:
%%writefile -a app.py

def main():
    st.set_page_config(
        page_title="Financial Analysis System",
        page_icon="ðŸ’¹",
        layout="wide",
        initial_sidebar_state="expanded",
    )

    st.title("Financial Analysis System")

    # Create an instance of the UI
    if 'ui' not in st.session_state:
        st.session_state.ui = FinancialAnalysisUI()
        st.session_state.initialized = False
        st.session_state.query_count = 0
        st.session_state.last_query_time = None
        st.session_state.last_result = None

    ui = st.session_state.ui

    # Sidebar for configuration
    with st.sidebar:
        st.header("Configuration")

        if not st.session_state.initialized:
            st.subheader("System Initialization")

            # Setup for Google Colab - use mounted Drive paths
            base_dir = st.text_input("Base Directory",
                                     value="/content/drive/MyDrive/MSDS490_Project")
            vector_store_dir = st.text_input("Vector Store Directory",
                                            value="/content/drive/MyDrive/MSDS490_Project/vector_store")

            rebuild_vector_store = st.checkbox("Rebuild Vector Store", value=False)

            model_options = {
                "Mistral 7B Instruct": "mistralai/Mistral-7B-Instruct-v0.1",
                "TinyLlama 1.1B": "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
            }
            selected_model = st.selectbox("Select Model", list(model_options.keys()))
            model_name = model_options[selected_model]

            if st.button("Initialize System"):
                try:
                    ui.initialize_system(
                        base_dir=base_dir,
                        vector_store_dir=vector_store_dir,
                        rebuild_vector_store=rebuild_vector_store,
                        model_name=model_name
                    )
                    st.session_state.initialized = True
                except Exception as e:
                    st.error(f"Error initializing system: {str(e)}")

        if st.session_state.initialized:
            st.success("System Initialized")

            st.subheader("Query Settings")
            audience_options = {
                "Financial Analyst": "analyst",
                "Executive": "executive",
                "Investor": "investor"
            }
            selected_audience = st.selectbox("Select Audience", list(audience_options.keys()))
            audience = audience_options[selected_audience]

            top_k = st.slider("Number of documents to retrieve", 5, 20, 10)

            st.subheader("System Info")
            st.write(f"Queries processed: {st.session_state.query_count}")
            if st.session_state.last_query_time:
                st.write(f"Last query: {st.session_state.last_query_time.strftime('%H:%M:%S')}")

            if st.button("Reset System"):
                st.session_state.initialized = False
                st.session_state.query_count = 0
                st.session_state.last_query_time = None
                st.session_state.last_result = None
                st.rerun()

    # Main content
    if not st.session_state.initialized:
        st.info("Please initialize the system using the sidebar.")
    else:
        # Query input
        st.subheader("Ask a financial question")
        query = st.text_area("Enter your query:",
                            placeholder="E.g., What are the key financial metrics for Apple from the last fiscal year?",
                            height=100)

        col1, col2 = st.columns([1, 5])
        with col1:
            submit_button = st.button("Submit Query", use_container_width=True)

        # Process query when button is clicked
        if submit_button and query:
            try:
                result = ui.query(query, audience=audience, top_k=top_k)
                st.session_state.last_result = result
                st.session_state.query_count += 1
                st.session_state.last_query_time = datetime.now()
                st.rerun()
            except Exception as e:
                st.error(f"Error processing query: {str(e)}")

        # Display result if available
        if st.session_state.last_result:
            result = st.session_state.last_result

            # Display the response
            st.subheader("Response")
            st.markdown(result["response"])

            st.divider()

            # Display visualizations
            st.subheader("Insights from Retrieved Documents")

            col1, col2 = st.columns(2)

            with col1:
                st.write("Document Sources")
                fig1 = ui.plot_source_distribution(result["retrieved_docs"])
                st.pyplot(fig1)

            with col2:
                st.write("Companies Mentioned")
                fig2 = ui.plot_company_distribution(result["retrieved_docs"])
                st.pyplot(fig2)

            st.write("Document Relevance")
            fig3 = ui.plot_relevance_scores(result["retrieved_docs"])
            st.pyplot(fig3)

            # Advanced details in an expander
            with st.expander("Advanced Details"):
                st.write(f"Response Time: {result['response_time']:.2f} seconds")
                st.write(f"Documents Retrieved: {len(result['retrieved_docs'])}")

                st.subheader("Retrieved Documents")

                # Create tabs for documents instead of nested expanders
                doc_tabs = st.tabs([f"Doc {i+1}: {doc.metadata.get('company', 'Unknown')}"
                                  for i, (doc, _) in enumerate(result['retrieved_docs'])])

                # Fill each tab with document details
                for i, tab in enumerate(doc_tabs):
                    doc, score = result['retrieved_docs'][i]
                    with tab:
                        st.write(f"**Source:** {doc.metadata.get('source', 'Unknown')}")
                        st.write(f"**Company:** {doc.metadata.get('company', 'Unknown')}")
                        st.write(f"**Relevance Score:** {score:.4f}")

                        if 'date' in doc.metadata:
                            st.write(f"**Date:** {doc.metadata['date']}")
                        elif 'year' in doc.metadata:
                            st.write(f"**Year:** {doc.metadata['year']}")

                        st.text_area("Content", value=doc.text, height=200, disabled=True)

if __name__ == "__main__":
    main()

Appending to app.py


## Run It

In [None]:
# Copy your financialanalysis.py file to the current directory if it's in Google Drive
import os
import shutil

# Change this path to match your financialanalysis.py location in Google Drive
finance_module_path = '/content/drive/MyDrive/MSDS490_Project/financialanalysis.py'

if os.path.exists(finance_module_path):
    shutil.copy(finance_module_path, './financialanalysis.py')
    print("Copied financialanalysis.py to current directory")
else:
    print(f"Could not find {finance_module_path}. Please make sure the file exists or update the path.")

# Run the Streamlit app with ngrok tunnel
!nohup streamlit run app.py --server.port 8501 &

# Set up ngrok tunnel to expose Streamlit
public_url = ngrok.connect(8501)
print(f"Streamlit app is available at: {public_url}")

# Keep the notebook running
import IPython
from IPython.display import display, HTML
display(HTML(f'<a href="{public_url}">Click here to open the Financial Analysis UI</a>'))

# To stop the app, run the following command:
# !pkill -f streamlit

Copied financialanalysis.py to current directory
nohup: appending output to 'nohup.out'
Streamlit app is available at: NgrokTunnel: "https://9507-35-222-104-63.ngrok-free.app" -> "http://localhost:8501"
