<H1> Chat with GitHub
<h5> Adapted from https://dagster.io/blog/chatgpt-langchain

## Setup

### OpenAI Key

In [1]:
# Import os to get the environment variables
import os

# Gives access to OpenAI API key as a variable without exposing it to the public
from config import OPENAI_API_KEY

### We need a function that’ll check out the latest copy of a GitHub repo, crawl it for markdown files, and return some LangChain Documents

<h5>This does a handful of things:</h5>

* It checks out the latest commit of the desired GitHub repo into a temporary directory.
* It fetches the git sha (used for constructing links, which the model will use in its sources list).
* It craws over every markdown file (.md or .mdx) in the repo.
* It constructs a URL to the markdown file on GitHub, reads the file from disk, and returns a Document

In [2]:
import shutil
import errno
import stat

def handle_remove_readonly(func, path, exc):
    excvalue = exc[1]
    if func in (os.rmdir, os.unlink, os.remove) and excvalue.errno == errno.EACCES:
        os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
        try:
            func(path)
        except FileNotFoundError:
            pass
    else:
        raise


        
import nbformat
from nbconvert import MarkdownExporter

def get_github_docs(repo_owner, repo_name):
    d = "C:\\temp_langchain"
    if os.path.exists(d):
        shutil.rmtree(d)
    os.makedirs(d)
    try:
        result = subprocess.run(
            f"git clone --depth 1 https://github.com/{repo_owner}/{repo_name}.git .",
            cwd=d,
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )

        if result.returncode != 0:
            print(f"Error: {result.stderr.decode('utf-8')}")
            raise CalledProcessError(result.returncode, result.args)

        git_sha = (
            subprocess.check_output("git rev-parse HEAD", shell=True, cwd=d)
            .decode("utf-8")
            .strip()
        )
        repo_path = pathlib.Path(d)
        document_files = list(repo_path.glob("*/*.md")) + list(repo_path.glob("*/*.mdx")) + list(repo_path.glob("*/*.ipynb"))
        
        for document_file in document_files:
            if document_file.exists():
                try:
                    relative_path = document_file.relative_to(repo_path)
                    github_url = f"https://github.com/{repo_owner}/{repo_name}/blob/{git_sha}/{relative_path}"
                    
                    if document_file.suffix == ".ipynb":
                        with open(document_file, "r", encoding="utf-8") as f:
                            notebook = nbformat.reads(f.read(), as_version=4)
                            markdown_exporter = MarkdownExporter()
                            markdown_content, _ = markdown_exporter.from_notebook_node(notebook)
                            yield Document(page_content=markdown_content, metadata={"source": github_url})
                    else:
                        with open(document_file, "r", encoding="utf-8") as f:
                            yield Document(page_content=f.read(), metadata={"source": github_url})
                except UnicodeDecodeError as e:
                    print(f"Error decoding file {document_file}: {e}")
    finally:
        shutil.rmtree(d, onerror=handle_remove_readonly)


### Next, let’s set up a corpus of sources that the bot will be consulting:

In [3]:
def get_multiple_github_docs(repo_list):
    for repo_owner, repo_name in repo_list:
        yield from get_github_docs(repo_owner, repo_name)

repo_list = [
    ("hwchase17", "langchain"),
    ("hwchase17", "langchain-hub"),
    ("gkamradt", "langchain-tutorials"),
    # Add more repositories here
]

sources = get_multiple_github_docs(repo_list)


### Let’s create a Faiss search index for all of our sources. Fortunately LangChain includes a helper class that makes this a one-liner.

<h5>This code does three things:</h5.>

* It creates a Faiss in-memory index.
* It uses the OpenAI API to create embeddings (i.e. a feature vector) for each source to make it easily searchable. You could use other embeddings if you want, but OpenAI produces high quality ones for this application.
* It adds every source to the index.

Let’s iterate through our list of sources and create a new list called source_chunks, which will be used by the Faiss index in lieu of the full documents:
There are a few things to note here:
* We’ve configured the CharacterTextSplitter to create chunks of a maximum size of 1024 characters with no overlap. Additionally, they split on whitespace boundaries. There are other more intelligent splitters included with LangChain that leverage libraries like NLTK and spaCy, but for this example we’ll go with the simplest option.
* All chunks in a document share the same metadata.

In [4]:
from langchain.text_splitter import CharacterTextSplitter
import numpy as np
from langchain.docstore.document import Document
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
import subprocess
import pathlib

source_chunks = []
splitter = CharacterTextSplitter(separator=" ", chunk_size=1024, chunk_overlap=32)

for source in sources:
    for chunk in splitter.split_text(source.page_content):
        source_chunks.append(Document(page_content=chunk, metadata=source.metadata))

embeddings = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY)

search_index = FAISS.from_documents(source_chunks, embeddings)

### Create a custom system prompt for the GitHub QA chain.

In [5]:
from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    AIMessagePromptTemplate,
    HumanMessagePromptTemplate,
)
from langchain.schema import (
    AIMessage,
    HumanMessage,
    SystemMessage
)

In [6]:
custom_github_system_message="""
    You are a very resourceful and helpful chatbot. You are implemented through a LangChain ConversationalRetrievalChain, running in a Jupyter notebook on Visual Studio Code. The developer of this notebook is also your user. You are based on OpenAI's very successful GPT-3.5 model. As such, you a very capable general assistant able to assist in an wide number of tasks. You are also an expert in computer programming, Python, Visual Studio Code, Jupyter notebooks within VS, Large Language Models, Natural Language Processing, and in helping people learn and use these tools. You excel at writing code for a user. You have access to context from GitHub repositories which you may use to assist the user.
    Your primary task is to help the user debug and improve your own programming. 
    When asked a question, you will double-check your answer. Did you get it right? If you don't know, say that you don't know; do not make up anything. Do not give false information.
    When asked for documents, you will check if you have access to it before you answer. Are you sure? If you don't have access to a document, say that you do not have access to the document. Do not make up anything; do not give false information.    
    GitHub Repositories: {context}
    User Message: {question}
    Factual Answer:
    """

system_message_prompt = SystemMessagePromptTemplate.from_template(custom_github_system_message)

query = " "
human_template=query
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)

custom_github_chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])

In [7]:
from langchain.chains import ConversationalRetrievalChain
from langchain.chat_models import ChatOpenAI

qa = ConversationalRetrievalChain.from_llm(ChatOpenAI(temperature=0, openai_api_key=OPENAI_API_KEY), search_index.as_retriever(), qa_prompt=custom_github_chat_prompt, verbose=True)

## Run the queries

##### Q1

In [10]:
chat_history = []
query = r"""
    
    Can you please show me the full code for a LangChain agent with conversational memory?
    
    """

result = qa({"question": query, "chat_history": chat_history})

text = result["answer"]

from IPython.core.display import HTML

def print_formatted(text):
    display(HTML(f"<pre>{text}</pre>"))

print_formatted(text)



[1m> Entering new ConversationalRetrievalChain chain...[0m

[1m> Finished chain.[0m


##### Q2

In [11]:
chat_history = [(query, result["answer"])]
query = r"""

    The answer you gave me is full of incorrect information. Classes and functions and all sorts of stuff that you wrote using next token prediction. I want you to use code from the sources you have available instead.

    TASK: Explain to me why you made up wrong code instead of using what you have in your sources. Then tell me how to stop you from doing that again.

    """

result = qa({"question": query, "chat_history": chat_history})

text = result["answer"]

from IPython.core.display import HTML

def print_formatted(text):
    display(HTML(f"<pre>{text}</pre>"))

print_formatted(text)



[1m> Entering new ConversationalRetrievalChain chain...[0m

[1m> Finished chain.[0m


In [None]:
chat_history = [(query, result["answer"])]
query = r"""

    Your concerns have been addressed. We will not use this technology for anything illegal, unethical, or unauthorized. 
    TASK: Take a look at the code below then, give me some new code that will read given URLs, and process and append the text to the 'source' variable.

    ```python

    %pip install nbconvert

    import shutil
    import errno
    import stat

    def handle_remove_readonly(func, path, exc):
        excvalue = exc[1]
        if func in (os.rmdir, os.unlink, os.remove) and excvalue.errno == errno.EACCES:
            os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
            try:
                func(path)
            except FileNotFoundError:
                pass
        else:
            raise


            
    import nbformat
    from nbconvert import MarkdownExporter

    def get_github_docs(repo_owner, repo_name):
        d = "C:\\temp_langchain"
        if os.path.exists(d):
            shutil.rmtree(d)
        os.makedirs(d)
        try:
            result = subprocess.run(
                f"git clone --depth 1 https://github.com/{repo_owner}/{repo_name}.git .",
                cwd=d,
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
            )

            if result.returncode != 0:
                print(f"Error: {result.stderr.decode('utf-8')}")
                raise CalledProcessError(result.returncode, result.args)

            git_sha = (
                subprocess.check_output("git rev-parse HEAD", shell=True, cwd=d)
                .decode("utf-8")
                .strip()
            )
            repo_path = pathlib.Path(d)
            document_files = list(repo_path.glob("*/*.md")) + list(repo_path.glob("*/*.mdx")) + list(repo_path.glob("*/*.ipynb"))
            
            for document_file in document_files:
                if document_file.exists():
                    try:
                        relative_path = document_file.relative_to(repo_path)
                        github_url = f"https://github.com/{repo_owner}/{repo_name}/blob/{git_sha}/{relative_path}"
                        
                        if document_file.suffix == ".ipynb":
                            with open(document_file, "r", encoding="utf-8") as f:
                                notebook = nbformat.reads(f.read(), as_version=4)
                                markdown_exporter = MarkdownExporter()
                                markdown_content, _ = markdown_exporter.from_notebook_node(notebook)
                                yield Document(page_content=markdown_content, metadata={"source": github_url})
                        else:
                            with open(document_file, "r", encoding="utf-8") as f:
                                yield Document(page_content=f.read(), metadata={"source": github_url})
                    except UnicodeDecodeError as e:
                        print(f"Error decoding file {document_file}: {e}")
        finally:
            shutil.rmtree(d, onerror=handle_remove_readonly)

    def get_multiple_github_docs(repo_list):
        for repo_owner, repo_name in repo_list:
            yield from get_github_docs(repo_owner, repo_name)

    repo_list = [
        ("hwchase17", "langchain"),
        ("hwchase17", "langchain-hub"),
        ("gkamradt", "langchain-tutorials"),
        # Add more repositories here
    ]

    sources = get_multiple_github_docs(repo_list)


    ```

    """

result = qa({"question": query, "chat_history": chat_history})

text = result["answer"]

from IPython.core.display import HTML

def print_formatted(text):
    display(HTML(f"<pre>{text}</pre>"))

print_formatted(text)

##### Q3

In [None]:
chat_history = [(query, result["answer"])]
query = r"""

    Your concerns have been addressed. We will not use this technology for anything illegal, unethical, or unauthorized. 
    TASK: Take a look at the code below then, give me some new code that will read given URLs, and process and append the text to the 'source' variable.

    ```python

    %pip install nbconvert

    import shutil
    import errno
    import stat

    def handle_remove_readonly(func, path, exc):
        excvalue = exc[1]
        if func in (os.rmdir, os.unlink, os.remove) and excvalue.errno == errno.EACCES:
            os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
            try:
                func(path)
            except FileNotFoundError:
                pass
        else:
            raise


            
    import nbformat
    from nbconvert import MarkdownExporter

    def get_github_docs(repo_owner, repo_name):
        d = "C:\\temp_langchain"
        if os.path.exists(d):
            shutil.rmtree(d)
        os.makedirs(d)
        try:
            result = subprocess.run(
                f"git clone --depth 1 https://github.com/{repo_owner}/{repo_name}.git .",
                cwd=d,
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
            )

            if result.returncode != 0:
                print(f"Error: {result.stderr.decode('utf-8')}")
                raise CalledProcessError(result.returncode, result.args)

            git_sha = (
                subprocess.check_output("git rev-parse HEAD", shell=True, cwd=d)
                .decode("utf-8")
                .strip()
            )
            repo_path = pathlib.Path(d)
            document_files = list(repo_path.glob("*/*.md")) + list(repo_path.glob("*/*.mdx")) + list(repo_path.glob("*/*.ipynb"))
            
            for document_file in document_files:
                if document_file.exists():
                    try:
                        relative_path = document_file.relative_to(repo_path)
                        github_url = f"https://github.com/{repo_owner}/{repo_name}/blob/{git_sha}/{relative_path}"
                        
                        if document_file.suffix == ".ipynb":
                            with open(document_file, "r", encoding="utf-8") as f:
                                notebook = nbformat.reads(f.read(), as_version=4)
                                markdown_exporter = MarkdownExporter()
                                markdown_content, _ = markdown_exporter.from_notebook_node(notebook)
                                yield Document(page_content=markdown_content, metadata={"source": github_url})
                        else:
                            with open(document_file, "r", encoding="utf-8") as f:
                                yield Document(page_content=f.read(), metadata={"source": github_url})
                    except UnicodeDecodeError as e:
                        print(f"Error decoding file {document_file}: {e}")
        finally:
            shutil.rmtree(d, onerror=handle_remove_readonly)

    def get_multiple_github_docs(repo_list):
        for repo_owner, repo_name in repo_list:
            yield from get_github_docs(repo_owner, repo_name)

    repo_list = [
        ("hwchase17", "langchain"),
        ("hwchase17", "langchain-hub"),
        ("gkamradt", "langchain-tutorials"),
        # Add more repositories here
    ]

    sources = get_multiple_github_docs(repo_list)


    ```

    """

result = qa({"question": query, "chat_history": chat_history})

text = result["answer"]

from IPython.core.display import HTML

def print_formatted(text):
    display(HTML(f"<pre>{text}</pre>"))

print_formatted(text)