In [1]:
# pip install --upgrade langchain langchain-community

In [2]:
# imports

import os
import glob
import chromadb
from sentence_transformers import SentenceTransformer
from openai import OpenAI
from docx import Document
import uuid
from dotenv import load_dotenv
from huggingface_hub import login
import numpy as np
from sentence_transformers import SentenceTransformer
from datasets import load_dataset
import chromadb
import plotly.graph_objects as go
import glob
import gradio as gr
from io import StringIO
import logging
import sys
import json

In [3]:
# environment
load_dotenv(override=True)
os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY', 'your-key-if-not-using-env')
os.environ['HF_TOKEN'] = os.getenv('HF_TOKEN', 'your-key-if-not-using-env')
DB = "agile_process"
gpt_model = "gpt-4o-mini"

In [4]:
# Log in to HuggingFace

hf_token = os.environ['HF_TOKEN']
login(hf_token, add_to_git_credential=True)

Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


In [5]:
class SentenceTransformerEmbeddings:
    def __init__(self, model_name: str = 'sentence-transformers/all-MiniLM-L6-v2'):
        self.model = SentenceTransformer(model_name)
    
    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        """Embed a list of documents."""
        embeddings = self.model.encode(texts)
        return embeddings.tolist()
    
    def embed_query(self, text: str) -> list[float]:
        """Embed a single query text."""
        embedding = self.model.encode([text])
        return embedding[0].tolist()

In [6]:
# def load_docx_content(file_path):
#     """Load content from a DOCX file."""
#     try:
#         doc = Document(file_path)
#         content = []
#         for paragraph in doc.paragraphs:
#             if paragraph.text.strip():
#                 content.append(paragraph.text.strip())
#         return '\n'.join(content)
#     except Exception as e:
#         print(f"Error loading {file_path}: {e}")
#         return ""

# def split_text(text, chunk_size=1000, chunk_overlap=200):
#     """Simple text splitter."""
#     chunks = []
#     start = 0
#     while start < len(text):
#         end = start + chunk_size
#         chunk = text[start:end]
#         chunks.append(chunk)
#         start = end - chunk_overlap
#         if start >= len(text):
#             break
#     return chunks

# # Load all DOCX files from the Documents folder
# files = glob.glob("Documents/*.docx")
# all_chunks = []
# all_metadatas = []
# all_ids = []

# for file_path in files:
#     doc_type = os.path.splitext(os.path.basename(file_path))[0]
#     content = load_docx_content(file_path)
    
#     if content:
#         chunks = split_text(content)
#         for i, chunk in enumerate(chunks):
#             all_chunks.append(chunk)
#             all_metadatas.append({
#                 "doc_type": doc_type,
#                 "file_path": file_path,
#                 "chunk_index": i
#             })
#             all_ids.append(f"{doc_type}_{i}_{uuid.uuid4().hex[:8]}")

# print(f"Total number of chunks: {len(all_chunks)}")
# print(f"Document types found: {set(meta['doc_type'] for meta in all_metadatas)}")

In [7]:
# Initialize ChromaDB client
DB_PATH = "agile_process"
client = chromadb.PersistentClient(path=DB_PATH)

# Initialize embeddings
embeddings_model = SentenceTransformerEmbeddings('sentence-transformers/all-MiniLM-L6-v2')
collection_name = "process_docs"
collection = client.get_collection(name=collection_name)

In [8]:
# # Uncomment to create the vectorstore again

# # Delete existing collection if it exists
# collection_name = "process_docs"
# try:
#     client.delete_collection(name=collection_name)
#     print("Existing collection deleted.")
# except Exception as e:
#     print(f"Collection didn't exist or couldn't be deleted: {e}")

# # Create a new collection
# collection = client.create_collection(
#     name=collection_name,
#     metadata={"hnsw:space": "cosine"}  # Use cosine similarity
# )

# # Add documents to the collection in batches (ChromaDB has limits)
# batch_size = 100
# for i in range(0, len(all_chunks), batch_size):
#     batch_chunks = all_chunks[i:i + batch_size]
#     batch_metadatas = all_metadatas[i:i + batch_size]
#     batch_ids = all_ids[i:i + batch_size]
    
#     # Generate embeddings for this batch
#     batch_embeddings = embeddings_model.embed_documents(batch_chunks)
    
#     # Add to collection
#     collection.add(
#         documents=batch_chunks,
#         metadatas=batch_metadatas,
#         ids=batch_ids,
#         embeddings=batch_embeddings
#     )
    
#     print(f"Added batch {i//batch_size + 1}/{(len(all_chunks) + batch_size - 1)//batch_size}")

# print(f"Vectorstore created with {collection.count()} documents")

# # Test query (uncomment to test)
# # results = query_documents("your query here")
# # print("Query results:", results)

In [9]:
# Example: Query the collection
def query_documents(query_text, n_results=5):
    """Query the document collection."""
    query_embedding = embeddings_model.embed_query(query_text)
    
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=n_results,
        include=["documents", "metadatas", "distances"]
    )
    
    return results


In [10]:
class HTMLLogFormatter(logging.Formatter):
    AGENT_COLORS = {
        "FrontierAgent": "blue",
        "Default": "black"
    }

    def format(self, record):
        record.asctime = self.formatTime(record, self.datefmt)
        logger_name = record.name.split('.')[-1]  # often "__main__" or module
        color = self.AGENT_COLORS.get(logger_name, self.AGENT_COLORS["Default"])
        return f'<div style="color:{color}">[{record.asctime}] [{logger_name}] [{record.levelname}] {record.getMessage()}</div>'

In [11]:
log_stream = StringIO()
html_log_stream = StringIO()

def init_logging():
    root = logging.getLogger()
    root.setLevel(logging.INFO)
    
    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(logging.INFO)
    
    # Memory buffer handler
    stream_handler = logging.StreamHandler(log_stream)
    stream_handler.setLevel(logging.INFO)

    formatter = logging.Formatter(
        "[%(asctime)s] [Agents] [%(levelname)s] %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S %z",
    )
    handler.setFormatter(formatter)
    stream_handler.setFormatter(formatter)

     # HTML handler for Gradio
    html_stream_handler = logging.StreamHandler(html_log_stream)
    html_stream_handler.setLevel(logging.INFO)
    html_stream_handler.setFormatter(HTMLLogFormatter(
        fmt="%(asctime)s [Agents] [%(levelname)s] %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S %z"
    ))
    root.addHandler(html_stream_handler)

    root.addHandler(handler)
    root.addHandler(stream_handler)

In [12]:
def get_html_logs():
    html_log_stream.seek(0)
    return html_log_stream.read()

In [13]:
def clear_logs():
    log_stream.truncate(0)
    log_stream.seek(0)
    html_log_stream.truncate(0)
    html_log_stream.seek(0)
    return None, ""

In [14]:
init_logging()

In [15]:
system_message = """You devise the strategy on how to set up and start a new data engineering project from scratch. You ask the user
    to share details about source systems, the number of final data marts, and a brief outline of the overall architecture. Do not proceed 
    unless the user shares the information related to overall architecture, details about source systems, and the number of target data
    marts. On getting the aforesaid information from the user, you will make the tool call using the return_context_function and strictly
    adhere to the agile process details that the tool call returns. You will eventually display the epics, features, and user stories 
    needed to achieve the end objective, which is to create the final data marts. Always provide epic, feature, and story details and 
    include points for each story along with the output."""

In [16]:
return_context_function = {
    "name": "return_context",
    "description": """Call this tool after the user confirms the details about source systems, the number of target data marts, and the 
    overall architecture. For any details related to agile processes, you will strictly adhere to the output that you gather from this 
    tool call.""",
    "parameters": {
        "type": "object",
        "properties": {
            "details_source_systems": {
                "type": "string",
                "description": "The details about source systems",
            },
            "overall_architecture": {
                "type": "string",
                "description": "Details about the architecture",
            },
            "target_data_marts": {
                "type": "string",
                "description": "Details about the target data marts",
            }
        }
    },
    "required": ["details_source_systems","overall_architecture","target_data_marts"],
    "additionalProperties": False
}

In [17]:
tools = [{"type": "function", "function": return_context_function}]

In [18]:
# We have to write that function handle_tool_call:

def handle_tool_call(name, args):
    source = args.get('details_source_systems')
    architecture = args.get('overall_architecture')
    marts = args.get('target_data_marts')
    if name.replace('"','') == "return_context":
        context=return_context(collection, f"Source details -\n{source}\n\nArchitecture Details -\n{architecture}\n\nMart Details -\n{marts}")
            
    return context

In [19]:
def return_context(collection, user_query):
    context = "\n\nProviding some context from relevant information -\n\n"
    retrieved = collection.query(
        query_embeddings=[embeddings_model.embed_query(user_query)],
        n_results=10,  # e.g., 5 or 10
        include=["documents", "metadatas"]
    )
    retrieved_chunks = retrieved["documents"][0]
    context+= "\n\n".join(retrieved_chunks)
    print(context)
    return context

In [20]:
def chat_open_ai(history):
    openai=OpenAI()
    messages = [{"role": "system", "content": system_message}] + history 
    response = openai.chat.completions.create(model=gpt_model, messages=messages, tools=tools)

    tool_responses = []

    if response.choices[0].finish_reason == "tool_calls":
        message = response.choices[0].message
        tool_calls = message.tool_calls  # renamed to avoid UnboundLocalError

        print(f"tool calls \n\n {tool_calls}")

        for tool_call in tool_calls:
            tool_id = tool_call.id
            name = tool_call.function.name
            args = json.loads(tool_call.function.arguments)

            # Call the tool handler
            result = ""
            if name == "return_context":
                result = handle_tool_call(name, args)

            tool_responses.append({
                "role": "tool",
                "tool_call_id": tool_id,
                "content": json.dumps(result),
            })

        print(f"tool responses {tool_responses}")
        messages.append(message)
        messages.extend(tool_responses)  # important fix here

        response = openai.chat.completions.create(
            model=gpt_model,
            messages=messages,
            tools=tools
        )

    reply = response.choices[0].message.content
    history += [{"role": "assistant", "content": reply}]

    return history


In [21]:
def chat(history,Model):
    if Model=="Open AI":
        openai = OpenAI()
        history = chat_open_ai(history)
    return history
    

In [22]:
initial_prompt = """👋 Hello! How can I assist you today? If you're looking to set up a new data engineering project, please 
provide me with details about the source systems, the number of target data marts, and a brief outline of the overall architecture."""

with gr.Blocks(css="""
    #log_box {
        height: 300px;
        overflow-y: scroll;
        border: 1px solid #ccc;
        padding: 10px;
        background: #f9f9f9;
        font-family: monospace;
        white-space: pre-wrap;
    }
""") as ui:
    with gr.Row():
        with gr.Column(scale=1):
            chatbot = gr.Chatbot(height=300, type="messages")
        with gr.Column(scale=1):
            
            logs_box = gr.HTML(label="Logs", elem_id="log_box")
    with gr.Row():
        Model = gr.Dropdown(["Open AI","XX"],
                              # value=["Open AI","Claude"],
                              multiselect=False,
                              label="Model",
                              interactive=True)
    with gr.Row():
        entry = gr.Textbox(label="Chat with our AI Assistant:")
    with gr.Row():
        clear = gr.Button("Clear")


    timer = gr.Timer(value=5, active=True)
    timer.tick(get_html_logs, inputs=None, outputs=[logs_box])

    def do_entry(message, history):
        history += [{"role":"user", "content":message}]
        logging.info(f"User message: {message}")
        yield "", history, get_html_logs()
        
    def set_initial_prompt():
        return [{"role": "assistant", "content": initial_prompt}]
    
    ui.load(set_initial_prompt, inputs=None, outputs=chatbot)
    
    entry.submit(do_entry, inputs=[entry, chatbot], outputs=[entry, chatbot, logs_box]).then(
        chat, inputs=[chatbot, Model], outputs=[chatbot]
    )
    
    clear.click(clear_logs, inputs=None, outputs=[chatbot, logs_box], queue=False)

ui.launch(inbrowser=True)

* Running on local URL:  http://127.0.0.1:7860
[2025-06-17 13:20:01 +0530] [Agents] [INFO] HTTP Request: GET http://127.0.0.1:7860/gradio_api/startup-events "HTTP/1.1 200 OK"
[2025-06-17 13:20:01 +0530] [Agents] [INFO] HTTP Request: HEAD http://127.0.0.1:7860/ "HTTP/1.1 200 OK"
* To create a public link, set `share=True` in `launch()`.




[2025-06-17 13:20:02 +0530] [Agents] [INFO] HTTP Request: GET https://api.gradio.app/pkg-version "HTTP/1.1 200 OK"
[2025-06-17 13:20:11 +0530] [Agents] [INFO] User message: The source system comprises of 2 IBM DB2 Tables
The final outcome are 2 data marts - one for account valuations details and the other for transactions summary
The architecture is as follows - From the IBM DB2 source tables, data is being transferred from the two source tables leveraging Airflow DAGs and IBM CDC subscriptions. Post landing the data at Snowflake Landing layer, data is transformed and populated at the final data marts at the final "gold layer" in snowflake, for both account valuations details and transactions summary
[2025-06-17 13:20:13 +0530] [Agents] [INFO] HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
tool calls 

 [ChatCompletionMessageToolCall(id='call_wPU0tjmd5JoZaZVPtHKeg7AD', function=Function(arguments='{"details_source_systems":"2 IBM DB2 Tables","overall_ar

Batches:   0%|          | 0/1 [00:00<?, ?it/s]



Providing some context from relevant information -

ake Landing Zone (21-40)-8 Story points – Ownership with Consultants (Developers)
Testing (Manual/Automated) to check data consistency across Hadoop and AWS S3 – 8 Story Points – Ownership – Consultants (QA Engineer)
Sprint 5
Loading tables to Snowflake Landing Zone (41-50)-5 Story points – Ownership with Consultants (Developers)
Feature – Commence Mapping from landing to Intermediate Staging Layer and Populate Staging Tables – Size - M
Sprint 5 –
Commence Mapping sheet updates from Landing to Staging Layer – 8 Story Points – Ownership – Consultants (Business Analysts) and Client
POC - Staging table creation at Snowflake Staging layer – 8 Story Points – Ownership – Client and Consultants (Developers)
Testing (Manual/Automated) to check data consistency across Hadoop and S3 – 5 Story Points – Ownership – Consultants (QA Engineer)
Testing (Manual/Automated) to check data consistency across Snowflake Landing Zone and Snowflake Staging 