# Code Search Using Embeddings and Cosine Similarity


Source: https://cookbook.openai.com/examples/code_search_using_embeddings

### Create Code Parsing Code

The following code will do the following steps:
1. Open and load files(as a big string) with a `.py` extension at the end (Open only python files).
2. Extract functions in each file  - Extracts function name, code and path to file seperately.
3. Iterates throughout an entire directory and stores the parsed data into a list of dictionaries.

In [7]:
import pandas as pd
from pathlib import Path

DEF_PREFIXES = ['def ', 'async def ']
NEWLINE = '\n'

def get_function_name(code):
    """
    Extract function name from a line beginning with 'def' or 'async def'.
    """
    for prefix in DEF_PREFIXES:
        if code.startswith(prefix):
            return code[len(prefix): code.index('(')]


def get_until_no_space(all_lines, i):
    """
    Get all lines until a line outside the function definition is found.
    """
    ret = [all_lines[i]]
    for j in range(i + 1, len(all_lines)):
        if len(all_lines[j]) == 0 or all_lines[j][0] in [' ', '\t', ')']:
            ret.append(all_lines[j])
        else:
            break
    return NEWLINE.join(ret)


def get_functions(filepath):
    """
    Get all functions in a Python file.
    """
    with open(filepath, 'r') as file:
        all_lines = file.read().replace('\r', NEWLINE).split(NEWLINE)
        for i, l in enumerate(all_lines):
            for prefix in DEF_PREFIXES:
                if l.startswith(prefix):
                    code = get_until_no_space(all_lines, i)
                    function_name = get_function_name(code)
                    yield {
                        'code': code,
                        'function_name': function_name,
                        'filepath': filepath,
                    }
                    break


def extract_functions_from_repo(code_root):
    """
    Extract all .py functions from the repository.
    """
    code_files = list(code_root.glob('**/*.py'))

    num_files = len(code_files)
    print(f'Total number of .py files: {num_files}')

    if num_files == 0:
        print('Verify openai-python repo exists and code_root is set correctly.')
        return None

    all_funcs = [
        func
        for code_file in code_files
        for func in get_functions(str(code_file))
    ]

    num_funcs = len(all_funcs)
    print(f'Total number of functions extracted: {num_funcs}')

    return all_funcs

### Clone a Sample Repo

In [10]:
!git clone https://github.com/openai/openai-python.git

Cloning into 'openai-python'...


### Extract Functions In The Given Rep

In [25]:
# Set user root directory to the 'openai-python' repository
# root_dir = Path.home()
root_dir = Path().absolute()


# Assumes the 'openai-python' repository exists in the user's root directory
code_root = root_dir / 'openai-python'

# Extract all functions from the repository
all_funcs = extract_functions_from_repo(code_root)

Total number of .py files: 383
Total number of functions extracted: 410


In [30]:
fun1 = all_funcs[0]
print(fun1["code"])
print(fun1["function_name"])
print(fun1["filepath"])


def test_pydantic_v1(session: nox.Session) -> None:
    session.install("-r", "requirements-dev.lock")
    session.install("pydantic<2")

    session.run("pytest", "--showlocals", "--ignore=tests/functional", *session.posargs)

test_pydantic_v1
c:\Users\Sahil\Desktop\Projects\open_ai_api_tutorials\8 Embeddings API\openai-python\noxfile.py


In [32]:
len(all_funcs)

410

### Create Embeddings For All The Extracted Functions

In [35]:
from utils.embedding_utils import get_embedding

df = pd.DataFrame(all_funcs)

# Might take longer time. Will make around 400 api requests

# The following line of code will get you embeddings of only first two records.
# Run it if you want to save cost and only see results
df['code_embedding'] = df[:2]['code'].apply(lambda x: get_embedding(x, model='text-embedding-3-small'))

# The following line of code will get you embeddings of the entire dataframe.
# Running this would incure you cost. 
# df['code_embedding'] = df[:2]['code'].apply(lambda x: get_embedding(x, model='text-embedding-3-small'))

df['filepath'] = df['filepath'].map(lambda x: Path(x).relative_to(code_root))
df.head()

Unnamed: 0,code,function_name,filepath,code_embedding
0,def test_pydantic_v1(session: nox.Session) -> ...,test_pydantic_v1,noxfile.py,"[0.007481125183403492, 0.00924705620855093, 0...."
1,def main() -> None:\n client = openai.OpenA...,main,examples\assistant_stream_helpers.py,"[0.01728219725191593, 0.01183176226913929, 0.0..."
2,async def main() -> None:\n stream = await ...,main,examples\async_demo.py,"[0.036569662392139435, 0.024040671065449715, 0..."
3,def main() -> None:\n stream_to_speakers()\...,main,examples\audio.py,"[0.020340846851468086, 0.0199880450963974, 0.0..."
4,def stream_to_speakers() -> None:\n import ...,stream_to_speakers,examples\audio.py,"[0.010680544190108776, -0.019652200862765312, ..."


In [36]:
## Save the embeddings only if you have generated embeddings on entire dataframe. 
# df.to_csv("data/code_search_openai-python.csv", index=False)


Run the following lines of code only if you have not generated embeddings of entire dataframe to save cost

In [None]:
# from ast import literal_eval
# import numpy as np

# df = pd.read_csv("data/code_search_openai-python.csv")
# df["code_embedding"] = df.code_embedding.apply(literal_eval).apply(np.array)

# df.head()

### Create Simantic Search Function

In [37]:
from utils.embedding_utils import cosine_similarity

def search_functions(df, code_query, n=3, pprint=True, n_lines=7):
    embedding = get_embedding(code_query, model='text-embedding-3-small')
    df['similarities'] = df.code_embedding.apply(lambda x: cosine_similarity(x, embedding))

    res = df.sort_values('similarities', ascending=False).head(n)

    if pprint:
        for r in res.iterrows():
            print(f"{r[1].filepath}:{r[1].function_name}  score={round(r[1].similarities, 3)}")
            print("\n".join(r[1].code.split("\n")[:n_lines]))
            print('-' * 70)

    return res

### Perform Inference

In [38]:
res = search_functions(df, 'fine-tuning input data validation logic', n=3)

src\openai\lib\_validators.py:format_inferrer_validator  score=0.44
def format_inferrer_validator(df: pd.DataFrame) -> Remediation:
    """
    This validator will infer the likely fine-tuning format of the data, and display it to the user if it is classification.
    It will also suggest to use ada and explain train/validation split benefits.
    """
    ft_type = infer_task_type(df)
    immediate_msg = None
----------------------------------------------------------------------
src\openai\lib\_parsing\_completions.py:validate_input_tools  score=0.367
def validate_input_tools(
    tools: Iterable[ChatCompletionToolParam] | NotGiven = NOT_GIVEN,
) -> None:
    if not is_given(tools):
        return

    for tool in tools:
----------------------------------------------------------------------
src\openai\lib\_validators.py:common_prompt_suffix_validator  score=0.357
def common_prompt_suffix_validator(df: pd.DataFrame) -> Remediation:
    """
    This validator will suggest to add a common

In [39]:
res

Unnamed: 0,code,function_name,filepath,code_embedding,similarities
119,def format_inferrer_validator(df: pd.DataFrame...,format_inferrer_validator,src\openai\lib\_validators.py,"[-0.008124016225337982, 0.040148280560970306, ...",0.439511
136,def validate_input_tools(\n tools: Iterable...,validate_input_tools,src\openai\lib\_parsing\_completions.py,"[0.028309985995292664, 0.06668996810913086, -0...",0.366878
112,def common_prompt_suffix_validator(df: pd.Data...,common_prompt_suffix_validator,src\openai\lib\_validators.py,"[0.04706217721104622, 0.014239056967198849, 0....",0.357211


In [40]:
res = search_functions(df, 'find common suffix', n=2, n_lines=10)

src\openai\lib\_validators.py:get_common_xfix  score=0.51
def get_common_xfix(series: Any, xfix: str = "suffix") -> str:
    """
    Finds the longest common suffix or prefix of all the values in a series
    """
    common_xfix = ""
    while True:
        common_xfixes = (
            series.str[-(len(common_xfix) + 1) :] if xfix == "suffix" else series.str[: len(common_xfix) + 1]
        )  # first few or last few characters
        if common_xfixes.nunique() != 1:  # we found the character at which we don't have a unique xfix anymore
----------------------------------------------------------------------
src\openai\lib\_validators.py:common_completion_suffix_validator  score=0.451
def common_completion_suffix_validator(df: pd.DataFrame) -> Remediation:
    """
    This validator will suggest to add a common suffix to the completion if one doesn't already exist in case of classification or conditional generation.
    """
    error_msg = None
    immediate_msg = None
    optional_msg 

In [41]:
res = search_functions(df, 'Command line interface for fine-tuning', n=1, n_lines=20)

src\openai\lib\_validators.py:write_out_file  score=0.353
def write_out_file(df: pd.DataFrame, fname: str, any_remediations: bool, auto_accept: bool) -> None:
    """
    This function will write out a dataframe to a file, if the user would like to proceed, and also offer a fine-tuning command with the newly created file.
    For classification it will optionally ask the user if they would like to split the data into train/valid files, and modify the suggested command to include the valid set.
    """
    ft_format = infer_task_type(df)
    common_prompt_suffix = get_common_xfix(df.prompt, xfix="suffix")
    common_completion_suffix = get_common_xfix(df.completion, xfix="suffix")

    split = False
    input_text = "- [Recommended] Would you like to split into training and validation set? [Y/n]: "
    if ft_format == "classification":
        if accept_suggestion(input_text, auto_accept):
            split = True

    additional_params = ""
    common_prompt_suffix_new_line_handled = c