In [None]:
!pip install langchain
!pip install langchain_chroma
!pip install sentence_transformers
!pip install langchain_community

In [None]:
!pip install pysqlite3-binary

In [3]:
__import__('pysqlite3')
import sys
sys.modules['sqlite3'] = sys.modules.pop('pysqlite3')

In [None]:
from langchain_community.embeddings.sentence_transformer import SentenceTransformerEmbeddings

def get_embedding_function():
    try:
        model_kwargs = {"device": "cuda", "trust_remote_code": True}
        encode_kwargs = {"normalize_embeddings": True}
        return SentenceTransformerEmbeddings(
            model_name="BAAI/bge-large-en-v1.5",
            model_kwargs=model_kwargs,
            encode_kwargs=encode_kwargs,
        )
    except Exception as e:
        raise

embedding_function = get_embedding_function()

In [5]:
import os
import time
import json
import requests
import re
from functools import reduce
from langchain_community.document_loaders.pdf import PyPDFium2Loader as pdf
from langchain_chroma import Chroma
from langchain_community.document_loaders import TextLoader
from langchain_community.document_loaders.csv_loader import CSVLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
from huggingface_hub import notebook_login

# API Function
def get_api_response(system_message=" ", user_message=" ", history=[[]], temperature=0.95, max_new_tokens=2500):
    url = "http://generate"
    headers = {
        "Content-Type": "application/json"
    }
    payload = {
        "system": system_message,
        "message": user_message,
        "history": history,
        "temperature": temperature,
        "max_new_tokens": max_new_tokens
    }
    print("Doing something")
    response = requests.post(url, headers=headers, data=json.dumps(payload))
    if response.status_code == 200:
        return response
    else:
        return {"error": f"Request failed with status code {response.status_code}"}

# Embedding Function


# Helper Functions
def get_docs(text):
    print("Executing chunker ....")
    docs = Document(page_content=text, metadata={"source": "book"})
    doc = [docs]
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_overlap=0,
        chunk_size=1200,
        separators=["*****", "\n\n", "\n", "\uff0e", "\u3002"]
    )
    return text_splitter.split_documents(doc)

def initialize_dbs_for_documents(text, path):
    if not os.path.exists(path):
        print("if")
        db = Chroma.from_documents(get_docs(text), embedding_function, persist_directory=path)
        time.sleep(8)
        vectordb = Chroma(persist_directory=path, embedding_function=embedding_function)
        return vectordb
    else:
        print("else")
        vectordb = Chroma(persist_directory=path, embedding_function=embedding_function)
        return vectordb

def doc_to_text(docs):
    return " ".join([f"**DATA:**\n{doc.page_content} \n" for doc in docs])

def adding_separators_to_text(data):
    text = reduce_consecutive_spaces(data, 2)
    sentences = text.split('\n')
    clause_pattern = r'^\s*\d+\.\d+\s*[A-Z][A-Z\s]*$'
    article_pattern = r'^\s*\d+\.\s*[A-Z][A-Z\s]*$'
    final_text_list = []
    i = 0

    while i < len(sentences):
        if re.match(article_pattern, sentences[i]):
            final_text_list.append("*****" + sentences[i])
            final_text_list.append(sentences[i+1])
            i += 2
        elif re.match(clause_pattern, sentences[i]):
            final_text_list.append("*****" + sentences[i])
            i += 1
        else:
            final_text_list.append(sentences[i])
            i += 1
    return " ".join(final_text_list)

def reduce_consecutive_spaces(input_string, k):
    result = []
    space_count = 0
    for char in input_string:
        if char.isspace():
            space_count += 1
            if space_count <= k:
                result.append(char)
        else:
            result.append(char)
            space_count = 0
    return ''.join(result)

def count_tokens(sentence):
    encoded_sentence = tokenizer.encode(sentence)
    token_ids = encoded_sentence[1]
    num_tokens = encoded_sentence
    return len(num_tokens)

def generate_response(query, retrieved_documents, document_name):
    document_text = retrieved_documents
    system = f"You are a highly skilled legal document analyst. You have been given Legal Document '{document_name}'"
    messages = (
        f"You are given a legal document in chunks and a QUESTION, read the legal document and answer the QUESTION precisely and accurately.\n\n **legal_document**:{document_text}  \n\n **QUESTION**:{query}"
    )

    response = get_api_response(system, messages)
    if response:
        full_response = response.json().get('response', '')
        assistant_message = response['response'].split('assistant\n\n')[1]
        return assistant_message
    else:
        print("Response Not found")



In [None]:
file_path = 'path/to/your/.txt'
with open(file_path, 'r', encoding='utf-8') as file:
    text = file.read()




In [None]:
path="embed"
vectordb=initialize_dbs_for_documents(text,path)

In [None]:
questions = [
    "What is CONTRACTOR limit of liability?",
    "Under what condition can COMPANY terminate the contract?",
    "Can CONTRACTOR terminate the contract?",
    "Can CONTRACTOR claim extension of time and additional cost on account of force majeure?",
    "Is increase in quantity a COMPANY risk event or CONTRACTOR risk event?"
]

prompt = questions[0]
docs = vectordb.similarity_search_with_score(query=prompt, k=1)
doc_text = doc_to_text(docs)

response = generate_response(prompt, doc_text, "Your Document Name")
print(response)
