# Data embedding, vector uploading and PR info mapping
## Data pipeline
The task of this notebook is to ingest each Pull Request(PR) in the subdatasets, upload the relevant content to a Database, embed it using a feature extraction model, upload it to a vector database, and map the PR content to the corresponding vector for Retrieval Augmented Generation (RAG).
Given the size of the overall dataset (over 30GB), this notebook will focus on the JavaScript portion of the dataset as a proof of concept.

`create a queue of the file names -> then spawn N threads -> each thread takes a file from the queue -> generates the proper datastructure/dataframe from the json file -> sends to the inference api (or runs the transformer locally) for embeddings -> takes the resulting embedding -> generates the proper datastructure to upload to a vector database (pinecone) and does so in chuncks of 100 vectors, as per the pinecone documentation -> upload the corresponding PR info to another Database mapping for the VectorID -> when finished with a file save to a log -> repeat.`

### Loading API keys and other secrets

In [2]:
from dotenv import load_dotenv
import os
import logging

logging.basicConfig(filename='embedding_progress.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

load_dotenv()

PINECONE_API_KEY = os.getenv('PINECONE_API_KEY')
HUGGINGFACE_API_KEY = os.getenv('HUGGINGFACE_API_KEY')
POSTGRESQL_USER = os.getenv('POSTGRESQL_USER')
POSTGRESQL_PASSWORD = os.getenv('POSTGRESQL_PASSWORD')

### Connect to the PostgreSQL Database

In [3]:
import psycopg2 as pg

conn = pg.connect(
    dbname="postgres",
    user=POSTGRESQL_USER,
    password=POSTGRESQL_PASSWORD,
    host="localhost",
    port="5432"
)
conn.autocommit = True

# Create a cursor object for SQL operations.
cur = conn.cursor()

In [4]:
# test connection
cur.execute("SELECT version();")
cur.fetchone()

('PostgreSQL 16.0, compiled by Visual C++ build 1935, 64-bit',)

In [5]:
cur.execute("ROLLBACK")
conn.commit()

### Create a database for the project

In [6]:
# Check if the database already exists, and create it if not.
cur.execute("SELECT 1 FROM pg_catalog.pg_database WHERE datname = 'review_owl';")
exists = cur.fetchall()

if not exists:
    logging.info("Creating database")
    cur.execute("CREATE DATABASE review_owl;")
    conn.commit()

conn.close()

### Connect to the project database

In [7]:
# Switch to the new database.
conn = pg.connect(
    dbname="review_owl",
    user=POSTGRESQL_USER,
    password=POSTGRESQL_PASSWORD,
    host="localhost",
    port="5432"
)
conn.autocommit = True

# Create a cursor object for SQL operations.
cur = conn.cursor()

### Create a Table for PR Information

In [8]:
# Check if the table already exists, and create it if not.
cur.execute("""
    CREATE TABLE IF NOT EXISTS pr_info (
        pr_vector_id TEXT PRIMARY KEY,
        pr_id INT UNIQUE,
        pr_repo_name TEXT,
        pr_html_url TEXT,
        pr_file_path TEXT,
        pr_line INT,
        pr_user TEXT,
        pr_diff_hunk TEXT,
        pr_body TEXT,
        pr_commit_id TEXT,
        pr_language TEXT
    );
""")

conn.commit()

### Function for inserting and updating rows

In [9]:
# Function to insert or update rows in the pr_info table.
def insert_or_update_pr_info(vector_id: str, pr_id: int, repo_name: str, html_url: str, file_path: str, line: int, user: str, diff_hunk: str, body: str, commit_id: str, language: str):
    try:
        cur.execute("""
            INSERT INTO pr_info (pr_vector_id, pr_id, pr_repo_name, pr_html_url, pr_file_path, pr_line, pr_user, pr_diff_hunk, pr_body, pr_commit_id, pr_language)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (pr_vector_id)
            DO UPDATE SET
                pr_id = EXCLUDED.pr_id,
                pr_repo_name = EXCLUDED.pr_repo_name,
                pr_html_url = EXCLUDED.pr_html_url,
                pr_file_path = EXCLUDED.pr_file_path,
                pr_line = EXCLUDED.pr_line,
                pr_user = EXCLUDED.pr_user,
                pr_diff_hunk = EXCLUDED.pr_diff_hunk,
                pr_body = EXCLUDED.pr_body,
                pr_commit_id = EXCLUDED.pr_commit_id,
                pr_language = EXCLUDED.pr_language;;
        """, (vector_id, pr_id, repo_name, html_url, file_path, line, user, diff_hunk, body, commit_id, language))
        conn.commit()
        return True
    except Exception as e:
        conn.rollback()
        logging.error(e)
        return False

In [10]:
# test insert
# insert_or_update_pr_info("", 44735997, "plotly@plotly.js", "https://github.com/plotly/plotly.js/pull/1#discussion_r44735997", "devtools/test_dashboard/server.js", 36, "etpinard", "@@ -1,89 +1,53 @@\n-var http = require('http');\n-var ecstatic = require('ecstatic');\n-var browserify = require('browserify');\n-var open = require('open');\n var fs = require('fs');\n-var watchify = require('watchify');\n+var http = require('http');\n var path = require('path');\n-var outpipe = require('outpipe');\n-var outfile = path.join(__dirname, '../shelly/plotlyjs/static/plotlyjs/build/plotlyjs-bundle.js');\n-\n-var testFile = './test';\n-\n-switch(process.argv[2]) {\n-  case 'geo':\n-    testFile = './test-geo';\n-  break;\n-  case '2d':\n-    testFile = './test-2d';\n-  break;\n-}\n-\n-console.log('using ' + testFile);\n-\n-var b = browserify(path.join(__dirname, '../shelly/plotlyjs/static/plotlyjs/src/plotly.js'), {\n-  debug: true,\n-  verbose: true,\n-  standalone: 'Plotly',\n-  transform: path.join(__dirname, '../shelly/plotlyjs/static/plotlyjs/compress_attributes.js'),\n-  cache: {},\n-  packageCache: {}\n-});\n-\n-\n-var w = watchify(b);\n \n-var bytes, time;\n-w.on('bytes', function (b) { bytes = b });\n-w.on('time', function (t) { time = t });\n-\n-w.on('update', bundle);\n-bundle();\n+var browserify = require('browserify');\n+var ecstatic = require('ecstatic');\n+var _open = require('open');\n \n-var firstBundle = true;\n+var makeWatchifiedBundle = require('../../tasks/util/make_watchified_bundle');\n+var shortcutPaths = require('../../tasks/util/shortcut_paths');\n+var constants = require('../../tasks/util/constants');\n \n-function bundle () {\n-    var didError = false;\n-    var outStream = process.platform === 'win32'\n-        ? fs.createWriteStream(outfile)\n-        : outpipe(outfile);\n \n-    var wb = w.bundle();\n-    wb.on('error', function (err) {\n-        console.error(String(err));\n-        didError = true;\n-        outStream.end('console.error('+JSON.stringify(String(err))+');');\n-    });\n-    wb.pipe(outStream);\n+// TODO make this an optional argument\n+var PORT = '8080';\n \n-    outStream.on('error', function (err) {\n-        console.error(err);\n-    });\n-    outStream.on('close', function () {\n-        if (!didError) {\n-            console.error(bytes + ' bytes written to ' + outfile\n-                + ' (' + (time / 1000).toFixed(2) + ' seconds)'\n-            );\n-            if(firstBundle) {\n-              open('http://localhost:8080/test-dashboard');\n-              firstBundle = false;\n-            }\n-        }\n-    });\n+var testFile;\n+switch(process.argv[2]) {\n+    case 'geo':\n+        testFile = './test-geo';\n+    break;\n+    case '2d':\n+        testFile = './test-2d';\n+    break;\n+    default:\n+        testFile = './test-3d';\n }\n \n-////// build the test examples\n+console.log('Using ' + testFile);\n+console.log('Listening on :' + PORT + '\\n');\n \n-fs.unlink('./test-bundle.js', function(error) {\n-    browserify({\n+// watch plotly.js\n+var watchifiedBundle = makeWatchifiedBundle(function onFirstBundleCallback() {\n+    _open('http://localhost:' + PORT + '/devtools/test_dashboard');\n+});\n+watchifiedBundle();", "the test dashboard server script uses the same watch-bundling machinery as `npm run watch` \n:palm_tree: :palm_tree: \n", "2abf31f5cb19b7cca9a4944ee506fe716ec44442", "JavaScript")

In [11]:
def get_pr_info_by_vector_ids(vector_ids: list[str]):
    placeholders = ', '.join(['%s' for _ in vector_ids])

    sql = f"SELECT * FROM pr_info WHERE vector_id IN ({placeholders})"

    cur.execute(sql, vector_ids)

    results = cur.fetchall()

    pr_info_list = []
    for result in results:
        pr_vector_id, pr_id, pr_repo_name, pr_html_url, pr_file_path, pr_line, pr_user, pr_diff_hunk, pr_body, pr_commit_id, pr_language = result
        pr_info_list.append({
            "vector_id": pr_vector_id,
            "id": pr_id,
            "repo_name": pr_repo_name,
            "html_url": pr_html_url,
            "file_path": pr_file_path,
            "line": pr_line,
            "user": pr_user,
            "diff_hunk": pr_diff_hunk,
            "body": pr_body,
            "commit_id": pr_commit_id,
            "language": pr_language
        })

    return pr_info_list

### Connecting to the Vector Database

In [12]:
import pinecone

pinecone.init(
    api_key=str(PINECONE_API_KEY),
    environment='gcp-starter'
)

  from tqdm.autonotebook import tqdm


#### In case we need to delete and recreate the index to start fresh

In [13]:
# pinecone.delete_index('review-owl')
# pinecone.create_index('review-owl', dimension=384, metric='euclidean', pods=1, pod_type='starter')

In [14]:
PINECONE_POOL_THREADS = 30
index = pinecone.Index('review-owl', pool_threads=PINECONE_POOL_THREADS)
index.describe_index_stats()

{'dimension': 384,
 'index_fullness': 0.04175,
 'namespaces': {'': {'vector_count': 4175}},
 'total_vector_count': 4175}

### Loading the models for running feature extraction locally

In [15]:
# from sentence_transformers import SentenceTransformer
# # model = SentenceTransformer('BAAI/bge-large-en-v1.5')
# model = SentenceTransformer('BAAI/bge-small-en-v1.5')

# # alternative embedding model to consider
# # model = SentenceTransformer('BAAI/llm-embedder')

### Functions for loading the datasets

In [16]:
# importing a specific repo dataset by finding all the files starting with the index number and a dash
import glob

# getting the index number from the file name
def get_index_number(file):
    """A helper function to get the index number from the file name."""
    return int(file.split('\\')[1].split('-')[0])

def import_filepaths(folder_path: str):
    # getting all the files in the directory
    file_paths = glob.glob(folder_path)

    # sorting the files by the index number
    sorted_filepaths = sorted(file_paths, key=get_index_number)

    return sorted_filepaths

### Functions for embedding

In [17]:
from tqdm.auto import tqdm
import pandas as pd
import json
import requests
import time
import random
import itertools

# API_URL = "https://api-inference.huggingface.co/models/BAAI/bge-base-en-v1.5"
# API_URL = "https://api-inference.huggingface.co/models/BAAI/bge-large-en-v1.5"
API_URL = "https://api-inference.huggingface.co/models/BAAI/bge-small-en-v1.5"
# API_URL = "https://api-inference.huggingface.co/models/BAAI/llm-embedder"
headers = {"Authorization": f"Bearer {HUGGINGFACE_API_KEY}"}

def file_to_embedding_inputs(file_path: str):
    embedding_input = []
    
    data = pd.read_json(file_path, orient='index')
    for row in tqdm(data.iloc[:, 0], 'Splitting file into payloads'):
        embedding_input.append('path: ' + row['path'] + '\n' + 'diff_hunk: ' + row['diff_hunk'])
    
    return embedding_input

def huggingface_inference_api_request(payload: list[str], payload_size: int = 1000):
    try:
        rejoined_list = []
        for i in tqdm(range(0, len(payload), payload_size), 'Making requests to HuggingFace API'):
            payload_slice = payload[i:i + payload_size]
            data = json.dumps(payload_slice)
            response = requests.request("POST", API_URL, headers=headers, data=data)

            if ('Rate limit reached' in response.content.decode("utf-8")):
                raise Exception('Rate limit reached')
            while (response.status_code != 200 and 'is currently loading' in response.content.decode("utf-8")):
                logging.info(f'Waiting for 30 seconds. Reponse: {response.content.decode("utf-8")}')
                time.sleep(30)
                response = requests.request("POST", API_URL, headers=headers, data=data)

            for v in json.loads(response.content.decode("utf-8")):
                rejoined_list.append(v)

        return rejoined_list
    except Exception as e:
        print(e)
        return e

def make_huggingface_request_with_backoff(payload: list[str]):
    max_retries = 10
    retries = 0

    while retries < max_retries:
        try:
            response = huggingface_inference_api_request(payload, 200)
            if (response is Exception): raise response
            return response
        except requests.HTTPError as e:
            print('HTTP error: ' + str(e.response.status_code))
            if e.response.status_code == 503:
                wait_time = (2 ** retries) + (random.uniform(0, 1) * 0.1)  # Exponential backoff with random jitter
                time.sleep(wait_time)
                retries += 1
            else:
                print('Error: ' + str(e))
                raise e
        except Exception as e:
            if (e == 'Rate limit reached'):
                print('Rate limit reached, waiting for 10 minutes')
                time.sleep(60 * 10) # wait for 10 minutes to try again
                retries = 0
                pass
            logging.error(e)
            retries += 1

    logging.error('Max retries exceeded')
    raise Exception("Max retries exceeded")

### Functions for generating the info that will be saved in the database

In [18]:
import hashlib

def hash_id(id):
    return hashlib.sha256(id.encode('utf-8')).hexdigest()

# Function to check if vector ID matches PR information.
def check_vector_id_match(repo_name: str, pr_id: int, vector_id: str):
    gen_id = repo_name + '-' + hash_id(repo_name + str(pr_id))
    return gen_id == vector_id

In [19]:
def generate_pr_info(file_path: str, generated_vector_data: list[tuple[str, list, dict]], language: str):
    pr_info = []

    data = pd.read_json(file_path, orient='index')

    for i, row in tqdm(enumerate(data.iloc[:, 0]), 'Generating PR info'):
        if (check_vector_id_match(data.columns[0], row['id'], generated_vector_data[i][0])):
            # (vector_id, pr_id, repo_name, html_url, file_path, line, user, diff_hunk, body, commit_id, language)
            pr_info.append({'vector_id': generated_vector_data[i][0], 'pr_id': row['id'], 'repo_name': data.columns[0], 'html_url': row['html_url'], 'file_path': row['path'], 'line': row['line'], 'user': row['user'], 'diff_hunk': row['diff_hunk'], 'body': row['body'], 'commit_id': row['commit_id'], 'language': language})
        else:
            raise Exception('Vector IDs dont match')

    return pr_info

### Functions that go from raw data to processed embeddings

In [20]:
import threading
import queue

def measure_metadata_size(metadata_list: list[dict]):
    return [len(str(metadata)) for metadata in metadata_list]

def generate_metadata(file_path: str):
    metadata_list = []
    data = pd.read_json(file_path, orient='index')

    for row in data.iloc[:, 0]:
        metadata_list.append({'id': row['id'],'repo': data.columns[0],'path': row['path']})
    
    metadata_size_list = measure_metadata_size(metadata_list)
    for metadata_size in metadata_size_list:
        if (metadata_size > 40960):
            logging.error(f"Metadata size is greater than 40960 bytes at {metadata_size} bytes at index: {metadata_size_list.index(metadata_size)}.\nfile: {file_path}.")

    return metadata_list

def generate_vector_ids(metadata_list: list[dict]):
    ids = []
    for i in range(len(metadata_list)):
        ids.append(metadata_list[i]['repo'] + '-' + hash_id(metadata_list[i]['repo'] + str(metadata_list[i]['id'])))
    return ids

def embed_file(file_path: str):
    vectors_list = []
    embedding_inputs_list = file_to_embedding_inputs(file_path)

    vectors_list = make_huggingface_request_with_backoff(embedding_inputs_list)

    # code for running the embedding model locally
    # for payload in tqdm(embedding_inputs_list, 'Generating vector embeddings'):
        # vectors_list.append(model.encode(payload).tolist())
    
    return vectors_list

def generate_upsert_data(file_path: str):
    embedding_result = embed_file(file_path)
    metadata_list = generate_metadata(file_path)
    vector_ids = generate_vector_ids(metadata_list)

    upsert_data = list(zip(vector_ids, embedding_result, metadata_list))

    return upsert_data

### Putting everything together with multithreading

In [21]:
def split_into_chunks(iterable: list, batch_size=100):
    """A helper function to break an iterable into chunks of size batch_size."""
    it = iter(iterable)
    chunks = []
    for i in range(0, len(iterable), batch_size):
        chunks.append(list(itertools.islice(it, batch_size)))

    return chunks

In [22]:
# Class to track the number of files
class CountedQueue(queue.Queue):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.total_files = 0

    def put(self, item, *args, **kwargs):
        super().put(item, *args, **kwargs)
        self.total_files += 1
            
def worker(file_queue: queue.Queue):
    while True:
        file_path = file_queue.get()
        if file_path is None:
            break

        upsert_data = generate_upsert_data(file_path)

        upsert_data_chunks = split_into_chunks(upsert_data)

        for chunk in tqdm(upsert_data_chunks, 'Uploading data to Pinecone'):
            index.upsert(chunk)

        for row in generate_pr_info(file_path, upsert_data, 'JavaScript'):
            # (vector_id, pr_id, repo_name, html_url, file_path, line, user, diff_hunk, body, commit_id, language)
            insert_or_update_pr_info(row['vector_id'], row['pr_id'], row['repo_name'], row['html_url'], row['file_path'], row['line'], row['user'], row['diff_hunk'], row['body'], row['commit_id'], row['language'])
        
        split_txt = "\\"
        logging.info(f"Logging: {file_path.split(split_txt)[1]} processed")

        file_queue.task_done()

# Function to create and manage worker threads
def create_worker_threads(num_threads: int, file_queue: queue.Queue):
    threads = list[threading.Thread]()

    for _ in range(num_threads):
        thread = threading.Thread(target=worker, args=(file_queue,))
        thread.start()
        threads.append(thread)

    return threads

# Main function to process files using multiple threads
def process_files_with_threads(file_names: list[str], num_threads: int, start: int = 0, end=None):
    # Create a thread-safe queue
    file_queue = CountedQueue()

    # Populate the queue with file names
    for file_name in file_names[start:end]:
        file_queue.put(file_name)

    # Create worker threads
    threads = create_worker_threads(num_threads, file_queue)
    logging.info(f"Total files: {file_queue.total_files}, unfinished tasks: {file_queue.unfinished_tasks}")

    # Create a tqdm progress bar
    pbar = tqdm(total=file_queue.total_files, unit="file")

    while pbar.n < file_queue.total_files:
        pbar.n = file_queue.total_files - file_queue.unfinished_tasks
        pbar.update(0)

    # Wait for all file processing to be completed
    file_queue.join()

    # Stop the worker threads
    for _ in range(num_threads):
        file_queue.put(None)

    for thread in threads:
        thread.join()

    logging.info("All files processed.")

In [23]:
# # For testing
# file_paths = import_filepaths('D:/GitHub/review-owl-datasets/dataset/mined-comments-25stars-25prs-JavaScript.json/repo-split/*.json')
# e0 = file_to_embedding_inputs(file_paths[5000])
# u1 = generate_upsert_data(file_paths[5000])

In [24]:
# e1 = huggingface_inference_api_request(e0, 200)

In [25]:
# m1 = generate_metadata(file_paths[500])
# m1[0]

In [26]:
# pri1 = generate_pr_info(file_paths[500], u1, 'JavaScript')
# pri1

In [27]:
# m1[0]['repo']

In [28]:
# v1 = generate_vector_ids(m1)

In [29]:
# z1 = list(zip(v1, e1, m1))

In [30]:
# print(type(z1))
# print(type(z1[0]))
# print(type(z1[0][0]))
# print(type(z1[0][1]))
# print(type(z1[0][2]))

In [31]:
# hash_id(m1[0]['repo'] + '0')

In [32]:
# z1

In [33]:
# upsert_data_chunks = split_into_chunks(z1)

# for chunk in tqdm(upsert_data_chunks, 'Uploading data to Pinecone'):
#     index.upsert(chunk)

In [34]:
# upsert_data = generate_upsert_data(file_paths[0])
# upsert_data_chunks = split_into_chunks(upsert_data)

In [35]:
index.describe_index_stats()

{'dimension': 384,
 'index_fullness': 0.04175,
 'namespaces': {'': {'vector_count': 4175}},
 'total_vector_count': 4175}

In [36]:
file_paths = import_filepaths('D:/GitHub/review-owl-datasets/dataset/mined-comments-25stars-25prs-JavaScript.json/repo-split/*.json')
process_files_with_threads(file_names=file_paths, num_threads=6, start=5040, end=5050)

  0%|          | 0/10 [00:00<?, ?file/s]

Splitting file into payloads:   0%|          | 0/7 [00:00<?, ?it/s]

Splitting file into payloads:   0%|          | 0/7 [00:00<?, ?it/s]

Splitting file into payloads:   0%|          | 0/7 [00:00<?, ?it/s]

Splitting file into payloads:   0%|          | 0/7 [00:00<?, ?it/s]

Splitting file into payloads:   0%|          | 0/7 [00:00<?, ?it/s]

Splitting file into payloads:   0%|          | 0/7 [00:00<?, ?it/s]

Making requests to HuggingFace API:   0%|          | 0/1 [00:00<?, ?it/s]

Making requests to HuggingFace API:   0%|          | 0/1 [00:00<?, ?it/s]

Making requests to HuggingFace API:   0%|          | 0/1 [00:00<?, ?it/s]

Making requests to HuggingFace API:   0%|          | 0/1 [00:00<?, ?it/s]

Making requests to HuggingFace API:   0%|          | 0/1 [00:00<?, ?it/s]

Making requests to HuggingFace API:   0%|          | 0/1 [00:00<?, ?it/s]

Uploading data to Pinecone:   0%|          | 0/1 [00:00<?, ?it/s]

Generating PR info: 0it [00:00, ?it/s]

Splitting file into payloads:   0%|          | 0/7 [00:00<?, ?it/s]

Uploading data to Pinecone:   0%|          | 0/1 [00:00<?, ?it/s]

Uploading data to Pinecone:   0%|          | 0/1 [00:00<?, ?it/s]

Uploading data to Pinecone:   0%|          | 0/1 [00:00<?, ?it/s]

Uploading data to Pinecone:   0%|          | 0/1 [00:00<?, ?it/s]

Uploading data to Pinecone:   0%|          | 0/1 [00:00<?, ?it/s]

Making requests to HuggingFace API:   0%|          | 0/1 [00:00<?, ?it/s]

Generating PR info: 0it [00:00, ?it/s]

Generating PR info: 0it [00:00, ?it/s]

Generating PR info: 0it [00:00, ?it/s]

Splitting file into payloads:   0%|          | 0/7 [00:00<?, ?it/s]

Generating PR info: 0it [00:00, ?it/s]

Generating PR info: 0it [00:00, ?it/s]

Uploading data to Pinecone:   0%|          | 0/1 [00:00<?, ?it/s]

Splitting file into payloads:   0%|          | 0/7 [00:00<?, ?it/s]

Splitting file into payloads:   0%|          | 0/7 [00:00<?, ?it/s]

Generating PR info: 0it [00:00, ?it/s]

Making requests to HuggingFace API:   0%|          | 0/1 [00:00<?, ?it/s]

Making requests to HuggingFace API:   0%|          | 0/1 [00:00<?, ?it/s]

Making requests to HuggingFace API:   0%|          | 0/1 [00:00<?, ?it/s]

Uploading data to Pinecone:   0%|          | 0/1 [00:00<?, ?it/s]

Generating PR info: 0it [00:00, ?it/s]

Uploading data to Pinecone:   0%|          | 0/1 [00:00<?, ?it/s]

Uploading data to Pinecone:   0%|          | 0/1 [00:00<?, ?it/s]

Generating PR info: 0it [00:00, ?it/s]

Generating PR info: 0it [00:00, ?it/s]