In [11]:
import re

def extract_between(text, start, end, multiple=False):
    target = r'(.*?)'
    pattern = re.escape(start) + target + re.escape(end)
    matches = re.findall(pattern, text)

    if multiple:
        results = []
        for match in matches:
            results.append([item.strip() for item in match.strip().split(',')])
        return results
    else:
        if matches:
            return [item.strip() for item in matches[0].strip().split(',')]
        else:
            return None

def get_targets_from_path(path):
    with open(path, 'r') as file:
            content = file.read()
    start = '@converter('
    end = ', channel_ordering_strategy'
    extract_between(content, start, end, multiple=True)

path = "/home/crimson/Projects/AI-cookbook/nobuco/nobuco/node_converters/math.py"
# targets = get_targets_from_path(path)

In [20]:
with open(path, 'r') as file:
        content = file.read()

In [22]:
def load_content(path):
    with open(path, 'r') as file:
            content = file.read()
    return content

In [40]:
from torch import functional as F

In [19]:
import logging

logger = logging.getLogger("extract_doc")
logging.basicConfig(filename=f"{logger.name}.log",
                    filemode='a',
                    format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
                    datefmt='%H:%M:%S',
                    level=logging.DEBUG)

In [12]:
import torch

def extract_doc(target):
    split = str(target).split(".")
    if "Tensor" in split:
        docs = getattr(torch.Tensor, split[-1]).__doc__
    elif "F" in split:
        try:
            docs = getattr(torch.nn.functional, split[-1]).__doc__
        except:
            try:
                docs = getattr(torch.functional, split[-1]).__doc__
            except:
                docs = None
    elif "nn" in split:
        docs = getattr(torch.nn, split[-1]).__doc__
    else:
        docs = getattr(torch, split[-1]).__doc__

    return docs

In [17]:
from pydantic import BaseModel
from typing import List, Any, Dict

class Session(BaseModel):
    header: str
    content: str
    
class Item(BaseModel):
    name: str
    docs: str
    parsed_docs: List[Session] = []

class ItemHolder(BaseModel):
    file_name: str
    items: List[Item] = []
    failed_items: List[str] = []
    meta_data: Dict[str, Any] = None



In [13]:
def filter_docs(parsed_docs):
    filtered = []
    for session in parsed_docs:
        if session["header"] in ["Args", "Shape", "Examples", "Example"]:
            filtered.append(session)
    return filtered

In [14]:
def convert_parsed_to_str(parsed_docs):
    formatted_string = ""

    for item in parsed_docs:
        formatted_string += item['header'] + '\n'
        formatted_string += '    ' + item['content'].replace('\n', '\n    ') + '\n\n'
    return formatted_string

In [15]:
def parse_sections(text, header_indent, unit_indent):
    lines = text.split('\n')
    parsed_data = []
    current_header = None
    current_content = []

    for line in lines:
        # Remove leading indentation
        line = line[header_indent:]

        # Check if the line is a header
        if line and line[0].isalpha() and line.strip().endswith(':'):
            # If there's a current header, save its content before starting a new one
            if current_header is not None:
                parsed_data.append({'header': current_header, 'content': '\n'.join(current_content)})
                current_content = []
            # Set new header
            current_header = line.rstrip(':')
        else:
            if current_header is not None:
                # Add line to current content
                current_content.append(line[unit_indent:])

    # Don't forget to save the last header-content pair
    if current_header is not None:
        parsed_data.append({'header': current_header, 'content': '\n'.join(current_content)})

    return parsed_data



In [None]:
def load_content(path):
    with open(path, 'r') as file:
            content = file.read()
    return content

In [157]:
itemholder = ItemHolder(
    file_name="math.py"
)


content = load_content("/home/crimson/Projects/AI-cookbook/nobuco/nobuco/node_converters/math.py")

start = '@converter('
end = ', channel_ordering_strategy'
targets = extract_between(content, start, end, multiple=True)

for target in targets:
    try:
        docs = extract_doc(target[0])
        item = Item(
            name=str(target[0]),
            docs=docs,
        )
        itemholder.items.append(item)
    except Exception as error:
        itemholder.failed_items.append(str(target[0]))
        logger.error(error)

for item in itemholder.items:
    func_type = parse_sections(item.docs, 0, 4)
    cls_type = parse_sections(item.docs, 4, 4)
    func_type_headers = [session["header"] for session in func_type]
    if any(keyword in func_type_headers for keyword in ["Example", "Examples", "Args"]):
        item.parsed_docs = func_type
    else:
        item.parsed_docs = cls_type

    

In [6]:
import os

def find_target_directory(root_dir, target_hint):
    for dirpath, dirnames, filenames in os.walk(root_dir):
        if target_hint in dirpath:
            return dirpath
    return None

def get_preprocessed_itemholder(file_name, target_hint="nobuco/node_converters"):
    # Get the current working directory
    cwd = os.getcwd()
    
    # Find the target directory that contains the hint
    target_directory = find_target_directory(cwd, target_hint)
    
    if target_directory:
        # Construct the full path
        full_path = os.path.join(target_directory, file_name)
    else:
        raise FileNotFoundError(f"Directory containing hint '{target_hint}' not found from {cwd}")
    
    content = load_content(full_path)
    
    start = '@converter('
    end = ', channel_ordering_strategy'
    targets = extract_between(content, start, end, multiple=True)

    itemholder = ItemHolder(
        file_name=file_name
    )

    for target in targets:
        try:
            docs = extract_doc(target[0])
            item = Item(
                name=str(target[0]),
                docs=docs,
            )
            itemholder.items.append(item)
        except Exception as error:
            itemholder.failed_items.append(str(target[0]))
            logger.error(error)

    for item in itemholder.items:
        func_type = parse_sections(item.docs, 0, 4)
        cls_type = parse_sections(item.docs, 4, 4)
        func_type_headers = [session["header"] for session in func_type]
        if any(keyword in func_type_headers for keyword in ["Example", "Examples", "Args"]):
            item.parsed_docs = func_type
        else:
            item.parsed_docs = cls_type

    return itemholder

In [20]:
itemholder = get_preprocessed_itemholder("math.py")

In [21]:
itemholder.failed_items

['torch.Tensor.__rsub__',
 'torch.Tensor.__truediv__',
 'torch.Tensor.__rdiv__',
 'torch.Tensor.__mod__',
 'torch.Tensor.__rpow__',
 'torch.clamp_min',
 'torch.clamp_max']

In [159]:
from pprint import pprint

pprint(item.parsed_docs)

[{'content': 'x1 (Tensor): input tensor of shape :math:`B \\times P \\times '
             'M`.\n'
             'x2 (Tensor): input tensor of shape :math:`B \\times R \\times '
             'M`.\n'
             'p: p value for the p-norm distance to calculate between each '
             'vector pair\n'
             '    :math:`\\in [0, \\infty]`.\n'
             'compute_mode:\n'
             "    'use_mm_for_euclid_dist_if_necessary' - will use matrix "
             'multiplication approach to calculate\n'
             '    euclidean distance (p = 2) if P > 25 or R > 25\n'
             "    'use_mm_for_euclid_dist' - will always use matrix "
             'multiplication approach to calculate\n'
             '    euclidean distance (p = 2)\n'
             "    'donot_use_mm_for_euclid_dist' - will never use matrix "
             'multiplication approach to calculate\n'
             '    euclidean distance (p = 2)\n'
             '    Default: use_mm_for_euclid_dist_if_necessary.\n'
   

In [160]:
from func_template import phase1_example, phase1_prompt

In [161]:
def generate_prompt(template, target, doc, example):
    processed_docs = convert_parsed_to_str(filter_docs(doc))
    
    prompt = template.format(
        target=target,
        doc=processed_docs,
        phase1_example=example
    )
    
    return prompt

In [162]:
item = itemholder.items[2]

In [163]:
item.parsed_docs, item.name

([{'header': 'Args', 'content': 'input (Tensor): the input tensor.\nother (Tensor or Number): the tensor or number to add to :attr:`input`.\n'}, {'header': 'Keyword arguments', 'content': 'alpha (Number): the multiplier for :attr:`other`.\nout (Tensor, optional): the output tensor.\n'}, {'header': 'Examples', 'content': '\n>>> a = torch.randn(4)\n>>> a\ntensor([ 0.0202,  1.0985,  1.3506, -0.6056])\n>>> torch.add(a, 20)\ntensor([ 20.0202,  21.0985,  21.3506,  19.3944])\n\n>>> b = torch.randn(4)\n>>> b\ntensor([-0.9732, -0.3497,  0.6245,  0.4022])\n>>> c = torch.randn(4, 1)\n>>> c\ntensor([[ 0.3743],\n        [-1.7724],\n        [-0.5811],\n        [-0.8017]])\n>>> torch.add(b, c, alpha=10)\ntensor([[  2.7695,   3.3930,   4.3672,   4.1450],\n        [-18.6971, -18.0736, -17.0994, -17.3216],\n        [ -6.7845,  -6.1610,  -5.1868,  -5.4090],\n        [ -8.9902,  -8.3667,  -7.3925,  -7.6147]])\n'}], 'torch.add')

In [164]:
print(generate_prompt(phase1_prompt, item.name, item.parsed_docs, phase1_example))



<<Instruction>>

Given the context, complete the <<Task>>.

<<Documentation for torch.add>>

Args
    input (Tensor): the input tensor.
    other (Tensor or Number): the tensor or number to add to :attr:`input`.
    

Examples
    
    >>> a = torch.randn(4)
    >>> a
    tensor([ 0.0202,  1.0985,  1.3506, -0.6056])
    >>> torch.add(a, 20)
    tensor([ 20.0202,  21.0985,  21.3506,  19.3944])
    
    >>> b = torch.randn(4)
    >>> b
    tensor([-0.9732, -0.3497,  0.6245,  0.4022])
    >>> c = torch.randn(4, 1)
    >>> c
    tensor([[ 0.3743],
            [-1.7724],
            [-0.5811],
            [-0.8017]])
    >>> torch.add(b, c, alpha=10)
    tensor([[  2.7695,   3.3930,   4.3672,   4.1450],
            [-18.6971, -18.0736, -17.0994, -17.3216],
            [ -6.7845,  -6.1610,  -5.1868,  -5.4090],
            [ -8.9902,  -8.3667,  -7.3925,  -7.6147]])
    



<<Example>>

Generate simple inputs to run the target code block.

target:
```
torch.Tensor.add(*inputs)
```

output:
`

In [151]:
print(torch.cdist.__doc__)

Computes batched the p-norm distance between each pair of the two collections of row vectors.

    Args:
        x1 (Tensor): input tensor of shape :math:`B \times P \times M`.
        x2 (Tensor): input tensor of shape :math:`B \times R \times M`.
        p: p value for the p-norm distance to calculate between each vector pair
            :math:`\in [0, \infty]`.
        compute_mode:
            'use_mm_for_euclid_dist_if_necessary' - will use matrix multiplication approach to calculate
            euclidean distance (p = 2) if P > 25 or R > 25
            'use_mm_for_euclid_dist' - will always use matrix multiplication approach to calculate
            euclidean distance (p = 2)
            'donot_use_mm_for_euclid_dist' - will never use matrix multiplication approach to calculate
            euclidean distance (p = 2)
            Default: use_mm_for_euclid_dist_if_necessary.

    If x1 has shape :math:`B \times P \times M` and x2 has shape :math:`B \times R \times M` then the
    o

In [None]:
for item in itemholder.items:
    processed_docs = convert_parsed_to_str(filter_docs(item.parsed_docs))
    
    phase1_prompt.format(
        target=item.name,
        doc=processed_docs,
        phase1_example=phase1_example
    )

In [96]:
len(itemholder.items)

31

In [97]:
itemholder.failed_items

['torch.Tensor.__rsub__',
 'torch.Tensor.__truediv__',
 'torch.Tensor.__rdiv__',
 'torch.Tensor.__mod__',
 'torch.Tensor.__rpow__',
 'torch.clamp_min',
 'torch.clamp_max']

In [70]:
for item in itemholder.items:
    item.meta_data["prompt"] = generate_prompt(phase1_prompt, item.name, item.parsed_docs, phase1_example)

In [107]:
def generate_text(prompt):
    model = "llama3:70b-instruct-q2_K"

    response = ollama.chat(
        model=model, 
        messages=[
    {
        'role': 'user',
        'content': prompt,
    },
    ])
    return response['message']['content']

In [None]:
connection_string = "mongodb+srv://sisungkim:skrkwh8327@cluster0.mkj2sh1.mongodb.net/"
import pymongo

myclient = pymongo.MongoClient(connection_string)

def insert_data(db_name, col_name, data):
    mydb = myclient[db_name]
    mycol = mydb[col_name]
    response = mycol.insert_one(data)

In [None]:
def push_to_db(item, phase1_example, generated_text):

    inputgeneration = {
        "project_name": "unittest_automation_test_initiation",
        "project_id": "0",
        "task": "function_input_generation",
        "name":item.name,
        "example":phase1_example,
        "docs": item.parsed_docs,
        "generated_text": generated_text,
    }

    mycol.insert_one(inputgeneration)
