In [1]:
import json
import logging
import os
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Literal, Optional
from uuid import uuid4
import boto3
from dotenv import load_dotenv



logger = logging.getLogger()

In [2]:
import logging
import sys
from datetime import datetime
from IPython.display import display, HTML

class VSCodeNotebookHandler(logging.Handler):
    def __init__(self):
        super().__init__()
        self.log_buffer = []

    def emit(self, record):
        try:
            # Format the message
            msg = self.format(record)
            
            # Define colors for different log levels
            colors = {
                logging.DEBUG: '#6c757d',    # gray
                logging.INFO: '#0d6efd',     # blue
                logging.WARNING: '#ffc107',   # yellow
                logging.ERROR: '#dc3545',     # red
                logging.CRITICAL: '#721c24'   # dark red
            }
            
            # Get color for current log level
            color = colors.get(record.levelno, '#000000')
            
            # Create HTML formatted log entry
            log_entry = f"""
            <div style='
                padding: 2px 6px;
                margin: 2px 0;
                border-left: 3px solid {color};
                background-color: #f8f9fa;
                font-family: Monaco, monospace;
                font-size: 12px;
            '>
                <span style='color: {color}; font-weight: bold;'>{record.levelname}</span>
                <span style='color: #666; margin-right: 8px;'>{datetime.now().strftime('%H:%M:%S')}</span>
                <span>{msg}</span>
            </div>
            """
            
            # Display the formatted log entry
            display(HTML(log_entry))
            
            # Also print to stderr for VSCode's native console
            print(msg, file=sys.stderr)
            
        except Exception as e:
            print(f"Error in log handler: {str(e)}", file=sys.stderr)

def setup_notebook_logging(level=logging.INFO):
    """
    Configure logging for VSCode Jupyter notebooks
    
    Args:
        level: The logging level to use (default: INFO)
    Returns:
        logger: Configured logger instance
    """
    # Create or get the root logger
    logger = logging.getLogger()
    
    # Remove any existing handlers
    for handler in logger.handlers[:]:
        logger.removeHandler(handler)
    
    # Set the logging level
    logger.setLevel(level)
    
    # Create and configure the VSCode notebook handler
    handler = VSCodeNotebookHandler()
    formatter = logging.Formatter('%(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    
    return logger

# Usage example:
logger = setup_notebook_logging(logging.INFO)

# Test the logger
def test_logging():
    logger.debug("This is a debug message")
    logger.info("This is an info message")
    logger.warning("This is a warning message")
    logger.error("This is an error message")
    logger.critical("This is a critical message")

In [3]:
logger = setup_notebook_logging()

In [4]:
# Load .env file
load_dotenv()
aws_profile = os.getenv('AWS_PROFILE')
llm_region = os.getenv('LLM_REGION')
llm_model = os.getenv('LLM_MODEL')
print(f"Using AWS Profile: {aws_profile}")
print(f"Using AWS Region: {llm_region}")
print(f"Using LLM model: {llm_model}")

Using AWS Profile: aws-prototype
Using AWS Region: us-east-1
Using LLM model: anthropic.claude-3-5-sonnet-20240620-v1:0


In [5]:
from langchain import hub
from langchain.agents import (
    AgentExecutor,
    tool,
    create_tool_calling_agent,
    create_json_chat_agent,
)
from langchain_core.tools import StructuredTool, Tool
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_aws import ChatBedrock
from langchain_experimental.utilities import PythonREPL
from langchain_core.messages import HumanMessage

In [6]:
from config import Config
from aws.bedrock import BedrockHandler
from tools.python_repl import PythonREPLTool
from tools.outputformatter import create_formatting_tool

In [7]:
import pandas as pd

class chartist:
    def __init__(self, region_name: str = "us-east-1"):
        """
        Initialize chartist with required configuration

        Parameters:
        region_name (str): AWS region name
        """
        logger.info("Chartist Start")
        self.config = Config()
        self.bedrock = BedrockHandler(self.config)
        self.llm = self.bedrock.get_llm()
        self.prompt = hub.pull("hwchase17/react-chat-json")
        
        self.python_repl = PythonREPLTool()
        self.tools = [
            #self.python_repl.get_tool(),
            create_formatting_tool(),
            #Tool(
            #    name="parse_dataframe",
            #    func=self.parse_dataframe,
            #    description="Parses a DataFrame and returns its structure, columns, data types, summary statistics, and sample data in JSON format.",
            #    ),
        ]
        self.agent_executor = self.set_agent_executor()

        # Set up holder for visualisations
        self.visualizations = None

    def set_agent_executor(self, verbose=False, handle_parse=True):
        logger.info("Setting up chartist executor")
        try:
            agent = create_json_chat_agent(self.llm, self.tools, self.prompt)
            logger.debug("Structured chat agent created")

            executor = AgentExecutor(
                agent=agent,
                tools=self.tools,
                verbose=verbose,
                handle_parsing_errors=handle_parse,
            )
            logger.info("Agent executor successfully set up")
            return executor
        except Exception as e:
            logger.error(f"Error setting up agent executor: {str(e)}")
            raise

    def parse_dataframe(self, df: pd.DataFrame) -> dict:
        """
        Extract useful information from a DataFrame and return it as a JSON-like dictionary.

        Parameters:
        df: DataFrame to analyze.

        Returns:
        dict: A structured dictionary containing DataFrame details.
        """
        logger.info("Parsing DataFrame into structured JSON dictionary")
        try:
            try:
                summary = df.describe(include="all").to_dict()
            except ValueError as e:
                logger.warning(f"Could not generate full summary statistics: {e}")
                summary = {}
            
            data_info = {
                "columns": [{"name": col, "dtype": str(dtype)} for col, dtype in df.dtypes.items()],
                "shape": {"rows": df.shape[0], "columns": df.shape[1]},
                "summary": summary,
                "sample_data": df.head(10).to_dict(orient="records"),
            }
            logger.debug(f"DataFrame parsed: {data_info}")
            return data_info
        except Exception as e:
            logger.error(f"Error parsing DataFrame: {str(e)}")
            raise
        

    def handle_request(self, data=None, prompt=None, **kwargs):
        """
        Handle user input by deciding how to process it, based on the type of data provided.

        Parameters:
        data: Input data for the agent. Could be a DataFrame or other types.
        prompt: String prompt to guide the agent.

        Returns:
        Response generated by the agent executor.
        """
        if isinstance(data, pd.DataFrame):
            logger.info("Detected DataFrame input. Preparing to analyze...")

            # Limit the DataFrame to the top 300 rows if necessary
            max_rows = 300
            if len(data) > max_rows:
                logger.info(f"DataFrame has more than {max_rows} rows. Trimming for processing.")
                data = data.head(max_rows)

            # Convert DataFrame to JSON string
            dataframe_json = data.to_json(orient="split")

            # Construct the combined prompt - TODO: maybe instead of creating a separate prompt, we add the bits
            # to the prompt depending on what is being analysed as we do with citations.
            combined_prompt = f"""
                        User Prompt:
                        {prompt}

                        DataFrame Information (Top {len(data)} Rows):
                        The following is a JSON representation of the DataFrame. Use this to suggest suitable visualizations:
                        {data.to_json(orient="split")}

                        Instructions:
                        1. Analyze the DataFrame to understand its structure and content
                        2. Suggest meaningful visualizations based on the data and user's instructions
                        3. For each visualization, include:
                        - Clear purpose
                        - Chart type
                        - Variables used
                        - Expected insights
                        4. Use the format_visualization_output tool to structure your response
                        5. Make sure to provide concrete, specific suggestions based on the actual data

                        Remember to use the formatting tool for your final output.
                        """

            # Send to the agent executor
            try:
                logger.info("Sending prompt and DataFrame to agent executor...")
                response = self.agent_executor.invoke({"input": combined_prompt})
                return response
            except Exception as e:
                logger.error(f"Error in agent executor: {str(e)}")
                return f"Error processing visualization suggestions: {str(e)}"

        elif prompt:
            logger.info("No DataFrame detected. Using agent executor for the provided prompt...")
            try:
                response = self.agent_executor.invoke({"input": prompt})
                return response
            except Exception as e:
                logger.error(f"Error in agent executor: {str(e)}")
                return f"Error processing prompt: {str(e)}"

        else:
            logger.warning("No input provided to handle_request method.")
            return "Please provide a DataFrame or a prompt for the agent to process."
        

In [8]:
import numpy as np
def create_sample_dataset():
    """Create a sample dataset for testing"""
    np.random.seed(42)
    dates = pd.date_range(start="2023-01-01", periods=100, freq="D")

    sample_df = pd.DataFrame(
        {
            "date": dates,
            "sales": np.random.normal(1000, 100, 100),
            "customers": np.random.randint(50, 200, 100),
            "satisfaction_score": np.random.uniform(3.5, 5.0, 100),
            "category": np.random.choice(["A", "B", "C"], 100),
        }
    )

    return sample_df

In [9]:
chartist = chartist()

Chartist Start


Loaded config for agent successfully.


Initializing BedrockHandler


Creating ChatBedrock LLM instance


Found credentials in shared credentials file: ~/.aws/credentials


Successfully created ChatBedrock LLM instance


Initialising PythonREPLTool


Launching SafeREPL


Setting up chartist executor


Agent executor successfully set up


In [10]:
sample_df = create_sample_dataset()

In [11]:
prompt = "Analyze this dataset and suggest visualizations to explore trends and insights."
response = chartist.handle_request(data=sample_df, prompt=prompt)

Detected DataFrame input. Preparing to analyze...


Sending prompt and DataFrame to agent executor...


In [12]:
print(response['output'])

📊 VISUALIZATION SUGGESTIONS:


📈 TIME SERIES ANALYSIS OF SALES
Purpose: To visualize the trend of sales over time
Chart Type: Line chart
Variables: date (x-axis), sales (y-axis)
Expected Insights: Identify any seasonal patterns, overall trends, or unusual spikes/dips in sales

--------------------------------------------------

📈 CUSTOMER COUNT VS
Purpose: To examine the relationship between the number of customers and sales
Chart Type: Scatter plot
Variables: customers (x-axis), sales (y-axis)
Expected Insights: Determine if there's a correlation between customer count and sales volume

--------------------------------------------------

📈 SATISFACTION SCORE DISTRIBUTION
Purpose: To analyze the distribution of customer satisfaction scores
Chart Type: Histogram or Box plot
Variables: satisfaction_score
Expected Insights: Understand the overall customer satisfaction levels and identify any outliers

--------------------------------------------------

📈 SALES BY CATEGORY COMPARISON
Purpo

In [2]:
# Python Interpreter Tool
python_repl = PythonREPL()
repl_tool = StructuredTool.from_function(
    func=python_repl.run,
    name="python_repl",
    description="A Python shell. Use this whenever calculations are required by passing in the required python commands to solve the calculation. Input should be a valid python command. If you want to see the output of a value, you should print it out with `print(...)`."
)