<a href="https://colab.research.google.com/github/DarthCoder501/ML-AI-Projects/blob/main/Financial_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Libraries

In [None]:
! pip install yfinance langchain_pinecone openai python-dotenv langchain-community sentence_transformers

In [None]:
from langchain_pinecone import PineconeVectorStore
from openai import OpenAI
import dotenv
import json
import yfinance as yf
import concurrent.futures
from langchain_community.embeddings import HuggingFaceEmbeddings
from google.colab import userdata
from langchain.schema import Document
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
from pinecone import Pinecone
import numpy as np
import requests
import os

In [None]:
def get_stock_info(symbol: str) -> dict:
    """
    Retrieves and formats detailed information about a stock from Yahoo Finance.

    Args:
        symbol (str): The stock ticker symbol to look up.

    Returns:
        dict: A dictionary containing detailed stock information, including ticker, name,
              business summary, city, state, country, industry, and sector.
    """
    data = yf.Ticker(symbol)
    stock_info = data.info

    properties = {
        "Ticker": stock_info.get("symbol", "Information not available"),
        "Name": stock_info.get("longName", "Information not available"),
        "Business Summary": stock_info.get(
            "longBusinessSummary", "Information not available"
        ),
        "City": stock_info.get("city", "Information not available"),
        "State": stock_info.get("state", "Information not available"),
        "Country": stock_info.get("country", "Information not available"),
        "Industry": stock_info.get("industry", "Information not available"),
        "Sector": stock_info.get("sector", "Information not available"),
    }

    return properties

In [None]:
data = yf.Ticker("NVDA")
stock_info = data.info
print(stock_info)

In [None]:
def get_huggingface_embeddings(text, model_name="sentence-transformers/all-mpnet-base-v2"):
    """
    Generates embeddings for the given text using a specified Hugging Face model.

    Args:
        text (str): The input text to generate embeddings for.
        model_name (str): The name of the Hugging Face model to use.
                          Defaults to "sentence-transformers/all-mpnet-base-v2".

    Returns:
        np.ndarray: The generated embeddings as a NumPy array.
    """
    model = SentenceTransformer(model_name)
    return model.encode(text)


def cosine_similarity_between_sentences(sentence1, sentence2):
    """
    Calculates the cosine similarity between two sentences.

    Args:
        sentence1 (str): The first sentence for similarity comparison.
        sentence2 (str): The second sentence for similarity comparison.

    Returns:
        float: The cosine similarity score between the two sentences,
               ranging from -1 (completely opposite) to 1 (identical).

    Notes:
        Prints the similarity score to the console in a formatted string.
    """
    # Get embeddings for both sentences
    embedding1 = np.array(get_huggingface_embeddings(sentence1))
    embedding2 = np.array(get_huggingface_embeddings(sentence2))

    # Reshape embeddings for cosine_similarity function
    embedding1 = embedding1.reshape(1, -1)
    embedding2 = embedding2.reshape(1, -1)

    # Calculate cosine similarity
    similarity = cosine_similarity(embedding1, embedding2)
    similarity_score = similarity[0][0]
    print(f"Cosine similarity between the two sentences: {similarity_score:.4f}")
    return similarity_score


# Example usage
sentence1 = "I like walking to the park"
sentence2 = "I like running to the office"

similarity = cosine_similarity_between_sentences(sentence1, sentence2)

In [None]:
aapl_info = get_stock_info('AAPL')
print(aapl_info)

In [None]:
aapl_description = aapl_info['Business Summary']

company_description = "I want to find companies that make smartphones and are headquarted in California"

similarity = cosine_similarity_between_sentences(aapl_description, company_description)

# Get all the Stocks in the Stock Market

First, we need to get the symbols (also known as tickers) of all the stocks in the stock market


In [None]:
def get_company_tickers():
    """
    Downloads and parses the Stock ticker symbols from the GitHub-hosted SEC company tickers JSON file.

    Returns:
        dict: A dictionary containing company tickers and related information.

    Notes:
        The data is sourced from the official SEC website via a GitHub repository:
        https://raw.githubusercontent.com/team-headstart/Financial-Analysis-and-Automation-with-LLMs/main/company_tickers.json
    """
    # URL to fetch the raw JSON file from GitHub
    url = "https://raw.githubusercontent.com/team-headstart/Financial-Analysis-and-Automation-with-LLMs/main/company_tickers.json"

    # Making a GET request to the URL
    response = requests.get(url)

    # Checking if the request was successful
    if response.status_code == 200:
        # Parse the JSON content directly
        company_tickers = json.loads(response.content.decode('utf-8'))

        # Optionally save the content to a local file for future use
        with open("company_tickers.json", "w", encoding="utf-8") as file:
            json.dump(company_tickers, file, indent=4)

        print("File downloaded successfully and saved as 'company_tickers.json'")
        return company_tickers
    else:
        print(f"Failed to download file. Status code: {response.status_code}")
        return None

company_tickers = get_company_tickers()

In [None]:
company_tickers

In [None]:
len(company_tickers)

In [None]:
pinecone_api_key = userdata.get("PINECONE_API_KEY")
os.environ['PINECONE_API_KEY'] = pinecone_api_key

index_name = "stocks"
namespace = "stock-descriptions"

hf_embeddings = HuggingFaceEmbeddings()
vectorstore = PineconeVectorStore(index_name=index_name, embedding=hf_embeddings)

# Sequential Processing

It will take very long to process all the stocks like this!

In [None]:
import time


In [None]:
for idx, stock in company_tickers.items():
    stock_ticker = stock['ticker']

    try:
        stock_data = get_stock_info(stock_ticker)
        stock_description = stock_data['Business Summary'] if stock_data['Business Summary'] != 'Information not available' else 'No description available'

        print(f"Processing stock {idx} / {len(company_tickers) - 1}: {stock_ticker}",end="\r")

        vectorstore_from_documents = PineconeVectorStore.from_documents(
            documents=[Document(page_content=stock_description, metadata=stock_data)],
            embedding=hf_embeddings,
            index_name=index_name,
            namespace=namespace
        )

        print(f"Successfully processed stock {idx} / {len(company_tickers) - 1}: {stock_ticker}")

        with open("successful_tickers.txt", "a") as success_file:
            success_file.write(f"{stock_ticker}\n")
    except Exception as e:
        print(f"Error processing stock {idx} / {len(company_tickers) - 1} ({stock_ticker}): {e}")
        with open("unsuccessful_tickers.txt", "a") as error_file:
            error_file.write(f"{stock_ticker}\n")

        if str(e) == "can't start new thread":
            print("Stock processing failed due to thread limit. Terminating the process...")
            break

    if int(idx) and int(idx) % 500 == 0:
        print("Sleeping for 2 minutes to avoid rate limiting...")
        time.sleep(120)

# Parallelizing



In [None]:
# Initialize tracking lists
successful_tickers = []
unsuccessful_tickers = []

# Load existing successful/unsuccessful tickers
try:
    with open('successful_tickers.txt', 'r') as f:
        successful_tickers = [line.strip() for line in f if line.strip()]
    print(f"Loaded {len(successful_tickers)} successful tickers")
except FileNotFoundError:
    print("No existing successful tickers file found")

try:
    with open('unsuccessful_tickers.txt', 'r') as f:
        unsuccessful_tickers = [line.strip() for line in f if line.strip()]
    print(f"Loaded {len(unsuccessful_tickers)} unsuccessful tickers")
except FileNotFoundError:
    print("No existing unsuccessful tickers file found")

In [None]:
def process_stock(stock_ticker: str) -> str:
    # Skip if already processed
    if stock_ticker in successful_tickers:
        return f"Already processed {stock_ticker}"

    try:
        # Get and store stock data
        stock_data = get_stock_info(stock_ticker)
        stock_description = stock_data['Business Summary']

        # Store stock description in Pinecone
        vectorstore_from_texts = PineconeVectorStore.from_documents(
            documents=[Document(page_content=stock_description, metadata=stock_data)],
            embedding=hf_embeddings,
            index_name=index_name,
            namespace=namespace
        )

        # Track success
        with open('successful_tickers.txt', 'a') as f:
            f.write(f"{stock_ticker}\n")
        successful_tickers.append(stock_ticker)

        return f"Processed {stock_ticker} successfully"

    except Exception as e:
        # Track failure
        with open('unsuccessful_tickers.txt', 'a') as f:
            f.write(f"{stock_ticker}\n")
        unsuccessful_tickers.append(stock_ticker)

        return f"ERROR processing {stock_ticker}: {e}"

def parallel_process_stocks(tickers: list, max_workers: int = 1) -> None:
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_ticker = {
            executor.submit(process_stock, ticker): ticker
            for ticker in tickers
        }

        for future in concurrent.futures.as_completed(future_to_ticker):
            ticker = future_to_ticker[future]
            try:
                result = future.result()
                print(result)

                # Stop on error
                if result.startswith("ERROR"):
                    print(f"Stopping program due to error in {ticker}")
                    executor.shutdown(wait=False)
                    raise SystemExit(1)

            except Exception as exc:
                print(f'{ticker} generated an exception: {exc}')
                print("Stopping program due to exception")
                executor.shutdown(wait=False)
                raise SystemExit(1)

# Prepare your tickers
tickers_to_process = [company_tickers[num]['ticker'] for num in company_tickers.keys()]

# Process them
parallel_process_stocks(tickers_to_process, max_workers= 1)

# Perform RAG

In [None]:
# Initialize Pinecone
pc = Pinecone(api_key=userdata.get("PINECONE_API_KEY"),)

# Connect to your Pinecone index
pinecone_index = pc.Index(index_name)

In [None]:
query = "What are some companies that manufacture consumer hardware?"

In [None]:
raw_query_embedding = get_huggingface_embeddings(query)

In [None]:
top_matches = pinecone_index.query(vector=raw_query_embedding.tolist(), top_k=10, include_metadata=True, namespace=namespace)

In [None]:
top_matches

In [None]:
contexts = [item['metadata']['text'] for item in top_matches['matches']]

In [None]:
augmented_query = "<CONTEXT>\n" + "\n\n-------\n\n".join(contexts[ : 10]) + "\n-------\n</CONTEXT>\n\n\n\nMY QUESTION:\n" + query

In [None]:
print(augmented_query)

# Setting up Groq for RAG




In [None]:
!pip install groq


In [None]:
from groq import Groq
client = Groq(
    api_key=userdata.get("GROQ_API_KEY"),
)

In [None]:
system_prompt = f"""You are an expert at providing answers about stocks. Please answer my question provided.
"""

chat_completion = client.chat.completions.create(
    model="llama-3.1-70b-versatile",
    messages=[
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": augmented_query}
    ]
)
response = chat_completion.choices[0].message.content

In [None]:
print(response)

In [None]:
!pip install streamlit pyngrok python-dotenv

In [None]:
!pip install groq


In [None]:
from threading import Thread
from pyngrok import ngrok
from google.colab import userdata
import os

In [None]:
ngrok_token = userdata.get('NGROK_AUTH_TOKEN')

ngrok.set_auth_token(ngrok_token)

In [None]:
def run_streamlit():
  os.system('streamlit run /content/app.py --server.port 8501')

In [None]:
%%writefile app.py
import streamlit as st
from rag import perform_rag  # Import the perform_rag function from your rag.py file

# Set the Streamlit app configuration
st.set_page_config(page_title="Stock Research Assistant", layout="wide")

# Title and Description
st.title("Stock Chatbot")
st.write(
    """
    Welcome to your Stocks chatbot! Engage in a conversation and receive answers based on your inputs.
    """
)

# Initialize session state for storing chat history
if "messages" not in st.session_state:
    st.session_state.messages = [{"role": "assistant", "content": "Hi! I'm here to answer your questions about stocks. How can I assist you today?"}]  # List to hold chat messages

# Display previous messages in the chat
for message in st.session_state.messages:
    with st.chat_message(message["role"]):  # `role` can be "user" or "assistant"
        st.markdown(message["content"])  # Render the message content

# Input box for user query
if user_input := st.chat_input("Ask your question about a stock..."):
    # Add user message to session state
    st.session_state.messages.append({"role": "user", "content": user_input})
    with st.chat_message("user"):
        st.markdown(user_input)  # Display user's message

    # Process the query with the RAG function
    with st.chat_message("assistant"):
        with st.spinner("Processing your query..."):
            try:
                # Call perform_rag to generate a response
                response = perform_rag(user_input)
                st.markdown(response)  # Display assistant's response
                # Add assistant's response to session state
                st.session_state.messages.append({"role": "assistant", "content": response})
            except Exception as e:
                error_message = f"An error occurred: {e}"
                st.error(error_message)
                st.session_state.messages.append({"role": "assistant", "content": error_message})

In [None]:
%%writefile rag.py
import requests
from pinecone import Pinecone, ServerlessSpec
from sentence_transformers import SentenceTransformer
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv()

# Set up API keys and configuration
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_ENVIRONMENT = 'us-east-1'
PINECONE_INDEX_NAME = 'stocks'
PINECONE_NAMESPACE = 'stock-descriptions'
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
GROQ_MODEL = "llama-3.1-70b-versatile"

def get_huggingface_embeddings(text, model_name="sentence-transformers/all-mpnet-base-v2"):
    model = SentenceTransformer(model_name)
    return model.encode(text)

# Initialize Pinecone
pc = Pinecone(
    api_key= PINECONE_API_KEY,
    spec=ServerlessSpec(cloud="aws", region=PINECONE_ENVIRONMENT)
)

pinecone_index = pc.Index(PINECONE_INDEX_NAME)
index_description = pc.describe_index(PINECONE_INDEX_NAME)
print(index_description)
def perform_rag(query):
    """
    Perform Retrieval-Augmented Generation (RAG) to answer a query.

    Args:
        query (str): User query.

    Returns:
        str: LLM-generated response.
    """
    # Step 1: Generate embeddings for the query
    raw_query_embedding = get_huggingface_embeddings(query)
    raw_query_embedding_list = raw_query_embedding.tolist()

    # Step 2: Query Pinecone for relevant contexts
    top_matches = pinecone_index.query(
        vector=raw_query_embedding_list,
        top_k=5,
        include_metadata=True,
        namespace=PINECONE_NAMESPACE
    )

    # Extract contexts from matches
    contexts = [item['metadata']['text'] for item in top_matches['matches']]

    # Step 3: Construct an augmented query
    augmented_query = (
        "<CONTEXT>\n"
        + "\n\n-------\n\n".join(contexts[:10])  # Use top 10 results
        + "\n-------\n</CONTEXT>\n\n\n\nMY QUESTION:\n" + query
    )

    # Step 4: Define the system prompt
    system_prompt = (
        "You are an expert at providing answers about stocks. Please answer my question provided."
        "Always explain your reasoning step by step."
    )

    # Step 5: Query the Llama 3.1 API
    headers = {
        "Authorization": f"Bearer {GROQ_API_KEY}",
        "Content-Type": "application/json",
    }
    payload = {
         "model": GROQ_MODEL,
        "messages": [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": augmented_query}
        ],
        "max_tokens": 1024,
        "temperature": 0.7
    }

    response = requests.post("https://api.groq.com/openai/v1/chat/completions", headers=headers, json=payload)
    # Debugging Groq response

    if response.status_code == 200:
        response_json = response.json()
        try:
            return response_json["choices"][0]["message"]["content"]
        except (KeyError, IndexError):
            return "No response text found in the Groq API response."
    else:
        raise ValueError(f"Error from Llama API: {response.status_code} - {response.text}")

In [None]:
%%writefile emeddings.py
from sentence_transformers import SentenceTransformer

def get_huggingface_embeddings(text, model_name="sentence-transformers/all-mpnet-base-v2"):
    model = SentenceTransformer(model_name)
    return model.encode(text)

In [None]:
thread = Thread(target=run_streamlit)
thread.start()

In [None]:
public_url = ngrok.connect(addr='8501', proto='http', bind_tls=True)

print("Public URL: ", public_url)

In [None]:
tunnels = ngrok.get_tunnels()
for tunnel in tunnels:
  print(f"Closing tunnel: {tunnel.public_url} -> {tunnel.config['addr']}")
  ngrok.disconnect(tunnel.public_url)

In [None]:
%%writefile .env

GROQ_API_KEY =
NGROK_AUTH_TOKEN =
PINECONE_API_KEY =