### Telegram Chatbot for Finance

In [None]:
# Find the chatbot as following per the link:
# t.me/fin_sandbox_bot

In [None]:
# Imports
from sentence_transformers import SentenceTransformer
import faiss
from langchain.chains import RetrievalQA
from langchain.llms import HuggingFacePipeline
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from transformers import pipeline
import numpy as np
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
import logging
import os

###########################
# Documents and Embeddings
###########################

# Load a pre-trained Sentence Transformer model for embeddings
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

# Load documents
loader = TextLoader('TSLA_description.txt')
documents = loader.load()

# Split into chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = text_splitter.split_documents(documents)

# Generate embeddings
embeddings = [embedding_model.encode(chunk.page_content) for chunk in chunks]
embeddings_np = np.array(embeddings)

# Create FAISS index
index = faiss.IndexFlatL2(embeddings_np.shape[1])
index.add(embeddings_np)

# Create LangChain vectorstore using HuggingFace embeddings
embeddings_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
vectorstore = FAISS.from_documents(chunks, embeddings_model)

###########################
# Retrieval and LLM Chain
###########################

# Use a text2text-generation model (Flan-T5 small works well)
gen_pipeline = pipeline(
    "text2text-generation",
    model="google/flan-t5-small"
)

# Wrap the pipeline in HuggingFacePipeline
llm_chain = HuggingFacePipeline(pipeline=gen_pipeline)

# Create RetrievalQA chain with the top 3 chunks being selected - short most relevant responses
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})  # Top 3 chunks
qa_chain = RetrievalQA.from_chain_type(
    llm=llm_chain,
    chain_type="stuff",
    retriever=retriever
)

# # Create RetrievalQA chain - for longer answers
# qa_chain = RetrievalQA.from_chain_type(
#     llm=llm_chain,
#     chain_type="stuff",
#     retriever=vectorstore.as_retriever()
# )

###########################
# Telegram Bot Handler
###########################-

# # /start command
# async def start(update: Update, context: CallbackContext) -> None:
#     await update.message.reply_text("Hello! Send me any question about TSLA and I'll try to answer it.")

# Handle user questions
async def handle_message(update: Update, context: CallbackContext) -> None:
    query = update.message.text
    response = qa_chain.run(query)
    await update.message.reply_text(response)



###########################
# Retrieval from API
###########################
import logging
import nest_asyncio  # Import nest_asyncio
import asyncio
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
import yfinance as yf

import os
from dotenv import load_dotenv

# Set httpx logging level to CRITICAL to suppress almost everything
logging.getLogger("httpx").setLevel(logging.CRITICAL)


# Load environment variables from the .env file
load_dotenv()

# Access the API tokens
#openai_api_key = os.getenv('OPENAI_API_KEY')
telegram_api_key = os.getenv('TELEGRAM_API_KEY')

# Apply nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Set up logging to help with debugging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                    level=logging.INFO)
logger = logging.getLogger(__name__)

# # Function to fetch stock information
# def get_stock_info(ticker):
#     try:
#         stock = yf.Ticker(ticker)
#         info = stock.info
#         price = info.get('currentPrice', 'No data available')
#         market_cap = info.get('marketCap', 'No data available')
#         return f"Price: ${price}\nMarket Cap: {market_cap}"
#     except Exception as e:
#         return "Error fetching stock data. Please try again later."

def get_stock_info(ticker, retries=3, delay=5):
    for attempt in range(retries):
        try:
            stock = yf.Ticker(ticker)
            stock_info = stock.info
            price = stock_info.get("currentPrice", "No data available")
            return price
        except Exception as e:
            print(f"Attempt {attempt+1}: Error fetching data for {ticker}: {e}")
            time.sleep(delay)  # Wait before retrying
    return "Error fetching stock data after multiple attempts."

# Command to start the bot
async def start(update: Update, context: CallbackContext) -> None:
    await update.message.reply_text('Hello! I am your finance bot. Ask me about stock prices by typing "start/ AAPL" to find Apple market price. Otherwise ask a question about Tesla.')

# Handler to process stock symbol queries
async def get_stock(update: Update, context: CallbackContext) -> None:
    if context.args:
        ticker = context.args[0].upper()  # Get stock ticker symbol (e.g., AAPL)
        stock_info = get_stock_info(ticker)
        await update.message.reply_text(f"The current stock price is: {stock_info}")
    else:
        await update.message.reply_text("Please provide a stock ticker (e.g., AAPL).")


###########################
# Main Application
###########################

# Main function to start the bot
async def main() -> None:
    # Get your bot's API token from BotFather
    token = telegram_api_key

    # Create an Application object and pass in the bot's API token
    application = Application.builder().token(token).build()

    # Register the start command handler
    application.add_handler(CommandHandler("start", start))

    # Register a handler for stock price queries
    application.add_handler(CommandHandler("stock", get_stock))

    # Register a handler for all messages (in case someone sends a message without a command)
    #application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, get_stock))

    # ----- langchain q-a integration--------------
    # Register handlers: the q-a feature
    application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))

    # Start the bot (non-blocking)
    await application.run_polling()

# Use the existing event loop instead of asyncio.run()
if __name__ == '__main__':
    import nest_asyncio
    nest_asyncio.apply()  # Necessary for running async in Jupyter or some IDEs
    import asyncio
    asyncio.run(main())

  from .autonotebook import tqdm as notebook_tqdm
Device set to use mps:0
2026-01-07 16:33:12,323 - telegram.ext.Application - INFO - Application started
Batches: 100%|████████████████████████████████████| 1/1 [00:00<00:00, 13.40it/s]


In [None]:
#---------Download Company Description from Yahoo Finance and Saved as Txt File


# import yfinance as yf
# import os

# # Function to fetch and save company description from Yahoo Finance
# def fetch_and_save_description(ticker):
#     try:
#         stock = yf.Ticker(ticker)
#         company_info = stock.info
#         description = company_info.get("longBusinessSummary", "No description available.")
        
#         # Save the description to a .txt file
#         with open(f"{ticker}_description.txt", "w", encoding="utf-8") as file:
#             file.write(description)
        
#         print(f"Description for {ticker} saved to {ticker}_description.txt")
#     except Exception as e:
#         print(f"Error fetching data for {ticker}: {str(e)}")

# # Fetch and save descriptions for multiple companies
# company_tickers = ["AAPL", "TSLA", "GOOGL"]
# for ticker in company_tickers:
#     fetch_and_save_description(ticker)
