In [None]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Use Retrieval Augmented Generation (RAG) with Codey API's

<table align="left">

  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/generative-ai/blob/main/language/use-cases/code-generation/retrieval_augmented_generation_with_codey">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/use-cases/code-generation/retrieval_augmented_generation_with_codey">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/generative-ai/blob/main/language/use-cases/code-generation/retrieval_augmented_generation_with_codey">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      Open in Vertex AI Workbench
    </a>
  </td>
</table>

### Objective

This notebook demonstrates how you augment output from Codey APIs by bringing in external knowledge. We'll show you an example using Code Retrieval Augmented Generation(RAG) using [Google Cloud's Generative AI github repository](https://github.com/GoogleCloudPlatform/generative-ai).

### Install libraries

In [None]:
!pip install langchain google-cloud-aiplatform faiss-cpu

### Restart runtime

In [None]:
# Restart kernel after installs so that your environment can access the new packages
import IPython
import time

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

### Authenticate your notebook environment (Colab only)

If you are running this notebook on Google Colab, you will need to authenticate your environment. To do this, run the new cell below. This step is not required if you are using Vertex AI Workbench.

In [None]:
import sys

if "google.colab" in sys.modules:
    # Define project information
    PROJECT_ID = ""  # @param {type:"string"}
    LOCATION = "us-central1"  # @param {type:"string"}

    # Authenticate user to Google Cloud
    from google.colab import auth

    auth.authenticate_user()

### Import libraries

In [None]:
# Utils

# LangChain
from langchain.llms import VertexAI
from langchain.embeddings import VertexAIEmbeddings

from langchain.schema import HumanMessage, SystemMessage
from langchain.schema.document import Document

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.text_splitter import Language

from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA

import time
from typing import List

from pydantic import BaseModel

# Vertex AI
from google.cloud import aiplatform
import vertexai
from vertexai.language_models import CodeGenerationModel

print(f"Vertex AI SDK version: {aiplatform.__version__}")

Vertex AI SDK version: 1.36.1


In [None]:
# Initialize project
vertexai.init(project=PROJECT_ID, location=LOCATION)

GITHUB_TOKEN = "" # @param {type:"string"}
GITHUB_REPO = "GoogleCloudPlatform/generative-ai" # @param {type:"string"}


#Code Generation

code_llm = VertexAI(
    model_name="code-bison@latest",
    max_output_tokens=2048,
    temperature=0.1,
    verbose=False,
    )


### Crawl Github repo to get list all code files. In this case we are only looking for Jupyter notebooks.

In [None]:
import requests, time

#Crawls a GitHub repository and returns a list of all ipynb files in the repository
def crawl_github_repo(url,is_sub_dir,access_token = f"{GITHUB_TOKEN}"):

    ignore_list = ['__init__.py']

    if not is_sub_dir:

        api_url = f"https://api.github.com/repos/{url}/contents"

    else:

        api_url = url

    headers = {
        "Accept": "application/vnd.github.v3+json",
        "Authorization": f"Bearer {access_token}"
                   }

    response = requests.get(api_url, headers=headers)
    response.raise_for_status()  # Check for any request errors

    files = []

    contents = response.json()
    # print(f"{contents}")

    for item in contents:
        if item['type'] == 'file' and item['name'] not in ignore_list and (item['name'].endswith('.py') or item['name'].endswith('.ipynb')):
            files.append(item['html_url'])
        elif item['type'] == 'dir' and not item['name'].startswith("."):
            sub_files = crawl_github_repo(item['url'],True)
            time.sleep(.1)
            files.extend(sub_files)

    return files

In [None]:
code_files_urls = crawl_github_repo(GITHUB_REPO,False,GITHUB_TOKEN)

# Write list to a file so you do not have to download each time
with open('code_files_urls.txt', 'w') as f:
    for item in code_files_urls:
        f.write(item + '\n')


len(code_files_urls)

93

In [None]:
code_files_urls[0:10]

In [None]:
# Utility functions for Embeddings API with rate limiting
def rate_limit(max_per_minute):
    period = 60 / max_per_minute
    print("Waiting")
    while True:
        before = time.time()
        yield
        after = time.time()
        elapsed = after - before
        sleep_time = max(0, period - elapsed)
        if sleep_time > 0:
            print(".", end="")
            time.sleep(sleep_time)


class CustomVertexAIEmbeddings(VertexAIEmbeddings, BaseModel):
    requests_per_minute: int
    num_instances_per_batch: int

    # Overriding embed_documents method
    def embed_documents(self, texts: List[str]):
        limiter = rate_limit(self.requests_per_minute)
        results = []
        docs = list(texts)

        while docs:
            # Working in batches because the API accepts maximum 5
            # documents per request to get embeddings
            head, docs = (
                docs[: self.num_instances_per_batch],
                docs[self.num_instances_per_batch :],
            )
            chunk = self.client.get_embeddings(head)
            results.extend(chunk)
            next(limiter)

        return [r.values for r in results]


### Extract code from Ipython notebooks

In [None]:
import requests
import nbformat
import json

# Extracts the python code from an ipynb file from github
def extract_python_code_from_ipynb(github_url,cell_type = "code"):
    raw_url = github_url.replace("github.com", "raw.githubusercontent.com").replace("/blob/", "/")

    response = requests.get(raw_url)
    response.raise_for_status()  # Check for any request errors

    notebook_content = response.text

    notebook = nbformat.reads(notebook_content, as_version=nbformat.NO_CONVERT)

    python_code = None

    for cell in notebook.cells:
        if cell.cell_type == cell_type:
          if not python_code:
            python_code = cell.source
          else:
            python_code += "\n" + cell.source

    return python_code

def extract_python_code_from_py(github_url):
    raw_url = github_url.replace("github.com", "raw.githubusercontent.com").replace("/blob/", "/")

    response = requests.get(raw_url)
    response.raise_for_status()  # Check for any request errors

    python_code = response.text

    return python_code

In [None]:
with open('code_files_urls.txt') as f:
    code_files_urls = f.read().splitlines()
len(code_files_urls)

34

In [None]:
code_strings = []

for i in range(0, len (code_files_urls)):
    if code_files_urls[i].endswith(".ipynb"):
        content = extract_python_code_from_ipynb(code_files_urls[i],"code")
        doc = Document(page_content=content, metadata= {"url": code_files_urls[i], "file_index":i})
        code_strings.append(doc)

### Chunk code files, generate embeddings & initialize Index

In [None]:
# Chunk code strings
text_splitter = RecursiveCharacterTextSplitter.from_language(
    language=Language.PYTHON,chunk_size=2000, chunk_overlap=200
)


texts = text_splitter.split_documents(code_strings)
print(len(texts))

#Initialize Embedding API
EMBEDDING_QPM = 100
EMBEDDING_NUM_BATCH = 5
embeddings = CustomVertexAIEmbeddings(
    requests_per_minute=EMBEDDING_QPM,
    num_instances_per_batch=EMBEDDING_NUM_BATCH,
    model_name = "textembedding-gecko@latest"
)

# Create Index from embedded code chunks
db = FAISS.from_documents(texts, embeddings)

# Init your retriever.
retriever = db.as_retriever(
    search_type="similarity",  # Also test "similarity", "mmr"
    search_kwargs={"k": 5},)

retriever

### Try zero-Shot prompts

In [42]:
# Define prompt templates


# Zero Shot prompt template
prompt_zero_shot = """
    You are a proficient python developer. Respond with the syntactically correct & concise code for to the question below.

    Question:
    {question}

    Output Code :
    """

prompt_prompt_zero_shot = PromptTemplate(
input_variables=["question"],
template=prompt_zero_shot,
)


# RAG template
prompt_RAG = """
    You are a proficient python developer. Respond with the syntactically correct code for to the question below. Make sure you follow these rules:
    1. Use context to understand the APIs and how to use it & apply.
    2. Do not add license information to the output code.
    3. Do not include colab code in the output.
    4. Ensure all the requirements in the question are met.

    Question:
    {question}

    Context:
    {context}

    Helpful Response :
    """

prompt_RAG_tempate = PromptTemplate(
    template=prompt_RAG, input_variables=["context", "question"]
)

qa_chain = RetrievalQA.from_llm(
    llm=code_llm, prompt=prompt_RAG_tempate, retriever=retriever, return_source_documents=True
)

In [43]:
user_question = "Create python function that takes a prompt and predicts using langchain.llms interface with VertexAI text-bison model"

In [44]:
response = code_llm.predict(text=user_question, max_output_tokens=2048, temperature=0.1)
print(response)

```python
def predict_with_langchain_llms(prompt):
    """Predicts the next token using LangChain LLMs interface with VertexAI text-bison model.

    Args:
        prompt: The prompt to predict the next token for.

    Returns:
        The predicted next token.
    """

    # Create the LangChain LLMs interface.
    llms = LangChainLLMs(
        model_name="text-bison",
        project_id="YOUR_PROJECT_ID",
        location="YOUR_LOCATION",
    )

    # Predict the next token.
    prediction = llms.predict(prompt)

    # Return the predicted next token.
    return prediction
```


In [45]:
results = qa_chain({"query": user_question})
print(results["result"])

```python
def predict_with_langchain_llms_interface_with_vertexai_text_bison_model(
    prompt: str,
) -> str:
    """Create python function that takes a prompt and predicts using langchain.llms interface with VertexAI text-bison model

    Args:
        prompt (str): The prompt to predict

    Returns:
        str: The prediction
    """

    # Initialize the VertexAI embeddings
    embedding = VertexAIEmbeddings()

    # Initialize the VertexAI LLM
    llm = VertexAI(
        model_name="text-bison-32k",
        max_output_tokens=256,
        temperature=0.1,
        top_p=0.8,
        top_k=40,
        verbose=True,
    )

    # Create the prediction
    prediction = llm.predict(prompt)

    return prediction

```


In [49]:
user_question = "Create python function that takes text input and returns embeddings using Langchain with VertexAI textembedding-gecko model"


response = code_llm.predict(text=user_question, max_output_tokens=2048, temperature=0.1)
print(response)

```python
import langchain
from langchain.models import GeckoEmbeddings

def get_embeddings(text):
  # Load the Langchain model
  model = GeckoEmbeddings()

  # Generate embeddings for the input text
  embeddings = model.encode(text)

  return embeddings
```


In [50]:
results = qa_chain({"query": user_question})
print(results["result"])

```python
def get_embeddings_langchain_vertexai_textembedding_gecko(text):
  """Gets the embeddings for a given text using Langchain with VertexAI textembedding-gecko model.

  Args:
    text: The text to get the embeddings for.

  Returns:
    A list of embeddings.
  """

  # Initialize the VertexAI embeddings model.
  embedding = VertexAIEmbeddings()

  # Get the embeddings for the text.
  embeddings = embedding.embed_documents([text])

  return embeddings[0]
```
