In [None]:
%%capture
!pip install 'pyautogen[gemini]~=0.2.0b4'
!pip install 'langchain_experimental==0.0.49'
!pip install 'langchain-openai==0.0.3'
!pip install 'faiss-cpu==1.7.4'
!pip install 'huggingface_hub==0.20.2'
!pip install 'wikipedia==1.4.0'
!pip install 'google-generativeai==0.3.2'
!pip install 'chromadb==0.4.22'
!pip install 'pypdf==3.17.4'
!pip install 'weasyprint==60.2'

# Utilizations

## Modules

### Base

In [None]:
from typing import Union,Optional,List,Dict,Any,Type
import faiss,os,shutil,warnings,math

### Generative Related

In [None]:
import openai,autogen
from autogen.agentchat.contrib.retrieve_assistant_agent import RetrieveAssistantAgent
from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent
import google.generativeai as genai
from langchain_experimental.generative_agents import GenerativeAgent,GenerativeAgentMemory
from langchain_openai import OpenAI as LOpenAI
from langchain_openai import ChatOpenAI
from langchain_openai import OpenAIEmbeddings

### Chain Related

In [None]:
from langchain.docstore import InMemoryDocstore
from langchain.llms import HuggingFaceHub

### Vector Related

In [None]:
from langchain_community.vectorstores import FAISS
from langchain.retrievers import TimeWeightedVectorStoreRetriever
from langchain.text_splitter import RecursiveCharacterTextSplitter
import chromadb,faiss
from chromadb.utils import embedding_functions

### System Related

In [None]:
from google.colab import userdata,drive

### Data Related

In [None]:
import weasyprint
import wikipedia as wpp
from langchain_community.utilities import GoogleSearchAPIWrapper
from autogen.code_utils import content_str

## Configurations

In [None]:
warnings.filterwarnings("ignore",category=DeprecationWarning)
warnings.filterwarnings("ignore",category=UserWarning)
warnings.filterwarnings("ignore",category=FutureWarning)

## Class

In [None]:
class ClassInitial:
  pass
class ProcessInitial:
  pass
class FunctionInitial:
  pass
class NullInitial:
  pass
class DocumentationInitial:
  pass
class ErrorInitial:
  pass
class ModelInitial:
  pass

## Functions

In [None]:
createDirectory:FunctionInitial = lambda x: os.mkdir(x) if not os.path.exists(x) else None
deleteDirectory:FunctionInitial = lambda x: shutil.rmtree(x) if len(os.listdir(x)) > 1 else None
sortDirectory:FunctionInitial = lambda x: [str(x)+str(y) for y in os.listdir(str(x)) if y != ".ipynb_checkpoints"]
relevanceScore:FunctionInitial = lambda x: 1.0-x/math.sqrt(2)

In [None]:
def StringToList(output:str)->list:
  output = output.strip("][").replace("'","")
  output = output.strip('][').replace('"','').replace("]","").replace("[","").split(", ")
  return output

In [None]:
class ErrorModule(object):
  def __init__(self)->ClassInitial:
    self.error:ErrorInitial = NotImplementedError(NotImplemented)
  def __str__(self)->str:
    return "Error Modulation"
  def __call__(self)->ErrorInitial:
    return self.error
  def __getstate__(self)->ErrorInitial:
    raise self.error
  def __repr__(self)->DocumentationInitial:
    return ErrorModule.__doc__
  @property
  def Default(self)->ErrorInitial:
    raise self.error
  def Manuel(self,errorType:ErrorInitial,errorMessage:str)->ErrorInitial:
    raise errorType(errorMessage)

In [None]:
class DefineCredentials(object):
  def __init__(self)->ClassInitial:
    self.openAIKey = userdata.get("OPENAI_API_KEY")
    self.huggingKey = userdata.get("HUGGINGFACEHUB_API_TOKEN")
    self.googleKey = userdata.get("GOOGLE_API_ADDITIONAL")
    self.geminiKey = userdata.get("GEMINI_API_ADDITIONAL")
    self.cseIDKey = userdata.get("CSE_ID_KEY")
  def __str__(self)->str:
    return "Credentials Modulation"
  def __call__(self)->NullInitial:
    return None
  def __getstate__(self)->ErrorInitial:
    ErrorModule().Default
  def __repr__(self)->DocumentationInitial:
    return DefineCredentials.__doc__
  def SaveOpenAI(self)->ProcessInitial:
    os.environ["OPENAI_API_KEY"] = self.openAIKey
    openai.api_key = self.openAIKey
    print("+ OPENAI API KEY - [HAS BEEN SAVED]")
  def SaveGemini(self)->ProcessInitial:
    os.environ["GEMINI_API_KEY"] = self.geminiKey
    print("+ GEMINI API KEY - [HAS BEEN SAVED]")
  def SaveGoogle(self)->ProcessInitial:
    os.environ["GOOGLE_API_KEY"] = self.googleKey
    os.environ["GOOGLE_CSE_ID"] = self.cseIDKey
    print("+ GOOGLE & CSE API KEY - [HAS BEEN SAVED]")
  def SaveHugging(self)->ProcessInitial:
    os.environ["HUGGINGFACEHUB_API_TOKEN"] = self.huggingKey
    globals()["HUGGING_KEY"] = self.huggingKey
    !huggingface-cli login --token $HUGGING_KEY
    print("+ HUGGING API KEY - [HAS BEEN SAVED]")

## Constants

In [None]:
DefineCredentials().SaveOpenAI()

+ OPENAI API KEY - [HAS BEEN SAVED]


In [None]:
DefineCredentials().SaveGemini()

+ GEMINI API KEY - [HAS BEEN SAVED]


In [None]:
DefineCredentials().SaveGoogle()

+ GOOGLE & CSE API KEY - [HAS BEEN SAVED]


In [None]:
DefineCredentials().SaveHugging()

Token will not been saved to git credential helper. Pass `add_to_git_credential=True` if you want to set the git credential as well.
Token is valid (permission: read).
Your token has been saved to /root/.cache/huggingface/token
Login successful
+ HUGGING API KEY - [HAS BEEN SAVED]


In [None]:
autogenConfiguration:list = [
    {
        "model":"gpt-4-1106-preview",
        "api_key":os.environ.get("OPENAI_API_KEY")
    },
    {
        "model":"gpt-4-vision-preview",
        "api_key":os.environ.get("OPENAI_API_KEY")
    },
    {
        "model":"dalle",
        "api_key":os.environ.get("OPENAI_API_KEY")
    },
    {
        "model": "gemini-pro",
        "api_key": os.environ.get("GEMINI_API_KEY")
    },
    {
        "model": "gemini-pro-vision",
        "api_key": os.environ.get("GEMINI_API_KEY")
    }
]

# LLM Modulation

## Structure

In [None]:
class LLMStructure(object):
  def __init__(self)->ClassInitial:
    self.mistral = "mistralai/Mixtral-8x7B-Instruct-v0.1"
    self.modelOpenAI = "gpt-4"
  def __str__(self)->str:
    return "LLM Structure Modulation"
  def __call__(self)->NullInitial:
    return None
  def __getstate__(self)->ErrorInitial:
    ErrorModule().Default
  def __repr__(self)->DocumentationInitial:
    return LLMStructure.__doc__
  def Load(self,LLMType:str="openai")->ModelInitial:
    if LLMType.lower() == "openai":
      llm = ChatOpenAI(
          temperature=0.3,
          max_tokens=4096,
          model_name=self.modelOpenAI,
          presence_penalty=2.0,
          frequency_penalty=1.0,
          timeout=300
      )
    elif LLMType.lower() == "hugging":
      llm = HuggingFaceHub(
          repo_id=self.mistral,
          task="conversational",
          model_kwargs={
              "temperature":0.2,
              "repetition_penalty":1.03,
              "max_new_tokens":512
          }
      )
    else:
      ErrorModule().Manuel(ValueError,"[LLM Type is not valid]")
    return llm

## Models

In [None]:
openAIModel = LLMStructure().Load("openai")
mistralModel = LLMStructure().Load("hugging")

# Gathering Operations

## Wikipedia

In [None]:
def WikipediaSearch(topic:Optional[str],limit:Optional[int],**load_kwargs:Any)->list|tuple:
  urls = []
  contents = []
  response = wpp.search(
      topic,
      results=limit
  )
  if (len(response) > 0) and (response is not None):
    for index in response:
      try:
        url = wpp.page(
            str(index),
            **load_kwargs
        ).url
        urls.append(url)
      except:
        pass
      try:
        content = wpp.page(
            str(index),
            **load_kwargs
        ).content
        contents.append(content)
      except:
        pass
    return urls,contents
  else:
    return [],[]

## Google Search

In [None]:
def GoogleSearch(query:Optional[str],limit:Optional[int]=10)->list:
  defaultQuery = f"Life and personality of {str(query)}"
  wrapper = GoogleSearchAPIWrapper(k=1,siterestrict=False)
  result = wrapper.results(query=defaultQuery,num_results=int(limit))
  urls = []
  if (len(result) > 0) and (result is not None):
    for rs in result:
      url = rs["link"]
      urls.append(url)
    return urls
  else:
    return []

## PDF Creating

In [None]:
def CreatePDF(urls:List)->ProcessInitial:
  createDirectory(os.path.join(os.getcwd(),"target_files"))
  if len(urls) > 0:
    for idx,url in enumerate(urls):
      try:
        engine = weasyprint.HTML(url).write_pdf()
        path = os.path.join(os.path.join(os.getcwd(),"target_files"),f"target_doc_{str(idx)}.pdf")
        with open(path,"wb") as ops:
          ops.write(engine)
      except:
        pass

# Memory Structure

## Embedding Design

In [None]:
embeddingModel:ModelInitial = OpenAIEmbeddings(
      model="text-embedding-ada-002",
      embedding_ctx_length=8191,
      api_key=userdata.get("OPENAI_API_KEY"),
      chunk_size=1000,
      max_retries=2,
      timeout=300
  )

## Structure

In [None]:
def MemoryRetriever()->ModelInitial:
  indexFlat = faiss.IndexFlatL2(1536)
  store = FAISS(
      embeddingModel.embed_query,
      indexFlat,
      InMemoryDocstore({}),
      {},
      relevance_score_fn=relevanceScore
  )
  vector = TimeWeightedVectorStoreRetriever(
      vectorstore=store,
      other_score_keys=["importance"],
      k=5,
      decay_rate=0.0000001
  )
  return vector

# Summarizing Model

## Embedding Design

In [None]:
embeddingModelCache:FunctionInitial = embedding_functions.OpenAIEmbeddingFunction(
    api_key=userdata.get("OPENAI_API_KEY"),
    model_name="text-embedding-ada-002"
)

## Constants

In [None]:
systemCommand = (
    "Extract personality traits in detail from the documents given to you, and identify at least 10 personality traits. "
    "Determine these personal characteristics objectively and clearly. "
    "Extract all the traits of personality according to the documents you have."
)
autoReply = "Always give the final answer as text and add 'TERMINATE' at the end"
problem = (
    "Using these documents, extract personality traits and return them as a string array. "
    "Export this as a list."
)

## Splitter Design

In [None]:
separators = [
    "\n#{1,6} ",
    "```\n",
    "\n\\*\\*\\*+\n",
    "\n---+\n",
    "\n___+\n",
    "\n\n",
    "\n",
    " ",
    "",
    "\r",
    "\t"
]

In [None]:
splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=100,
    strip_whitespace=True,
    separators=separators
)

## Assistant

In [None]:
def GetSummarizingAssistant(message:str=systemCommand)->ModelInitial:
  assistant = RetrieveAssistantAgent(
      name="personality_traits_assistant",
      system_message=message,
      llm_config={
          "timeout":300,
          "cache_seed":42,
          "config_list":autogenConfiguration
      }
  )
  return assistant

## Proxy

In [None]:
def GetSummarizingProxy(targetDocuments:List,modelType:str="gemini-pro")->ModelInitial:
  createDirectory(os.path.join(os.getcwd(),"target_files"))
  targetPath = os.path.join(os.path.abspath(""),"target_files")
  targetDocuments.append(targetPath)
  proxy = RetrieveUserProxyAgent(
      name="ragproxyagent",
      human_input_mode="NEVER",
      default_auto_reply=autoReply,
      max_consecutive_auto_reply=20,
      retrieve_config={
          "task":"qa",
          "docs_path":targetDocuments,
          "chunk_token_size":2000,
          "model":str(modelType),
          "client":chromadb.PersistentClient(path="/tmp/chromadb"),
          "embedding_function":embeddingModelCache,
          "custom_text_splitter_function":splitter,
          "get_or_create":True,
          "custom_text_types":autogen.retrieve_utils.TEXT_FORMATS,
          "custom_text_split_function":splitter.split_text
      },
      code_execution_config=False
  )
  return proxy

## Response Callback

In [None]:
def ReturnCallback(retModel:ModelInitial,problem:str=problem,description:str=autoReply)->str:
  for value in retModel.chat_messages.values():
    if (value) and (len(value) > 0):
      for idx in value:
        if (idx["content"] is not None) and (idx["content"] != " ") and (idx["content"].lower() != str(problem).lower()) and (idx["content"].lower() != str(description).lower()) and (idx["content"] != "") and (idx["content"].upper() != "UPDATE CONTEXT"):
          try:
            content = content_str(idx["content"])
          except:
            content = idx["content"]
        else:
          pass
    else:
      content = None
  return content

## Modulation

In [None]:
def GetSummarizingResult(documentList:list,message:str=systemCommand,modelType:str="gemini-pro",problem:str=problem)->str:
  genai.configure(api_key=userdata.get("GEMINI_API_ADDITIONAL"))
  assistant = GetSummarizingAssistant(message)
  proxy = GetSummarizingProxy(documentList,modelType)
  try:
    result = proxy.initiate_chat(
        assistant,
        problem=problem,
        clear_history=True
    )
    if (assistant is not None):
      output = ReturnCallback(assistant)
    else:
      output = ReturnCallback(proxy)
    if output is not None:
      output = StringToList(output)
    else:
      output = "[The model got complicated while responding]"
    try:
      assistant.reset()
      proxy.reset()
    except Exception as err:
      print(err)
      pass
    return output
  except Exception as err:
    print(err)
    try:
      assistant.reset()
      proxy.reset()
      return "[The model got complicated while responding]"
    except Exception as err:
      print(err)
      return "[The model got complicated while responding]"

# Resurrection

## Gathering Related Informations

In [None]:
def GetInformation(target:Optional[str],limit:int=10)->list:
  if not os.path.exists(os.path.join(os.getcwd(),"target_files")):
    urls,contents = WikipediaSearch(str(target),int(limit))
    googleURLS = GoogleSearch(str(target),int(limit))
    urls.extend(googleURLS)
    if len(urls) > 0:
      CreatePDF(urls)
    else:
      ErrorModule().Manuel(ValueError,"[No linked data records found]")
  path = os.path.join(os.getcwd(),"target_files")
  path = path + "/" if not path.endswith("/") else path
  files = sortDirectory(path)
  return files

## Callback

In [None]:
def PersonReturnCallback(message:Optional[str],person:ModelInitial,nameTarget:str)->str:
  response = person.generate_dialogue_response(str(message))
  reaction = person.generate_reaction(str(message))
  if (len(response) > 0) and (response is not None):
    if response[0]:
      output = response[1]
      output = output.replace(f"{nameTarget} said","").replace(f"{nameTarget} said ","")
      return output,reaction
    else:
      ErrorModule().Manuel(ValueError,"[Not acceptable process for this response]")
  else:
    ErrorModule().Manuel(ValueError,"[Not acceptable process for this response]")

## Modulation

In [None]:
def GenerativePersonality(traits:list,personal:list,dailyActivities:list,name:str,age:int,status:str,llmType:ModelInitial=openAIModel,limit:int=5)->ModelInitial:
  traits = ", ".join(traits)
  memory = GenerativeAgentMemory(
      llm=llmType,
      memory_retriever=MemoryRetriever(),
      verbose=False,
      reflection_threshold=8,
      max_tokens_limit=1200,
      importance_weight=0.15
  )
  if dailyActivities != "[The model got complicated while responding]":
    person = GenerativeAgent(
        name=str(name),
        age=int(age),
        traits=traits,
        status=str(status),
        memory_retriever=MemoryRetriever(),
        llm=llmType,
        memory=memory,
        summary_refresh_seconds=3600,
        daily_summaries=dailyActivities
    )
  else:
      person = GenerativeAgent(
        name=str(name),
        age=int(age),
        traits=traits,
        status=str(status),
        memory_retriever=MemoryRetriever(),
        llm=llmType,
        memory=memory,
        summary_refresh_seconds=3600
    )
  if personal != "[The model got complicated while responding]":
    for info in personal:
      person.memory.add_memory(str(info))
  return person

## Action

In [None]:
def LaunchPerson(message:str,traits:list,personal:list,dailyActivities:list,name:str,age:int,status:str="N/A",llmType:ModelInitial=openAIModel,limit:int=5)->str:
  person = GenerativePersonality(
      traits,
      personal,
      dailyActivities=dailyActivities,
      name=name,
      age=age,
      status=status,
      llmType=llmType,
      limit=limit
  )
  summary = person.get_summary(force_refresh=True)
  output,reaction = PersonReturnCallback(message,person,nameTarget=name)
  return str(output),summary,reaction

# Launch

## Parameters

In [None]:
targetPerson = "Nikola Tesla"
targetAge = 30
targetLimit = 5
targetStatus = "coming back to life"

## Gathering Personality

In [None]:
personalMessage = (
    "Use one-sentence descriptive phrases. "
    "Make complete sentences in accordance with the language rules. "
    "For example: 'He/she remembers his/her dog, from when he/she was a kid', 'He/she feels tired from driving so far', 'He/she sees the new home'. "
    "Return them as a string array. "
    "Array must consist of at least 10 items."
)
personalProblem = (
    "Provide self-information about personality in accordance with the documents given to you. "
    "Use one-sentence descriptive phrases. "
    "For example: 'He/she remembers his/her dog, from when he/she was a kid', 'He/she feels tired from driving so far', 'He/she sees the new home'. "
    "Return them as a string array. "
)

In [None]:
summaryMessage = (
    "Using the documents uploaded to you as a reference, present the 10-day activities as an array list. "
    "These daily activities must be in line with reality and the documents given to you. "
    "Use one-sentence descriptive phrases. "
    "Return them as a string array."
)
summaryProblem = (
    "Using the documents uploaded to you as a reference, present the 10-day activities as an array list. "
    "Return them as a string array."
)

In [None]:
files = GetInformation(targetPerson,targetLimit)

In [None]:
traits = GetSummarizingResult(files)

In [None]:
#personal = GetSummarizingResult(files,personalMessage,problem=personalProblem)

In [None]:
#dailySummaries = GetSummarizingResult(files,summaryMessage,problem=summaryProblem)

In [None]:
personal = [
    "Nikola Tesla was a brilliant inventor, known for his pioneering contributions to the development of alternating current (AC) electrical systems.",
    "Tesla was a polyglot, fluent in several languages including English, French, German, and his native Serbo-Croatian.",
    "Despite his numerous inventions and contributions to science, Tesla often struggled financially and lived much of his life in relative poverty.",
    "He was a bachelor throughout his life, never marrying or having children, and was known to have said he felt he could not be as dedicated to his work if he were to marry.",
    "He was known for his eccentric personality, including an intense fear of germs and obsessive-compulsive behaviors such as requiring a stack of 18 napkins at every meal.",
    "Tesla had a strong ethical principle against war and violence, and he hoped that his inventions would contribute to peace on Earth.",
    "Despite his achievements, he received relatively little recognition during his lifetime and died alone in a New York hotel room in 1943."
]

In [None]:
dailySummaries = [
    "Nikola Tesla begins his day with a morning walk, often contemplating new inventions and theories as he enjoys the solitude of dawn.",
    "After his walk, Tesla spends time feeding and caring for his beloved pigeons, with whom he shares a unique bond.",
    "Tesla takes a brief midday break to dine alone, preferring simple meals that support his rigorous mental and physical discipline.",
    "In the late afternoon, he goes for another walk, using this time to mentally solve complex problems and conceive new ideas.",
    "He spends some time each day working on his physical health, engaging in exercises that promote his well-being and support his capacity for intense mental focus.",
    "Before retiring for the night, Tesla often reads a wide range of topics, from poetry to scientific treatises, fueling both his imagination and intellectual curiosity."
]

In [None]:
traits

['visionary',
 'multilingual',
 'workaholic',
 'eccentric',
 'obsessive',
 'compulsive',
 'innovative',
 'solitary',
 'meticulous',
 'frugal']

## Test

In [None]:
question = "What do you think about wars?"

In [None]:
#%%capture
response,summary,reaction = LaunchPerson(
    message=question,
    traits=traits,
    personal=personal,
    dailyActivities=dailySummaries,
    name=targetPerson,
    age=targetAge,
    status=targetStatus,
    limit=targetLimit
)



In [None]:
print(response)

 "I have always been a staunch advocate for peace. Wars are destructive and cause immense suffering, not just to the people directly involved but also to humanity as a whole. They hinder progress and divert resources that could be used for the betterment of mankind towards destruction instead. I believe in using science and technology for peaceful purposes, with an aim to improve lives rather than destroy them."


In [None]:
print(summary)

Name: Nikola Tesla (age: 30)
Innate traits: visionary, multilingual, workaholic, eccentric, obsessive, compulsive, innovative, solitary, meticulous, frugal
Nikola Tesla was a brilliant and ethical inventor, known for his significant contributions to the development of AC electrical systems. Despite his inventions, he struggled financially and received little recognition during his lifetime. He lived in relative poverty and died alone in 1943. Fluent in several languages including English, French, German, and Serbo-Croatian; Tesla had strong principles against war and violence hoping that his work would contribute to global peace.


In [None]:
print(reaction)

(True, 'Nikola Tesla said "As I mentioned earlier, my mother was a remarkable woman with an inventive spirit and mechanical abilities. She could craft household appliances from virtually nothing which greatly inspired me. Her memory for Serbian epic poems was also extraordinary."')
