In [None]:
import os
import pandas as pd
from nltk.translate.meteor_score import meteor_score
from nltk.translate.chrf_score import sentence_chrf
from sentence_transformers import SentenceTransformer, util
import numpy as np
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
from scipy.stats import mannwhitneyu
from cliffs_delta import cliffs_delta

In [None]:
data_dir = os.path.relpath(os.path.join(os.getcwd(), '..', 'data'))
all_paragraphs = pd.read_csv(os.path.join('..', 'data', 'function_refined_summary.csv'), index_col=0)
all_paragraphs['called paragraphs'] = all_paragraphs['called paragraphs'].apply(lambda x:x.strip(']').strip('[').strip('"').strip('"').replace("'","").split(', '))
index = all_paragraphs.loc[(all_paragraphs['called paragraphs'].apply(len)==1)&(all_paragraphs['called paragraphs'].apply(lambda x:x[0])=='')].index
all_paragraphs.loc[index,'called paragraphs'] = all_paragraphs.loc[index]['called paragraphs'].apply(lambda x: [])
all_files = pd.read_csv(os.path.join('..', 'data', 'file_generated_summary.csv'), index_col=0)
function_reference = pd.read_csv(os.path.join('..', 'data', 'function_level_reference_dataset.csv'), index_col=0)
file_reference = pd.read_csv(os.path.join('..', 'data', 'file_level_reference_dataset.csv'), index_col=0)

# setup text similarity evaluation

In [None]:
def meteor(candidate_summary, reference_summary, rslt_str=''):
    candidate_summaries_tokenized = [summary.split() for summary in candidate_summary]
    reference_summaries_tokenized = [summary.split() for summary in reference_summary]
    meteor_scores = []
    # Compute METEOR score
    for i in range(len(candidate_summaries_tokenized)):
        meteor_scores.append(meteor_score([reference_summaries_tokenized[i]], candidate_summaries_tokenized[i]))
    print(f"Average METEOR Score {rslt_str}:", round(np.mean(meteor_scores),3))
    print(f"Median METEOR Score {rslt_str}:", round(np.median(meteor_scores),3))
    return meteor_scores

def chrf(candidate_summary, reference_summary, rslt_str=''):
    # Compute sentence-level chrF scores
    chrf_scores = [sentence_chrf(ref, hyp) for ref, hyp in zip(reference_summary, candidate_summary)]

    # Compute the average (corpus-level) and median chrF scores
    avg_score = np.mean(chrf_scores)
    median_score = np.median(chrf_scores)

    # Print results
    print(f"Average chrF Score {rslt_str}: {round(avg_score, 3)}")
    print(f"Median chrF Score {rslt_str}: {round(median_score, 3)}")

    return chrf_scores

def sentencebert(candidate_summary, reference_summary, rslt_str=''):
    # Generate embeddings
    sentenceBert_model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
    cosine_scores = []
    for i in range(len(candidate_summary)):
        candidate_embedding = sentenceBert_model.encode(candidate_summary[i], convert_to_tensor=True).cpu()
        reference_embedding = sentenceBert_model.encode(reference_summary[i], convert_to_tensor=True).cpu()
        # Calculate cosine similarity
        cosine_scores.append(list(util.pytorch_cos_sim(candidate_embedding, reference_embedding).numpy()[0])[0])
    print(f"Average sentenceBERT Score {rslt_str}:", round(np.mean(cosine_scores),4))
    print(f"Median sentenceBERT Score {rslt_str}:", round(np.median(cosine_scores),4))
    return cosine_scores

# function level

## load granite 34b

In [None]:
device = "cuda"
model_id = "ibm-granite/granite-34b-code-instruct" 
device_map = device_map = "auto"
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    device_map=device_map
)
tokenizer = AutoTokenizer.from_pretrained(model_id)
model.eval()
runtimeFlag = "cuda:1" 
cache_dir = None 
scaling_factor = 1.0 

## baseline

In [None]:
def build_baseline_prompt(code_snippet):
    return f"### Instruction:\nSummarize the following COBOL code:\n{code_snippet}"

In [None]:
for idx in function_reference.index:
    row = function_reference.loc[idx]
    prompt = build_baseline_prompt(row['code'])
    chat = [{ "role": "user", "content":  prompt}]
    chat = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)
    input_tokens = tokenizer(
        chat,
        return_tensors="pt",
        add_special_tokens=True
    ).input_ids.to(runtimeFlag)

    max_context = int(model.config.max_position_embeddings*scaling_factor)

    torch.cuda.empty_cache()
    # max_prompt_len = int(0.85 * max_context)
    # max_gen_len = int(0.10 * max_prompt_len)
    max_prompt_len = 1024
    max_gen_len = 1024

    output = model.generate(input_ids=input_tokens, max_new_tokens=150)
    new_tokens = output[0][input_tokens.shape[-1]:]
    response = tokenizer.decode(new_tokens, skip_special_tokens=True).strip()
    function_reference.at[idx, 'baseline'] = response
    print(idx, 'done')

## code processsing agent

In [None]:
FUNCTION_CODE_SYSTEM_PROMPT = """Now, you are assistant to help me explain the COBOL code and answer questions about the code. I will give you the COBOL code within <Code> and variable definitions within <Variable> tags. Your answers should be concise and coherent."""

In [None]:
def build_paragraph_code_prompt(code_snippet, variables):
    return f"<Code>\n{code_snippet}\n<\Code>\n<Variable>\n{variables}\n<\Variable>\nGenerate an explanation of the above COBOL code. The explanation should be no more than 75 words."

In [None]:
for idx in all_paragraphs.index:
    row = all_paragraphs.loc[idx]
    prompt = build_paragraph_code_prompt(row['code'],row['variables'])
    chat = [{ "role": "system", "content":  FUNCTION_CODE_SYSTEM_PROMPT},
            { "role": "user", "content":  prompt}]
    chat = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)
    input_tokens = tokenizer(
        chat,
        return_tensors="pt",
        add_special_tokens=True
    ).input_ids.to(runtimeFlag)

    max_context = int(model.config.max_position_embeddings*scaling_factor)

    torch.cuda.empty_cache()
    # max_prompt_len = int(0.85 * max_context)
    # max_gen_len = int(0.10 * max_prompt_len)
    max_prompt_len = 1024
    max_gen_len = 1024

    output = model.generate(input_ids=input_tokens, max_new_tokens=150)
    new_tokens = output[0][input_tokens.shape[-1]:]
    response = tokenizer.decode(new_tokens, skip_special_tokens=True).strip()
    all_paragraphs.at[idx, 'generated summary'] = response
    print(idx, 'done')

### get the explanations of the reference dataset and run text similarity evaluation

In [None]:
for idx in function_reference.index:
    row = function_reference.loc[idx]
    prompt = build_paragraph_code_prompt(row['code'],row['variables'])
    chat = [{ "role": "system", "content":  FUNCTION_CODE_SYSTEM_PROMPT},
            { "role": "user", "content":  prompt}]
    chat = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)
    input_tokens = tokenizer(
        chat,
        return_tensors="pt",
        add_special_tokens=True
    ).input_ids.to(runtimeFlag)

    max_context = int(model.config.max_position_embeddings*scaling_factor)

    torch.cuda.empty_cache()
    # max_prompt_len = int(0.85 * max_context)
    # max_gen_len = int(0.10 * max_prompt_len)
    max_prompt_len = 1024
    max_gen_len = 1024

    output = model.generate(input_ids=input_tokens, max_new_tokens=150)
    new_tokens = output[0][input_tokens.shape[-1]:]
    response = tokenizer.decode(new_tokens, skip_special_tokens=True).strip()
    function_reference.at[idx, 'refined summary2'] = response
    print(idx, 'done')

In [None]:
rslt1 = meteor(function_reference['baseline'], function_reference['reference data'], rslt_str='for baseline')
rslt2 = meteor(function_reference['refined summary2'], function_reference['reference data'], rslt_str='for generated')
print(f'METEOR pvalue {mannwhitneyu(rslt1, rslt2).pvalue}')
print(f'METEOR effect size {cliffs_delta(rslt1, rslt2)[1]}')
rslt1 = chrf(function_reference['baseline'], function_reference['reference data'], rslt_str='for baseline')
rslt2 = chrf(function_reference['refined summary2'], function_reference['reference data'], rslt_str='for generated')
print(f'chrF pvalue {mannwhitneyu(rslt1, rslt2).pvalue}')
print(f'chrF effect size {cliffs_delta(rslt1, rslt2)[1]}')
rslt1 = sentencebert(function_reference['baseline'].to_list(), function_reference['reference data'].to_list(), rslt_str='for baseline')
rslt2 = sentencebert(function_reference['refined summary2'].to_list(), function_reference['reference data'].to_list(), rslt_str='for generated')
print(f'sentenceBert pvalue {mannwhitneyu(rslt1, rslt2).pvalue}')
print(f'sentenceBert effect size {cliffs_delta(rslt1, rslt2)[1]}')

## text processing agent

In [None]:
from openai import OpenAI
client = OpenAI(
    # Defaults to os.environ.get("OPENAI_API_KEY")
    api_key = '<your own openai key'
)
gpt_model_id = "gpt-4o-mini"

In [None]:
def check_paragraphs_ready(row, all_paragraphs):
    paragraphs = row['called paragraphs']
    df = all_paragraphs.loc[(all_paragraphs['file path']==row['file path'])&(all_paragraphs['function name'].isin(paragraphs))]
    if not df['refined summary2'].isna().any():
        return True
    return False

In [None]:
PARAGRAPH_SYSTEM_PROMPT = """You are now an writing assistant to help me simplify a paragraph containing complex terms. I will provide the main paragraph within <Main> tags and the definitions of the terms within <Term> tags. Your task is to replace the complex terms with concise descriptions to improve readability, while making as few changes as possible to the main paragraph."""

In [None]:
def build_function_writing_prompt(row, paragraph_df):
    code_snippet, called_paragraph = row['generated summary'], row['called paragraphs']
    called_paragraph_text = ''
    if len(called_paragraph) > 0:
        df = paragraph_df.loc[paragraph_df['file path']==row['file path']]
        df = df.loc[df['function name'].isin(row['called paragraphs'])]
        called_paragraph_text = ""
        for p in called_paragraph:
            called_paragraph_text += f'{p}: {df.loc[df["function name"]==p]["refined summary2"].values[0]}\n'
    return f"\n<Main>\n{code_snippet}\n</Main>\n\n<Term>\n{called_paragraph_text}\n</Term>\nGenerate the improved main function:"

In [None]:
#all_paragraphs['refined summary2'] = None

In [None]:
# for the paragraph with no called functions, the final explanation is already generated by text processing agent, and don't need to use text processing agent
temp_df = all_paragraphs.loc[all_paragraphs['called paragraphs'].apply(len)==0]
all_paragraphs.loc[temp_df.index, 'refined summary2'] = all_paragraphs.loc[temp_df.index, 'generated summary']

In [None]:
num_unexplained_last_iter = -1 # number of paragraph not explained in last iteration, check circular dependency
num_ready_last_iter = -1
while True:
    idx_paragraph_ready = all_paragraphs.loc[(all_paragraphs['refined summary2'].isna()&(all_paragraphs.apply(lambda x: check_paragraphs_ready(x, all_paragraphs), axis=1)))].index
    print(len(idx_paragraph_ready), 'paragraphs ready to explain')
    if len(idx_paragraph_ready) == num_ready_last_iter and len(all_paragraphs.loc[all_paragraphs['refined summary2'].isna()]) == num_unexplained_last_iter:
        if len(idx_paragraph_ready)==0: # all explained
            break
        else: #circular dependency
            all_paragraphs.at[idx_paragraph_ready[0], 'refined summary2'] = all_paragraphs.at[idx_paragraph_ready[0], 'generated summary']
            continue
    num_unexplained_last_iter = len(all_paragraphs.loc[all_paragraphs['refined summary2'].isna()]) 
    num_ready_last_iter = len(idx_paragraph_ready)
    for idx in idx_paragraph_ready:
        row = all_paragraphs.loc[idx]
        messages = [
            {"role": "system", "content": PARAGRAPH_SYSTEM_PROMPT},
            {"role": "user", "content": build_function_writing_prompt(row, all_paragraphs)}
        ]

        chat_completion = client.chat.completions.create(
            model=gpt_model_id,
            messages=messages
            )
        response = chat_completion.choices[0].message.content.replace('<Main>','').replace('</Main>','').strip()
        all_paragraphs.at[idx, 'refined summary2'] = response
        print(idx)

# File level

## baseline

In [None]:
def build_baseline_prompt_file(code):
    code_lst = code.split('\n')
    for i, line in enumerate(code_lst):
        if 'procedure division' in line.lower():
            break
    return 'Summarize the following COBOL file:\n'+'\n'.join(code_lst[i:])

In [None]:
for idx in file_reference.index:
    try:
        row = file_reference.loc[idx]
        prompt = build_baseline_prompt_file(row['code'])
        chat = [{ "role": "user", "content":  prompt}]
        chat = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)
        input_tokens = tokenizer(
            chat,
            return_tensors="pt",
            add_special_tokens=True
        ).input_ids.to(runtimeFlag)

        max_context = int(model.config.max_position_embeddings*scaling_factor)

        torch.cuda.empty_cache()
        # max_prompt_len = int(0.85 * max_context)
        # max_gen_len = int(0.10 * max_prompt_len)
        max_prompt_len = 1024
        max_gen_len = 1024

        output = model.generate(input_ids=input_tokens, max_new_tokens=150)
        new_tokens = output[0][input_tokens.shape[-1]:]
        response = tokenizer.decode(new_tokens, skip_special_tokens=True).strip()
        file_reference.at[idx, 'baseline'] = response
        print(idx, 'done')
    except RuntimeError as e1:
        file_reference.at[idx, 'baseline'] = 'Error Long'
        print(idx, 'Error long')

## Short file

In [None]:
SHORT_FILE_SYSTEM_PROMPT = 'Now, you are assistant to help me explain a COBOL file. I will give you the procedure division of the COBOL file embraced by <Code> tags. The data division of the file is given with <variable> tags. Generate a summary of the file base on the provided procedure and data divisoin. The summary should be a single cohesive paragraph with no more than 75 words that explains the basic business purpose and logic of the COBOL file. Ensure the summary is concise and cohesive.'

In [None]:
def build_file_granite_prompt(row):
    program_id = row['program id']
    file_name = row['filename']
    variables = row['data division']
    code = row['procedure division']
    basic_prompt = f"This COBOL file's name is {file_name} and the program ID is {program_id}. Following is the procedure and data division of the file. \n<Code>\n{code}\n</Code>\n\n<Variable>\n{variables}\n</Variable>\nGenerate a very short and cohesive explanation of the COBOL file. The explanation should be no more than 75 words."
    return basic_prompt

In [None]:
for idx, row in all_files.iterrows():
    try:
        prompt = build_file_granite_prompt(row)
        chat = [
            {"role": "system", "content": SHORT_FILE_SYSTEM_PROMPT},
            {"role": "user", "content": prompt}
        ]
        chat = tokenizer.apply_chat_template(chat, tokenize=False, add_generation_prompt=True)

        # Tokenize input
        input_tokens = tokenizer(
            chat,
            return_tensors="pt",
            add_special_tokens=True
        ).input_ids.to(runtimeFlag)

        # Define token constraints
        max_context = int(model.config.max_position_embeddings * scaling_factor)
        max_prompt_len = min(1024, int(0.85 * max_context))  # Ensure it's within bounds
        max_gen_len = min(1024, int(0.10 * max_context))

        # Free CUDA memory before running inference
        torch.cuda.empty_cache()

        # Generate response
        output = model.generate(input_ids=input_tokens, max_new_tokens=max_gen_len)
        new_tokens = output[0][input_tokens.shape[-1]:]
        response = tokenizer.decode(new_tokens, skip_special_tokens=True).strip()

        all_files.at[idx, 'refined summary2'] = response
        print(f"{idx} done")
    except RuntimeError as e1:
        all_files.at[idx, 'refined summary2'] = 'Error Long'
        print(idx, 'Error long')

## Long file

In [None]:
FILE_WRITING_SYSTEM_PROMPT = """You are an assistant helping to explain COBOL code at the file level. Our goal is to create a comprehensive explanation for the COBOL file by recursively merging explanations of its function chunks. When generating the comprehensive explanation, follow the relationships between functions.

Each chunk should be formatted as follows:
1. '<function_name>: <explanation>'
2. Separate the explanation of each function chunk with ---.

Use the <Relationship> tag to indicate relationships between functions.

The comprehensive explanation should not exceed 75 words. 
Clearly define the purpose and sequence of each function, ensuring the explanation follows the flow and dependencies between functions.
"""

In [None]:
def get_paragraph_name_summary(file_path, all_paragraphs):
    df = all_paragraphs.loc[all_paragraphs['file path']==file_path]
    df = df.loc[(df['is section']==True)|((df['is paragraph']==True)&(df['section name'].isna()))]
    return list(zip(df['function name'].to_list(),df['refined summary2'].to_list()))

def build_file_writing_prompt(row, all_paragraphs):
    pairs = get_paragraph_name_summary(row['file path'], all_paragraphs)
    summary_text = ''
    for item in pairs:
        summary_text += f"{item[0]}: {item[1]}\n---\n"
    relation_text = row['call relations']
    return f"Below are explanations of each paragraph in a COBOL file:\n{summary_text}<Relationship>\n{relation_text}\n</Relationship>\nWe are creating one comprehensive explanation for the COBOL file by recursively merging explanations of its paragraph chunks. When generating the comprehensive explanation, make sure to follow the paragraph relationship. You must briefly introduce the business purpose and functionality of the file. Generate the explanation in one short paragraph and do not use bullet points. Summary (75 words):"

In [None]:
for idx in all_files.index:
    row = all_files.loc[idx]
    messages = [
        {"role": "system", "content": FILE_WRITING_SYSTEM_PROMPT},
        {"role": "user", "content": build_file_writing_prompt(row, all_paragraphs)}
    ]

    chat_completion = client.chat.completions.create(
        model=gpt_model_id,
        messages=messages
        )
    response = chat_completion.choices[0].message.content.replace('<Main>','').replace('</Main>','').strip()
    all_files.at[idx, 'refined summary2'] = response
    print(build_file_writing_prompt(row, all_paragraphs))
    print(response)
    print(idx)