# Code Analyis using Amazon Q for Business

In this notebook we will use Amazon Q for Business to analyze the code of a repository. We will use langchain-agents repository in `aws-samples` as an example.

## Prerequisites

First make sure that your AWS credentials are properly configured. You can do this by installing the AWS CLI and running `aws configure`:

```bash
pip install awscli
aws configure
```

Then install the necessary libraries:

In [None]:
%pip install boto3 --upgrade 
%pip install python-git shutils

## Creating the Amazon Q Application

In the steps below we will create an Amazon Q application that will be used to process and then answer questions about the code of a repository.

In [None]:
amazon_q_user_id = "<your email>"
role_arn = None
amazon_q_app_id = None
reuse_existing_q_app = False

If you want to reuse the application, uncomment the cell below, fill in the values, and run it.

In [1]:
# amazon_q_app_id = <your app id>
# role_arn = <your role arn>
# reuse_existing_q_app = True

Next we create the IAM role that will be used by the Amazon Q application to access the repository.

In [None]:
# Create Q IAM Service role from iam-policy.json and trust-policy.json
import boto3
import json
import datetime

if not role_arn:

    # Create IAM role
    project_name = f"Code-Analysis-Demo-App-{datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}"
    iam = boto3.client('iam')
    # Note to work with Q Business the role MUST start with "QBusiness-Application-"
    role_name = f"QBusiness-Application-{project_name}"
    role_policy_file = "./security/iam-policy.json"
    trust_policy_file = "./security/trust-policy.json"

    # Create role using iam policy and trust policy
    role_policy = json.load(open(role_policy_file))
    trust_policy = json.load(open(trust_policy_file))
    role = iam.create_role(
        RoleName=role_name,
        AssumeRolePolicyDocument=json.dumps(trust_policy)
    )
    role_arn = role['Role']['Arn']

Now we create the Amazon Q application passing the role created above. Note that we are enabling attachments so that we can later process every file of code in the repository with the application

In [None]:
amazon_q = boto3.client('qbusiness')

if not reuse_existing_q_app:
    response = amazon_q.create_application(
        attachmentsConfiguration={
            'attachmentsControlMode': 'ENABLED'
        },
        description=f"{project_name}-{datetime.datetime.now().strftime('%Y-%m-%d')}",
        displayName=project_name,
        roleArn=role_arn,
    )
    amazon_q_app_id = response["applicationId"]

After creating the application, we will create an index that will be used to store the information about the code of the repository.

In [None]:
if not reuse_existing_q_app:
    response = amazon_q.create_index(
        applicationId=amazon_q_app_id,
        capacityConfiguration={
            'units': 1
        },
        description=f"{project_name}-{datetime.datetime.now().strftime('%Y-%m-%d')}",
        displayName=project_name,
    )
    index_id = response["indexId"]
else:
    response = amazon_q.list_indices(
        applicationId=amazon_q_app_id
    )
    index_id = response["indices"][0]['indexId']

Lastly we will create a retriever to fetch the relevant information when we ask questions about the repository.

In [None]:
if not reuse_existing_q_app:
    response = amazon_q.create_retriever(
        applicationId=amazon_q_app_id,
        configuration={
            'nativeIndexConfiguration': {
                'indexId': index_id
            }
        },
        displayName=project_name,
        roleArn=role_arn,
        type='NATIVE_INDEX'
    )
    retriever_id = response["retrieverId"]
else:
    retriever_id = amazon_q.list_retrievers(
        maxResults=1,
        applicationId=amazon_q_app_id,
    )["retrievers"][0]["retrieverId"]

In [None]:
import time
while True:
    response = amazon_q.get_index(
    applicationId=amazon_q_app_id,
    indexId=index_id,
    )
    status = response.get('status')
    print(f"Creat index status {status}")
    if status == 'ACTIVE':
        break
    time.sleep(10)

## Generating and Ingesting Documentation
If we only ingest the code, we will be retrieving random code chunks that may not be relevant to the questions we want to ask. To avoid this, we will generate concise documentation for the repository and ingest it into the index.

First we will define a few helper functions. The first one will be used to take a file and a prompt and return the answer generated from Q.

In [None]:
def ask_question_with_attachment(prompt, filename):
    data=open(filename, 'rb')
    answer = amazon_q.chat_sync(
        applicationId=amazon_q_app_id,
        userId=amazon_q_user_id,
        userMessage=prompt,
        attachments=[
            {
                'data': data.read(),
                'name': filename
            },
        ],
    )
    return answer['systemMessage']

Next, we define a function that will upload the generated answer, the filename, and the prompt to the index along with a source attribute that can be used later to find the files that the answer came from.

In [None]:
import uuid

def upload_prompt_answer_and_file_name(filename, prompt, answer, repo_url):
    amazon_q.batch_put_document(
        applicationId=amazon_q_app_id,
        indexId=index_id,
        roleArn=role_arn,
        documents=[
            {
                'id': str(uuid.uuid4()),
                'contentType': 'PLAIN_TEXT',
                'title': filename,
                'content':{
                    'blob': f"{filename} | {prompt} | {answer}".encode('utf-8')
                },
                'attributes': [
                    {
                        'name': 'url',
                        'value': {
                            'stringValue': f"{repo_url}{filename}"
                        }
                    }
                ]
            },
        ]
    )

We also create a helper function to save the generated answers to a local folder in case we want to inspect it later.

In [None]:
# Function to save generated answers to folder documentation/
def save_answers(answer, filepath, folder):
    import os
    # Only create directory until the last / of filepath
    sub_directory = f"{folder}{filepath[:filepath.rfind('/')+1]}"
    if not os.path.exists(sub_directory):
        # Only create directory until the last /
        os.makedirs(sub_directory)
    # Replace all file endings with .txt
    filepath = filepath[:filepath.rfind('.')] + ".txt"
    with open(f"{folder}{filepath}", "w") as f:
        f.write(answer)

Since there are some files we want to ignore, we will define a function to filter the files we want to process. You can modify this function to fit your needs.

In [2]:
import os
def should_ignore_path(path):
    path_components = path.split(os.sep)
    for component in path_components:
        if component.startswith('.'):
            return True
    return False

## Processing and Ingesting a Git Repository

Now that we have the application and the index, along with the helper functions, we can process every file in the repository and ingest the generated answers into the index.

First we will clone the repository and then process every file in the repository. We will use the helper functions to generate the answers and ingest them into the index. We will also save the generated answers to a local folder.

If you want to create documentation with focus on a particular aspect, i.e. security, you can change the prompt so Q will generate answers with that focus.

In [None]:
import git
import shutil

def process_repository(repo_url, ssh_url=None):

    # Temporary clone location
    tmp_dir = f"/tmp/{datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}" 

    destination_folder = 'repositories/'

    # Clone the repository
    # If you authenticate with some other repo provider just change the line below
    if ssh_url:
        repo = git.Repo.clone_from(ssh_url, tmp_dir)
    else:
        repo = git.Repo.clone_from(repo_url, tmp_dir)

    # Copy all files to destination folder
    for src_dir, dirs, files in os.walk(tmp_dir):
        dst_dir = src_dir.replace(tmp_dir, destination_folder)
        if not os.path.exists(dst_dir):
            os.mkdir(dst_dir)
        for file_ in files:
            src_file = os.path.join(src_dir, file_)
            dst_file = os.path.join(dst_dir, file_)
            if os.path.exists(dst_file):
                os.remove(dst_file)
            shutil.copy(src_file, dst_dir)
    
    # Delete temp clone       
    shutil.rmtree(tmp_dir)

    import time

    processed_files = []
    failed_files = []

    for root, dirs, files in os.walk(destination_folder):
        if should_ignore_path(root):
            continue
        for file in files:
            if file.endswith(('.png', '.jpg', '.jpeg', '.gif', '.zip')):
                continue
            # Ignore files that start with a dot (.)
            if file.startswith('.'):
                continue
                
            file_path = os.path.join(root, file)
            
            for attempt in range(3):
                try:
                    print(f"\033[92mProcessing file: {file_path}\033[0m")
                    #prompt = "Generate comprehensive documentation about the attached file. Make sure you include what dependencies and other files are being referenced as well as function names, class names, and what they do."
                    prompt = "Come up with a list of questions and answers about the attached file. Keep answers dense with information. A good question for a database related file would be 'What is the database technology and architecture?' or for a file that executes SQL commands 'What are the SQL commands and what do they do?' or for a file that contains a list of API endpoints 'What are the API endpoints and what do they do?'"
                    answer = ask_question_with_attachment(prompt, file_path)
                    upload_prompt_answer_and_file_name(file_path, prompt, answer, repo_url)
                    save_answers(answer, file_path, "documentation/")
                    processed_files.append(file)
                    break
                except:
                    print(f"\033[93mSkipping file: {file_path}\033[0m") 
                    time.sleep(15)
            else:
                failed_files.append(file_path)
                
    print(f"Processed files: {processed_files}")
    print(f"Failed files: {failed_files}")

Important: If you need to sign your ssh key before making a request, i.e. mwinit, perform that action in your terminal first and then run the cell below.

In [None]:
# Specify the path to the Git repository
# If you are using ssh to clone then uncomment the line below
# ssh_url = '<your ssh url>'
repo_url = "https://github.com/aws-samples/langchain-agents.git"

# Call the function to process the repository
process_repository(repo_url)

## Using the Amazon Q Application to Answer Questions about the Repository

Now that we've created the application, the index, and ingested the documentation, we can use the Amazon Q application to answer questions about the repository.

In [None]:
def ask_question_about_repo(prompt):
    answer = amazon_q.chat_sync(
        applicationId=amazon_q_app_id,
        userId=amazon_q_user_id,
        userMessage=prompt
    )
    return answer['systemMessage']

In [None]:
ask_question_about_repo("What is the architecture of the Bedrock Langchain solution?")

## Delete Q Application, Index, and Retriever

To avoid incurring costs, you can delete the application, the index, and the retriever.

In [None]:
amazon_q.delete_retriever(applicationId=amazon_q_app_id, retrieverId=retriever_id)
amazon_q.delete_index(applicationId=amazon_q_app_id, indexId=index_id)
amazon_q.delete_application(applicationId=amazon_q_app_id)