# <font color="#38db02">Author</font>

Coder: <font color="#38db02"> Baris Dincer</font>

Department: <font color="#38db02">Artificial Intelligence - Lead / CO-Founder of LUX & Memoriae</font>

Affiliated: <font color="#38db02"> LUX Space Science & Technology - Memoriae Technology

## Account

- LinkedIn:
[LinkedIn](https://www.linkedin.com/in/brs-dincer/)

- Kaggle:
[Kaggle](https://www.kaggle.com/brsdincer)

## Project Description

- Establishing the AGI principle by making experimental approaches and creating external tools on the custom assistant announced by OpenAI within the scope of DevDay

>`NOTE: Huggingface and OpenAI API values are required.`

# General Approaches

## Util-Classes

In [1]:
class CLASSINIT:
  pass
class PROCESS:
  pass
class RESULT:
  pass
class PARAMETER:
  pass
class URL:
  pass
class DOCUMENTATION:
  pass
class NULL:
  pass
class ERROR:
  pass
class DATA:
  pass
class MODEL:
  pass

## Util-Functions

In [2]:
createDirectory:PROCESS = lambda x: os.mkdir(x) if not os.path.exists(x) else None
deleteDirectory:PROCESS = lambda x: shutil.rmtree(x) if len(os.listdir(x)) > 1 else None
sortDirectory:PROCESS = lambda x: int(x.rsplit("_",1)[-1].split(".")[0])

In [3]:
class ERRORMODULE(object):
  def __init__(self)->CLASSINIT:
    self.error = NotImplementedError(NotImplemented)
  def __str__(self)->str:
    return "Error Module - Sub(Script)"
  def __call__(self)->ERROR:
    return self.error
  def __getstate__(self)->ERROR:
    raise self.error
  def __repr__(self)->DOCUMENTATION | str:
    return ERRORMODULE.__doc__
  def Default(self)->ERROR:
    raise self.error
  def Custom(self,errorType:ERROR,errorMessage:str)->ERROR:
    raise errorType(errorMessage)

In [4]:
def CREATEINPUTURL(initURL:URL)->dict:
  return {
      "type":"image_url",
      "image_url":str(initURL)
  }

In [5]:
def CREATEINPUTTEXT(initText:str)->dict:
  return {
      "type":"text",
      "text":str(initText)
  }

## Utils-Modules

In [6]:
import sys,os,shutil
from typing import Union,Optional,List,Dict,Any

In [7]:
willBeDownloaded:list = [
    "langchain==0.0.311",
    "langchain-experimental",
    "cohere",
    "tiktoken",
    "youtube-transcript-api",
    "pytube",
    "openai",
    "duckduckgo-search",
    "wikipedia",
    "arxiv",
    "huggingface_hub"
]

In [8]:
%%capture
for module in willBeDownloaded:
  !pip install $module

## Utils-Constants

In [9]:
modelGPT:str = "gpt-4-1106-preview"

## Utils-Credentials

In [10]:
import openai

In [11]:
def DefineCredentials(typePass:str="openai")->PROCESS:
  from getpass import getpass
  if typePass.lower() == "openai":
    userInput = getpass("Type Your OpenAI API Key > ")
  else:
    userInput = getpass("Type Your HuggingFace API Key > ")
  if (userInput) and (userInput != " "):
    if typePass.lower() == "openai":
      os.environ["OPENAI_API_KEY"] = str(userInput)
      openai.api_key = str(userInput)
    else:
      os.environ["HUGGINGFACEHUB_API_TOKEN"] = str(userInput)
      globals()["huggingfaceApiKey"] = str(userInput)
      !huggingface-cli login --token $huggingfaceApiKey
  else:
    DefineCredentials()

# LLM

## Hugging

In [12]:
from langchain.llms import HuggingFaceHub

### Structure

In [13]:
class LLMArchitecture(object):
  def __init__(self)->CLASSINIT:
    pass
  def __str__(self)->str:
    return "LLM Architecture - Pre(Script)"
  def __call__(self)->NULL:
    return None
  def __getstate__(self)->ERROR:
    ERRORMODULE().Default()
  def __repr__(self)->DOCUMENTATION:
    return LLMArchitecture.__doc__
  def GetModel(self,defaultAIModel:str="mistralai/Mistral-7B-Instruct-v0.1")->MODEL:
    DefineCredentials(typePass="hugging")
    llm = HuggingFaceHub(repo_id=defaultAIModel,
                         task="text-generation",
                         model_kwargs={"temperature":0})
    return llm

In [None]:
optionHM = 'mistralai/Mistral-7B-Instruct-v0.1' # @param ["mistralai/Mistral-7B-Instruct-v0.1", "tiiuae/falcon-7b"]

In [None]:
%%capture
llm = LLMArchitecture().GetModel(defaultAIModel=optionHM)

## OpenAI: 1.2.3

### Agent

In [15]:
from openai import OpenAI

In [16]:
class OpenAIAgent(object):
  def __init__(self)->CLASSINIT:
    self.modelType = modelGPT
    self.instruction = (
        "You are an expert who analyzes video content and texts. "
        "You fact-check these analyzes by comparing them with historical information. "
        "Do research on the internet if necessary."
    )
  def __str__(self)->str:
    return "OpenAI Assistant Agent - Pre(Script)"
  def __call__(self)->NULL | None:
    return None
  def __getstate__(self)->ERROR:
    ERRORMODULE().Default()
  def __repr__(self)->DOCUMENTATION | str:
    return OpenAIAgent.__doc__
  def LoadModel(self)->MODEL:
    if os.environ.get("OPENAI_API_KEY") == "" or not os.environ.get("OPENAI_API_KEY"):
      DefineCredentials()
    else:
      pass
    defaultModel = OpenAI()
    return defaultModel
  def Model(self,
            initModel:MODEL,
            timeout:int=300)->MODEL:
    model = initModel.beta.assistants.create(model=self.modelType,
                                             instructions=self.instruction,
                                             tools=[{"type":"retrieval"},{"type":"code_interpreter"}],
                                             name="Transcript-Expert-Assistant",
                                             description="Agent to analyze video transcripts",
                                             timeout=timeout)
    return model

### Thread

In [17]:
def ReturnThread()->PROCESS:
  thread = openai.beta.threads.create()
  return thread

### Message

In [18]:
userPrompt = (
    "I have to find out whether what is said in this video is true or not. "
    "Search for me. "
    "Content and transcript related to video:\n"
    )

In [19]:
def ReturnThreadMessage(initModel:MODEL,
                        threadID:PROCESS,
                        defaultPrompt:str,
                        defaultRole:str="user")->PROCESS:
  message = initModel.beta.threads.messages.create(thread_id=threadID.id,
                                                   role=defaultRole,
                                                   content=defaultPrompt)
  return message

# Tools: Pre-Process

## Youtube

In [20]:
from langchain.document_loaders import YoutubeLoader

In [21]:
class YoutubeVideoExtraction(object):
  def __init__(self)->CLASSINIT:
    self.supportedLanguages = [
        "en","tr","es","el","da","de",
        "fr","hi","it","pl","sl","sr",
        "uk","zh","sk"
        ]
  def __str__(self)->str:
    return "Youtube Content Extraction - Pre(Script)"
  def __call__(self)->NULL | None:
    return None
  def __getstate__(self)->ERROR:
    ERRORMODULE().Default()
  def __repr__(self)->DOCUMENTATION | str:
    return YoutubeVideoExtraction.__doc__
  def Search(self,url:URL | str)->str:
    try:
      try:
        loader = YoutubeLoader.from_youtube_url(str(url),
                                                add_video_info=True,
                                                language=self.supportedLanguages,
                                                translation="en",
                                                continue_on_failure=False)
        documentMain = loader.load()
      except:
        vCode = url.split("v=")
        loader = YoutubeLoader.from_youtube_url(str(vCode[1]),
                                                add_video_info=True,
                                                language=self.supportedLanguages,
                                                translation="en",
                                                continue_on_failure=False)
        documentMain = loader.load()
      if len(documentMain) > 0:
        videoContent = documentMain[0].page_content
        videoMetadata = documentMain[0].metadata
        return (
            "#Video Content#\n"
            f"{str(videoContent)}\n"
            "#Video Metadata#\n"
            f"Title: {videoMetadata['title']}\n"
            f"Description: {videoMetadata['description']}\n"
            f"Publish Date: {videoMetadata['publish_date']}\n"
            f"Author: {videoMetadata['author']}\n"
            f"View: {videoMetadata['view_count']}\n"
        )
      else:
        return "Video is not within the scope of research -END-"
    except:
      return "An unexpected error occurred -The video is not available for review-"

# Assistant

## Agent: Design

### Process: Retrieve-Check

In [22]:
class AgentBaseRetrieve(object):
  def __init__(self)->CLASSINIT:
    self.isOK = False
  def __str__(self)->str:
    return "Agent Base Design - Pre(Script)"
  def __call__(self)->NULL | None:
    return None
  def __getstate__(self)->ERROR:
    ERRORMODULE().Default()
  def __repr__(self)->DOCUMENTATION:
    return AgentBase.__doc__
  def GetThreads(self,initModel:MODEL,threadID:str,assistantID:str)->PROCESS:
    run = initModel.beta.threads.runs.create(
        thread_id = str(threadID.id),
        assistant_id = str(assistantID.id)
    )
    while True:
      retr = initModel.beta.threads.runs.retrieve(
          thread_id=str(threadID.id),
          run_id=str(run.id)
      )
      if retr.completed_at:
        self.isOK = True
        break
    return self.isOK

### Process: Message-Check

In [23]:
class GetAgentMessage(object):
  def __init__(self)->CLASSINIT:
    self.timeout = 500
  def __str__(self)->str:
    return "Gathering Agent Response - Pre(Script)"
  def __call__(self)->NULL | None:
    return None
  def __getstate__(self)->ERROR:
    ERRORMODULE().Default()
  def __repr__(self)->DOCUMENTATION | str:
    return GetAgentMessage.__doc__
  def Launch(self,initModel:MODEL,threadID:str)->str:
    response = initModel.beta.threads.messages.list(thread_id=str(threadID.id),
                                                    timeout=self.timeout)
    message = response.data[0].content[0].text.value
    return message

## Agent: Base

### Main Structure

In [24]:
assistantBase = OpenAIAgent().LoadModel()

Type Your OpenAI API Key > ··········


In [25]:
assistantModel = OpenAIAgent().Model(initModel=assistantBase)

# Tools: Main-Process

In [26]:
from langchain.tools import BaseTool
from langchain.callbacks.manager import CallbackManagerForToolRun,AsyncCallbackManagerForToolRun

## Youtube

In [27]:
class YoutubeTool(BaseTool):
  name = "Tool for Youtube videos analysis"
  description = "Use this tool to analyze Youtube videos"
  def _run(self,
           url:Optional[str],
           run_manager:Optional[CallbackManagerForToolRun])->RESULT:
    resultVideo = YoutubeVideoExtraction().Search(url)
    promptUser = userPrompt + resultVideo
    assistantThread = ReturnThread()
    assistantThreadMessage = ReturnThreadMessage(initModel=assistantBase,
                                             threadID=assistantThread,
                                             defaultPrompt=promptUser)
    isOK = AgentBaseRetrieve().GetThreads(initModel=assistantBase,
                                      threadID=assistantThread,
                                      assistantID=assistantModel)
    if isOK:
      answer = GetAgentMessage().Launch(initModel=assistantBase,
                                  threadID=assistantThread)
      return answer
    else:
      return "No logical answer could be obtained -END-"
  def _arun(self,
            url:Optional[str],
            run_manager:Optional[AsyncCallbackManagerForToolRun])->ERROR:
    ERRORMODULE().Default()

# AGI

In [28]:
from langchain.memory import ConversationBufferMemory,ReadOnlySharedMemory

## Structure: Command

In [29]:
prefixMain = (
    "You are an expert who analyzes video content and texts. "
    "You fact-check these analyzes by comparing them with historical information. "
    "If possible, compare the information in the video with other "
    "information found on the internet and uncover the truth. "
    "Check out this video link given to you: {objective}"
)

In [30]:
suffixTasks = """{agent_scratchpad}"""

In [31]:
agiInstruction = "Choose the most appropriate tool according to the content of the objectives given to you."

## Structure: Memory

In [32]:
agiTaskMemory = ConversationBufferMemory(input_key="objective",
                                         memory_key="chat_history")

In [33]:
agiMemory = ReadOnlySharedMemory(memory=agiTaskMemory)

## Structure: Model

In [34]:
from langchain.chains.base import Chain
from langchain.agents import initialize_agent,ZeroShotAgent,AgentExecutor
from pydantic import BaseModel,Field
from langchain.agents import AgentType
from langchain.llms.base import BaseLLM
from langchain.memory.chat_message_histories import FileChatMessageHistory

In [35]:
class ASSISTANTAGENT:
  pass

In [36]:
class HogwartsAGI(Chain,BaseModel):
  agentExecutor:AgentExecutor = Field(...)
  class Config:
    arbitrary_types_allowed = True
  @property
  def input_keys(self)->List[str]:
    return ["objective"]
  @property
  def output_keys(self)->List[str]:
    return ["result"]
  def _call(self,inputs:Optional[str])->Optional[str]:
    if inputs["objective"]:
      url = inputs["objective"]
      resultVideo = YoutubeVideoExtraction().Search(url)
      promptUser = userPrompt + resultVideo
      assistantThread = ReturnThread()
      assistantThreadMessage = ReturnThreadMessage(initModel=assistantBase,
                                              threadID=assistantThread,
                                              defaultPrompt=promptUser)
      isOK = AgentBaseRetrieve().GetThreads(initModel=assistantBase,
                                        threadID=assistantThread,
                                        assistantID=assistantModel)
      if isOK:
        answer = GetAgentMessage().Launch(initModel=assistantBase,
                                    threadID=assistantThread)
        return {"result":str(answer)}
      else:
        return {"result":"No logical answer could be obtained -END-"}
  @classmethod
  def from_llm(cls,
               llm:BaseLLM,
               verbose:bool=False,
               **kwargs)->MODEL:
      executor = initialize_agent(agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
                                  tools=[YoutubeTool()],
                                  llm=llm,
                                  verbose=verbose,
                                  max_iterations=20,
                                  memory=agiMemory,
                                  agent_instructions=agiInstruction,
                                  handling_parsing_error=True,
                                  chat_history_memory=FileChatMessageHistory("chat_history.txt"))
      agentTaskInitial = ZeroShotAgent.create_prompt(tools=[YoutubeTool()],
                                                     prefix=prefixMain,
                                                     suffix=suffixTasks,
                                                     input_variables=["objective","agent_scratchpad"])
      executor.agent.llm_chain.prompt = agentTaskInitial
      return cls(agentExecutor=executor,
                 **kwargs)


In [37]:
Hogwarts = HogwartsAGI.from_llm(llm=llm,
                                verbose=False)

In [42]:
userInput = "https://www.youtube.com/watch?v=6XnsYZxH2nI"

In [43]:
Hogwarts({"objective":userInput})

{'objective': 'https://www.youtube.com/watch?v=6XnsYZxH2nI',
 'result': 'To verify the truthfulness of the statements contained in the video transcript, I will analyze each key point and provide evidence that supports or contradicts the claims:\n\n1. **Adolf Hitler\'s Motivation**: There is historical consensus that Hitler aimed to eliminate the Jewish population as part of his ideology. This is supported by his writings in "Mein Kampf" and the implementation of the Holocaust during World War II.\n\n2. **Hitler\'s Birth and Childhood**: Adolf Hitler was indeed born in Braunau am Inn, Austria, on April 20, 1889. Historical records support that his childhood included difficulties like the death of his father and poor academic performance.\n\n3. **Hitler\'s Name**: The claim that Hitler\'s name was originally "Schickelberger" is false. His actual name was Adolf Hitler. There were rumors and conspiracy theories about Hitler\'s last name, but credible historical sources do not support this 