<h2>Import Libraries</h2>

In [11]:
import pandas as pd
import json
import io
import re
import numpy as np
import pdfplumber
import plotly.express as px
import google.generativeai as genai
import ipywidgets as widgets
from ipywidgets import FileUpload, Layout, VBox, Output
from IPython.display import display, clear_output
from langchain.memory import ConversationBufferMemory

<h2>Extracting Data</h2>

In [12]:
# Configure Google Gemini API
genai.configure(api_key="")  # Replace with your actual API key
memory = ConversationBufferMemory()

# Initialize global dataframe
df = None

def extract_table_from_pdf(file_path):
    """Extracts tables from a pdf and converts them into a pandas dataframe."""
    tables = []
    with pdfplumber.open(file_path) as pdf:
        for page in pdf.pages:
            extracted_tables = page.extract_tables()
            for table in extracted_tables:
                df = pd.DataFrame(table[1:], columns=table[0])  # First row as headers
                tables.append(df)
    
    if tables:
        return pd.concat(tables, ignore_index=True)  # Merge tables if multiple found
    return None  # No tables found

<h2>Processing File</h2>

In [13]:
def process_file(file):
    """Processes uploaded files and extracts structured data."""
    try:
        content = io.BytesIO(file["content"])
        filename = file["metadata"]["name"]

        if filename.endswith('.csv'):
            return pd.read_csv(content)
        elif filename.endswith('.xlsx'):
            return pd.read_excel(content)
        elif filename.endswith('.pdf'):
            df = extract_table_from_pdf(content)
            return df if df is not None else pd.DataFrame({"Message": ["No tables found"]})
        else:
            return None
    except Exception as e:
        print(f"❌ Error processing file: {e}")
        return None


<h2>For Queries</h2>

In [14]:
import re
import textwrap

def clean_json_response(response):
    """Extracts and cleans Python or JSON code from LLM responses while preserving correct indentation."""
    match = re.search(r"```(?:json|python)?\s*\n([\s\S]+?)\n```", response)
    if match:
        code_block = match.group(1)
        
        # Use textwrap.dedent() to remove unwanted leading spaces while preserving relative indentation
        cleaned_code = textwrap.dedent(code_block)
        
        # Strip any accidental leading/trailing blank lines
        return cleaned_code.strip()

    return response  # Return original if no code block found

def query_gemini(prompt_text):
    """Function to call Google Gemini API for processing queries"""
    try:
        model = genai.GenerativeModel("gemini-2.0-flash")  # Use correct Gemini model
        response = model.generate_content(prompt_text)
        return response.text.strip() if response and response.text else "No response"
    except Exception as e:
        return f"❌ Error querying Gemini API: {e}"


<h2>Generating Visualizations</h2>

In [20]:
import textwrap

def generate_visualization(df, query):
    try:
        prompt_text = f"""
        Given the columns: {', '.join(df.columns)}, and the query: {query}, 
        Give plotly code for the same and try to keep the format similar to this:
        df[y_col] = pd.to_numeric(df[y_col], errors='coerce')  # Convert if needed
        df_sorted = df.sort_values(by=y_col, ascending=True)  # Sort correctly
        
        chart_mapping = {{
            "pie": px.pie(df_sorted, names=x_col, values=y_col, title=query),
            "bar": px.bar(df_sorted, x=x_col, y=y_col, title=query),
            "line": px.line(df_sorted, x=x_col, y=y_col, title=query),
            "scatter": px.scatter(df_sorted, x=x_col, y=y_col, title=query),
        }}

        fig = chart_mapping.get(chart_type, px.bar(df, x=x_col, y=y_col, title=query))
        fig.show().
        and dont give anything more than the things present in this format.
        Also dont give import statements or function declarations or comments. I Just
        need the code exactly like the format I specified so that I can put it myself in a 
        function. Also give a print statement for the dataframe in the end.
        """
        response = query_gemini(prompt_text)
        # print("📢 Gemini API Raw Response:", response)  # Debugging output

        # Clean response if wrapped in markdown
        cleaned_response = clean_json_response(response)
        
        # Dedent the code to fix indentation issues
        formatted_code = textwrap.dedent(cleaned_response)

        # Execute the cleaned Plotly code
        exec(formatted_code, globals())  # Run in the global scope

    except Exception as e:
        print(f"❌ Error generating visualization: {e}")

<h2>Handling Queries</h2>

In [16]:
def handle_query(query):
    global df
    if "visualize" in query.lower() or "chart" in query.lower() or "graph" in query.lower():
        if df is None:
            print("❌ Error: No file uploaded for visualization")
            return
        generate_visualization(df, query)
    else:
        document = df.to_string() if df is not None else "No document uploaded"
        history = "\n".join([msg.content for msg in memory.chat_memory.messages])
        
        prompt_text = f"Conversation history: {history}\n\nGiven the document: {document}, answer the following question: {query}"
        response = query_gemini(prompt_text)
        
        memory.chat_memory.add_user_message(query)
        memory.chat_memory.add_ai_message(response)
        
        print(f"🤖 Bot: {response}")


<h2>Simple Widget</h2>

In [17]:
# File upload widget
file_upload = FileUpload(accept='.csv,.pdf,.xlsx,.docx', multiple=False)
upload_button = widgets.Button(description="Upload File")

# Query input widget
query_input = widgets.Text(placeholder="Ask a question...", layout=Layout(width="80%"))
ask_button = widgets.Button(description="Ask")

# Output widget
output = Output()


<h2>Widget Interact Behavior</h2>

In [18]:
# Event handlers
def on_upload_button_clicked(b):
    global df
    if not file_upload.value:
        with output:
            clear_output()
            print("❌ Error: No file selected.")
        return
    
    uploaded_file = file_upload.value[0]  # Extract first dictionary from tuple
    filename = uploaded_file["name"]
    file_content = uploaded_file["content"]

    with output:
        clear_output()
        print(f"📂 Uploaded File Name: {filename}")
        print(f"📏 File Content Size: {len(file_content)} bytes")

    # Process the file
    df = process_file({"metadata": {"name": filename}, "content": file_content})
    
    if df is not None:
        with output:
            clear_output()
            print(f"✅ File '{filename}' uploaded successfully!")
            print(df.head())  # Display first few rows of the DataFrame
    else:
        with output:
            clear_output()
            print("❌ Error: Unsupported file type or failed to process file.")

def on_ask_button_clicked(b):
    query = query_input.value.strip()
    if query:
        with output:
            clear_output()
            handle_query(query)
    else:
        with output:
            clear_output()
            print("❌ Error: Please enter a query.")

upload_button.on_click(on_upload_button_clicked)
ask_button.on_click(on_ask_button_clicked)

<h2>Display the Widget Components</h2>

In [19]:
display(VBox([file_upload, upload_button, query_input, ask_button, output]))

VBox(children=(FileUpload(value=(), accept='.csv,.pdf,.xlsx,.docx', description='Upload'), Button(description=…