**This script will take a source directory in Azure blob storage and then download and chunk up the documents in it based on the chunking parameters you provide. It will then create embeddings for the chunks, then upload a resulting JSON file for each chunk that contains the chunked content and associated embeddings back to a destination directory in Azure Blob storage. It will also create an Azure Cognitive Search index, indexer, and data source you can use to easily pull in the data to Azure Cognitive Search. Once the entire script has run you should have the chunked data loaded into the new Azure Cognitive Search index. Make sure to have Azure Cognitive Search Semantic Search turned on. Also make sure to give the Forms Recognizer service managed identity Storage Blob Data Reader access to the blob store.**

In [None]:
pip install pypdf azure-ai-formrecognizer azure-identity langchain azure-storage-blob python-dotenv unstructured openai azure-search-documents tiktoken

In [None]:
pip install azure-storage-file-datalake --pre

**Update the fields below with your Azure services information and keys:**

In [1]:
import os, uuid
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.core.credentials import AzureKeyCredential
from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.storage.filedatalake import DataLakeServiceClient
from azure.identity import DefaultAzureCredential
from azure.identity import ClientSecretCredential
from langchain.text_splitter import RecursiveCharacterTextSplitter
import openai
import json
import requests
import random
import string
import time

#OpenAI connectivity 
openai.api_type = "azure"
openai.api_key = ""
openai.api_base = "https://<your instance>.openai.azure.com/"
openai.api_version = "2022-12-01"
openai_embeddings_model = "embeddings-ada-002" #name of your Ada embeddings deployment, update with correct name

#blob access variables

connect_str = "DefaultEndpointsProtocol=https;AccountName=;AccountKey=" #Get from your Azure storage config
blob_account_url = "https://<your storage account>.blob.core.windows.net" 
source_container_name = "" #Location of documents to use for the source
source_path_name = "/"

# Create a unique name for the container for the chunked documents - make sure the container exists in Azure storage
destination_container_name = ""


#Name of search index, indexer, and data source you want to create in Azure Cog Search. 
index_name = ""
indexer_name = ""
data_source_name = ""

#Cognitive Search Connection
search_endpoint = "https://<your cognitive search>.search.windows.net"
search_endpoint_for_creating_index = "https://<your cognitive search>.search.windows.net/indexes?api-version=2023-07-01-Preview"
search_endpoint_for_creating_indexer = "https://<your cognitive search>.search.windows.net/indexers?api-version=2023-07-01-Preview"
search_endpoint_for_creating_datasource = "https://<your cognitive search>.search.windows.net/datasources?api-version=2023-07-01-Preview"
search_api_key ="" #Cog search admin key

#Forms recognizer details
forms_recognizer_url = "https://<your form recognizer>.cognitiveservices.azure.com"
forms_recognizer_preview_url = "https://<your form recognizer>.cognitiveservices.azure.com/formrecognizer/documentModels/prebuilt-read:analyze?api-version=2022-06-30-preview"
forms_recognizer_key = ""

#Chunk parameters in tokens
chunk_size = 1000
chunk_overlap = 200

StatementMeta(, f46af4ee-91d4-4dd3-bedc-675b27da77a9, 5, Finished, Available)

**This section will create the index in Azure Cog Search that will store the data after the indexer is run.**

In [None]:
## This creates the JSON that will create the search index with vector search enabled.

search_json = {
    "name": f"{index_name}",
    "fields": [
        {
            "name": "key",
            "type": "Edm.String",
            "searchable": False,
            "retrievable": True,
            "key": True,
            "filterable": False,
            "facetable": False,
            "sortable": False
        },
        {
            "name": "title",
            "type": "Edm.String",
            "searchable": True,
            "retrievable": True,
            "key": False,
            "filterable": False,
            "facetable": False,
            "sortable": False
        },
        {
            "name": "content",
            "type": "Edm.String",
            "searchable": True,
            "retrievable": True,
            "key": False,
            "filterable": False,
            "facetable": False,
            "sortable": False
        },
        {
            "name": "path",
            "type": "Edm.String",
            "searchable": True,
            "retrievable": True,
            "key": False,
            "filterable": False,
            "facetable": False,
            "sortable": False
        },        
        {
            "name": "titleVector",
            "type": "Collection(Edm.Single)",
            "searchable": True,
            "retrievable": False,
            "dimensions": 1536,
            "vectorSearchConfiguration": "my-vector-config"
        },
        {
            "name": "contentVector",
            "type": "Collection(Edm.Single)",
            "searchable": True,
            "retrievable": False,
            "dimensions": 1536,
            "vectorSearchConfiguration": "my-vector-config"
        }
    ],
    "corsOptions": {
        "allowedOrigins": [
            "*"
        ],
        "maxAgeInSeconds": 60
    },
    "vectorSearch": {
        "algorithmConfigurations": [
            {
                "name": "my-vector-config",
                "kind": "hnsw",
                "hnswParameters": {
                    "m": 4,
                    "efConstruction": 400,
                    "metric": "cosine"
                }
            }
        ]
    },
    "semantic": {
        "configurations": [
            {
                "name": "my-semantic-config",
                "prioritizedFields": {
                    "titleField": {
                        "fieldName": "title"
                    },
                    "prioritizedContentFields": [
                        {
                            "fieldName": "content"
                        }
                    ],
                    "prioritizedKeywordsFields": [
                        {
                            "fieldName": "content"
                        }
                    ]
                }
            }
        ]
    }
}

#This section makes the REST call to create the index

url = f'{search_endpoint_for_creating_index }'
print("URL is " + url)
headers = {'Content-Type': 'application/json', 'api-key': search_api_key}
print("Search api key is" +search_api_key)
print("Search JSON is " + str(search_json))
print("About to make the rest call")
#print("data is" + str(search_json))
response = requests.post(search_endpoint_for_creating_index , headers=headers, data=json.dumps(search_json))

if response.status_code == 200 or response.status_code == 201:
    data = response.json()
    print(data)
else:
    print('Error:', response.status_code)       

print("rest call completed")   

**This section will create the data source in Azure Cog Search that the indexer will use to load the data. It references the location of the chunked data above**

In [None]:
data_source_request_json = {   
    "name" : data_source_name,    
    "type" : "azureblob",
    "credentials" : { "connectionString" :connect_str },
    "container": {
        "name": destination_container_name
    }
}  

url = f'{search_endpoint_for_creating_index }'
print("URL is " + url)
headers = {'Content-Type': 'application/json', 'api-key': search_api_key}
#print("Search api key is" +search_api_key)
print("About to make the REST call")
print("Request JSON is " +json.dumps(request_json))
#print("data is" + str(search_json))
response = requests.post(search_endpoint_for_creating_datasource , headers=headers, data=json.dumps(data_source_request_json))

data = response.json()
print(data)

if response.status_code == 200 or response.status_code == 201:
    print("Create data source call completed successfully") 
else:
    print('Error:', response.status_code)       



**This section below will read the data from the source storage account, chunk it, create embeddings, then upload a JSON file that can be ingested into Azure Cognitive Search when the indexer is run. **


In [None]:

# This function below is for loading the chunks back to Azure storage, called at the end of this section
def upload_blob_data(blob_service_client: BlobServiceClient, destination_container_name: str, blob_name):
    blob_client = blob_service_client.get_blob_client(container=destination_container_name, blob=blob_name)
    data = json_string
    # Upload the blob data - default blob type is BlockBlob
    blob_client.upload_blob(data, blob_type="BlockBlob")

def list_blobs_flat(blob_service_client: BlobServiceClient, source_container_name):
    container_client = blob_service_client.get_container_client(container=source_container_name)
    blob_list = container_client.list_blobs()
    #for blob in blob_list:
    #    print(f"{blob.name}")
    return blob_list

#This function generates a random string we can use for the Azure Search key field
def generate_random_string(length):
    characters = string.ascii_letters + string.digits
    random_string = ''.join(random.choice(characters) for _ in range(length))
    return random_string


# Function to generate embeddings for content
def generate_embeddings(text):
    response = openai.Embedding.create(
    input=text, engine=f"{openai_embeddings_model}") #engine = deployment name of your ada-0002 model
    embeddings = response['data'][0]['embedding']
    return embeddings

blob_service_client = BlobServiceClient.from_connection_string(connect_str)
blob_listing = list_blobs_flat(blob_service_client, source_container_name)
#The section below will loop through each of the documents paths, parse the files using forms recognizer, generate the embeddings, then upload each chunk to Azure storage
count = 0
for blob in blob_listing:
    print(blob.name + '\n')

    ##This section uses Forms Recognizer to read a file in blob store. 
    endpoint = forms_recognizer_url
    key = forms_recognizer_key
    formUrl =  f"{blob_account_url}/{source_container_name}/{blob.name}"
    print(formUrl)

    document_analysis_client = DocumentAnalysisClient(
        endpoint=endpoint, credential=AzureKeyCredential(key)
        )

    poller = document_analysis_client.begin_analyze_document_from_url(
        "prebuilt-read", formUrl
    )
    result = poller.result()

    #print("Document contains content: ", result.content)
    content = result.content

    #This will chunk up the document
    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=chunk_size, chunk_overlap=chunk_overlap
    )
    texts = text_splitter.split_text(content)


    #print(texts[0])

    x = len(texts)
    #print(x)

    #This takes the url of the document and stores it with no spaces
    formUrl = formUrl.replace(" ", "%20")
    print("Looping through each of the chunks and writing them back to blob storage in a JSON format")
    for item in texts:
        count = count + 1
        ##print(item)
        item = str(item)
        content_embeddings = generate_embeddings(item)
        time.sleep(10) #To stay under quota
        title_embeddings = generate_embeddings(blob.name)

        # Generate a random string of length 15 for the Azure Search key
        random_str = generate_random_string(15)

        #item = str(item) //use this for just straight chunking without using a json format
        json_data = {
            "key": f"{random_str}",
            "title": f"{blob.name}",
            "content": f"{item}",
            "path": f"{formUrl}",
            "titleVector": f"{title_embeddings}",
            "contentVector": f"{content_embeddings}"
        }

        json_string = json.dumps(json_data)
        #print(json_string)

        # Create the BlobServiceClient object and upload the documents
        blob_service_client = BlobServiceClient.from_connection_string(connect_str)
        blob_name = blob.name + "_"  +str(count)+".json"
        upload_client = upload_blob_data (blob_service_client, destination_container_name, blob_name )

**This section will create the Azure Cognitive Search indexer. It will automatically run and load the data to the index.**

In [None]:
# This code creates an indexer that will load the data to the index above
request_json = {
    "name": indexer_name,
    "dataSourceName": data_source_name,
    "targetIndexName": index_name,
    "parameters": {
        "configuration": {
        "allowSkillsetToReadFileData": False,
        "parsingMode": "json"
        }
    },
    "fieldMappings": [],
    "outputFieldMappings": [],
    }


url = f'{search_endpoint_for_creating_index }'
print("URL is " + url)
headers = {'Content-Type': 'application/json', 'api-key': search_api_key}
print("Search api key is" +search_api_key)
print("About to make the rest call")
print("request json is " +json.dumps(request_json))
#print("data is" + str(search_json))
response = requests.post(search_endpoint_for_creating_indexer , headers=headers, data=json.dumps(request_json))

data = response.json()
print(data)

if response.status_code == 200 or response.status_code == 201:
    print("Create indexer call completed successfully") 
else:
    print('Error:', response.status_code)       

print("Indexer call completed") 