## Function Tool Calling + RAG

In [1]:
# Install necessary libraries
!pip install --quiet --upgrade langchain-text-splitters langchain-community langchain-google-genai langchain-pinecone
!pip install -qU pypdf tqdm uuid

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.5 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m2.5/2.5 MB[0m [31m92.7 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m40.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.5/41.5 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m34.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m427.3/427.3 kB[0m [31m21.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.6/49.6 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m87.5/87.5 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[

In [2]:
!pip install langchain --upgrade



## function tool calling with COT report

In [14]:
!pip install plotly pandas numpy python-dotenv

# !pip install langchain-community



In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [9]:
import os
import pandas as pd
import numpy as np
from typing import List, Dict, Union, Optional, Tuple, Any
from datetime import datetime
from dataclasses import dataclass
from functools import lru_cache
from dotenv import load_dotenv
import uuid
import re
import json
from pathlib import Path

# LangChain imports
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.agents import Tool, AgentExecutor, AgentType, initialize_agent
from langchain.memory.chat_message_histories import ChatMessageHistory
from langchain.memory import ConversationBufferMemory
from langchain.chains import RetrievalQAWithSourcesChain, LLMChain
from langchain.prompts import PromptTemplate
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_pinecone import PineconeVectorStore
from pinecone import Pinecone, ServerlessSpec
from langchain.tools import tool
# from langchain.chains import ConversationalChain
# from langchain.agents import Tool, ToolAgent, ToolAgentExecutor
# Load environment variables
load_dotenv()

@dataclass
class COTAnalysisResult:
    """Data class to store analysis results"""
    analysis_type: str
    symbol: str
    data: Union[pd.DataFrame, Dict, str]
    timestamp: datetime = datetime.now()
    metadata: Dict = None

class MarketDataManager:
    """Manages market data loading and preprocessing"""

    def __init__(self, data_folder: str):
        self.data_folder = Path(data_folder)
        self.data: Optional[pd.DataFrame] = None
        self._load_data()

    def _load_data(self) -> None:
        """Load data from Excel files"""
        try:
            excel_files = list(self.data_folder.glob('*.xls*'))
            if not excel_files:
                raise FileNotFoundError(f"No Excel files found in {self.data_folder}")

            dfs = []
            for file in excel_files:
                try:
                    df = pd.read_excel(file)
                    dfs.append(df)
                except Exception as e:
                    print(f"Error loading {file}: {str(e)}")

            if not dfs:
                raise ValueError("No valid data loaded")

            self.data = pd.concat(dfs, ignore_index=True)
            self._process_data()

        except Exception as e:
            raise RuntimeError(f"Data loading error: {str(e)}")

    def _process_data(self) -> None:
        """Process and clean the loaded data"""
        if self.data is None:
            return

        # Basic preprocessing
        self.data['Date'] = pd.to_datetime(self.data['Date'])
        self.data.sort_values(['Symbol', 'Date'], inplace=True)

        # Calculate derived metrics
        self._calculate_market_metrics()

    def _calculate_market_metrics(self) -> None:
        """Calculate additional market metrics"""
        if self.data is None:
            return

        # Calculate rolling metrics
        groups = self.data.groupby('Symbol')

        # Momentum indicators
        for window in [4, 12, 26]:
            self.data[f'momentum_{window}w'] = groups['Commercial Net Positions'].transform(
                lambda x: x.diff(window).rolling(window).mean()
            )

        # Volatility indicators
        self.data['volatility'] = groups['Commercial Net Positions'].transform(
            lambda x: x.rolling(12).std()
        )

class PineconeManager:
    """Manages Pinecone vector store operations"""

    def __init__(self, api_key: str, index_name: str, dimension: int = 384):
        self.api_key = api_key
        self.index_name = index_name
        self.dimension = dimension
        self.pc = Pinecone(api_key=api_key)
        self._initialize_index()

    def _initialize_index(self) -> None:
        """Initialize or connect to Pinecone index"""
        try:
            if self.index_name not in self.pc.list_indexes():
                self.pc.create_index(
                    name=self.index_name,
                    dimension=self.dimension,
                    metric="cosine",
                    spec=ServerlessSpec(cloud="aws", region="us-east-1")
                )
            self.index = self.pc.Index(self.index_name)
        except Exception as e:
            raise RuntimeError(f"Pinecone initialization error: {str(e)}")

    def store_analysis(self, analysis_result: COTAnalysisResult, embedding: List[float]) -> str:
        """Store analysis result in vector store"""
        try:
            doc_id = str(uuid.uuid4())
            metadata = {
                "symbol": analysis_result.symbol,
                "type": analysis_result.analysis_type,
                "timestamp": analysis_result.timestamp.isoformat(),
                "data": json.dumps(analysis_result.data)
            }
            if analysis_result.metadata:
                metadata.update(analysis_result.metadata)

            self.index.upsert(vectors=[(doc_id, embedding, metadata)])
            return doc_id
        except Exception as e:
            print(f"Error storing analysis: {str(e)}")
            return None

class MarketAnalyzer:
    """Enhanced market analysis with ML integration"""

    def __init__(self, data_manager: MarketDataManager):
        self.data_manager = data_manager
        self.analysis_cache = {}

    @lru_cache(maxsize=100)
    def get_symbol_data(self, symbol: str, lookback: int = 52) -> Optional[pd.DataFrame]:
        """Get processed data for a symbol"""
        if self.data_manager.data is None:
            return None

        symbol_data = self.data_manager.data[
            self.data_manager.data['Symbol'] == symbol
        ].copy()
        return symbol_data.tail(lookback)

    def analyze_sentiment(self, symbol: str) -> Dict:
        """Advanced sentiment analysis"""
        try:
            data = self.get_symbol_data(symbol)
            if data is None:
                return {"error": f"Symbol {symbol} not found"}

            # Calculate weighted sentiment score
            sentiment_components = {
                'spec': (data['Specs Net Pct OI'].mean(), 0.5),
                'comm': (data['Comms Net PCT OI'].mean(), 0.3),
                'small': (data['Smalls Net Pct OI'].mean(), 0.2)
            }

            sentiment_score = sum(
                np.sign(val) * weight
                for val, weight in sentiment_components.values()
            )

            # Enhanced analysis
            analysis = {
                'symbol': symbol,
                'overall_sentiment': 'Bullish' if sentiment_score > 0 else 'Bearish',
                'sentiment_score': float(sentiment_score),
                'components': {
                    name: {
                        'value': float(val),
                        'weight': weight,
                        'contribution': float(np.sign(val) * weight)
                    }
                    for name, (val, weight) in sentiment_components.items()
                },
                'momentum_indicators': {
                    f'{period}w': float(data[f'momentum_{period}w'].iloc[-1])
                    for period in [4, 12, 26]
                },
                'volatility': float(data['volatility'].iloc[-1])
            }

            return analysis
        except Exception as e:
            return {"error": f"Analysis error: {str(e)}"}

class COTAnalysisSystem:
    """Integrated COT analysis system with LLM capabilities"""

    def __init__(self,
                 data_folder: str,
                 pinecone_api_key: str,
                 index_name: str):
        """Initialize the integrated analysis system"""

        # Initialize components
        self.data_manager = MarketDataManager(data_folder)
        self.market_analyzer = MarketAnalyzer(self.data_manager)
        self.pinecone_manager = PineconeManager(pinecone_api_key, index_name)

        # Setup LLM and embeddings
        self.llm = ChatGoogleGenerativeAI(
            model="gemini-1.5-flash",
            temperature=0.7
        )
        self.embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-MiniLM-L6-v2"
        )

        # Setup vector store and retriever
        self.vector_store = PineconeVectorStore(
            embedding=self.embeddings,
            index=self.pinecone_manager.index
        )

        # Define custom tools
        @tool("analyze_sentiment", return_direct=True)
        def analyze_sentiment_tool(symbol: str) -> str:
            """Tool for sentiment analysis"""
            try:
                symbol = symbol.strip().upper()
                result = self.market_analyzer.analyze_sentiment(symbol)
                if "error" in result:
                    return result["error"]

                analysis_result = COTAnalysisResult(
                    analysis_type="sentiment",
                    symbol=symbol,
                    data=result,
                    metadata={"tool": "sentiment_analysis"}
                )

                embedding = self.embeddings.embed_query(f"sentiment analysis {symbol}")
                self.pinecone_manager.store_analysis(analysis_result, embedding)

                response = f"""Market Sentiment Analysis for {symbol}:
Overall Sentiment: {result['overall_sentiment']}
Sentiment Score: {result['sentiment_score']:.2f}
Momentum Indicators:"""

                for period, value in result['momentum_indicators'].items():
                    response += f"\n{period} Momentum: {value:.2f}"

                response += f"\nVolatility: {result['volatility']:.2f}"

                response += "\n\nComponent Analysis:"
                for comp_name, comp_data in result['components'].items():
                    response += f"\n{comp_name.title()}: {comp_data['value']:.2f} (Weight: {comp_data['weight']:.1f})"

                return response

            except Exception as e:
                return f"Error in sentiment analysis: {str(e)}"

        @tool("analyze_trends", return_direct=True)
        def analyze_trends_tool(symbol: str) -> str:
            """Tool for trend analysis"""
            try:
                symbol = symbol.strip().upper()
                data = self.market_analyzer.get_symbol_data(symbol)
                if data is None:
                    return f"Symbol {symbol} not found"

                analysis = {
                    'momentum': {
                        f'{period}w': float(data[f'momentum_{period}w'].iloc[-1])
                        for period in [4, 12, 26]
                    },
                    'volatility': float(data['volatility'].iloc[-1]),
                    'positioning': {
                        'commercial': float(data['Commercial Net Positions'].diff().mean()),
                        'speculator': float(data['Speculators Net Positions'].diff().mean()),
                        'small': float(data['Small Traders Net Positions'].diff().mean())
                    }
                }

                analysis_result = COTAnalysisResult(
                    analysis_type="trends",
                    symbol=symbol,
                    data=analysis,
                    metadata={"tool": "trend_analysis"}
                )

                embedding = self.embeddings.embed_query(f"trend analysis {symbol}")
                self.pinecone_manager.store_analysis(analysis_result, embedding)

                response = f"""Market Trend Analysis for {symbol}:

Momentum Analysis:
- 4-Week Momentum: {analysis['momentum']['4w']:.2f}
- 12-Week Momentum: {analysis['momentum']['12w']:.2f}
- 26-Week Momentum: {analysis['momentum']['26w']:.2f}

Volatility: {analysis['volatility']:.2f}

Positioning Trends:
- Commercial: {"Increasing" if analysis['positioning']['commercial'] > 0 else "Decreasing"}
  (Change: {analysis['positioning']['commercial']:.2f})
- Speculator: {"Increasing" if analysis['positioning']['speculator'] > 0 else "Decreasing"}
  (Change: {analysis['positioning']['speculator']:.2f})
- Small Traders: {"Increasing" if analysis['positioning']['small'] > 0 else "Decreasing"}
  (Change: {analysis['positioning']['small']:.2f})"""

                return response

            except Exception as e:
                return f"Error in trend analysis: {str(e)}"

        @tool("search_historical", return_direct=True)
        def search_historical_tool(query: str) -> str:
            """Tool for searching historical analysis"""
            try:
                docs = self.retriever.get_relevant_documents(query)
                if not docs:
                    return "No relevant historical analysis found"

                results = []
                for doc in docs:
                    metadata = doc.metadata
                    if 'data' in metadata:
                        analysis_data = json.loads(metadata['data'])
                        results.append(f"""Analysis from {metadata['timestamp']}:
Type: {metadata['type']}
Symbol: {metadata['symbol']}
Key Findings:
{json.dumps(analysis_data, indent=2)}
---""")

                return "\n\n".join(results)

            except Exception as e:
                return f"Error searching historical analysis: {str(e)}"

        self.tools = [analyze_sentiment_tool, analyze_trends_tool, search_historical_tool]

    def process_query(self, query: str) -> Tuple[str, str]:
        """Process a user query"""
        try:
            response = self.llm.invoke(f"Use the available tools to answer the following question: {query}")

            doc_id = str(uuid.uuid4())
            metadata = {
                "query": query,
                "response": response,
                "timestamp": datetime.now().isoformat()
            }

            self.vector_store.index.upsert(
                vectors=[(
                    doc_id,
                    self.embeddings.embed_query(query),
                    metadata
                )]
            )

            return response, ""

        except Exception as e:
            return f"Error processing query: {str(e)}", ""

def main():
    """Main function"""
    try:
        from google.colab import userdata
        creds = userdata.get('Piaic-Tattari')
        pinecone_api_key = userdata.get('PINECONE_API_KEY_2')

        system = COTAnalysisSystem(
            data_folder="/content/drive/MyDrive/cot+excel",
            pinecone_api_key=pinecone_api_key,
            index_name="quickstart411"
        )

        # Example usage
        # query = "What's the market sentiment for AD?"
        # response, sources = system.process_query(query)
        # print("Response:", response)
        # print("\nSources:", sources)
        # Process a query
        response, sources = system.process_query("What's the market sentiment for AD?")
        print("Response:", response)

    except Exception as e:
        print(f"Error in main: {str(e)}")

if __name__ == "__main__":
    main()

ERROR:grpc._plugin_wrapping:AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x7943a06066b0>" raised exception!
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/google/auth/compute_engine/credentials.py", line 128, in refresh
    self._retrieve_info(request)
  File "/usr/local/lib/python3.10/dist-packages/google/auth/compute_engine/credentials.py", line 101, in _retrieve_info
    info = _metadata.get_service_account_info(
  File "/usr/local/lib/python3.10/dist-packages/google/auth/compute_engine/_metadata.py", line 323, in get_service_account_info
    return get(request, path, params={"recursive": "true"})
  File "/usr/local/lib/python3.10/dist-packages/google/auth/compute_engine/_metadata.py", line 248, in get
    raise exceptions.TransportError(
google.auth.exceptions.TransportError: ("Failed to retrieve http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/?recursive=true from the Go

KeyboardInterrupt: 