### Install required Python libraries for OpenAI, LangChain, Azure Kusto, and data processing.


In [None]:
%pip install openai==1.12.0 azure-kusto-data langchain tenacity langchain-openai pypdf
%pip install beautifulsoup4 langchain-community

In [None]:
%pip install openai --upgrade

In [None]:
%pip install -q pdfplumber

In [None]:
from openai import AzureOpenAI
from IPython.display import display, HTML
import os
import textwrap
import json 
import requests
import pandas as pd
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

from notebookutils import mssparkutils
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

from langchain.text_splitter import CharacterTextSplitter,RecursiveCharacterTextSplitter
from langchain_openai import AzureOpenAIEmbeddings
from langchain.document_loaders import PyPDFLoader
from langchain_community.document_loaders import WebBaseLoader
from tenacity import retry, wait_random_exponential, stop_after_attempt
from bs4 import SoupStrainer
from bs4 import BeautifulSoup

### Initialize OpenAI client, generate text embeddings, and connect to Azure Data Explorer (Kusto) database.


In [None]:
OPENAI_GPT4_DEPLOYMENT_NAME="gpt-4o-shivam-project"
OPENAI_DEPLOYMENT_ENDPOINT="Replace with your OpenAI endpoint" 
OPENAI_API_KEY="Replace with your OpenAI API key" 
TEXTEMBEDDING_DEPLOYMENT_ENDPOINT="Replace with your OpenAI text embedding endpoint" 
TEXTEMBEDDING_API_KEY="Replace with your OpenAI API key" 
OPENAI_ADA_EMBEDDING_DEPLOYMENT_NAME = "text-embedding-ada-002-shivam-project"


KUSTO_URL = 'Replace with your kusto URI' 
KUSTO_DATABASES = "VectorDatabase"
KUSTO_TABLES = "embeddingtables"
accessToken = mssparkutils.credentials.getToken(KUSTO_URL)

### Initialize the Azure OpenAI client using endpoint, API key, and version details.


In [None]:
client_openAI  = AzureOpenAI(
        azure_endpoint=OPENAI_DEPLOYMENT_ENDPOINT,
        api_key=OPENAI_API_KEY,
        api_version="2025-01-01-preview"
    )

### Create Azure OpenAI embedding client and define a retry-enabled function to generate text embeddings using Tenacity.


In [None]:
client= AzureOpenAI(
        azure_endpoint=TEXTEMBEDDING_DEPLOYMENT_ENDPOINT,
        api_key=TEXTEMBEDDING_API_KEY,
        api_version="2023-05-15"
    )
#we use the tenacity library to create delays and retries when calling openAI embeddings to avoid hitting throttling limits
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(6))
def generate_embeddings(text): 
    # replace newlines, which can negatively affect performance.
    txt = text.replace("\n", " ")
    return client.embeddings.create(input = [txt], model=OPENAI_ADA_EMBEDDING_DEPLOYMENT_NAME).data[0].embedding

### Import necessary libraries and configure a text splitter to divide PDF content into manageable chunks for processing.


In [None]:
import os, hashlib, json
import pandas as pd
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader

# Splitter settings
splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=50,
)


### Retrieve and list all PDF files from the specified directory for further processing.


In [None]:
#  PDF list
import os

# Folder where PDFs are uploaded
base_path = "/lakehouse/default/Files"

# List all PDF files dynamically
pdf_files = [
    {
        "name": f,
        "path": os.path.join(base_path, f),
        "source_url": None
    }
    for f in os.listdir(base_path)
    if f.lower().endswith(".pdf")
]

print("Detected PDF files:", [f["name"] for f in pdf_files])

### Extract tables from PDFs using pdfplumber and convert them into clean, compact Markdown format for embedding or text analysis.


In [None]:
import pdfplumber
import pandas as pd
import re

# --- SAFE table -> markdown ---
def df_to_markdown(df: pd.DataFrame) -> str:
    """Turn a table dataframe into a compact markdown string for embeddings,
    robust to None headers/cells."""
    # Make a copy so we don't mutate the caller's DF
    df = df.copy()

    # Clean headers: replace None with "" and cast to str
    df.columns = ["" if c is None else str(c) for c in df.columns.tolist()]

    # Clean cells: replace None/NaN with "", cast to str, collapse whitespace
    df = df.fillna("")
    df = df.applymap(lambda x: re.sub(r"\s+", " ", "" if x is None else str(x)).strip())

    header = " | ".join(df.columns)
    rows = [" | ".join(row.tolist()) for _, row in df.iterrows()]
    body = "\n".join(rows)

    return f"TABLE\n{header}\n{body}"


def extract_tables_with_pdfplumber(pdf_path: str) -> list[dict]:
    """Return a list of dicts: [{'page_no': int, 'markdown': str}, ...] for every table found."""
    out = []
    with pdfplumber.open(pdf_path) as pdf:
        for p_idx, page in enumerate(pdf.pages, start=1):
            # try both table finders
            for table in page.extract_tables(table_settings={"vertical_strategy": "lines",
                                                             "horizontal_strategy": "lines"}) or []:
                if not table or len(table) < 2:
                    continue
                df = pd.DataFrame(table[1:], columns=table[0])
                out.append({"page_no": p_idx, "markdown": df_to_markdown(df)})

            # fallback stream mode
            for table in page.extract_tables(table_settings={"vertical_strategy": "text",
                                                             "horizontal_strategy": "text"}) or []:
                if not table or len(table) < 2:
                    continue
                df = pd.DataFrame(table[1:], columns=table[0])
                out.append({"page_no": p_idx, "markdown": df_to_markdown(df)})
    return out


### Define utility functions for generating the current UTC timestamp and detecting language (English or Hindi) based on character patterns.


In [None]:
from datetime import datetime, timezone
import re

def now_utc_iso() -> str:
    # ISO8601; we’ll cast to timestamp before writing
    return datetime.now(timezone.utc).isoformat()

def detect_lang_simple(text: str) -> str:
    """Very light/fast detector: 'hi' if Devanagari chars appear, else 'en'."""
    if re.search(r"[\u0900-\u097F]", text or ""):
        return "hi"
    return "en"


### Load PDFs, split into text/table chunks, assign stable chunk_ids, and collect metadata (page, type, lang, timestamps) for indexing.


In [None]:
# Load, split, and index chunks  +  ADD chunk_id  ✱✱✱
rows = []
doc_ids_in_batch = set()   # ✱✱✱ keep track of doc_ids we touch in this run

for pdf in pdf_files:
    try:
        # -------- IDs / metadata --------
        doc_id = hashlib.md5(pdf["name"].encode("utf-8")).hexdigest()
        doc_ids_in_batch.add(doc_id)    # ✱✱✱
        src_url = pdf.get("source_url") or pdf["path"]

        # -------- text pages --------
        loader = PyPDFLoader(pdf["path"])
        docs = loader.load()  # text only
        chunk_idx = 0
        text_chunks = 0
        table_chunks = 0

        for d in docs:
            # PyPDFLoader pages are often 0-based; add 1 if you prefer 1-based page numbers.
            page_no = int(d.metadata.get("page", 0)) + 1
            page_chunks = splitter.split_text(d.page_content)

            for chunk_text in page_chunks:
                # stable chunk_id: doc_id + page_no + chunk_idx + prefix of text
                chunk_id = hashlib.md5(
                    f"{doc_id}|{page_no}|{chunk_idx}|{chunk_text[:128]}".encode("utf-8")
                ).hexdigest()

                rows.append({
                    "chunk_id":      chunk_id,        # ✱✱✱ NEW
                    "doc_id":        doc_id,
                    "document_name": pdf["name"],
                    "source_url":    src_url,
                    "page_no":       page_no,
                    "chunk_no":      chunk_idx,
                    "content":       chunk_text,
                    "content_type":  "text",
                    "ingest_time":   now_utc_iso(),
                    "lang":          detect_lang_simple(chunk_text)
                })
                chunk_idx += 1
                text_chunks += 1

        # -------- tables (extra chunks) --------
        table_items = extract_tables_with_pdfplumber(pdf["path"])  # [{'page_no', 'markdown'}, ...]
        for t in table_items:
            page_no = t["page_no"]  # already 1-based in our helper
            # split table markdown too (tables can be large)
            for chunk_text in splitter.split_text(t["markdown"]):
                chunk_id = hashlib.md5(
                    f"{doc_id}|{page_no}|{chunk_idx}|{chunk_text[:128]}".encode("utf-8")
                ).hexdigest()

                rows.append({
                    "chunk_id":      chunk_id,
                    "doc_id":        doc_id,
                    "document_name": pdf["name"],
                    "source_url":    src_url,
                    "page_no":       page_no,
                    "chunk_no":      chunk_idx,
                    "content":       chunk_text,
                    "content_type":  "table",
                    "ingest_time":   now_utc_iso(),
                    "lang":          detect_lang_simple(chunk_text)
                })
                chunk_idx += 1
                table_chunks += 1

        print(f"Loaded {text_chunks} text + {table_chunks} table chunks "
              f"= {chunk_idx} total from {pdf['name']}")

    except Exception as e:
        print(f"Failed to load {pdf['name']}: {e}")


### Convert processed chunks into a DataFrame and verify data availability before proceeding to the next stage.


In [None]:
#  DataFrame
df = pd.DataFrame(rows)

# ✱✱✱ If nothing to process, bail out early
if df.empty:
    print("No chunks produced. Nothing to write.")
else:
    print(f"Prepared {len(df)} chunks in memory.")


### De-duplicate chunks against Kusto by chunk_id, filter new rows, and generate embeddings only for the new chunks.


In [None]:
# ─────────────────────────────────────────────────────────
# DEDUP: remove rows that already exist in Eventhouse by chunk_id ✱✱✱
#    (We only pull existing IDs for the doc_ids we have in this batch)
# ─────────────────────────────────────────────────────────
from pyspark.sql import functions as F

# Build a small KQL to get existing chunk_ids for these doc_ids
doc_ids_list = list(doc_ids_in_batch)
# Guard: if huge list, you could write them to a temp table; usually it's small.
doc_ids_literal = ",".join([f"'{d}'" for d in doc_ids_list]) or "''"

existing_ids_query = f"""
{KUSTO_TABLES}
| where doc_id in ({doc_ids_literal})
| project chunk_id
"""

existing_ids_sdf = (
    spark.read
    .format("com.microsoft.kusto.spark.synapse.datasource")
    .option("kustoCluster", KUSTO_URL)
    .option("kustoDatabase", KUSTO_DATABASES)
    .option("accessToken", accessToken)
    .option("kustoQuery", existing_ids_query)
    .load()
)

# Bring only the necessary column to driver as a set (usually small per doc)
existing_ids = set()
if existing_ids_sdf.columns:
    existing_ids = set([r["chunk_id"] for r in existing_ids_sdf.select("chunk_id").distinct().collect()])

print(f"Found {len(existing_ids)} existing chunks in Eventhouse for these docs.")

# Filter out duplicates locally
if existing_ids:
    df = df[~df["chunk_id"].isin(existing_ids)]

print(f"⚙️ New chunks to embed & write: {len(df)}")

if df.empty:
    print("✅ Nothing new to insert. Skipping embeddings/write.")
else:
    # ─────────────────────────────────────────────────────────
    # 6) Embeddings (only for NEW rows)
    # ─────────────────────────────────────────────────────────
    print("Generating embeddings for new chunks...")

    def _embed_one(text):
        try:
            return generate_embeddings(text)
        except Exception:
            return [0.0]

    df["embedding_list"] = df["content"].apply(_embed_one)
    df["embedding"] = df["embedding_list"].apply(lambda v: json.dumps(v, ensure_ascii=False))
    df.drop(columns=["embedding_list"], inplace=True)

### Cast fields, align schema, and append new embedded chunks to Azure Data Explorer (Kusto) via Spark.


In [None]:
from pyspark.sql.functions import col, to_timestamp

if df.empty:
    print("✅ Nothing new to insert. Skipping Spark write.")
else:
    # build Spark DF
    df_sp = spark.createDataFrame(df) \
        .withColumn("page_no", col("page_no").cast("int")) \
        .withColumn("chunk_no", col("chunk_no").cast("int")) \
        .withColumn("embedding", col("embedding").cast("string"))\
        .withColumn("ingest_time", to_timestamp(col("ingest_time")))

    # IMPORTANT: reorder columns to match Kusto table schema
    ordered_cols = [
        "doc_id",
        "document_name",
        "source_url",
        "page_no",
        "chunk_no",
        "content",
        "embedding",
        "chunk_id",
        "content_type",
        "ingest_time",
        "lang",
    ]
    df_sp = df_sp.select(*ordered_cols)

    # write
    (df_sp.write
        .format("com.microsoft.kusto.spark.synapse.datasource")
        .option("kustoCluster", KUSTO_URL)
        .option("kustoDatabase", KUSTO_DATABASES)
        .option("kustoTable", KUSTO_TABLES)
        .option("accessToken", accessToken)
        .mode("Append")
        .save())
    print(f"✅ Wrote {df_sp.count()} new rows to Eventhouse.")



### Retrieve relevant context from Kusto by vector similarity (cosine) and return surrounding chunks for a given question.


In [None]:
def get_answer_from_eventhousess(question, k=3, before=3, after=20):
    emb = generate_embeddings(question)
    emb_json = json.dumps(emb)

    kql = f"""
let q = dynamic({emb_json});
let hits =
{KUSTO_TABLES}
| extend sim = series_cosine_similarity(q, todynamic(embedding))
| top {k} by sim desc
| project doc_id, hit_chunk = chunk_no;   // rename to avoid chunk_no1

{KUSTO_TABLES}
| join kind=inner (hits) on doc_id
| where chunk_no between (hit_chunk - {before} .. hit_chunk + {after})
| order by doc_id asc, chunk_no asc
| project doc_id, document_name, source_url, page_no, chunk_no, content
"""

    return (
        spark.read
        .format("com.microsoft.kusto.spark.synapse.datasource")
        .option("kustoCluster", KUSTO_URL)
        .option("kustoDatabase", KUSTO_DATABASES)
        .option("accessToken", accessToken)
        .option("kustoQuery", kql)
        .load()
    )

### Generate responses from Azure OpenAI GPT model using chat completions API.


In [None]:
def call_openAI(text):
    response = client_openAI.chat.completions.create(
        model=OPENAI_GPT4_DEPLOYMENT_NAME,
        messages = text,
        temperature=0
    )

    return response.choices[0].message.content

### Retrieve contextual chunks from Kusto, construct a prompt, and generate a final answer using Azure OpenAI GPT model.


In [None]:
k = 3
question = " What is the On-line registration of Application by Candidates for IBPS Common Recruitment Process for Recruitment of Officers (Scale-I, II & III) and Office Assistants (Multipurpose) in Regional Rural Banks (RRBs) ?"

answers_df = get_answer_from_eventhousess(question, k)

# Combine content
answer = " ".join([row['content'] for row in answers_df.rdd.toLocalIterator() if row['content']])

# Create chat prompt
prompt = f"Question: {question}\nInformation: {answer}"
messages = [
    {"role": "system", "content": "You are a helpful assistant. Use only the provided information to answer. If the answer is not in the context, say 'I don't know'."},
    {"role": "user", "content": prompt}
]

result = call_openAI(messages)
print(result)


### Detect language using `langdetect` and translate text to a target language with Azure OpenAI GPT for lightweight translation tasks.


In [None]:
# pip install langdetect (once, outside Fabric if needed)
def detect_lang(text: str) -> str:
    try:
        return detect(text)  # e.g. 'hi' for Hindi, 'en' for English
    except:
        return "en"

def translate(text: str, target_lang: str) -> str:
    """Lightweight GPT translation. For heavy prod traffic, consider Azure Translator API."""
    if not text:
        return ""
    system = "You are a precise translator. Translate the text to the target language only. No explanations."
    user = f"Target language: {target_lang}\nText:\n{text}"
    r = client_openAI.chat.completions.create(
        model=OPENAI_GPT4_DEPLOYMENT_NAME,
        messages=[{"role":"system","content":system},{"role":"user","content":user}],
        temperature=0
    )
    return r.choices[0].message.content.strip()

### Retrieve multilingual answers from Kusto, translate if needed, and generate the final response in the user’s original language using Azure OpenAI GPT.


In [None]:
# Retrieves 2 answers from Eventhouse
nr_of_answers = 1
question = " Dy. Director (Architect) ka qualification kya hai ?"
src_lang = detect_lang(question)        # 'hi' / 'en' / etc.
q_for_retrieval = question if src_lang == "en" else translate(user_q, "English")
answers_df = get_answer_from_eventhousess(question, nr_of_answers)

# Concatenates the answers
answer = ""
for row in answers_df.rdd.toLocalIterator():
    answer = answer + " " + row['content']

# Creates a prompt for GPT4 with the question and the 2 answers
prompt = 'Question: {}'.format(question) + '\n' + 'Information: {}'.format(answer)
# prepare prompt
messages = [{"role": "system", "content": "You are a HELPFUL assistant answering users questions. Answer the question using the provided information and do not add anything else."},
            {"role": "user", "content": prompt}]


result = call_openAI(messages)
final_answer = result if src_lang == "en" else translate(answer_en_or_hi, "Hindi")
print(final_answer)