In [1]:
from dotenv import load_dotenv
import os
load_dotenv()

True

In [2]:
from azure.identity.aio import DefaultAzureCredential, get_bearer_token_provider
from azure.search.documents import SearchClient
from azure.search.documents.models import QueryType, VectorizedQuery,QueryAnswerType, QueryCaptionType
from openai import AzureOpenAI
from backend.auth.auth_utils import get_authenticated_user_details
from backend.security.ms_defender_utils import get_msdefender_user_json
import re
import base64
import json
import openai
from pydantic import BaseModel, Field
from typing import List, Dict, Optional

service_endpoint = os.getenv("AZURE_SEARCH_SERVICE_ENDPOINT")
wiki_index = os.getenv("AZURE_SEARCH_INDEX_WIKI")
video_index = os.getenv("AZURE_SEARCH_INDEX_VIDEO")
api_version = os.environ.get("AZURE_OPENAI_PREVIEW_API_VERSION")
aoai_api_key = os.environ.get("AZURE_OPENAI_KEY")
embedding_model = os.environ.get("AZURE_OPENAI_EMBEDDING_NAME")
chat_model = os.environ.get("AZURE_OPENAI_MODEL")
credential = DefaultAzureCredential()

ad_token_provider = get_bearer_token_provider(credential,"https://cognitiveservices.azure.com/.default")
azure_endpoint = (
            os.environ.get("AZURE_OPENAI_ENDPOINT")
            if os.environ.get("AZURE_OPENAI_ENDPOINT")
            else f"https://{os.environ.get('AZURE_OPENAI_RESOURCE')}.openai.azure.com/")
default_headers = {"x-ms-useragent": "GitHubSampleWebApp/AsyncAzureOpenAI/1.0.0"}

wiki_search_client = SearchClient(service_endpoint, wiki_index, credential)
video_search_client = SearchClient(service_endpoint, video_index, credential)
authenticated_user_details = get_authenticated_user_details({})
conversation_id = None     
user_json = get_msdefender_user_json(authenticated_user_details, {}, conversation_id)

openai_client = AzureOpenAI(
  api_version = api_version, 
  api_key  = aoai_api_key,
  azure_ad_token_provider = ad_token_provider,
  azure_endpoint = azure_endpoint,
  default_headers = default_headers,)

In [3]:
from azure.core.credentials import AzureKeyCredential
from azure.search.documents.aio import SearchClient
from azure.identity.aio import DefaultAzureCredential
from typing import List, Any



search_client = SearchClient(service_endpoint, wiki_index, DefaultAzureCredential())


async def search_for_text(query: str) -> List[Any]:
    results_list = []
    
    async with search_client:
        results = await search_client.search(search_text=query)
        
        async for result in results:
            results_list.append(result)
    
    return results_list


In [4]:
def generate_embeddings(query, model):
    embeddings_response = openai_client.embeddings.create(model=model, input=query)
    embedding = embeddings_response.data[0].embedding
    return embedding

def convert_timestamp_to_seconds(timestamp_str):
    match = re.search(r'(\d{2}):(\d{2}):(\d{2})', timestamp_str)
    if match:
        hours, minutes, seconds = map(int, match.groups())
        total_seconds = hours * 3600 + minutes * 60 + seconds
        return total_seconds
    else:
        print("No valid timestamp found in the string")
        return 0

def extract_integer(value):
    if isinstance(value, int):
        return value
    
    if isinstance(value, str):
        match = re.search(r'\d+', value)
        if match:
            return int(match.group())
    
    raise ValueError("Input must be an integer or a string representing a single integer.")


def is_video_link(url):
    video_extensions = ['mp4', 'mkv', 'avi', 'mov', 'wmv', 'flv', 'webm']
    extension = url.split('.')[-1]
    return extension in video_extensions



def generate_base64_encoded_string(start_time_in_seconds):
    data = {
        "referralInfo": {
            "referralApp": "StreamWebApp",
            "referralView": "ShareDialog-Link",
            "referralAppPlatform": "Web",
            "referralMode": "view"
        },
        "playbackOptions": {
            "startTimeInSeconds": start_time_in_seconds
        }
    }
    
    json_string = json.dumps(data)
    
    base64_encoded = base64.b64encode(json_string.encode('utf-8')).decode('utf-8')
    
    return "&nav="+base64_encoded

def clean_url(url):
    clean_url = re.sub(r'([?&]nav=).*', '', url)
    if clean_url[-1] == '?' or clean_url[-1] == '&':
        clean_url[:-1]
    return clean_url

def remove_duplicates(lst):
    seen = set()
    unique_lst = []
    
    for item in lst:
        item_tuple = tuple(item)
        if item_tuple not in seen:
            seen.add(item_tuple)
            unique_lst.append(item)
    
    return unique_lst


query = "how to get cad into your workspace"
# query = "Registering a PDM Link Server"
"How to generate a Microsoft part number"
"How to generate a Microsoft part number(mspn)"
"How to share files from windchill PDMlink to onePDM"
"How to do a cad push"
vector_query = VectorizedQuery(vector= generate_embeddings(query, embedding_model,),
                               k_nearest_neighbors=3, fields="text_vector", )

results_wiki = list(wiki_search_client.search(
    search_text=query,
    vector_queries=[vector_query],
    select=["chunk"],
    query_type=QueryType.SEMANTIC,
    semantic_configuration_name="semantic",
    query_caption=QueryCaptionType.EXTRACTIVE,
    query_answer=QueryAnswerType.EXTRACTIVE,
    top=3,
))

results_video = list(video_search_client.search(
    search_text=query,
    vector_queries=[vector_query],
    select=["title", "chunk", "url_metadata"],
    query_type=QueryType.SEMANTIC,
    semantic_configuration_name="semantic",
    query_caption=QueryCaptionType.EXTRACTIVE,
    query_answer=QueryAnswerType.EXTRACTIVE,
    top=3,
))

results = results_wiki + results_video
filter_result = []
for d in results:
    if d in results_wiki:
        d['container'] = 'wiki'
    elif d in results_video:
        d['container'] = 'video'

sorted_data = sorted(results_wiki + results_video, key=lambda x: x["@search.reranker_score"], reverse=True)

selected_chunks = sorted_data[:3]

context_str = f"""
**documents: "1"**

{selected_chunks[0]['chunk']}


**documents: "2"**

{selected_chunks[1]['chunk']}

**documents: "3"**

{selected_chunks[2]['chunk']}
"""

RAG_SYSTEM_PROMPT = f"""\
Context information is below.
---------------------
{context_str}
---------------------
INSTRUCTIONS:
1. You are an assistant who helps users answer their queries.
2. Answer the user's question from the above Context. The Context is provided in the form of multiple documents, each identified by a document number. If a document is a transcript, it also includes timestamps in the format HH:MM on each line above the text.
3. Give answer in step by step format.
4. Keep your answer solely on the information given in the Context above.
5. Once the answer is completed, then Always provide all relevant citations at the end of the whole answer, ensuring that each citation includes the corresponding timestamp and document number used to generate the response. Provide the citation in the following format only at the end of the whole answer neither in between the answer nor end of line.
    - For transcript, use: [timestamp, documents number]. for example [["00:11:00", "1"], ["00:1:44", "2"]]
    - For non transcript, use: ["", documents number]. for example [["", "3"],["", "1"], ["", "2"]]
7. Do not create or derive your own answer. If the answer is not directly available in the context, just reply stating, 'There is no answer available'
"""




ValueError: Expected `azure_ad_token_provider` argument to return a string but it returned <coroutine object get_bearer_token_provider.<locals>.wrapper at 0x000001F202FACBA0>

In [39]:
def format_stream_response_old(chatCompletionChunk, history_metadata, apim_request_id):
    response_obj = {
        "id": chatCompletionChunk.id,
        "model": chatCompletionChunk.model,
        "created": chatCompletionChunk.created,
        "object": chatCompletionChunk.object,
        "choices": [{"messages": []}],
        "history_metadata": history_metadata,
        "apim-request-id": apim_request_id,
        "citation": ""
    }

    if len(chatCompletionChunk.choices) > 0:
        delta = chatCompletionChunk.choices[0].delta
        if delta:
            if delta.content:
                messageObj = {
                        "role": "assistant",
                        "content": delta.content,
                    }
                response_obj["choices"][0]["messages"].append(messageObj)
                return response_obj
            if delta.tool_calls:
                if delta.tool_calls[0].function.arguments:
                    tool_obj = delta.tool_calls[0].function.arguments
                    if delta.tool_calls[0].index == 0:
                        messageObj = {"role": "assistant","content": tool_obj}
                        response_obj["choices"][0]["messages"].append(messageObj)
                    if delta.tool_calls[0].index == 1:
                        response_obj['citation'] = response_obj['citation'] + tool_obj
                return response_obj

    return {}

In [40]:
# async def stream_chat_request(request_body, request_headers):
#     response, apim_request_id = await send_chat_request(request_body, request_headers)
#     history_metadata = request_body.get("history_metadata", {})
#     citation_accumulator = {'citation': ''}

#     async def generate():
#         async for completionChunk in response:
#             yield format_stream_response(completionChunk, history_metadata, apim_request_id, citation_accumulator)

#     return generate(), citation_accumulator['citation']


In [41]:
messages=[{"role": "system", "content": RAG_SYSTEM_PROMPT},  
                    {"role": "user", "content": query}]

class Asnwer(BaseModel):
    """ 
    Asnwer
    """
    answer: str = Field(description="Only include Answer, do not include any citations in this, in case of no answer this will be ''")

class Cittion(BaseModel):
    """ 
    Citations
    """
    citation: List[List[str]] = Field(
        description="Alwaysm include all the citations, in case of no citation this will be [[]]")

tools = [openai.pydantic_function_tool(Asnwer), openai.pydantic_function_tool(Cittion)]

response = openai_client.chat.completions.with_raw_response.create(model=chat_model, messages=messages,  
                                                                   tools=tools, stream=True, temperature = 0)
response = response.parse()

# def generate():
#     for completionChunk in response:
#         yield format_stream_response(completionChunk, {}, "")

# for chunk in generate():
#     print(chunk)

    
# def run_conversation():
#     messages = [{"role": "system", "content": RAG_SYSTEM_PROMPT},
#               {"role": "user", "content": query}]

#     stream = openai_client.chat.completions.with_raw_response.create(model=chat_model, messages=messages,  tools=tools, 
#                                                    stream=True, temperature = 0,)
#     stream = stream.parse()
#     func_args_ = ''
#     messages = ''
#     for chunk in stream:
#         if chunk.id != '':
#             if chunk.choices[0].delta.content:
#                 messages = messages + chunk.choices[0].delta.content 
#             if chunk.choices[0].delta.tool_calls:
#                 if chunk.choices[0].delta.tool_calls[0].function.arguments:
#                     func_args_ += chunk.choices[0].delta.tool_calls[0].function.arguments
#                 else:
#                     func_args_ += ""
                
#     if func_args_ == '':
#         ouput = messages
#     else:
#         ouput = json.loads(func_args_)
#     return ouput 

# run_conversation()

In [None]:
def format_stream_response(chatCompletionChunk, history_metadata, apim_request_id, citation_accumulator):
    response_obj = {
        "id": chatCompletionChunk.id,
        "model": chatCompletionChunk.model,
        "created": chatCompletionChunk.created,
        "object": chatCompletionChunk.object,
        "choices": [{"messages": []}],
        "history_metadata": history_metadata,
        "apim-request-id": apim_request_id,
    }

    if len(chatCompletionChunk.choices) > 0:
        delta = chatCompletionChunk.choices[0].delta
        if delta:
            if delta.content:
                messageObj = {
                    "role": "assistant",
                    "content": delta.content,
                }
                response_obj["choices"][0]["messages"].append(messageObj)
                return response_obj
            if delta.tool_calls:
                if delta.tool_calls[0].function.arguments:
                    tool_obj = delta.tool_calls[0].function.arguments
                    if delta.tool_calls[0].index == 0:
                        messageObj = {"role": "assistant", "content": tool_obj}
                        response_obj["choices"][0]["messages"].append(messageObj)
                    if delta.tool_calls[0].index == 1:
                        citation_accumulator.append(tool_obj)
                return response_obj

    return {}

def generate(response, citation_accumulator):
    for completionChunk in response:
        yield format_stream_response(completionChunk, {}, "", citation_accumulator)

# Create a list to accumulate citations
citation_accumulator = []

# Separate the generator and citation accumulation process
stream_generator = generate(response, citation_accumulator)

# To process the streaming response
for chunk in stream_generator:
    # Process each chunk (response_obj) as it streams in
    print(chunk)

# After streaming is done, access the full citation
full_citation = json.loads("".join(citation_accumulator))
print("Full citation:", full_citation)


In [43]:
full_citation['citation']

[['00:17:48', '3'], ['00:37:59', '2'], ['00:38:20', '2'], ['00:38:30', '2']]

In [11]:
ll = [[]]

len(ll[0])

0

In [1]:
query = "how to get cad into your workspace"
# query = "Registering a PDM Link Server"
"How to generate a Microsoft part number"
"How to generate a Microsoft part number(mspn)"
"How to share files from windchill PDMlink to onePDM"
"How to do a cad push"

'How to do a cad push'

In [7]:
from backend.azure_rag import AzureSearchPromptService

search_prompt_service = AzureSearchPromptService() 

async def main():    
    query = 'Registering a PDM Link Server'  
    actual_citations, answer, apim_request_id, user_json = await search_prompt_service.rag(query, top=3)  
    return actual_citations, answer, apim_request_id, user_json 
  
# Run the main function  
actual_citations, answer, apim_request_id, user_json = await main()

INFO:azure.identity.aio._credentials.environment:No environment configuration found.
INFO:azure.identity.aio._credentials.managed_identity:ManagedIdentityCredential will use IMDS
ERROR:asyncio:Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002855FDEE800>
DEBUG:root:No AZURE_OPENAI_KEY found, using Azure Entra ID auth
INFO:azure.identity.aio._credentials.environment:No environment configuration found.
INFO:azure.identity.aio._credentials.managed_identity:ManagedIdentityCredential will use IMDS
DEBUG:httpx:load_ssl_context verify=True cert=None trust_env=True http2=False
DEBUG:httpx:load_verify_locations cafile='c:\\Users\\v-samomin\\AppData\\Local\\Programs\\Python\\Python310\\lib\\site-packages\\certifi\\cacert.pem'
DEBUG:azure.identity.aio._internal.decorators:EnvironmentCredential.get_token failed: EnvironmentCredential authentication unavailable. Environment variables are not fully configured.
Visit https://aka.ms/azsdk/python/identity/environme

In [10]:
print(answer)

There is no answer available


In [11]:
actual_citations

[]