In [1]:
from milvus_model.hybrid import BGEM3EmbeddingFunction
from pymilvus import (
    connections,
    utility,
    FieldSchema,
    CollectionSchema,
    DataType,
    Collection,
    AnnSearchRequest,
    WeightedRanker,
)
import pandas as pd
from pymilvus import MilvusClient
import json
import ollama

from transformers import AutoTokenizer
import uuid
import time
import pickle
import redis


model_id = "meta-llama/Meta-Llama-3.1-8B"
tokenizer = AutoTokenizer.from_pretrained(model_id)

def tiktoken_len(text):
    tokens = tokenizer(
        text,
        return_tensors="pt"
    )["input_ids"][0]
    return len(tokens)

def generate_thread_id():
    return f"thread_{uuid.uuid4().hex}"

def generate_message_id():
    return uuid.uuid4().hex

class HybridRetriever:
    def __init__(self, uri, col_name, model, embedding_model, sparse_embedding_model, output_fields, limit=15):
        self.uri = uri
        self.col_name = col_name
        self.model = model
        self.dense_embedding_model = embedding_model
        self.sparse_embedding_model = sparse_embedding_model
        self.output_fields = output_fields
        self.limit = limit
        self.connect_milvus()
        self.col = Collection(self.col_name, consistency_level="Strong")
        self.client = MilvusClient(uri=self.uri)

    def connect_milvus(self):
        connections.connect(uri=self.uri)

    def kw_extractor(self, query, history):
        output = ollama.generate(
            model=self.model,
            prompt=f"""
                Question: 
                    {query}
                Chat History:
                    {history}
                System Instructions:
                - return me a string of key words/phrases from the given question
                - take into the consideration the chat history, if the given question somehow referes to it, 
                    then extract the keywords/phrases also from the history of chat only related ones.
                - Do not do references to the text in you answer 
                - Do not provide comments from your side.                
                """
        )
        res = output['response']
        print(res)
        return self.sparse_embedding_model([res])["sparse"][[0]], ollama.embeddings(model=self.dense_embedding_model, prompt=res)["embedding"]

    def hybrid_search(self, query_dense_embedding, query_sparse_embedding, sparse_weight=1.0, dense_weight=1.0):
        dense_search_params = {"index_type": "GPU_IVF_FLAT", "metric_type": "IP", "field_name": "dense_vector", "params": {"nlist": 4096}}
        dense_req = AnnSearchRequest([query_dense_embedding], "dense_vector", dense_search_params, limit=self.limit)
        
        sparse_search_params = {"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP", "field_name": "sparse_vector"}
        sparse_req = AnnSearchRequest([query_sparse_embedding], "sparse_vector", sparse_search_params, limit=self.limit)
        
        rerank = WeightedRanker(sparse_weight, dense_weight)
        res = self.col.hybrid_search([sparse_req, dense_req], rerank=rerank, limit=self.limit, output_fields=self.output_fields)[0]
        return res

    def convert_explode_order_and_sort(self, obj_list):
        df = pd.DataFrame([obj.__dict__ for obj in obj_list])
        
        # if 'fields' in df.columns:
        #     fields_df = pd.json_normalize(df['fields'])
        #     df = df.drop(columns=['fields']).join(fields_df)
            
        if 'fields' in df.columns:
            # Extract the metadata column first
            metadata = df['fields'].apply(lambda x: x.get('metadata', None))
            
            # Normalize the fields without the metadata
            fields_df = pd.json_normalize(df['fields'].apply(lambda x: {k: v for k, v in x.items() if k != 'metadata'}))
            
            # Drop the original fields column and join the normalized data and metadata
            df = df.drop(columns=['fields']).join(fields_df).assign(metadata=metadata)
            # print(df.columns)
        column_order = ['distance', "document_id", "chunk_id", "file_name", "chunk_name", "chunk_text", "chunk_token_length","metadata"]
        df = df[[col for col in column_order if col in df.columns]]
        df = df.sort_values(by='distance', ascending=True)
        return df

    def new_row_to_df(self, res):
        new_row = {
            'distance': None,
            'document_id': res['document_id'],
            'chunk_id': res['chunk_id'],
            'file_name': res['file_name'],
            'chunk_name': res['chunk_name'],
            'chunk_text': res['chunk_text'],
            'chunk_token_length': res['chunk_token_length']
        }
        return pd.DataFrame([new_row])

    def get_data_milvus(self, doc_id, chunk_id):
        try:
            res = self.client.query(
                collection_name=self.col_name,
                filter=f'(document_id == "{doc_id}") and (chunk_id == {chunk_id})',
                output_fields=self.output_fields,
                limit=1
            )
            return res[0] if res else None
        except Exception as e:
            print(f"Error retrieving chunk {chunk_id} for document {doc_id}: {e}")
            return None

    def get_final_text(self, query="", history="") -> str:
        sparse_embeddings, dense_embeddings = self.kw_extractor(query,history)
        
        hybrid_results = self.hybrid_search(
            dense_embeddings, sparse_embeddings, sparse_weight=0.5, dense_weight=0.5
        )

        df = self.convert_explode_order_and_sort(hybrid_results)
        json_result = df.to_json(orient='records')
        parsed_json = json.loads(json_result)
        # pretty_json = json.dumps(parsed_json, indent=4)
        
        min_distance_idx = df.groupby('document_id')['distance'].idxmin()
        df_min_distance = df.loc[min_distance_idx].reset_index(drop=True)
        df_min_distance_sorted = df_min_distance.sort_values(by='distance', ascending=True)
        
        result_list = list(df_min_distance_sorted[['document_id', 'distance']].itertuples(index=False, name=None))
        
        document_chunks = {}
        for document_id, _ in result_list:
            relevant_rows = df[df['document_id'] == document_id].sort_values(by='chunk_id')
            document_chunks[document_id] = relevant_rows['chunk_id'].tolist()

        for doc_id in document_chunks:
            for chunk_id in document_chunks[doc_id]:
                if chunk_id - 1 not in document_chunks[doc_id]:
                    prev_chunk_data = self.get_data_milvus(doc_id, chunk_id - 1)
                    if prev_chunk_data:
                        new_row_df = self.new_row_to_df(prev_chunk_data).dropna(axis=1, how='all')
                        if int(new_row_df["chunk_token_length"][0]) + int(df['chunk_token_length'].sum()) < 25000:
                            df = pd.concat([df, new_row_df], ignore_index=True)

                if chunk_id + 1 not in document_chunks[doc_id]:
                    next_chunk_data = self.get_data_milvus(doc_id, chunk_id + 1)
                    if next_chunk_data:
                        new_row_df = self.new_row_to_df(next_chunk_data).dropna(axis=1, how='all')
                        if int(new_row_df["chunk_token_length"][0]) + int(df['chunk_token_length'].sum()) < 25000:
                            df = pd.concat([df, new_row_df], ignore_index=True)


        df = df.drop_duplicates(subset=['document_id', 'chunk_id', 'file_name', 'chunk_name', 'chunk_text', 'chunk_token_length'])
        
        df_sorted = df.sort_values(by=['document_id', 'chunk_id'])
        
        concatenated_texts = []
        for document_id in result_list:
            group = df_sorted[df_sorted['document_id'] == document_id[0]]
            concatenated_text = " ".join(group['chunk_text'].tolist())
            concatenated_texts.append(concatenated_text)
        
        final_concatenated_text = "\n\n".join(concatenated_texts)
        return final_concatenated_text, parsed_json


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Redis
redis = redis.Redis(host='localhost', port=6379, db=0)


# LLM
model="llama3.1:8b"

# Dense embedding model 
embedding_model="llama3.1:8b"

# Milvus params
uri="http://localhost:19530/dolphinai_db"
col_name = "hybrid_sap_collection_llama_7b"
limit = 10
output_fields=["document_id","chunk_id","file_name","chunk_name","chunk_text","chunk_token_length","metadata"]

# Usage example:
retriever = HybridRetriever(
    uri=uri,
    col_name=col_name,
    model=model,
    embedding_model=embedding_model,
    sparse_embedding_model=BGEM3EmbeddingFunction(use_fp16=False, device="cuda:2"),
    output_fields=output_fields,
    limit=limit
)

Fetching 30 files: 100%|██████████| 30/30 [00:00<00:00, 153076.79it/s]
  colbert_state_dict = torch.load(os.path.join(model_dir, 'colbert_linear.pt'), map_location='cpu')
  sparse_state_dict = torch.load(os.path.join(model_dir, 'sparse_linear.pt'), map_location='cpu')


In [3]:
req = {
  "question": "puoi dire qualcosa in più?",
  # "question": "Describe how the SAP partner determination in SD works",
  "assistant_id": "asst_fe4VWMpLT0W04Wpc8A8JQ2rg",
  "thread_id": "thread_48d812c782dd4d77a66182922def7a98"
}

thread_id = req["thread_id"]
query = req["question"]
assistant_id = req["assistant_id"]
# query = "Describe how the SAP partner determination in SD works"
# query = "puoi dire qualcosa in più?"
# query = "Come funziona il controllo di disponibilità su SAP?"

In [6]:
if not thread_id:
    thread_id = generate_thread_id()
    chat_history ={
    "_id" : thread_id,
    "topic" : "",
    "user_id" : "",
    "messages" : [],
    "LLM" : model,
    "assistant_id" : assistant_id,
}
    
else:
    chat_history_context = None
    chat_history = pickle.loads(redis.hget("DolphinChatConversation", thread_id))
    # Sorting messages by timestamp
    sorted_messages = sorted(chat_history['messages'], key=lambda x: x['timestamp'])

    # Concatenating the contents with roles in chronological order
    chat_history_context = "\n\n".join([f"{message['role'].capitalize()}:\n{message['content']}" for message in sorted_messages])

    print(chat_history_context)

User:
Describe how the SAP partner determination in SD works

Dolphinai:
The SAP partner determination in SD works as follows:

The system checks the conditions set up in Customizing, such as access sequence, output type, schema, and application, to determine the trading partner. The condition components are evaluated in the specified order until a match is found.

For outbound EDI messages, the transmission medium must be set to 6 (EDI) and the output type must be SEDI or a modified copy of it. The access sequence should be 0001, and the output type must match the one specified in the condition component.

The system also checks the partner function, which can be CR (Carrier) or SH (Ship-to party), and the port, which should be SUBSYSTEM. Additionally, the output mode must be set to Collect IDocs, and the basic type should be SHPMNT03.

For inbound EDI messages, a similar process takes place, but with different condition components such as access sequence, output type, schema, and app

In [7]:
message_data = {
    "role" : "",
    "content" : "",
    "message_id" : "",
    "intent" : None,
    "reference" : None,
    "timestamp" : None,
    "feedback_rating" : None
    
}

user_name = "User"

q_message_id = generate_message_id()
user_timestamp = int(time.time())

message_data["role"] = user_name
message_data["content"] = query
message_data["message_id"] = q_message_id
message_data["timestamp"] = user_timestamp

chat_history["messages"].append(message_data)
chat_history

{'_id': 'thread_48d812c782dd4d77a66182922def7a98',
 'topic': '',
 'user_id': '',
 'messages': [{'role': 'User',
   'content': 'Describe how the SAP partner determination in SD works',
   'message_id': 'd54273ac6e614a2ab263445d39be4f6c',
   'intent': None,
   'reference': None,
   'timestamp': 1727356227,
   'feedback_rating': None},
  {'role': 'DolphinAI',
   'content': 'The SAP partner determination in SD works as follows:\n\nThe system checks the conditions set up in Customizing, such as access sequence, output type, schema, and application, to determine the trading partner. The condition components are evaluated in the specified order until a match is found.\n\nFor outbound EDI messages, the transmission medium must be set to 6 (EDI) and the output type must be SEDI or a modified copy of it. The access sequence should be 0001, and the output type must match the one specified in the condition component.\n\nThe system also checks the partner function, which can be CR (Carrier) or SH (

In [8]:
final_text, json_result = retriever.get_final_text(query,chat_history_context)

prompt=f"""Using this data: {final_text}. 
  Provide a comprehensive answer to this prompt: {query}. 
  Please take into consideration also the following chat history conversation:
  {chat_history_context}        
  System Instructions:
        - Do not do references to the text in you answer 
        - Do not provide comments from your side.
        - Answer in the same language of the provided question."""
        
print(tiktoken_len(prompt))

Partner determination SD, EDI messages, transmission medium, output type, access sequence, condition components, partner function, port, output mode, Collect IDocs, basic type, inbound EDI, outbound EDI, Customizing, EDI settings, time 4, time 3, shipment document
10058


In [9]:
message_data = {
    "role" : "",
    "content" : "",
    "message_id" : "",
    "intent" : None,
    "reference" : None,
    "timestamp" : None,
    "question_id" : None,
    "feedback_rating" : None
    
}

user_name = "DolphinAI"
message_id = generate_message_id()



output = ollama.generate(
  model=model,
  prompt=prompt
)

assistant_timestamp = int(time.time())
message_data["role"] = user_name
message_data["content"] = output['response']
message_data["message_id"] = message_id
message_data["timestamp"] = assistant_timestamp
message_data["question_id"] = q_message_id
message_data["reference"] = json_result

if not chat_history["messages"]:
  topic_output = ollama.generate(
  model=model,
  prompt=f"""create a header for the following conversation
            User:
            {query}
            Assistant:
            {output['response']}
            
            Do not add any comments from your side, there should be only a string of the header in your response.
  """)
  chat_history["topic"] = topic_output["response"]
  chat_history["1st_timestamp"] = user_timestamp
  
chat_history["messages"].append(message_data)
chat_history
# print(output['response'])

{'_id': 'thread_48d812c782dd4d77a66182922def7a98',
 'topic': '',
 'user_id': '',
 'messages': [{'role': 'User',
   'content': 'Describe how the SAP partner determination in SD works',
   'message_id': 'd54273ac6e614a2ab263445d39be4f6c',
   'intent': None,
   'reference': None,
   'timestamp': 1727356227,
   'feedback_rating': None},
  {'role': 'DolphinAI',
   'content': 'The SAP partner determination in SD works as follows:\n\nThe system checks the conditions set up in Customizing, such as access sequence, output type, schema, and application, to determine the trading partner. The condition components are evaluated in the specified order until a match is found.\n\nFor outbound EDI messages, the transmission medium must be set to 6 (EDI) and the output type must be SEDI or a modified copy of it. The access sequence should be 0001, and the output type must match the one specified in the condition component.\n\nThe system also checks the partner function, which can be CR (Carrier) or SH (

In [11]:
# Assuming chat_history is the dictionary you want to save
b_chat_history = pickle.dumps(chat_history)

# Ensure that thread_id is a valid string/int and b_ is not None
if thread_id is not None and b_chat_history is not None:
    thread_id = str(thread_id)  # Convert thread_id to string if it's not already
    redis.hset("DolphinChatConversation", thread_id, b_chat_history)
else:
    print("Error: thread_id or b_ is None.")


In [10]:
def get_schema(data, indent=0):
    schema = ""
    if isinstance(data, dict):
        schema += " " * indent + "{\n"
        for key, value in data.items():
            schema += " " * (indent + 2) + f"'{key}': " + get_schema(value, indent + 2) + "\n"
        schema += " " * indent + "}"
    elif isinstance(data, list):
        if data:
            schema += "[\n" + get_schema(data[0], indent + 2) + "\n" + " " * indent + "]"
        else:
            schema += "[]"
    else:
        schema += str(type(data)).replace("<class '", "").replace("'>", "")
    return schema

schema = get_schema(chat_history)
print(schema)


{
  '_id': str
  'topic': str
  'user_id': str
  'messages': [
    {
      'role': str
      'content': str
      'message_id': str
      'intent': NoneType
      'reference': NoneType
      'timestamp': int
      'feedback_rating': NoneType
    }
  ]
  'LLM': str
  'assistant_id': str
}


In [6]:
import ollama
model="llama3.1:8b"
stream = ollama.chat(
    model=model,
    messages=[{'role': 'user', 'content': 'Why is the sky blue?'}],
    stream=True,
)


txt =""
for chunk in stream:
  print(chunk['message']['content'], end='', flush=True)
  txt += chunk['message']['content']


The sky appears blue to us because of a phenomenon called scattering. Here's why:

**Scattering and light**

When sunlight enters Earth's atmosphere, it encounters tiny molecules of gases such as nitrogen (N2) and oxygen (O2). These molecules are much smaller than the wavelength of light, so they scatter the light in all directions.

**Short wavelengths dominate**

Now here's where things get interesting. Not all wavelengths of light are scattered equally. Shorter wavelengths, like blue and violet light, are scattered more than longer wavelengths, like red and orange light. This is because shorter wavelengths have a higher frequency (they vibrate more quickly), which makes them more susceptible to scattering.

**Blue sky**

When we look up at the sky on a clear day, we see mostly blue light, not violet or ultraviolet light, because our eyes are sensitive to that part of the spectrum. The blue light scattered by the atmosphere reaches our eyes from all directions, making the sky appear 

In [8]:
print(txt)

The sky appears blue to us because of a phenomenon called scattering. Here's why:

**Scattering and light**

When sunlight enters Earth's atmosphere, it encounters tiny molecules of gases such as nitrogen (N2) and oxygen (O2). These molecules are much smaller than the wavelength of light, so they scatter the light in all directions.

**Short wavelengths dominate**

Now here's where things get interesting. Not all wavelengths of light are scattered equally. Shorter wavelengths, like blue and violet light, are scattered more than longer wavelengths, like red and orange light. This is because shorter wavelengths have a higher frequency (they vibrate more quickly), which makes them more susceptible to scattering.

**Blue sky**

When we look up at the sky on a clear day, we see mostly blue light, not violet or ultraviolet light, because our eyes are sensitive to that part of the spectrum. The blue light scattered by the atmosphere reaches our eyes from all directions, making the sky appear 

In [5]:
chunk['message']['content']

''