In [None]:
# %% [code] Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# %% [code] Environment Setup
!pip install transformers accelerate datasets peft --quiet

In [None]:
import os
os.chdir('/content/drive/MyDrive/PYOMO')  # Change path as needed

In [None]:
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

model_name = "deepseek-ai/deepseek-coder-1.3b-instruct"  # change this if necessary

tokenizer = AutoTokenizer.from_pretrained(model_name)
model_teacher = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto")
model_teacher.eval()

print("Teacher model loaded.")

In [None]:
!pip install -U datasets
!pip install fsspec==2023.9.2
!pip install jiwer

In [None]:
# TO extract samples having both latex and pyomo in the jsonl file.
import json

# === Input and output paths ===
input_file = "/content/drive/MyDrive/Combined/data.jsonl"      # Your input JSON file
output_file = "/content/drive/MyDrive/filtered.json"     # Output JSON file

# === Load original JSON lines ===
with open(input_file, "r", encoding="utf-8") as f:
    data = [json.loads(line) for line in f if line.strip()]

# === Filter and transform ===
converted_data = []
for entry in data:
    latex = entry.get("latex", "").strip()
    pyomo = entry.get("pyomo", "").strip()
    if latex and pyomo:  # Include only if pyomo is not blank
        converted_data.append({"input": latex, "output": pyomo})

# === Save to new JSON lines file ===
with open(output_file, "w", encoding="utf-8") as f:
    for item in converted_data:
        f.write(json.dumps(item, ensure_ascii=False) + "\n")

print(f"Filtered {len(converted_data)} valid entries saved to '{output_file}'")


In [None]:
#Splitting the dataset json to training and test splits.

import json
import random

# === Input and output paths ===
input_file = "/content/drive/MyDrive/PYOMO/filtered.json"        # File created from previous step
train_file = "/content/drive/MyDrive/PYOMO/filtered_train.jsonl"
test_file = "/content/drive/MyDrive/PYOMO/filtered_test.jsonl"

# === Parameters ===
test_ratio = 0.2                    # 20% for test, 80% for train
random_seed = 42                   # For reproducibility

# === Load data ===
with open(input_file, "r", encoding="utf-8") as f:
    data = [json.loads(line) for line in f if line.strip()]

# === Shuffle and split ===
random.seed(random_seed)
random.shuffle(data)

split_index = int(len(data) * (1 - test_ratio))
train_data = data[:split_index]
test_data = data[split_index:]

# === Write outputs ===
with open(train_file, "w", encoding="utf-8") as f_train:
    for item in train_data:
        f_train.write(json.dumps(item, ensure_ascii=False) + "\n")

with open(test_file, "w", encoding="utf-8") as f_test:
    for item in test_data:
        f_test.write(json.dumps(item, ensure_ascii=False) + "\n")

print(f"Total: {len(data)} → Train: {len(train_data)}, Test: {len(test_data)}")

In [None]:
# ADDING "endoftext" token at the end of each pyomo script in train split.

import json

# Paths
input_path = "/content/drive/MyDrive/PYOMO/filtered_train.json"
output_path = "/content/drive/MyDrive/PYOMO/filtered_with_endtoken.jsonl"

# Process and write
with open(input_path, "r", encoding="utf-8") as infile, open(output_path, "w", encoding="utf-8") as outfile:
    for line in infile:
        data = json.loads(line)
        # Append end token to output
        if not data["output"].strip().endswith("<|endoftext|>"):
            data["output"] += "<|endoftext|>"
        # Write back to new file
        json.dump(data, outfile, ensure_ascii=False)
        outfile.write("\n")

print(f"✅ Finished! New dataset saved to {output_path}")

In [None]:
# Finetuning on our json file having latex as "input" and pyomo as "output"

from transformers import Trainer, TrainingArguments, DataCollatorForLanguageModeling
from datasets import load_dataset

# ✅ Load dataset
dataset_path = "/content/drive/MyDrive/PYOMO/filtered_with_endtoken.jsonl"
raw_dataset = load_dataset("json", data_files=dataset_path)

# ✅ Add special tokens
special_tokens = {'eos_token': '<|endoftext|>'}
tokenizer.add_special_tokens(special_tokens)
model_teacher.resize_token_embeddings(len(tokenizer))

# ✅ Preprocessing function (with consistent instruct prompt)
def preprocess_function(examples):
    texts = [f"Convert the following LaTeX problem into clean Pyomo code. Output only variable declarations, objective function, and constraint.\nLaTeX Problem:\n{inp}\n### Solution:\n{outp}" for inp, outp in zip(examples["input"], examples["output"])]
    return tokenizer(texts, truncation=True, padding="max_length", max_length=1024)

tokenized_dataset = raw_dataset.map(preprocess_function, batched=True)

# ✅ Data collator for causal LM
data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=False
)

# ✅ Training arguments optimized for Colab
training_args = TrainingArguments(
    output_dir="/content/drive/MyDrive/PYOMO/finetuned_student",
    per_device_train_batch_size=2,
    gradient_accumulation_steps=4,
    num_train_epochs=20,
    weight_decay=0.01,
    learning_rate=5e-5,
    logging_steps=20,
    save_steps=200,  # less frequent saves
    save_total_limit=1,  # only latest checkpoint to save space
    fp16=True,
    resume_from_checkpoint=True,
    report_to=[],
)

# ✅ Trainer
trainer = Trainer(
    model=model_teacher,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    data_collator=data_collator,
)

# ✅ Start training
trainer.train()

# ✅ Save final model and tokenizer
trainer.save_model("/content/drive/MyDrive/PYOMO/finetuned_student")
tokenizer.save_pretrained("/content/drive/MyDrive/PYOMO/finetuned_student")

print("✅ Finetuning complete. Finetuned student model saved in Drive!")

In [None]:
# Inference

import re
import json
import torch
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.metrics import edit_distance
import jiwer

# === CONFIG ===
MODEL_PATH = "/content/drive/MyDrive/PYOMO/finetuned_student"              # path to model weights
TEST_JSON_PATH = "/content/drive/MyDrive/PYOMO/filtered_test.jsonl"      # input=latex, output=pyomo
OUTPUT_TXT_PATH = "/content/drive/MyDrive/PYOMO/TEMP_evaluation_report.txt"
MAX_NEW_TOKENS = 512

# === Setup model + tokenizer ===
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
model = AutoModelForCausalLM.from_pretrained(MODEL_PATH).to(device)
model.eval()

# Add special token if missing
tokenizer.add_special_tokens({'eos_token': '<|endoftext|>'})
model.resize_token_embeddings(len(tokenizer))

# === Tokenization Helpers ===
def tokenize_latex(expr):
    return re.findall(r'(\\[a-zA-Z]+|[{}_^=+\-*/(),]|[a-zA-Z]+|\d+)', expr)

class TokenizeTransform(jiwer.transforms.AbstractTransform):
    def process_string(self, s: str):
        return tokenize_latex(s)
    def process_list(self, tokens: list[str]):
        return [self.process_string(token) for token in tokens]

# === Metric Computation ===
def compute_cer(pairs):
    gt_list, pred_list = [], []
    for gt, pred in pairs:
        gt = str(gt).strip().replace("\n", " ")
        pred = str(pred).strip().replace("\n", " ")
        if gt == "" and pred == "":
            continue
        gt_list.append(gt)
        pred_list.append(pred)
    if not gt_list:
        return 1.0
    return jiwer.cer(
        truth=gt_list,
        hypothesis=pred_list,
        reference_transform=TokenizeTransform(),
        hypothesis_transform=TokenizeTransform()
    )

def compute_metrics(pairs):
    smoothie = SmoothingFunction().method4
    bleus = []
    for gt, pred in pairs:
        gt = gt.strip().replace("\n", " ")
        pred = pred.strip().replace("\n", " ")
        bleu = sentence_bleu([tokenize_latex(gt)], tokenize_latex(pred), smoothing_function=smoothie)
        bleus.append(bleu)
    cer = compute_cer(pairs)
    return {
        "bleu": sum(bleus) / len(bleus),
        "cer": cer
    }

def generate_pyomo_code(latex_input):
    prompt = f"Convert the following LaTeX problem into clean Pyomo code. Output only variable declarations, objective function, and constraint.\nLaTeX Problem:\n{latex_input}\n### Solution:\n"

    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    # Let model continue from the END of the prompt
    input_ids = inputs["input_ids"]
    attention_mask = inputs["attention_mask"]

    with torch.no_grad():
        outputs = model.generate(
            input_ids=input_ids,
            attention_mask=attention_mask,
            max_new_tokens=512,
            do_sample=False,
            num_beams=1,
            pad_token_id=tokenizer.eos_token_id,
            eos_token_id=tokenizer.eos_token_id
        )

    full_text = tokenizer.decode(outputs[0], skip_special_tokens=True)

    # Strip the original prompt if needed
    if full_text.startswith(prompt):
        generated = full_text[len(prompt):].strip()
    else:
        # Fallback: split by "### Solution:"
        generated = full_text.split("### Solution:")[-1].strip()

    return generated.replace('\n', ' ')

# === Evaluation Loop ===
def run_inference_evaluation():
    with open(TEST_JSON_PATH, "r", encoding="utf-8") as f:
        examples = [json.loads(line) for line in f if "input" in line and "output" in line]

    if not examples:
        print("❌ No valid examples found in the test file.")
        return

    results = []

    with open(OUTPUT_TXT_PATH, "w", encoding="utf-8") as fout:
        for i, ex in enumerate(tqdm(examples, desc="Evaluating")):
            latex = ex["input"].strip()
            gt_pyomo = ex["output"].strip()

            try:
                pred_pyomo = generate_pyomo_code(latex)
            except Exception as e:
                print(f"[Error on Sample {i+1}] {e}")
                pred_pyomo = ""

            bleu = sentence_bleu([tokenize_latex(gt_pyomo)], tokenize_latex(pred_pyomo), smoothing_function=SmoothingFunction().method4)
            ed = edit_distance(pred_pyomo, gt_pyomo) / max(len(pred_pyomo), len(gt_pyomo)) if max(len(pred_pyomo), len(gt_pyomo)) > 0 else 0.0

            results.append((gt_pyomo, pred_pyomo))

            fout.write(f"Sample {i+1}\n")
            fout.write(f"Input (LaTeX)   : {latex}\n")
            fout.write(f"Ground Truth    : {gt_pyomo}\n")
            fout.write(f"Prediction      : {pred_pyomo}\n")
            fout.write(f"BLEU            : {bleu:.4f}\n")
            fout.write("-" * 60 + "\n")

        # Final summary
        metrics = compute_metrics(results)
        fout.write("\n=== Summary ===\n")
        fout.write(f"Total Samples    : {len(results)}\n")
        fout.write(f"Average BLEU     : {metrics['bleu']:.4f}\n")
        fout.write(f"Average CER      : {metrics['cer']:.4f}\n")

    print("\n✅ Inference and Evaluation complete.")
    print(f"Report saved to: {OUTPUT_TXT_PATH}")

# === Run It ===
run_inference_evaluation()
