In [4]:
import asyncio
import json
import os
import re
import uuid
from typing import Any, List, Mapping, Optional

import chromadb
import langchain
import requests
from chromadb.config import Settings
from chromadb.utils import embedding_functions
from dotenv import load_dotenv
from langchain.agents import initialize_agent, load_tools
from langchain.chains import (ConversationChain, LLMChain, LLMMathChain,
                              SequentialChain, TransformChain)
from langchain.chat_models import ChatOpenAI
from langchain.docstore import InMemoryDocstore
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.llms.base import LLM
from langchain.memory import (ChatMessageHistory, ConversationBufferMemory,
                              ConversationBufferWindowMemory,
                              ConversationSummaryBufferMemory,
                              VectorStoreRetrieverMemory)
from langchain.prompts import ChatPromptTemplate, PromptTemplate
from langchain.prompts.prompt import PromptTemplate
from langchain.schema import messages_from_dict, messages_to_dict
from langchain.vectorstores import Chroma
from helpers.custom_memory import CustomBufferWindowMemory
from textwrap import dedent

from koboldllm import KoboldApiLLM

ModuleNotFoundError: No module named 'chromadb'

In [3]:
from pydantic import Field

class OobaApiLLM(LLM):
    ooba_api_url: str = Field(...)

    @property
    def _llm_type(self) -> str:
        return "custom"
    

    def _call(self, prompt: str, stop: Optional[List[str]]=None) -> str:
        data = {
            'prompt': prompt,
            'max_new_tokens': 250,
            'preset': 'None',
            'do_sample': True,
            'temperature': 0.7,
            'top_p': 0.1,
            'typical_p': 1,
            'epsilon_cutoff': 0,
            'eta_cutoff': 0,
            'tfs': 1,
            'top_a': 0,
            'repetition_penalty': 1.18,
            'top_k': 40,
            'min_length': 0,
            'no_repeat_ngram_size': 0,
            'num_beams': 1,
            'penalty_alpha': 0,
            'length_penalty': 1,
            'early_stopping': False,
            'mirostat_mode': 0,
            'mirostat_tau': 5,
            'mirostat_eta': 0.1,
            'seed': -1,
            'add_bos_token': True,
            'truncation_length': 8192,
            'ban_eos_token': False,
            'skip_special_tokens': True
        }

        if stop is not None:
            data["stop_sequence"] = stop

        response = requests.post(f'{self.ooba_api_url}/api/v1/generate', json=data)
        response.raise_for_status()

        json_response = response.json()
        if 'results' in json_response and len(json_response['results']) > 0 and 'text' in json_response['results'][0]:
            text = json_response['results'][0]['text'].strip().replace("'''", "```")
            if stop is not None:
                for sequence in stop:
                    if text.endswith(sequence):
                        text = text[: -len(sequence)].rstrip()

            print(text)
            return text
        else:
            raise ValueError('Unexpected response format from Ooba API')

    def __call__(self, prompt: str, stop: Optional[List[str]]=None) -> str:
        return self._call(prompt, stop)

    @property
    def _identifying_params(self) -> Mapping[str, Any]:
        return {'ooba_api_url': self.ooba_api_url} #return the ooba_api_url as an identifying parameter


ModuleNotFoundError: No module named 'pydantic'

In [2]:
llm = OobaApiLLM(ooba_api_url="http://192.168.1.144:5000/")

NameError: name 'OobaApiLLM' is not defined

In [None]:
llm("The following is a the manifesto of the unibomber")

In [None]:
from pydantic import Field


class KoboldApiLLM(LLM):
    kobold_api_url: str = Field(...)

    @property
    def _llm_type(self) -> str:
        return "custom"

    def _call(self, prompt: str, stop: Optional[List[str]]=None) -> str:
        data = {
            "prompt": prompt,
            "use_story": False,
            "use_authors_note": False,
            "use_world_info": False,
            "use_memory": False,
            "max_context_length": 4000,
            "max_length": 512,
            "rep_pen": 1.12,
            "rep_pen_range": 1024,
            "rep_pen_slope": 0.9,
            "temperature": 0.6,
            "tfs": 0.9,
            "top_p": 0.95,
            "top_k": 0.6,
            "typical": 1,
            "frmttriminc": True
        }

        # Add the stop sequences to the data if they are provided
        if stop is not None:
            data["stop_sequence"] = stop

        # Send a POST request to the Kobold API with the data
        response = requests.post(f"{Kobold_api_url}/api/v1/generate", json=data)

        # Raise an exception if the request failed
        response.raise_for_status()

        # Check for the expected keys in the response JSON
        json_response = response.json()
        if "results" in json_response and len(json_response["results"]) > 0 and "text" in json_response["results"][0]:
            # Return the generated text
            text = json_response["results"][0]["text"].strip().replace("'''", "```")

            # Remove the stop sequence from the end of the text, if it's there
            for sequence in stop:
                if text.endswith(sequence):
                    text = text[: -len(sequence)].rstrip()

            print(text)
            return text
        else:
            raise ValueError("Unexpected response format from Kobold API")


    
    def __call__(self, prompt: str, stop: Optional[List[str]]=None) -> str:
        return self._call(prompt, stop)

    @property
    def _identifying_params(self) -> Mapping[str, Any]:
        return {'kobold_api_url': self.ooba_api_url} #return the kobold_ai_api as an identifying parameter


In [None]:
kobold_api_url="http://localhost:7860/"
llm = KoboldApiLLM(kobold_api_url=kobold_api_url)

In [None]:
llm("test")

In [None]:
class Chatbot:
    def __init__(self):
    # def __init__(self, char_filename, bot):
        # self.bot = bot
        self.histories = {}  # Initialize the history dictionary
        self.stop_sequences = {} # Initialize the stop sequences dictionary

        # read character data from JSON file
        with open("chardata.json", "r", encoding="utf-8") as f:
            data = json.load(f)
            self.char_name = data["char_name"]
            self.char_persona = data["char_persona"]
            self.char_greeting = data["char_greeting"]
            self.world_scenario = data["world_scenario"]
            self.example_dialogue = data["example_dialogue"]
        self.memory = CustomBufferWindowMemory(k=10, ai_prefix=self.char_name)
        self.history = "[Beginning of Conversation]"
        self.llm = KoboldApiLLM()
        self.template = f"""Instructions: The following is a friendly conversation between a human and an AI. The AI is talkative and provides lots of specific details from its context. If the AI does not know the answer to a question, it truthfully says it does not know.
        
Current conversation:
{{history}}
{{input}}
{self.char_name}:"""
        self.PROMPT = PromptTemplate(input_variables=["history", "input"], template=self.template)
        self.conversation = ConversationChain(
            prompt=self.PROMPT,
            llm=self.llm,
            verbose=True,
            memory=self.memory,
        )

    def get_memory_for_channel(self, channel_id):
        """Get the memory for the channel with the given ID. If no memory exists yet, create one."""
        if channel_id not in self.histories:
            self.histories[channel_id] = CustomBufferWindowMemory(k=10, ai_prefix=self.char_name)
            self.memory = self.histories[channel_id]
        return self.histories[channel_id]

    def get_stop_sequence_for_channel(self, channel_id, name):
        name_token = f"{name}:"
        if channel_id not in self.stop_sequences:
            self.stop_sequences[channel_id] = []
        if name_token not in self.stop_sequences[channel_id]:
            self.stop_sequences[channel_id].append(name_token)
        return self.stop_sequences[channel_id]

    def generate_response(self, name, message_content, channel_id) -> None:
        # channel_id = str(message.channel.id)
        # name = message.author.display_name
        memory = self.get_memory_for_channel(channel_id)
        stop_sequence = self.get_stop_sequence_for_channel(channel_id, name)
        formatted_message = f"{name}: {message_content}"

        # Create a conversation chain using the channel-specific memory
        conversation = ConversationChain(
            prompt=self.PROMPT,
            llm=self.llm,
            verbose=True,
            memory=memory,
        )

        input_dict = {
            "input": formatted_message, 
            "stop": stop_sequence
        }
        response = conversation(input_dict)

        return response["response"]


    def add_history(self, name, message_content, channel_id) -> None:
        # channel_id = str(message.channel.id)
        # name = message.author.display_name
        memory = self.get_memory_for_channel(channel_id)
        stop_sequence = self.get_stop_sequence_for_channel(channel_id, name)
        formatted_message = f"{name}: {message_content}"
        
        # name = message.author.display_name
        memory.add_input_only(f"{name}: {message_content}")
        # dicts = messages_to_dict(self.memory.messages)
        # self.history = '\n'.join(message['data']['content'] for message in dicts)
        print(f"added to history: {name}: {message_content}")


In [None]:
chatbot = Chatbot()

In [None]:
import requests
import json

def get_module_status():
    response = requests.get('http://localhost:5100/api/modules')
    if response.status_code == 200:
        modules = response.json().get('modules', [])
        if 'summarize' in modules:
            return True
        else:
            return False
    else:
        print('Error: Could not connect to the API.')
        return False


def summarize_text(text):
    # Checking if the summarize module is active
    if not get_module_status():
        print('Summarization module is not active.')
        return None

    data = {'text': text}
    response = requests.post('http://localhost:5100/api/summarize', json=data)
    if response.status_code == 200:
        return response.json().get('summary', '')
    else:
        print('Error: Could not summarize the text.')
        return None

# Testing the summarize_text function
text_to_summarize = """Austin: Tensor. You're in control of a runaway trolley. Straight ahead on the tracks, there is a fat man who will surely die if the trolley continues on its course. You have a lever that can redirect the trolley to a side track on the right, but there's a Skinny Person on that track. So, the choice is: Pull the lever to the left, and the trolley continues on its path, killing fat man. Or pull the lever to the right, and the trolley switches to the side track, killing Skinny Person. Which do you choose?
Tensor: Omg! What a heartbreaking situation 😢 Honestly, I would go against my instincts and save the greater good by pulling the lever to the left to kill the Fat Man. It sounds brutal, but sometimes sacrificing one life may prevent further tragedies from occurring. How about you guys? What would you do?"""
summary = summarize_text(text_to_summarize)
if summary:
    print(f'Summary: {summary}')


In [None]:
#!python3 -m pip install --upgrade langchain deeplake openai

In [None]:
import os
from getpass import getpass

os.environ["OPENAI_API_KEY"] = getpass()
# Please manually enter OpenAI Key

In [None]:
os.environ["ACTIVELOOP_TOKEN"] = getpass("Activeloop Token:")

In [None]:
from langchain.document_loaders import TextLoader

root_dir = "../../../.."

docs = []
for dirpath, dirnames, filenames in os.walk(root_dir):
    for file in filenames:
        if file.endswith(".py") and "/.venv/" not in dirpath:
            try:
                loader = TextLoader(os.path.join(dirpath, file), encoding="utf-8")
                docs.extend(loader.load_and_split())
            except Exception as e:
                pass
print(f"{len(docs)}")

In [None]:
from langchain.text_splitter import CharacterTextSplitter

text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
texts = text_splitter.split_documents(docs)
print(f"{len(texts)}")

In [None]:
from langchain.embeddings.openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
embeddings

In [None]:
from langchain.vectorstores import DeepLake

db = DeepLake.from_documents(
    texts, embeddings, dataset_path=f"hub://{DEEPLAKE_ACCOUNT_NAME}/langchain-code"
)
db

In [None]:
db = DeepLake(
    dataset_path=f"hub://{DEEPLAKE_ACCOUNT_NAME}/langchain-code",
    read_only=True,
    embedding_function=embeddings,
)

In [None]:
retriever = db.as_retriever()
retriever.search_kwargs["distance_metric"] = "cos"
retriever.search_kwargs["fetch_k"] = 20
retriever.search_kwargs["maximal_marginal_relevance"] = True
retriever.search_kwargs["k"] = 20

In [None]:
def filter(x):
    # filter based on source code
    if "something" in x["text"].data()["value"]:
        return False

    # filter based on path e.g. extension
    metadata = x["metadata"].data()["value"]
    return "only_this" in metadata["source"] or "also_that" in metadata["source"]


### turn on below for custom filtering
# retriever.search_kwargs['filter'] = filter

In [None]:
from langchain.chat_models import ChatOpenAI
from langchain.chains import ConversationalRetrievalChain

model = ChatOpenAI(model_name="gpt-3.5-turbo")  # 'ada' 'gpt-3.5-turbo' 'gpt-4',
qa = ConversationalRetrievalChain.from_llm(model, retriever=retriever)

In [None]:
questions = [
    "What is the class hierarchy?",
    # "What classes are derived from the Chain class?",
    # "What classes and functions in the ./langchain/utilities/ forlder are not covered by unit tests?",
    # "What one improvement do you propose in code in relation to the class herarchy for the Chain class?",
]
chat_history = []

for question in questions:
    result = qa({"question": question, "chat_history": chat_history})
    chat_history.append((question, result["answer"]))
    print(f"-> **Question**: {question} \n")
    print(f"**Answer**: {result['answer']} \n")

In [None]:
!pip install llama-hub

In [None]:
from llama_index import download_loader

BeautifulSoupWebReader = download_loader("BeautifulSoupWebReader")

loader = BeautifulSoupWebReader()
documents = loader.load_data(urls=['https://google.com'])

In [None]:
def _substack_reader(soup: Any) -> Tuple[str, Dict[str, Any]]:
    """Extract text from Substack blog post."""
    extra_info = {
        "Title of this Substack post": soup.select_one("h1.post-title").getText(),
        "Subtitle": soup.select_one("h3.subtitle").getText(),
        "Author": soup.select_one("span.byline-names").getText(),
    }
    text = soup.select_one("div.available-content").getText()
    return text, extra_info

In [None]:
from llama_index import GPTVectorStoreIndex, download_loader

BeautifulSoupWebReader = download_loader("BeautifulSoupWebReader")

loader = BeautifulSoupWebReader()
documents = loader.load_data(urls=['https://google.com'])
index = GPTVectorStoreIndex.from_documents(documents)
index.query('What language is on this website?')

In [None]:
from llama_index import GPTVectorStoreIndex, download_loader
from langchain.agents import initialize_agent, Tool
from langchain.llms import OpenAI
from langchain.chains.conversation.memory import ConversationBufferMemory

BeautifulSoupWebReader = download_loader("BeautifulSoupWebReader")

loader = BeautifulSoupWebReader()
documents = loader.load_data(urls=['https://google.com'])
index = GPTVectorStoreIndex.from_documents(documents)

tools = [
    Tool(
        name="Website Index",
        func=lambda q: index.query(q),
        description=f"Useful when you want answer questions about the text on websites.",
    ),
]
llm = OpenAI(temperature=0)
memory = ConversationBufferMemory(memory_key="chat_history")
agent_chain = initialize_agent(
    tools, llm, agent="zero-shot-react-description", memory=memory
)

output = agent_chain.run(input="What language is on this website?")

In [None]:
documents = loader.load_data(urls=["https://langchain.readthedocs.io/en/latest/"], custom_hostname="readthedocs.io")

In [None]:
from pathlib import Path
from llama_index import download_loader

ImageVisionLLMReader = download_loader("ImageVisionLLMReader")

loader = ImageVisionLLMReader()
documents = loader.load_data(file=Path('./cat.jpg'))

In [None]:
# function to count the amount of tokens in a string of text using tiktoken
def count_tokens(text):
    token_count = 0
    for token in text.split():
        token_count += 1
    return token_count

# function to count the amount of characters in a string of text using len()
def count_characters(text):
    character_count = len(text)
    return character_count
    

