# CodeSearchNet Data Source Notice

In [None]:
!mkdir /content/CodeSearchNet

mkdir: cannot create directory ‘/content/CodeSearchNet’: File exists


In [None]:
%%capture
!pip install docopt

After the CodeSearchNet dataset was archieved, the S3 bucket was taken offline. As a result, following the installation on the github installation guide will not work. A short illustration can be seen down below.

In [None]:
import os
from subprocess import call, check_call, CalledProcessError

destination_dir = "/content/CodeSearchNet"

if not os.path.exists(destination_dir):
    os.makedirs(destination_dir)
os.chdir(destination_dir)

try:
    language = "python"
    check_call(['wget', f'https://s3.amazonaws.com/code-search-net/CodeSearchNet/v2/{language}.zip', '-O', f'{language}.zip'])
    check_call(['unzip', f'{language}.zip'])
    check_call(['rm', f'{language}.zip'])
except CalledProcessError as e:
    print(f"Error: {e}")
    print(f"Error executing command {e.cmd}")
    print(f"Returned code {e.returncode}")

Error: Command '['wget', 'https://s3.amazonaws.com/code-search-net/CodeSearchNet/v2/python.zip', '-O', 'python.zip']' returned non-zero exit status 8.
Error executing command ['wget', 'https://s3.amazonaws.com/code-search-net/CodeSearchNet/v2/python.zip', '-O', 'python.zip']
Returned code 8


Instead, we download the dataset from Hugging Face. Updating `datasets` might not be necessary but might sometimes be helpful to avoid errors concering caching in the local file system

# Data Fetching

In [2]:
%%capture

!pip install -U datasets

In [3]:
from datasets import load_dataset

# Load the dataset
dataset = load_dataset("code_search_net", "python")

train_data = dataset["train"]
test_data = dataset["test"]
validation_data = dataset["validation"]

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

code_search_net.py: 0.00B [00:00, ?B/s]

The repository for code_search_net contains custom code which must be executed to correctly load the dataset. You can inspect the repository content at https://hf.co/datasets/code_search_net.
You can avoid this prompt in future by passing the argument `trust_remote_code=True`.

Do you wish to run the custom code? [y/N] y


python.zip:   0%|          | 0.00/941M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/412178 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/22176 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/23107 [00:00<?, ? examples/s]

We can inspect the contents of the dataset object for the training, testing, and validation datasets.

In [5]:
print(train_data.features.keys())

dict_keys(['repository_name', 'func_path_in_repository', 'func_name', 'whole_func_string', 'language', 'func_code_string', 'func_code_tokens', 'func_documentation_string', 'func_documentation_tokens', 'split_name', 'func_code_url'])


# Question Generation Pipeline

In [87]:
import itertools
import torch
from transformers import (
    AutoModelForSeq2SeqLM,
    AutoTokenizer,
    PreTrainedModel,
    PreTrainedTokenizer
)
from nltk import sent_tokenize

from typing import (
    Tuple,
    Dict,
    Literal,
    List,
    Generator,
    overload,
    Union
)

class QGPipeline:
    def __init__(
        self,
        model: str,
        qg_format: Literal["highlight"] = "highlight",
        exclude_after: List[str] = [],
        use_cuda: bool = False
    ):

        self.model = AutoModelForSeq2SeqLM.from_pretrained(model)
        self.tokenizer = AutoTokenizer.from_pretrained(model)
        self.qg_format = qg_format

        assert self.model.__class__.__name__ == "T5ForConditionalGeneration"

        self.device = torch.device("cuda" if torch.cuda.is_available() and use_cuda else "cpu")
        self.model.to(self.device)
        self.model.eval()
        self.use_cuda = use_cuda
        self._exclude_after = exclude_after
        print(f"Using {self.device}")

    def __call__(self, input: Union[Tuple[str, str], List[Tuple[str, str]]]):
        if isinstance(input, tuple):
            # Handle single input
            func_name, docstring = input
            questions = self._generate_questions(func_name, docstring)
            output = [{'answer': func_name, 'question': que} for que in questions]
            if output:
                 return output[0]
            else:
                 return {}

        elif isinstance(input, list) and all(isinstance(item, tuple) for item in input):
            # Handle batch input with proper error handling
            return self._process_batch_generator(input)
        else:
            raise TypeError("Invalid input type. Expected a tuple (func_name, docstring) or a list of such tuples.")


    def _process_batch_generator(self, batch_input: List[Tuple[str, str]]) -> Generator[Dict[str, Any], None, None]:
        """
        Process batch input and yield results with error handling per item
        """
        for i, (func_name, docstring) in enumerate(batch_input):
            try:
                questions = self._generate_questions(func_name, docstring)
                output = [{'answer': func_name, 'question': que} for que in questions]

                if output:
                    yield {
                        'success': True,
                        'index': i,
                        'function_name': func_name,
                        'docstring': docstring,
                        'result': output[0],
                        'error': None
                    }
                else:
                    yield {
                        'success': False,
                        'index': i,
                        'function_name': func_name,
                        'docstring': docstring,
                        'result': {},
                        'error': 'No questions generated'
                    }

            except Exception as e:
                yield {
                    'success': False,
                    'index': i,
                    'function_name': func_name,
                    'docstring': docstring,
                    'result': {},
                    'error': str(e)
                }

    def _generate_questions(self, func_name, docstring):
        #TODO: This can be re-written in a more forceful way for the llm
        inputs = self._prepare_inputs_for_question_extraction(func_name, docstring)

        inputs = self._tokenize(inputs, padding=True, truncation=True)

        with torch.no_grad():
            outs = self.model.generate(
                input_ids=inputs['input_ids'].to(self.device),
                attention_mask=inputs['attention_mask'].to(self.device),
                num_beams=4,

                max_length=32
            )

        questions = [self.tokenizer.decode(ids, skip_special_tokens=True) for ids in outs]

        return questions

    def _tokenize(self, inputs, padding=True, truncation=True, add_special_tokens=True, max_length=512):
        tokenized_inputs = self.tokenizer(
            inputs,
            max_length=max_length,
            add_special_tokens=add_special_tokens,
            truncation=truncation,
            padding=padding,
            return_tensors="pt"
        )
        return tokenized_inputs

    def _prepare_inputs_for_question_extraction(self, func_name, docstring):
        #NOTE: experimental, consider removing :params and :return values
        #manual observation suggests the model struggles to understand the pupose of the function in their presense
        for string in self._exclude_after:
            param_idx = docstring.find(string)
            if param_idx != -1:
                docstring = docstring[:param_idx]
            docstring = docstring.strip()
        input = f"answer: <hl>The function is {func_name}<hl>. Context: {docstring} </s>"

        return [input]

    @property
    def exclude_after(self):
        return self._exclude_after

    @exclude_after.setter
    def exclude_after(self, value):
        self._exclude_after = value

In [91]:
finetuned_t5 = QGPipeline(model="valhalla/t5-base-qg-hl")

Using cpu


In [80]:
func_name = train_data[4]['func_name']
docstring = train_data[4]['func_documentation_string']
print(docstring)
finetuned_t5((func_name, docstring))

Parses the XML run statistics file (GenerateFASTQRunStatistics.xml). In some cases, the file is not
        available. Equivalent data can be pulled from Basespace.Generate a text file  name indexingQC.txt containing
        the copied tables from the Indexing QC tab of the run on Basespace


{'answer': 'Metadata.parserunstats',
 'question': 'What is the function that parses the XML run statistics file?'}

The result looks promising. Let's run the model for the first 20 doc strings in our dataset

In [48]:
for i in range(10):
    func_name = train_data[i]['func_name']
    p = docstring = train_data[i]['func_documentation_string']
    print(f"========Sample{i+1}==========")
    idx = docstring.find(":param")
    if idx != -1:
        p = docstring[:idx]
    p = docstring[:].strip()
    print(f"Docstring: {docstring}")
    print(finetuned_t5(func_name, docstring))


Docstring: Display slug with level by language.
[{'answer': 'show_slug_with_level', 'question': 'What is the name of the function that displays a slug with level by language?'}]
Docstring: Render the last 10 revisions of a page content with a list using
        the ``pages/revisions.html`` template
[{'answer': 'show_revisions', 'question': 'What is the name of the function that shows the last 10 revisions of a page?'}]
Docstring: Method that parse the imageplaceholder template tag.
[{'answer': 'do_videoplaceholder', 'question': 'What is the name of the method that parses the imageplaceholder template tag?'}]
Docstring: Return Pages with given tag

    Syntax::

        {% get_pages_with_tag <tag name> as <varname> %}

    Example use:
        {% get_pages_with_tag "footer" as pages %}
[{'answer': 'do_get_pages_with_tag', 'question': 'What is the name of the function that returns pages with given tag?'}]
Docstring: Parses the XML run statistics file (GenerateFASTQRunStatistics.xml). In 

In [93]:
zipped10 = zip(train_data[:5]['func_name'], train_data[:10]['func_documentation_string'])

generator = finetuned_t5(list(zipped10))
print(generator)

for item in generator:
    ans, ques = item['answer'], item['question']
    print(f"========Sample==========")
    print(f"Question: {ques}")
    print(f"Answer: {ans}")
    print()

<generator object QGPipeline.__call__.<locals>.batch_generator at 0x7e667d4d5d40>
Question: What is the function that shows a slug with level?
Answer: show_slug_with_level

Question: What is the function that shows the last 10 revisions of a page?
Answer: show_revisions

Question: What is the name of the function that parses the imageplaceholder template tag?
Answer: do_videoplaceholder

Question: What is the function do_get_pages_with_tag?
Answer: do_get_pages_with_tag

Question: What is the function that parses the XML run statistics file?
Answer: Metadata.parserunstats



# Dataset Processor

In [94]:
import json
import pandas as pd
from pathlib import Path
from tqdm import tqdm
import time
import logging
from typing import List, Dict, Any
from datasets import Dataset, DatasetDict
from huggingface_hub import HfApi, login

class DocstringDatasetProcessor:
    def __init__(self,
                 hf_dataset_name: str,
                 batch_size: int = 1000,
                 save_locally: bool = False,
                 local_cache_dir: str = "./cache",
                 private_repo: bool = False):

        self.hf_dataset_name = hf_dataset_name
        self.batch_size = batch_size
        self.private_repo = private_repo
        self.local_cache_dir = Path(local_cache_dir)
        self.local_cache_dir.mkdir(exist_ok=True)
        self.save_locally = save_locally

        self.processed_count = 0
        self.failed_count = 0
        self.all_geneated_data = []

        self.hf_api = HfApi()

    def authenticate_hf(self):
        """Authenticate with Hugging Face Hub"""
        try:
            login()
            print("Successfully authenticated")
        except Exception as e:
            print(f"Error authenticating with Hugging Face Hub: {e}")
            raise


    def process_batch(self, batch_data: List[Tuple[str, str]], pipeline: QGPipeline, batch_id: int) -> List[Dict[str, Any]]:
        """Process a batch of (func_name, docstring) tuples with individual error handling"""
        batch_results = []
        batch_success_count = 0
        batch_failure_count = 0

        # Process the entire batch through pipeline
        try:
            # Pipeline returns a generator that yields results with error info
            for result in pipeline(batch_data):
                if result['success']:
                    # Successfully processed item
                    batch_results.append({
                        'function_name': result['function_name'],
                        'docstring': result['docstring'],
                        'question': result['result']['question'],
                        'answer': result['result']['answer']
                    })
                    batch_success_count += 1
                else:
                    # Failed to process item
                    self.logger.warning(
                        f"Failed to process {result['function_name']}: {result['error']}"
                    )
                    batch_failure_count += 1

        except Exception as e:
            # Catastrophic failure - entire batch failed
            self.logger.error(f"Catastrophic batch failure {batch_id}: {e}")
            batch_failure_count = len(batch_data)
            batch_success_count = 0

        # Update global counters
        self.processed_count += batch_success_count
        self.failed_count += batch_failure_count

        # Log batch statistics
        self.logger.info(
            f"Batch {batch_id}: {batch_success_count} successful, "
            f"{batch_failure_count} failed out of {len(batch_data)} items"
        )

        # Save locally if enabled
        if self.save_locally and batch_results:
            self._save_batch_locally(batch_results, batch_id)

        return batch_results

    def _save_batch_locally(self, batch_results: List[Dict], batch_id: int):
        batch_file = self.local_cache_dir / f"batch_{batch_id}.jsonl"
        with open(batch_file, 'w') as f:
            for item in batch_results:
                json.dump(item, f)
                f.write('\n')

    def process_full_dataset(self, train_data, pipeline, start_idx: int = 0):
        """Process the entire data set and upload to hugging face"""

        print(f"Starting processing of {len(train_data)} items from index {start_idx}")
        start_time = time.time()

        for batch_start in tqdm(range(start_idx, len(train_data), self.batch_size),
                                desc="Processing batches"):
            batch_end = min(batch_start + self.batch_size, len(train_data))
            batch_data = train_data[batch_start:batch_end]
            batch_id = batch_start // self.batch_size

            batch_results = self.process_batch(batch_data, pipeline, batch_id)
            self.all_geneated_data.extend(batch_results)

            # print progress
            if batch_id % 10 == 0:
                elapsed = time.time() - start_time
                rate = self.processed_count / elapsed if elapsed > 0 else 0
                print(f"Processed {self.processed_count} items in {elapsed:.2f} seconds. Rate: {rate:.2f} items/sec")

            #final statistics
            total_time = time.time() - start_time
            print(f"Processed {self.processed_count} items in {total_time:.2f} seconds. Rate: {self.processed_count / total_time:.2f} items/sec")

            self._upload_to_hf()
            return self.all_geneated_data

    def _upload_to_hf(self):
        if self.private_repo:
            self.authenticate_hf()

    def _upload_to_hf(self):
        try:
            print("Creating Hugging Face dataset")

            dataset = Dataset.from_list(self.all_generated_data)

            #todo consider a test/train split
            dataset = dataset.train_test_split(test_size=0.1, seed=42)

            dataset_dict = DatasetDict({
                'train': dataset['train'],
                'validation': dataset['test']
            })

            #notice this might introduce unneccessary inefficiencies
            dataset_dict = dataset_dict.map(
                lambda x: {
                    **x,
                    'id': f"{x['function_name']}_{hash(x['docstring']) % 10000}"
                }
            )

            print(f"Uploading dataset to {self.hf_dataset_name}...")

            dataset_dict.push_to_hub(
                self.hf_dataset_name,
                private=self.private_repo,
                commit_message=f"Add {len(self.all_generated_data)} docstring-question pairs"
            )

            print(f"Successfully uploaded dataset to https://huggingface.co/datasets/{self.hf_dataset_name}")

        except Exception as e:
            print(f"Error uploading to Hugging Face: {e}")
            if self.save_locally:
                print("Data is available locally in cached directory")
            raise

    def load_from_hf(self):
        """Load the dataset from Hugging Face"""

        from datasets import load_dataset

        try:
            dataset = load_dataset(self.hf_dataset_name)
            print(f"Successfully loaded dataset from {self.hf_dataset_name}")
            return dataset
        except Exception as e:
            print(f"Error loading dataset from Hugging Face: {e}")
            raise

    #TODO: resume processing from local cache file
    #TODO: upload from colab cache to permanent file location (local or drive)

