In [1]:
##############################################################################
# Handle imports
##############################################################################
from docling.document_converter import DocumentConverter
import traceback
from collections.abc import Iterable
import os
import pypdfium2 as pdfium
import re
import json
import jsonlines
import uuid
from langchain_openai import ChatOpenAI
from langchain_text_splitters import RecursiveCharacterTextSplitter, MarkdownHeaderTextSplitter
from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from deepeval.models.base_model import DeepEvalBaseLLM
from deepeval import assert_test
from deepeval.test_case import LLMTestCase, LLMTestCaseParams
from deepeval.metrics import GEval, AnswerRelevancyMetric
from deepeval import evaluate
from huggingface_hub import snapshot_download, login, HfApi
from sklearn.model_selection import train_test_split
from transformers import AutoModelForCausalLM, AutoTokenizer
import xml.etree.ElementTree as etree
from longdocfactscore.ldfacts import LongDocFACTScore
from datetime import datetime
import nltk
import pandas as pd
from datasets import load_dataset, interleave_datasets
nltk.download('punkt_tab')
from dotenv import load_dotenv
load_dotenv()
from transformers import DataCollatorForLanguageModeling, TrainingArguments
from trl import SFTConfig, SFTTrainer
from peft import LoraConfig
from transformers import EarlyStoppingCallback
import torch
import optuna
import traceback
torch.cuda.empty_cache()

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


In [2]:
##############################################################################
# Set up variables
##############################################################################
SOURCE_DIR="source_docs"
SOURCE_DIR_CHUNKED="source_docs_chunked"
MARKDOWN_DIR="markdown"
MARKDOWN_URI_PREFIX="https://raw.githubusercontent.com/agapebondservant/code-generation-capstone/refs/heads/main/eda/resources"
REPORT_DIR="reports"
OUTPUT_DIR="output"
INVALID_DIR="invalid"
ERROR_DIR="error" 
MODEL_DIR="models"
MODEL_IDS = ["ibm-granite/granite-8b-code-instruct-4k","ibm-granite/granite-8b-code-base-128k"]
DEVICE="cuda"
DATASET_REPO=f"{os.getenv('HF_USERNAME')}/codegen"

In [3]:
##############################################################################
# Set up object instances
##############################################################################

data_generator_llm = ChatOpenAI(
    model=os.getenv("DATA_GENERATOR_MODEL_ID"), # os.getenv('QWEN25CODER_MODEL_ID'),
    api_key=os.getenv('OPENROUTER_TOKEN'),
    base_url=os.getenv('OPENROUTER_API_BASE'),
    temperature=0.1,
)

class DataGeneratorLLM(DeepEvalBaseLLM):
    def __init__(
        self,
        model
    ):
        self.model = model

    def load_model(self):
        return self.model

    def generate(self, prompt: str) -> str:
        chat_model = self.load_model()
        return chat_model.invoke(prompt).content

    async def a_generate(self, prompt: str) -> str:
        chat_model = self.load_model()
        res = await chat_model.ainvoke(prompt)
        return res.content

    def get_model_name(self):
        return "Custom Data Generator LLM (GPT-OSS)"

evaluator_llm = DataGeneratorLLM(data_generator_llm)

In [4]:
##############################################################################
# PROMPTS AND PROMPT TYPES
##############################################################################

summary_prompt = """
Your task is to analyze this code snippet and provide an explanation of the code.
    
Instructions:
1. Provide a concise explanation that summarizes the purpose of the code without getting into too many specific technical details.
2. If the provided snippet does not appear to be a code snippet, indicate that this is not valid code.
3. Also exclude any details that link the requirements to a specific programming language or framework.
"""

topics_prompt = """
Use the provided summary to analyze this code snippet and generate a list of programming topics that are related to the code.
    
Instructions:
1. Provide a short list of topics that you can identify.
2. If the provided snippet does not appear to be a code snippet, indicate that this is not valid code.
"""

components_prompt = """
Your task is to analyze this code snippet and generate a specification of all the JSP relevant components you can find.

Instructions:
1. Include only relevant components.
3. If the provided snippet does not appear to be a code snippet, indicate that this is not valid code.
"""

domain_prompt = """
Your task is to analyze this code snippet and generate an outline of the domain model associated with this code.
    
Instructions:
1. Avoid getting into too many specific technical details. Simply provide a domain model of the code.
2. If the provided snippet does not appear to be a code snippet, indicate that this is not valid code.
3. Include the current state of the domain objects based on information extracted from the code.
"""

keywords_prompt = """
Your task is to analyze this code snippet and generate a list of keywords that are associated with the code.
    
Instructions:
1. Provide a short list of keywords.
2. If the provided snippet does not appear to be a code snippet, indicate that this is not valid code.
"""

functional_requirements_prompt = """
Use the provided summary to analyze this code snippet and generate a list of programming topics that are related to the code.
    
Instructions:
1. Provide a short list of topics that you can identify.
2. If the provided snippet does not appear to be a code snippet, indicate that this is not valid code.
"""

business_requirements_prompt = """
Use the provided summary to generate an outline of sample business requirements that might be connected to the code.

Instructions:
1. Provide a short list of relevant requirements. Do not include requirements that are not related to the code.
2. If the provided snippet does not appear to be a code snippet, indicate that this is not valid code.
"""

prompts = {

    "functional_requirements": {
        
        "prompt": functional_requirements_prompt, 

        "title": "Functional Requirements",
    },
    "business_requirements": {
        
        "prompt": business_requirements_prompt, 

        "title": "Business Requirements",
    },
    "topics": {
        
        "prompt": topics_prompt, 

        "title": "Components",
    },
    "components": {
        
        "prompt": components_prompt,

        "title": "Topics",
    },
    "keywords": {
        
        "prompt": keywords_prompt,

        "title": "Keywords",
    },
    "summary": {
        
        "prompt": summary_prompt,

        "title": "Summary",
    }
}


prompts_with_dependencies = {
    "topics": "summary",
    
    "business_requirements": "summary",
    
    "functional_requirements": "summary",
}

### Download candidate models
The following candidate models will be downloaded:
- ibm-granite/granite-8b-code-instruct-4k
- ibm-granite/granite-8b-code-base-128k

In [5]:
##############################################################################
# UTILITY METHODS
##############################################################################

def download_models(repo_id):
    try:
        ##############################################################################
        # Save the model
        ##############################################################################
        local_dir = snapshot_download(repo_id=repo_id, cache_dir=MODEL_DIR)
        
        print(f"Model {repo_id} downloaded to: {local_dir}")

        ##############################################################################
        # Save the tokenizer
        ##############################################################################
        tokenizer = AutoTokenizer.from_pretrained(repo_id)

        if tokenizer.pad_token is None:
            
            tokenizer.pad_token = tokenizer.eos_token

        tokenizer.save_pretrained(local_dir)
        
        
    except Exception as e:
    
        print(f"Error downloading model {repo_id}: {e}")

def upload_models(repo_id, model_dir):

    try:
    
        tokenizer = AutoTokenizer.from_pretrained(model_dir)
        
        model = AutoModelForCausalLM.from_pretrained(model_dir, 
                                                     trust_remote_code=True,
                                                     device_map=DEVICE)
    
        api = HfApi()
    
        api.create_repo(repo_id=repo_id, repo_type="model")
    
        api.upload_folder(
            folder_path=model_dir,
            
            repo_id=repo_id,
            
            repo_type="model"
        )

    except Exception as e:
    
        print(f"Error uploading model {repo_id} from directory {model_dir}: {e}")

def build_datasets(dataset_name):

    final_datasets = []

    def process_summary_to_text(example, code_type=""):
        
        example["text"], example["completion"], example["code_type"] = example["summary"], example[code_type], [code_type]*len(example["code"])
        
        return example

    def process_code_to_text(example, code_type=""):
        example["text"], example["completion"], example["code_type"] = example["code"], example[code_type], [code_type]*len(example["code"])
        
        return example
    
    train_dataset = load_dataset(dataset_name, split="train")

    test_dataset = load_dataset(dataset_name, split="test")

    for dataset in [train_dataset, test_dataset]:

        datasets = []

        code_types = [c for c, obj in prompts.items() if c not in ['code']]
        
        for code_type in code_types:

            if code_type in prompts_with_dependencies:

                datasets.append(dataset.map(process_summary_to_text, batched=True, fn_kwargs={"code_type": code_type}))
            
            else:

                datasets.append(dataset.map(process_code_to_text, batched=True, fn_kwargs={"code_type": code_type}))

        final_datasets.append(interleave_datasets(datasets))

    return final_datasets
        
    

In [6]:
##############################################################################
# Code Formatting Helper Function
##############################################################################
def code_text_formatter(example):

    _code = example['code']
    
    _summary = example['summary']

    _code_type = example["code_type"]

    _text = example['text']

    _prompt = prompts[_code_type]["prompt"]

    _title = prompts[_code_type]["title"]

    ######################################
    # Code-Summary pair
    ######################################
    if _code_type in prompts_with_dependencies:
        text = f"""
        <|assistant|>
        {_prompt}
        Summary:
        {_summary}
        <|assistant|>
        {_title}:
        {_text}<|endoftext|>
        """

        return text

    #######################
    # Code-Text pair
    #######################
    else:
        text = f"""
        <|system|>
        You are a helpful assistant.
        {_prompt}
        Code to analyze:
        <|user|>
        {_code}
        <|assistant|>
        {_title}:
        {_text}<|endoftext|>
        """

        return text

In [7]:
##############################################################################
# PIPELINES
##############################################################################
def peft_finetuning_pipeline(dataset_name, use_dora=False):
    """
    Executes the LoRA pipeline.
    """
    try:
        [os.makedirs(dirname, exist_ok=True) for dirname in [
            MODEL_DIR
        ]]
    
        ##############################################################################
        # Early Stopping Callback
        ##############################################################################
        early_stopping_callback = EarlyStoppingCallback(
            early_stopping_patience=3,
            
            early_stopping_threshold=0.001,
        )   
    
        ##############################################################################
        # Load models to finetune
        ##############################################################################
        for model_id in MODEL_IDS:
    
            print(f"Start finetuning {model_id}...")

            base_model_dir = model_id.replace("/","_")

            [os.makedirs(dirname, exist_ok=True) for dirname in [
                f"{MODEL_DIR}/{base_model_dir}/experiment",
                f"{MODEL_DIR}/{base_model_dir}/final",
                f"{MODEL_DIR}/{base_model_dir}/model",
            ]]
    
            model = AutoModelForCausalLM.from_pretrained(
                
                model_id,
                
                device_map="auto",

                trust_remote_code=True,
            )
    
            tokenizer = AutoTokenizer.from_pretrained(model_id)
    
            if tokenizer.pad_token is None:
                
                tokenizer.pad_token = tokenizer.eos_token

            train_dataset, test_dataset = build_datasets(dataset_name)
        
            ##############################################################################
            # Data collator
            ##############################################################################
            collator = DataCollatorForLanguageModeling(
                
                tokenizer=tokenizer,
                
                mlm=False,
            )
            
            ##############################################################################
            # Objective Function for Hyperparameter Tuning
            ##############################################################################
            def objective(trial):

                ##############################################################################
                # Hyperparameters
                ##############################################################################

                learning_rate = trial.suggest_float(
                    "learning_rate", 1e-5, 1e-4, log=True
                )
                
                per_device_train_batch_size = trial.suggest_categorical(
                    "per_device_train_batch_size", [16, 32]
                )
                
                r = trial.suggest_categorical(
                    "r", [8, 16, 32]
                )
                
                lora_alpha = trial.suggest_categorical(
                    "lora_alpha", [16, 32, 64]
                )
                
                lora_dropout = trial.suggest_categorical(
                    "lora_dropout", [0.05, 0.1]
                )
                
    
                ##############################################################################
                # LoRA / DORA Configuration
                ##############################################################################
            
                lora_config = LoraConfig(
                    r=r, 
                    
                    lora_alpha=lora_alpha,
                    
                    target_modules=None,
                    
                    lora_dropout=lora_dropout,
                    
                    bias="none",
            
                    use_dora=use_dora,
                )

                ##############################################################################
                # Training Arguments / SFTConfig
                ##############################################################################
                training_args = SFTConfig(
                    
                    output_dir=f"{MODEL_DIR}/{base_model_dir}/experiment",
                    
                    learning_rate=learning_rate,
                    
                    per_device_train_batch_size=per_device_train_batch_size,
                    
                    per_device_eval_batch_size=per_device_train_batch_size,
                    
                    num_train_epochs=10,
                    
                    logging_steps=100,
                    
                    fp16=True,
                    
                    report_to="none",
                    
                    eval_strategy="epoch",  
                    
                    save_strategy="epoch",   
                    
                    load_best_model_at_end=True,  
                
                    metric_for_best_model="eval_loss", 
                    
                    greater_is_better=False,   
                    
                    max_length=4096,
                    
                    packing=False,    
                
                    seed=42,
                )
        
                ##############################################################################
                # Supervised Finetuning Trainer
                ##############################################################################
            
                trainer = SFTTrainer(
                    
                    model=model,
                    
                    args=training_args,
                    
                    train_dataset=train_dataset,
                    
                    eval_dataset=test_dataset,
                    
                    peft_config = lora_config,
                    
                    formatting_func = code_text_formatter,
                    
                    data_collator = collator,
                    
                    callbacks=[early_stopping_callback],
                )

                trainer.train()

                return trainer.state.best_metric
                

            ##############################################################################
            # Perform Hyperparameter Search
            ##############################################################################\

            study = optuna.create_study(direction="minimize") # Minimize loss
            
            study.optimize(objective, n_trials=10)
    
            final_lora_config = LoraConfig(
                r=study.best_params["r"], 
                
                lora_alpha=study.best_params["lora_alpha"],
                
                target_modules=None,
                
                lora_dropout=study.best_params["lora_dropout"],
                
                bias="none",
        
                use_dora=use_dora,
            )
            
            final_training_args = SFTConfig(
                output_dir=f"{MODEL_DIR}/{base_model_dir}/final",
            
                learning_rate=study.best_params["learning_rate"],
                
                per_device_train_batch_size=study.best_params["per_device_train_batch_size"],
                
                per_device_eval_batch_size=study.best_params["per_device_train_batch_size"],
                
                num_train_epochs=10,
                
                logging_steps=100,
                
                fp16=True,
                
                report_to="none",
                
                eval_strategy="epoch",  
                
                save_strategy="epoch",   
                
                load_best_model_at_end=True,  
            
                metric_for_best_model="eval_loss", 
                
                greater_is_better=False,   
                
                max_length=4096,
                
                packing=False,    
            
                seed=42,
            )
            
            final_trainer = SFTTrainer(
                
                model=model,
                
                args=final_training_args,
                
                train_dataset=train_dataset,
                
                eval_dataset=test_dataset,
                
                peft_config = final_lora_config,
                
                formatting_func = code_text_formatter,
                
                data_collator = collator,
                
                callbacks=[early_stopping_callback],
            )
            
            ##############################################################################
            # Start finetuning!
            ##############################################################################
            final_trainer.train()
    
            ##############################################################################
            # Save snapshot and push to HuggingFace Hub
            ##############################################################################

            try:
            
                model.save_pretrained(f"{MODEL_DIR}/{base_model_dir}/model")
                
                tokenizer.save_pretrained(f"{MODEL_DIR}/{base_model_dir}/model")

                published_model_id = model_id.partition("/")[2] or model_id

                model.push_to_hub(published_model_id)

            except Exception as e:

                print(f"Error saving and pushing to HuggingFace: {e}")
        
                traceback.print_exc()
            
    except Exception as e:

        print(f"Error running PEFT pipeline: {e}")

        traceback.print_exc()

def lora_finetuning_pipeline(dataset_name):
    """
    Executes the LoRA pipeline.
    """
    return peft_finetuning_pipeline(dataset_name, use_dora=False)
        
def dora_finetuning_pipeline(dataset_name):
    """
    Executes the DORA pipeline.
    """
    return peft_finetuning_pipeline(dataset_name, use_dora=True)

            

### Run the pipeline
Execute the pipelines!

In [8]:
lora_finetuning_pipeline(f"{os.getenv('HF_USERNAME')}/jsp-code-to-text")

Start finetuning ibm-granite/granite-8b-code-instruct-4k...
Error running PEFT pipeline: unsupported operand type(s) for /: 'str' and 'str'


Traceback (most recent call last):
  File "/tmp/ipykernel_23611/1653455611.py", line 32, in peft_finetuning_pipeline
    MODEL_DIR/base_model_dir/"experiment",
    ~~~~~~~~~^~~~~~~~~~~~~~~
TypeError: unsupported operand type(s) for /: 'str' and 'str'


### Evaluate the candidate models
Evaluate the candidate models using the following metrics / bechmarks:

In [9]:
###################################
# Perform data
###################################
