# Load  Files from Storage Account into CogSearch 
### The files from Storage Account are loaded into CogSearch by following below steps
- Establish a connection with Storage Account using the Python SDK.
- Retrieve the required files from Storage Account container using the file stream download method.
- Use Azure AI Document Intelligence to parse the PDF files from Storage Account.
- Divide the PDF into smaller, manageable page chunks for easier processing.
- Index the parsed chunks into Azure Cognitive Search.
- Repeat the process for all the required files.

#### Using the Azure Storage Pyhon SDK  to fetch the file stream and use PYPDF and Document Intelligence to chunk the page in memory and create a vector index
- Azure Python SDK https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/storage/azure-storage-blob
- Refer to https://github.com/MSUSAzureAccelerators/Azure-Cognitive-Search-Azure-OpenAI-Accelerator/blob/main/04-Complex-Docs.ipynb for loading large documents using PYPDF and Document Intelligence

Install the required Packages and setup the auth

In [1]:
#pip install openai==0.28.1

In [2]:
#pip show openai

In [3]:
#pip install -r requirements.txt

In [1]:
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
import shutil
from PyPDF2 import PdfFileReader, PdfFileWriter,PdfReader 
import os
from dotenv import load_dotenv  
from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.core.credentials import AzureKeyCredential
import html
import langchain
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma, FAISS
from langchain import OpenAI, VectorDBQA
from langchain.chat_models import AzureChatOpenAI
from langchain.chat_models import ChatOpenAI
from azure.storage.blob.aio import BlobClient
from azure.storage.blob import ContainerClient
from tqdm import tqdm
import base64
from azure.storage.blob import BlobClient
import io  

# Configure environment variables  
load_dotenv()

True

In [2]:
# Setup the Payloads header for cog search
headers = {'Content-Type': 'application/json','api-key': os.getenv('AZURE_SEARCH_KEY')}

# Set the ENV variables that Langchain needs to connect to Azure OpenAI
os.environ["OPENAI_API_BASE"] = os.getenv("AZURE_OPENAI_ENDPOINT")
os.environ["OPENAI_API_KEY"] = os.getenv("AZURE_OPENAI_API_KEY")
os.environ["OPENAI_API_VERSION"] = os.getenv("AZURE_OPENAI_API_VERSION")
os.environ["OPENAI_API_TYPE"] = "azure"


In [3]:
embedder = OpenAIEmbeddings(deployment="text-embedding-ada-002", chunk_size=1) 

#### Define function to Parse the PDF using the PYPDF or FormRecognizer ,  stream of file content from Storage Account is source to parse the files

In [4]:
def parse_pdf(filename, form_recognizer=False, formrecognizer_endpoint=None, formrecognizerkey=None, model="prebuilt-document", from_url=False, verbose=False):
    """Parses PDFs using PyPDF or Azure Document Intelligence SDK (former Azure Form Recognizer)"""
    
    page_map=[]
    #setup a blob client with authentication
    blob_client = BlobClient.from_connection_string(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING"), 
                                                    container_name=os.getenv("AZURE_STORAGE_CONTAINER_NAME"), blob_name=filename)
    #get stream 
    stream = blob_client.download_blob()
    file_content_stream = stream.readall()
    offset = 0
    
    if not form_recognizer:
        if verbose: print(f"Extracting text using PyPDF")
        reader = PdfReader(file_content_stream)
        pages = reader.pages
        for page_num, p in enumerate(pages):
            page_text = p.extract_text()
            page_map.append((page_num, offset, page_text))
            offset += len(page_text)
    else:
        if verbose: print(f"Extracting text using Azure Document Intelligence")
        credential = AzureKeyCredential(os.getenv("FORM_RECOGNIZER_KEY"))
        form_recognizer_client = DocumentAnalysisClient(endpoint=os.getenv("FORM_RECOGNIZER_ENDPOINT"), credential=credential)
        
        if not from_url:
            #with open(file, "rb") as filename:
            poller = form_recognizer_client.begin_analyze_document(model, document = file_content_stream)
        #else:
        #    poller = form_recognizer_client.begin_analyze_document_from_url(model, document_url = file)
            
        form_recognizer_results = poller.result()

        for page_num, page in enumerate(form_recognizer_results.pages):
            tables_on_page = [table for table in form_recognizer_results.tables if table.bounding_regions[0].page_number == page_num + 1]

            # mark all positions of the table spans in the page
            page_offset = page.spans[0].offset
            page_length = page.spans[0].length
            table_chars = [-1]*page_length
            for table_id, table in enumerate(tables_on_page):
                for span in table.spans:
                    # replace all table spans with "table_id" in table_chars array
                    for i in range(span.length):
                        idx = span.offset - page_offset + i
                        if idx >=0 and idx < page_length:
                            table_chars[idx] = table_id

            # build page text by replacing charcters in table spans with table html
            page_text = ""
            added_tables = set()
            for idx, table_id in enumerate(table_chars):
                if table_id == -1:
                    page_text += form_recognizer_results.content[page_offset + idx]
                elif not table_id in added_tables:
                    page_text += table_to_html(tables_on_page[table_id])
                    added_tables.add(table_id)

            page_text += " "
            page_map.append((page_num, offset, page_text))
            offset += len(page_text)

    return page_map    

In [5]:
#for tables in pdf convert to html
def table_to_html(table):
    table_html = "<table>"
    rows = [sorted([cell for cell in table.cells if cell.row_index == i], key=lambda cell: cell.column_index) for i in range(table.row_count)]
    for row_cells in rows:
        table_html += "<tr>"
        for cell in row_cells:
            tag = "th" if (cell.kind == "columnHeader" or cell.kind == "rowHeader") else "td"
            cell_spans = ""
            if cell.column_span > 1: cell_spans += f" colSpan={cell.column_span}"
            if cell.row_span > 1: cell_spans += f" rowSpan={cell.row_span}"
            table_html += f"<{tag}{cell_spans}>{html.escape(cell.content)}</{tag}>"
        table_html +="</tr>"
    table_html += "</table>"
    return table_html

In [6]:
def text_to_base64(text):
    # Convert text to bytes using UTF-8 encoding
    bytes_data = text.encode('utf-8')

    # Perform Base64 encoding
    base64_encoded = base64.b64encode(bytes_data)

    # Convert the result back to a UTF-8 string representation
    base64_text = base64_encoded.decode('utf-8')

    return base64_text

In [7]:
#define a list for holding the file details 
files_to_index = []

### List the blobs in your container using the ContainerClient

In [8]:
container = ContainerClient.from_connection_string(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING"), container_name=os.getenv("AZURE_STORAGE_CONTAINER_NAME"))

blob_list = container.list_blobs()
for blob in blob_list:
    files_to_index.append({"file_name": blob.name,"file_url": (os.getenv("AZURE_STORAGE_BASE_URL")+ os.getenv("AZURE_STORAGE_CONTAINER_NAME") + "/" + blob.name)})  

### Get the file content Stream for the blobs async and use the document intelligence to chunk pdf pages 

In [9]:
for item in files_to_index:
    print(item)
    item["page_map"] = parse_pdf(filename=item["file_name"], form_recognizer=True, model="prebuilt-document",from_url=False, verbose=True)
    #item["page_map"] = parse_pdf(filename=item["file_name"], form_recognizer=True, model="prebuilt-layout",from_url=False, verbose=True)
    
    

{'file_name': 'fabric-get-started.pdf', 'file_url': 'https://stgwrkshp.blob.core.windows.net/fabric/fabric-get-started.pdf'}
Extracting text using Azure Document Intelligence


In [13]:
print(files_to_index[0]["page_map"])

[(0, 0, "Tell us about your PDF experience.\nMicrosoft Fabric get started documentation\nMicrosoft Fabric is a unified platform that can meet your organization's data and analytics needs. Discover the Fabric shared and platform documentation from this page.\nAbout Microsoft Fabric\nｅOVERVIEW\nWhat is Fabric?\nFabric terminology\nWhat's New\nｂGET STARTED\nStart a Fabric trial\nFabric home navigation\nEnd-to-end tutorials\nContext sensitive Help pane\nGet started with Fabric items\nｐCONCEPT\nFind items in OneLake data hub\nPromote and certify items\nｃHOW-TO GUIDE\nApply sensitivity labels\nWorkspaces\nｐCONCEPT :selected: "), (1, 598, 'Fabric workspace\nWorkspace roles\nｂGET STARTED\nCreate a workspace\nｃHOW-TO GUIDE\nWorkspace access control :selected: '), (2, 713, "What is Microsoft Fabric?\nArticle • 06/30/2023\nMicrosoft Fabric is an all-in-one analytics solution for enterprises that covers everything from data movement to data science, Real-Time Analytics, and business intelligence. 

In [10]:
str_index_name = "idx_fabric_get"

In [11]:
### Create Azure Search Vector-based Index
# Setup the Payloads header
headers = {'Content-Type': 'application/json','api-key': os.getenv('AZURE_SEARCH_KEY')}
params = {'api-version': os.getenv('AZURE_SEARCH_API_VERSION')}

In [16]:
#from azure.storage.blob import BlobClient

#blob = BlobClient.from_connection_string(conn_str=os.getenv("AZURE_STORAGE_CONNECTION_STRING"), container_name=os.getenv("AZURE_STORAGE_CONTAINER_NAME"), blob_name="004003-Contract-Client-2023Oct18-232541.pdf")
#stream = blob.download_blob()
#data = stream.readall()
#print(data)

Now that we have the content of the box file chunks (each page of each file) in the dictionary page_map, let's create the Vector-based index in our Azure Search Engine where this content is going to land

In [12]:
index_payload = {
    "name": str_index_name,
    "fields": [
        {"name": "id", "type": "Edm.String", "key": "true", "filterable": "true" },
        {"name": "title","type": "Edm.String","searchable": "true","retrievable": "true"},
        {"name": "chunk","type": "Edm.String","searchable": "true","retrievable": "true"},
        {"name": "chunkVector","type": "Collection(Edm.Single)","searchable": "true","retrievable": "true","dimensions": 1536, "vectorSearchProfile": "my-default-vector-profile"},
        {"name": "name", "type": "Edm.String", "searchable": "true", "retrievable": "true", "sortable": "false", "filterable": "false", "facetable": "false"},
        {"name": "location", "type": "Edm.String", "searchable": "false", "retrievable": "true", "sortable": "false", "filterable": "false", "facetable": "false"},
        {"name": "page_num","type": "Edm.Int32","searchable": "false","retrievable": "true"},
        
    ],
     "vectorSearch": {
        "algorithms": [
            {
                "name": "my-hnsw-config-1",
                "kind": "hnsw",
                "hnswParameters": {
                    "m": 4,
                    "efConstruction": 400,
                    "efSearch": 500,
                    "metric": "cosine"
                }
            }
        ],
        "profiles": [
            {
                "name": "my-default-vector-profile",
                "algorithm": "my-hnsw-config-1"
            }
        ]
    },
    "semantic": {
        "configurations": [
            {
                "name": "my-semantic-config",
                "prioritizedFields": {
                    "titleField": {
                        "fieldName": "title"
                    },
                    "prioritizedContentFields": [
                        {
                            "fieldName": "chunk"
                        }
                    ],
                    "prioritizedKeywordsFields": []
                }
            }
        ]
    }
}

r = requests.put(os.getenv('AZURE_SEARCH_ENDPOINT') + "/indexes/" + str_index_name,
                 data=json.dumps(index_payload), headers=headers, params=params)
print(r.status_code)
print(r.ok)

201
True


In [13]:
#uncomment in case of errors
#r.text

'{"@odata.context":"https://cog-search-kuk2gm4c5zjju.search.windows.net/$metadata#indexes/$entity","@odata.etag":"\\"0x8DBE6F805A6B63F\\"","name":"idx_fabric_get","defaultScoringProfile":null,"fields":[{"name":"id","type":"Edm.String","searchable":true,"filterable":true,"retrievable":true,"sortable":true,"facetable":true,"key":true,"indexAnalyzer":null,"searchAnalyzer":null,"analyzer":null,"normalizer":null,"dimensions":null,"vectorSearchProfile":null,"synonymMaps":[]},{"name":"title","type":"Edm.String","searchable":true,"filterable":true,"retrievable":true,"sortable":true,"facetable":true,"key":false,"indexAnalyzer":null,"searchAnalyzer":null,"analyzer":null,"normalizer":null,"dimensions":null,"vectorSearchProfile":null,"synonymMaps":[]},{"name":"chunk","type":"Edm.String","searchable":true,"filterable":true,"retrievable":true,"sortable":true,"facetable":true,"key":false,"indexAnalyzer":null,"searchAnalyzer":null,"analyzer":null,"normalizer":null,"dimensions":null,"vectorSearchProfil

Upload the Document chunks and its vectors to the Vector-Based Index
The following code will iterate over each chunk of each book and use the Azure Search Rest API upload method to insert each document with its corresponding vector (using OpenAI embedding model) to the index.

In [14]:
%%time
for item in files_to_index:
    print("Uploading chunks from",item["file_name"])
    for page in tqdm(item['page_map']):
        try:
            page_num = page[0] + 1
            content = page[2]
            book_url = item["file_url"]
            idx_id = text_to_base64(item["file_name"] + str(page_num))
            title = f"{item['file_name']}_page_{str(page_num)}"  
            upload_payload = {
                "value": [
                    {
                        "id": idx_id,
                        "title": title,
                        "chunk": content,
                        "chunkVector": embedder.embed_query(content if content!="" else "-------"),
                        "name": item["file_name"],
                        "location": item["file_url"],
                        "page_num": page_num,
                        "@search.action": "upload"
                    },
                ]
            }

            r = requests.post(os.environ['AZURE_SEARCH_ENDPOINT'] + "/indexes/" + str_index_name + "/docs/index",
                                 data=json.dumps(upload_payload), headers=headers, params=params)
            if r.status_code != 200:
                print(r.status_code)
                print(r.text)
        except Exception as e:
            print("Exception:",e)
            print(content)

Uploading chunks from fabric-get-started.pdf


  0%|          | 0/178 [00:00<?, ?it/s]

100%|██████████| 178/178 [02:12<00:00,  1.35it/s]

CPU times: total: 3.31 s
Wall time: 2min 12s



