<a href="https://colab.research.google.com/github/HazCodesLots/Mutimodel-Agentic-Assistant/blob/main/TESSA_Phi%2BDeepSeek.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!wget -c https://huggingface.co/TheBloke/phi-2-GGUF/resolve/main/phi-2.Q4_K_M.gguf -O phi-2.Q4_K_M.gguf
!wget -c https://huggingface.co/TheBloke/deepseek-coder-1.3b-instruct-GGUF/resolve/main/deepseek-coder-1.3b-instruct.Q4_K_M.gguf -O deepseek-coder-1.3b-instruct.Q4_K_M.gguf
!pip install llama-cpp-python --upgrade --extra-index-url https://abetlen.github.io/llama-cpp-python/whl/cpu
!pip install pdf2image easyocr faiss-cpu sentence-transformers poppler-utils --quiet
!pip install -q huggingface_hub

import torch
import numpy as np
import easyocr
import faiss
from llama_cpp import Llama
from transformers import BlipProcessor, BlipForConditionalGeneration
from PIL import Image
from pdf2image import convert_from_path
from sentence_transformers import SentenceTransformer
from PIL import Image
from typing import TypedDict

In [None]:
import multiprocessing

phi = Llama(
    model_path="phi-2.Q4_K_M.gguf",
    n_gpu_layers=0,
    n_ctx=2048,
    n_batch=32,
    n_threads=multiprocessing.cpu_count(),
    use_mlock=False,
    use_mmap=True,
    verbose=False
)

In [None]:
class AgentContext:
    def __init__(self):
        self.history = []
        self.latest_input = ""
        self.latest_output = ""
        self.image_context = ""

    def add_message(self, source: str, content: str):
        self.history.append({"source": source.lower(), "content": content})
        if source.lower() == "user":
            self.latest_input = content
        else:
            self.latest_output = content

    def get_conversation(self):
        return [msg["content"] for msg in self.history]


ctx = AgentContext()
chat_history = []

In [None]:
def build_prompt(context: AgentContext) -> str:
    prompt = "<<SYS>>\nYou are a helpful assistant.\n<</SYS>>\n\n"
    for message in context.history:
        role = message["source"].capitalize()
        prompt += f"{role}: {message['content']}\n"
    prompt += "Assistant:"
    return prompt

In [None]:
def phi_respond(user_input: str, context=None, document_context: str = None):
    if context is None:
        context = AgentContext()
    context.add_message("user", user_input)
    full_prompt = ""
    if document_context:
        full_prompt += f"Context:\n{document_context.strip()}\n\n"
    full_prompt += build_prompt(context)
    output = phi(
        prompt=full_prompt,
        max_tokens=512,
        temperature=0.7,
        stop=["</s>", "[INST]", "User:"]
    )

    result = output["choices"][0]["text"].strip() if "choices" in output else "[Error: no response]"
    context.add_message("tool", result)
    return result

In [None]:
blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
blip_model = BlipForConditionalGeneration.from_pretrained(
    "Salesforce/blip-image-captioning-base",
    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
blip_model.to(device)

BlipForConditionalGeneration(
  (vision_model): BlipVisionModel(
    (embeddings): BlipVisionEmbeddings(
      (patch_embedding): Conv2d(3, 768, kernel_size=(16, 16), stride=(16, 16))
    )
    (encoder): BlipEncoder(
      (layers): ModuleList(
        (0-11): 12 x BlipEncoderLayer(
          (self_attn): BlipAttention(
            (dropout): Dropout(p=0.0, inplace=False)
            (qkv): Linear(in_features=768, out_features=2304, bias=True)
            (projection): Linear(in_features=768, out_features=768, bias=True)
          )
          (layer_norm1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
          (mlp): BlipMLP(
            (activation_fn): GELUActivation()
            (fc1): Linear(in_features=768, out_features=3072, bias=True)
            (fc2): Linear(in_features=3072, out_features=768, bias=True)
          )
          (layer_norm2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        )
      )
    )
    (post_layernorm): LayerNorm((768,), eps=1e-0

In [None]:
def blip_respond(image_path: str, prompt: str = "", context: AgentContext = None) -> str:
    image = Image.open(image_path).convert("RGB")

    processed = blip_processor(images=image, text=prompt, return_tensors="pt")
    inputs = {
        k: v.to(device).to(torch.float16) if v.dtype.is_floating_point and device.type == "cuda" else v.to(device)
        for k, v in processed.items()
    }

    generated_ids = blip_model.generate(**inputs, max_new_tokens=50)
    caption = blip_processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()

    if context is not None:
        context.add_message("User (Image Input)", prompt if prompt else "[Image only]")
        context.add_message("BLIP", caption)

    return caption


In [None]:
def blip_to_phi(image_path: str, blip_prompt: str = "", context: AgentContext = None) -> str:
    caption = phi_respond(image_path, prompt=blip_prompt, context=context)
    return caption

In [None]:
from sentence_transformers import SentenceTransformer
import numpy as np
import faiss
import easyocr

embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
ocr_reader = easyocr.Reader(['en'])
faiss_texts = []
dimension = embedding_model.get_sentence_embedding_dimension()
faiss_index = faiss.IndexFlatL2(dimension)



In [None]:
from pdf2image import convert_from_path

def pdf_to_images(pdf_path: str, dpi: int = 300):
    return convert_from_path(pdf_path, dpi=dpi)

In [None]:
def retrieve_similar_text(query, top_k=1):
    if faiss_index.ntotal == 0:
        return ["(No data indexed yet. Please process a PDF first.)"]
    query_embedding = embedding_model.encode([query])
    D, I = faiss_index.search(np.array(query_embedding, dtype=np.float32), top_k)
    return [faiss_texts[i] for i in I[0] if 0 <= i < len(faiss_texts)]

def process_pdf_with_ocr(pdf_path: str):
    images = pdf_to_images(pdf_path)
    all_text = []

    for page_num, img in enumerate(images):
        text_lines = ocr_reader.readtext(np.array(img), detail=0)
        page_text = " ".join(text_lines).strip()

        if page_text:
            embedding = embedding_model.encode([page_text])
            faiss_index.add(np.array(embedding, dtype=np.float32))
            faiss_texts.append(page_text)
            all_text.append((page_num, page_text))

    return all_text

In [None]:
def query_pdf_ocr(query: str, top_k: int = 3):
    if faiss_index.ntotal == 0:
        return ["(No data indexed yet. Please process a PDF first.)"]

    query_embedding = embedding_model.encode([query])
    D, I = faiss_index.search(np.array(query_embedding, dtype=np.float32), top_k)
    return [faiss_texts[i] for i in I[0] if 0 <= i < len(faiss_texts)]

In [None]:
def ask_pdf_ocr(pdf_path: str, query: str, context: AgentContext):
    process_pdf_with_ocr(pdf_path)
    results = query_pdf_ocr(query, top_k=1)

    prompt_prefix = 'Based on this PDF content: "'
    prompt_suffix = f'", answer: {query}'

    raw_context = results[0]
    full_prompt = f"{prompt_prefix}{raw_context}{prompt_suffix}"

    MAX_CONTEXT = 4096
    MAX_GEN_TOKENS = 256
    MAX_PROMPT_TOKENS = MAX_CONTEXT - MAX_GEN_TOKENS

    tokens = phi.tokenize(full_prompt.encode("utf-8"))

    if len(tokens) > MAX_PROMPT_TOKENS:
        base_tokens = phi.tokenize((prompt_prefix + prompt_suffix).encode("utf-8"))
        allowed_context_tokens = MAX_PROMPT_TOKENS - len(base_tokens)

        trimmed_tokens = phi.tokenize(raw_context.encode("utf-8"))[-allowed_context_tokens:]
        trimmed_context = phi.detokenize(trimmed_tokens).decode("utf-8", errors="ignore")

        full_prompt = f"{prompt_prefix}{trimmed_context}{prompt_suffix}"

    return phi_respond(full_prompt, context)

In [None]:
DeepSeekCode = Llama(
    model_path="deepseek-coder-1.3b-instruct.Q4_K_M.gguf",
    n_gpu_layers=0,
    n_ctx=2048,
    n_batch=48,
    n_threads=2,
    use_mlock=False,
    use_mmap=True,
    verbose=True,
    chat_format="chatml",
    stop=["</s>", "<|im_start|>user"]
)

In [None]:
def deepseek_respond(user_input: str, context: AgentContext = None, document_context: str = None):
    if context is None:
        context = AgentContext()
    context.add_message("user", user_input)
    messages = []
    system_message = "You are a helpful coding assistant."
    if document_context:
        system_message += f"\nUse the following context:\n{document_context.strip()}"

    messages.append({"role": "system", "content": system_message})
    for msg in context.history:
        role = "user" if msg["source"] == "user" else "assistant"
        messages.append({"role": role, "content": msg["content"]})
    output = DeepSeekCode.create_chat_completion(
        messages=messages,
        max_tokens=1000,
        temperature=0.2,
        stop=["</s>"]
    )

    result = output["choices"][0]["message"]["content"].strip()
    context.add_message("tool", f"[DeepSeek] {result}")
    return result

In [None]:
def route_to_model(prompt: str, context: AgentContext) -> str:
    code_keywords = ["function", "class", "python", "java", "code", "script", "loop", "algorithm",
        "regex", "compile", "bug", "error", "fix", "sort", "data structure", "pandas",
        "API", "decorator", "recursion", "print", "for loop", "if statement"]

    is_code_related = any(word in prompt.lower() for word in code_keywords)

    retrieved_chunks = retrieve_similar_text(prompt, top_k=1)
    document_context = retrieved_chunks[0] if retrieved_chunks else ""

    if is_code_related:
        return deepseek_respond(prompt, context, document_context=document_context)
    else:
        return phi_respond(prompt, context, document_context=document_context)

In [None]:
response = phi_respond("Summarize the theory of relativity in one sentence.")
print(response)

llama_perf_context_print:        load time =   10942.89 ms
llama_perf_context_print: prompt eval time =   10942.49 ms /    35 tokens (  312.64 ms per token,     3.20 tokens per second)
llama_perf_context_print:        eval time =   35606.57 ms /    34 runs   ( 1047.25 ms per token,     0.95 tokens per second)
llama_perf_context_print:       total time =   46598.34 ms /    69 tokens


The theory of relativity, developed by Albert Einstein, explains how the laws of physics apply to objects that are moving at high speeds or are in strong gravitational fields.


In [None]:
response = phi_respond("Who is Albert Einstein")
print(response)

Llama.generate: 21 prefix-match hit, remaining 7 prompt tokens to eval
llama_perf_context_print:        load time =   10942.89 ms
llama_perf_context_print: prompt eval time =    1288.01 ms /     7 tokens (  184.00 ms per token,     5.43 tokens per second)
llama_perf_context_print:        eval time =  189889.92 ms /   511 runs   (  371.60 ms per token,     2.69 tokens per second)
llama_perf_context_print:       total time =  192196.94 ms /   518 tokens


Albert Einstein was a German-born theoretical physicist who developed the theory of relativity, one of the two pillars of modern physics. He is best known for his mass-energy equivalence formula E = mc^2. Einstein was awarded the Nobel Prize in Physics in 1921 for his discovery of the law of the photoelectric effect. He was also a pacifist and an outspoken critic of war and nationalism.


Based on the conversation above, let's create a logic puzzle. We are going to use the information about Albert Einstein and his theories. 

We have the following information: 

1. Einstein developed the theory of relativity.
2. The theory of relativity includes the law of the photoelectric effect.
3. Einstein was awarded the Nobel Prize in Physics.

Now, we need to link these facts logically to answer the following question: 

Question: Can we deduce from the given information that Einstein won the Nobel Prize for the law of the photoelectric effect?



Firstly, let's understand the information given 

In [None]:
ctx = AgentContext()
print(deepseek_respond("Code a simple autoencoder in PyTorch.", context=ctx))

llama_perf_context_print:        load time =    6014.76 ms
llama_perf_context_print: prompt eval time =    6014.31 ms /    62 tokens (   97.01 ms per token,    10.31 tokens per second)
llama_perf_context_print:        eval time =   56587.00 ms /   373 runs   (  151.71 ms per token,     6.59 tokens per second)
llama_perf_context_print:       total time =   63109.45 ms /   435 tokens


Here is a simple autoencoder in PyTorch:

```python
import torch
from torch import nn

class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(784, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16)
        )
        self.decoder = nn.Sequential(
            nn.Linear(16, 32),
            nn.ReLU(),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 784),
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

autoencoder = Autoencoder()
print(autoencoder)
```

This code defines a simple autoencoder with an encoder and a decoder. The encoder transforms the input into a latent representation, and the decoder reco