In [3]:
# Imports and Setup
import os
import re
import json
import requests
import pandas as pd
from typing import List, Dict, Any, Optional, Union
from dataclasses import dataclass
from enum import Enum

# LangGraph and LangChain imports
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from typing import Annotated
from typing_extensions import TypedDict
from langchain_core.messages import HumanMessage, AIMessage

from langchain.tools import tool

# LLM imports
from llama_index.llms.ollama import Ollama

# Additional tools
import yt_dlp
import whisper
from PIL import Image
import subprocess

In [4]:
whisper

<module 'whisper' from '/Users/maksim.rostov/pdev/freestyling/agents/hf-course/.conda/lib/python3.12/site-packages/whisper.py'>

In [None]:
# Configuration and LLM Setup
REQUEST_TIMEOUT = 300
CONTEXT_WINDOW = 80000
MODEL_NAME = "qwen2:7b"

# Initialize Ollama LLM
llm = Ollama(
    model=MODEL_NAME, 
    context_window=CONTEXT_WINDOW, 
    request_timeout=REQUEST_TIMEOUT
)

# Test LLM connection
test_response = llm.complete("Hello")
print(f"LLM initialized: {test_response.text[:50]}...")

# Configuration
MAX_RETRIES = 3
TEMP_DIR = "./temp_files"
os.makedirs(TEMP_DIR, exist_ok=True)

print("Configuration completed")

In [None]:
# Question Classification System
@dataclass
class QuestionAnalysis:
    question_analysis: str
    deep_web_search: bool  
    video_processing: bool 
    audio_prcessing: bool 
    image_processing: bool 
    structured_data_processing: bool    
    unstructered_data_processing: bool  
    code_math_writing: bool 

def classify_question(question: str, attachments: List[str] = None) -> QuestionAnalysis:
    """
    Use LLM to analyze a question and determine what capabilities/steps are needed.
    
    Args:
     question (str): text of the question 
     attachments (str): list of files that are related to the question 

    Returns: 
        QuestionAnalysis: dataclass that describes what answering this question requires 
    """
    attachment_info = ""
    if attachments:
        attachment_info = f"\n\nAttachments mentioned: {', '.join(attachments)}"
    
    classification_prompt = f"""You are an expert question analyzer for a generalist AI agent. Analyze the following question and break down exactly what capabilities are needed to answer it.

Question: {question}
Attachments: {attachment_info}

Your task is to provide a detailed analysis of what this question requires. Consider these capabilities:
- Deep web research: executing search queries on the web, iterating over results and trying to find the answer in multiple web search steps
- Video analysis: downloading and processing a video, extracting its content in the form of image frames or audio, and answering queries about the video 
- Audio processing: dowloading and processing audio, analysing the voice and the content
- Image analysis: visual analysis of an image 
- Structured data processing: analysis of table data or json documents, answering queries against them and/or visualising the data  
- Unstructered data processing: raw text analys, summarisation, entity or sentiment extraction  
- Code and mathematical computations: creating or executing code, processing and/or analysing code bits, creating and/or computing mathematical expressions

Instructions:
1. Provide what capabilities would need to be triggered to answer the question
2. If multiple functionalities are required, describe them briefly and suggest an order
3. Determine if files, web access, or computation are needed

Example:
Question: "What is the price trend of NVIDIA stock?"
Analysis: "Deep we research: looking up current and historical NVIDIA stock price data from financial websites, finding financial articles that describe the trand. Unstructered data processing: analyzing the text data found before."

Question: "What is the artistic style of this video https://www.youtube.com/watch?v=fLu080UX25o"?
Analysis: "Process the video: download it, capture fragments of it, evaluate the fragments"  

Respond in this exact JSON format:
{{
    "question_analysis": "Breakdown of what this question requires, including all necessary capabilities and processes",
    "deep_web_search": True/False,  
    "video_processing": True/False, 
    "audio_prcessing": True/False, 
    "image_processing": True/False, 
    "structured_data_processing": True/False,    
    "unstructered_data_processing": True/False,  
    "code_math_writing": True/False, 
}}

Focus on being precise and concise - if a question needs multiple steps or capabilities, describe them all clearly without being verbose."""

    try:
        response = llm.complete(classification_prompt)
        
        # Parse JSON response
        response_text = response.text.strip()
        if response_text.startswith('```json'):
            response_text = response_text.replace('```json', '').replace('```', '').strip()
        
        result = json.loads(response_text)
        
        return QuestionAnalysis(
            question_analysis=result["question_analysis"],
            requires_files=result["requires_files"],
            requires_web=result["requires_web"],
            requires_computation=result["requires_computation"]
        )
        
    except Exception as e:
        print(f"Error in classification: {e}")
        # Fallback to simple classification
        return QuestionAnalysis(
            question_analysis=f"General research and analysis required for: {question}",
            requires_files=bool(attachments),
            requires_web=True,
            requires_computation=False
        )

print("LLM-based question analysis system implemented")

In [None]:
# Web Search Tool using DuckDuckGo
from duckduckgo_search import DDGS

@tool
def web_search(query: str, max_results: int = 5) -> str:
    """
    Search the web for information using DuckDuckGo search.
    """
    try:
        with DDGS() as ddgs:
            results = list(ddgs.text(query, max_results=max_results))
            
            if not results:
                return f"No search results found for '{query}'"
            
            formatted_results = []
            for i, result in enumerate(results, 1):
                title = result.get('title', 'No title')
                body = result.get('body', 'No description')
                href = result.get('href', 'No URL')
                
                formatted_results.append(f"{i}. {title}\n   {body}\n   URL: {href}")
            
            return f"Web search results for '{query}':\n\n" + "\n\n".join(formatted_results)
    
    except Exception as e:
        return f"Error performing web search: {e}"

print("Web search tool implemented successfully")

In [None]:
# Image Analyzer Tool with LLaVA Integration
from PIL import Image
import base64
import io
from typing import Literal

@tool
def image_analyzer_llava(image_path: str, task: str = Literal["describe", "text"]) -> str:
    """
    Analyze images using local LLaVA instance - describe content, analyze chess positions, read text, etc.
    """
    try:
        # Check if file exists
        if not os.path.exists(image_path):
            return f"Image file not found: {image_path}"
        
        # Load and process image
        image = Image.open(image_path)
        
        # Get basic image information
        width, height = image.size
        mode = image.mode
        format_type = image.format
        file_size = os.path.getsize(image_path)
        file_name = os.path.basename(image_path)
        
        # Convert image to base64 for API transmission
        buffered = io.BytesIO()
        image.save(buffered, format="PNG")
        img_base64 = base64.b64encode(buffered.getvalue()).decode()
        
        # Create task-specific prompts
        if task == "describe":
            prompt = "Describe what you see in this image in detail."
        elif task == "text":
            prompt = "Extract and read any text visible in this image."
        else:
            prompt = f"Analyze this image for the following task: {task}"
        
        # Placeholder for LLaVA API call
        llava_response = send_to_llava(img_base64, prompt)
        
        basic_info = f"Image: {file_name}\nSize: {width}x{height}\nMode: {mode}\nFormat: {format_type}\nFile size: {file_size} bytes"
        
        return f"{basic_info}\n\nLLaVA Analysis:\n{llava_response}"
    
    except Exception as e:
        return f"Error analyzing image '{image_path}': {e}"

def send_to_llava(image_base64: str, prompt: str) -> str:
    """
    Placeholder function to send image to local LLaVA instance.
    Replace this with actual API call to your LLaVA server.
    """
    try:
        # Placeholder for actual LLaVA API integration
        # This would typically be a POST request to localhost:11434 or similar
        
        # Example of what the actual implementation might look like:
        import requests
        
        payload = {
            "model": "llava",
            "prompt": prompt,
            "images": [image_base64],
            "stream": False
        }
        
        response = requests.post("http://localhost:11434/api/generate", 
                               json=payload, 
                               timeout=60)
        
        if response.status_code == 200:
            return response.json().get("response", "No response from LLaVA")
        else:
            return f"LLaVA API error: {response.status_code}"
    
    except Exception as e:
        return f"Error communicating with LLaVA: {e}"

In [None]:
# File Processor Tool
import pandas as pd
import csv

@tool
def file_processor(file_path: str, file_type: str = "auto") -> str:
    """
    Process various file types - Excel files, CSV files, Parquet files, text files, etc.
    """
    try:
        # Check if file exists
        if not os.path.exists(file_path):
            return f"File not found: {file_path}"
        
        file_name = os.path.basename(file_path)
        file_size = os.path.getsize(file_path)
        
        if file_path.endswith(('.xlsx', '.xls')):
            # Process Excel files
            df = pd.read_excel(file_path)
            
            # Basic analysis
            shape_info = f"Shape: {df.shape[0]} rows, {df.shape[1]} columns"
            columns_info = f"Columns: {list(df.columns)}"
            
            # Calculate totals for numeric columns
            numeric_cols = df.select_dtypes(include=['number']).columns
            totals_info = ""
            if len(numeric_cols) > 0:
                totals = df[numeric_cols].sum()
                totals_info = f"Column totals: {totals.to_dict()}"
            
            sample_data = f"First 5 rows:\n{df.head().to_string()}"
            
            return f"Excel file: {file_name}\nFile size: {file_size} bytes\n{shape_info}\n{columns_info}\n{totals_info}\n\n{sample_data}"
        
        elif file_path.endswith('.csv'):
            # Process CSV files with automatic delimiter detection
            def detect_delimiter(file_path):
                with open(file_path, 'r', encoding='utf-8') as f:
                    sample = f.read(1024)
                    sniffer = csv.Sniffer()
                    delimiter = sniffer.sniff(sample).delimiter
                    return delimiter
            
            try:
                delimiter = detect_delimiter(file_path)
                df = pd.read_csv(file_path, delimiter=delimiter)
                
                # Basic analysis
                shape_info = f"Shape: {df.shape[0]} rows, {df.shape[1]} columns"
                columns_info = f"Columns: {list(df.columns)}"
                delimiter_info = f"Detected delimiter: '{delimiter}'"
                
                # Calculate totals for numeric columns
                numeric_cols = df.select_dtypes(include=['number']).columns
                totals_info = ""
                if len(numeric_cols) > 0:
                    totals = df[numeric_cols].sum()
                    totals_info = f"Column totals: {totals.to_dict()}"
                
                sample_data = f"First 5 rows:\n{df.head().to_string()}"
                
                return f"CSV file: {file_name}\nFile size: {file_size} bytes\n{delimiter_info}\n{shape_info}\n{columns_info}\n{totals_info}\n\n{sample_data}"
            
            except Exception as csv_error:
                # Fallback to text processing if CSV parsing fails
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                
                line_count = len(content.split('\n'))
                preview = content[:500] + "..." if len(content) > 500 else content
                
                return f"CSV file (read as text due to parsing error): {file_name}\nFile size: {file_size} bytes\nLines: {line_count}\nError: {csv_error}\n\nContent preview:\n{preview}"
        
        elif file_path.endswith('.parquet'):
            # Process Parquet files
            df = pd.read_parquet(file_path)
            
            # Basic analysis
            shape_info = f"Shape: {df.shape[0]} rows, {df.shape[1]} columns"
            columns_info = f"Columns: {list(df.columns)}"
            
            # Calculate totals for numeric columns
            numeric_cols = df.select_dtypes(include=['number']).columns
            totals_info = ""
            if len(numeric_cols) > 0:
                totals = df[numeric_cols].sum()
                totals_info = f"Column totals: {totals.to_dict()}"
            
            sample_data = f"First 5 rows:\n{df.head().to_string()}"
            
            return f"Parquet file: {file_name}\nFile size: {file_size} bytes\n{shape_info}\n{columns_info}\n{totals_info}\n\n{sample_data}"
        
        else:
            # Read as text file for all other formats (including Python files)
            try:
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
            except UnicodeDecodeError:
                # Try with different encoding if UTF-8 fails
                with open(file_path, 'r', encoding='latin-1') as f:
                    content = f.read()
            
            line_count = len(content.split('\n'))
            word_count = len(content.split())
            char_count = len(content)
            
            preview = content[:500] + "..." if len(content) > 500 else content
            
            file_type_desc = "Python file" if file_path.endswith('.py') else "Text file"
            
            return f"{file_type_desc}: {file_name}\nFile size: {file_size} bytes\nLines: {line_count}\nWords: {word_count}\nCharacters: {char_count}\n\nContent preview:\n{preview}"
    
    except Exception as e:
        return f"Error processing file '{file_path}': {e}"

print("File processor tool implemented successfully")