#### Installing the Relevant Libraries

In [0]:
%pip install langchain langchain-community openai tiktoken chromadb docx2txt

In [0]:
%pip install pytesseract pillow pdf2image

In [0]:
%pip install python-docx

In [0]:
%pip install gradio

#### OpenAI Setup

In [0]:
OPENAI_API_KEY = ""

#### Load Word Documents with Text and Image

In [0]:
import os
from langchain_community.document_loaders import Docx2txtLoader
from PIL import Image
import pytesseract
import docx
import io

doc_dir = "/Volumes/workspace/default/ai_assistant_rag_documents"

documents = []
for filename in os.listdir(doc_dir):
    if filename.endswith(".docx"):
        file_path = os.path.join(doc_dir, filename)
        # Load text from the Word Document
        loader = Docx2txtLoader(file_path)
        documents.extend(loader.load())

        # Using OCR to extract text from images in the Word Document
        doc = docx.Document(file_path)
        for i,rel in enumerate(doc.part.rels.values()):
            if "image" in rel.target_ref:
                image_data = rel.target_part.blob
                image = Image.open(io.BytesIO(image_data))
                text = pytesseract.image_to_string(image)
                if text.strip():
                        documents.append(
                            Document(
                                page_content=text,
                                metadata={"source": file_path, "page": f"image_{i}"}
                                    )
                                )

print(len(documents), "documents loaded")
print(documents[0].page_content[:100])

68 documents loaded
Extract, Transform & Load

Step 1 – Extract the data (All the structured, semi-structured & unstruct


#### Split Into Chunks

In [0]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
chunks = splitter.split_documents(documents)

print(len(chunks), "chunks of document created")
print(chunks[0])

256 chunks of document created
page_content='Extract, Transform & Load

Step 1 – Extract the data (All the structured, semi-structured & unstructured )

Step 2 – Transform the data (Take the data values that are required, clean the data, convert data into desired format)

Step 3 – Load the data (Can be either ADLS Gen 2 Account or can be SQL Warehouse like Synapse)



So, the ETL Process includes:

Extraction -> Staging Area -> Transform -> Load -> Data Warehouse -> Analytics



Data Warehouse Architecture

Data Source -> Data Staging (ETL) -> Data Storage (Data Warehouse & Data Marts)  -> Data Presentation (Data Analytics, Reporting & Data Mining)



Azure Data Factory (ADF)

This is a cloud based ETL and data integration service.

You can create data-driven workflows that can be used for orchestrating data movement.

You can also transform data at scale.

You can connect to a variety of data sources as the source and destination.

ADLS Gen2 (Source) ------------ Data Factory --------

#### Create Vector Store and Vector Embeddings

In [0]:
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

per_dir = "/Workspace/Users/jaggiakshat@gmail.com/rag_chromastoreV2"

embeddings = OpenAIEmbeddings(model = "text-embedding-ada-002", openai_api_key = OPENAI_API_KEY)
vector_store = Chroma.from_documents(chunks, embeddings, persist_directory=per_dir)

You can re-use the Chroma Vector Store stored in the persistent directory like below:
<br> </br>

```python
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

persist_path = "/Workspace/rag_chromastore"

embeddings = OpenAIEmbeddings(
    model="text-embedding-ada-002",
    openai_api_key=OPENAI_API_KEY
)

# Reload persisted vector store
vector_store = Chroma(
    persist_directory=persist_path,
    embedding_function=embeddings
)

print("Reloaded vector store with", vector_store._collection.count(), "documents")
```

#### Build RAG Retriever + Chain 

In [0]:
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI

llm = ChatOpenAI(model = "gpt-4o-mini", temperature = 0, openai_api_key = OPENAI_API_KEY)

retreiver = vector_store.as_retriever(search_type = "similarity", search_kwargs = {"k": 5}) 
# k here means the number of documents to return from the vector store after a similarity search

qa_chain = RetrievalQA.from_chain_type(llm=llm, 
                                       chain_type="stuff", # Concatenates the retrieved 5 chunks with user query
                                       retriever=retreiver, 
                                       return_source_documents=True
                                       )

#### Asking Questions 

In [0]:
query = "Can you please explain to me what is ETL Process?"
result = qa_chain({"query":query})

print("Answer: ", result["result"])
print("Source Document Names:")
for doc in result["source_documents"]:
    print(doc.metadata.get("source", "Unknown"))

Answer:  The ETL Process stands for Extract, Transform, and Load. It consists of three main steps:

1. **Extract**: This step involves extracting data from various sources, which can include structured, semi-structured, and unstructured data.

2. **Transform**: In this step, the extracted data is transformed to meet the desired format. This includes cleaning the data, taking only the required data values, and converting the data into a suitable format for analysis.

3. **Load**: Finally, the transformed data is loaded into a destination, which can be an Azure Data Lake Storage (ADLS) Gen 2 account or a SQL Warehouse like Azure Synapse Analytics.

The overall ETL process can be visualized as follows: Extraction -> Staging Area -> Transform -> Load -> Data Warehouse -> Analytics.
Source Document Names:
/Volumes/workspace/default/ai_assistant_rag_documents/Azure Data Factory (ADF) Notes.docx
/Volumes/workspace/default/ai_assistant_rag_documents/Azure Synapse Notes.docx
/Volumes/workspace/

In [0]:
query2 = "What is Azure Synapse Analytics? Also, What is the difference between Synapse and Databricks?"
result2 = qa_chain({"query":query2})

print("Answer: ", result2["result"])
print("Source Document Names:")
for doc in result2["source_documents"]:
    print(doc.metadata.get("source", "Unknown"))

Answer:  Azure Synapse Analytics is a unified analytics platform that allows you to analyze large amounts of data using various services, including both SQL-based and Apache Spark-based pools. It integrates data from different sources and provides the necessary infrastructure for data warehousing and big data analytics. Key components of Azure Synapse include Synapse SQL for hosting SQL Data Warehouses, Apache Spark for big data processing, and data integration features similar to Azure Data Factory.

As for the difference between Azure Synapse and Databricks, I don't know.
Source Document Names:
/Volumes/workspace/default/ai_assistant_rag_documents/Azure Synapse Notes.docx
/Volumes/workspace/default/ai_assistant_rag_documents/Azure Synapse Notes.docx
/Volumes/workspace/default/ai_assistant_rag_documents/Azure Synapse Notes.docx
/Volumes/workspace/default/ai_assistant_rag_documents/Azure Synapse Notes.docx
/Volumes/workspace/default/ai_assistant_rag_documents/Azure Synapse Notes.docx


In [0]:
query3 = "What are the problems solved by ACID?"
result3 = qa_chain({"query":query3})

print("Answer: ", result3["result"])
print("Source Document Names:")
for doc in result3["source_documents"]:
    print(doc.metadata.get("source", "Unknown"))

Answer:  ACID solves the following problems:

1. Streamlined Data Append: It simplifies and improves data appending, making it efficient even with concurrent writes.

2. Simplified Data Modification: It ensures data modification is straightforward while maintaining data consistency.

3. Data Integrity Through Job Failures: It prevents data inconsistencies due to job failures, thereby maintaining data integrity.

4. Support for Real-time Operations: It serves as a robust data source and sink for real-time and streaming operations.

5. Efficient Historical Data Version Management: It offers time travel for accessing historical data versions, ensuring cost-effectiveness based on specific use cases and alternatives. 

6. Guarantees Data Consistency: It ensures data consistency and quality, avoiding issues with concurrent writes. 

7. Supports Upserts (MERGE): It allows for efficient updates through upserts or merges.
Source Document Names:
/Volumes/workspace/default/ai_assistant_rag_docume

Let's try to create an Enhancement where when the result doesn't have any fetched documents or it has the words "I don't know", it will call the general model, in our case "gpt-4o-mini"

#### Fallback to Open Domain LLM

In [0]:
def smart_qa(query):
    # Try RAG first
    result = qa_chain({"query": query})
    
    if "I don't know" in result["result"] or not result["source_documents"]:
        # Fallback to general LLM knowledge
        print("Not found in docs. Using general LLM knowledge.")
        return llm.predict(f"Answer this using general knowledge: {query}")
    
    return result["result"]

In [0]:
smart_qa("What is Azure Synapse Analytics? Also, What is the difference between Synapse and Databricks?")

Not found in docs. Using general LLM knowledge.


'Azure Synapse Analytics is a cloud-based integrated analytics service provided by Microsoft Azure. It combines big data and data warehousing capabilities, allowing organizations to analyze large volumes of data from various sources. Synapse enables users to ingest, prepare, manage, and serve data for business intelligence and analytics. It provides a unified experience for data integration, data warehousing, and big data analytics, allowing users to run complex queries and perform analytics using both SQL and Spark.\n\nKey features of Azure Synapse Analytics include:\n\n1. **Data Integration**: It allows users to connect to various data sources, both on-premises and in the cloud, and integrate them into a single analytics platform.\n2. **Data Warehousing**: Synapse provides a powerful SQL-based data warehousing solution that can handle large datasets and complex queries.\n3. **Big Data Analytics**: It supports big data processing using Apache Spark, enabling users to analyze unstructu

In [0]:
smart_qa("Who is the Prime Minister of India?")

Not found in docs. Using general LLM knowledge.


'As of my last knowledge update in October 2023, the Prime Minister of India is Narendra Modi. He has been in office since May 26, 2014. Please verify with up-to-date sources, as political positions can change.'

#### Gradio Mini Chatbot App (Not Possible to run on Databricks Free Edition)

Could Not run Gradio Mini App in Databricks Free Edition

In [0]:
# def answer_question(user_input):
#     if not user_input.strip():
#         return "Please enter a question."
    
#     result = qa_chain({"query": user_input})
#     answer = result['result']
    
#     # Optional: show source documents metadata
#     sources = []
#     for doc in result['source_documents']:
#         sources.append(doc.metadata.get("source", "Unknown"))
#     sources_text = "\n".join(sources)
    
#     return f"Answer:\n{answer}\n\nSources:\n{sources_text}"

In [0]:
# import gradio as gr
# # --- Build Gradio Interface ---
# iface = gr.Interface(
#     fn=answer_question,
#     inputs=gr.Textbox(lines=2, placeholder="Ask me anything about the docs..."),
#     outputs=gr.Textbox(label="Answer"),
#     title="AI-Powered Document Helper",
#     description="Ask questions about your Word documents (text + images). Uses RAG with fallback to OpenAI."
# )

# # --- Launch ---
# iface.launch(share=True, server_name="0.0.0.0")

#### Interactive QnA Chat within Notebook

In [0]:

while True:
    question = input("Ask a question: ")
    if question.lower() in ["exit", "quit"]:
        break
    result = smart_qa(question) #qa_chain({"query": question})
    print(result)
    # print("Sources:", [d.metadata['source'] for d in result['source_documents']])

Ask a question:  How can we ingest data in DLT in Databricks?

Not found in docs. Using general LLM knowledge.
In Databricks, you can ingest data into Delta Lake (DLT) using several methods. Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads. Here are some common ways to ingest data into Delta Lake in Databricks:

1. **Batch Ingestion**:
   - **Spark DataFrames**: You can read data from various sources (like CSV, JSON, Parquet, etc.) into a Spark DataFrame and then write it to a Delta table using the `write` method.
     ```python
     df = spark.read.format("csv").option("header", "true").load("path/to/csv")
     df.write.format("delta").mode("overwrite").save("/delta/table/path")
     ```

2. **Streaming Ingestion**:
   - **Structured Streaming**: You can set up a streaming DataFrame that reads from a source (like Kafka, socket, etc.) and writes to a Delta table.
     ```python
     streamingDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "server:port").option("

Ask a question:  What are the problems that are solved by ACID?

The problems solved by ACID include:

1. Streamlined Data Append: ACID ensures efficient data appending, even with concurrent writes.
2. Simplified Data Modification: It simplifies data modification while ensuring data consistency.
3. Data Integrity Through Job Failures: ACID prevents data inconsistencies due to job failures, maintaining data integrity.
4. Support for Real-time Operations: ACID provides a robust framework for real-time and streaming operations.
5. Efficient Historical Data Version Management: ACID supports time travel for accessing historical data versions, ensuring cost-effectiveness based on specific use cases.


Ask a question:  quit