In [None]:
from google.colab import drive
drive.mount("/content/drive")

In [None]:
!pip install transformers datasets accelerate
from transformers import T5Tokenizer, T5ForConditionalGeneration, Seq2SeqTrainer, Seq2SeqTrainingArguments, DataCollatorForSeq2Seq
from datasets import load_dataset, Dataset
import json
import torch

In [None]:
with open('/content/S3_flan_old.json') as f:
    raw_data = json.load(f)

In [None]:
!pip install transformers datasets torch

In [None]:
import json
from datasets import Dataset
from transformers import T5Tokenizer, T5ForConditionalGeneration
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments
from transformers import DataCollatorForSeq2Seq

In [None]:
model_name = "google/flan-t5-base"
tokenizer = T5Tokenizer.from_pretrained(model_name)

In [None]:
print(tokenizer.tokenize("{"))
print(tokenizer.tokenize("}"))

In [None]:
from transformers import (
    T5Tokenizer,
    T5ForConditionalGeneration,
    Seq2SeqTrainer,
    Seq2SeqTrainingArguments,
    DataCollatorForSeq2Seq
)
from datasets import Dataset
import json

# ✅ Load your dataset
with open("/content/drive/MyDrive/Changai/S3/Datasets/S3_flan_old.json") as f:
    raw_data = json.load(f)

INSTRUCTION = "Generate the correct Frappe query for the given question, using the provided doctype and fields."

# ✅ Prepare prompt-based data
processed_data = []
for entry in raw_data:
    try:
        input_data = entry["input"]
        if not isinstance(input_data, dict):
            continue

        prompt = (
            f"Instruction: {INSTRUCTION}\n"
            f"Doctype: {input_data['doctype']}\n"
            f"Question: {input_data['question']}\n"
            f"Fields: {input_data['fields']}"
        )

        processed_data.append({
            "input": prompt,
            "output": entry["output"]
        })
    except Exception as e:
        print("Skipping due to error:", e)
        continue

# ✅ Convert to Hugging Face Dataset
dataset = Dataset.from_list(processed_data)
dataset = dataset.train_test_split(test_size=0.1)


model_name = "google/flan-t5-base"
tokenizer = T5Tokenizer.from_pretrained(model_name)

# ✅ Add curly braces as special tokens (ONLY if they map to <unk>)
special_tokens_dict = {'additional_special_tokens': ['{', '}']}
num_added = tokenizer.add_special_tokens(special_tokens_dict)

# ✅ Load and resize model to accommodate new tokens
model = T5ForConditionalGeneration.from_pretrained(model_name)
model.resize_token_embeddings(len(tokenizer))

# ✅ Tokenize with `{}` token loss reinforcement
def tokenize_and_enhance_loss(example):
    model_inputs = tokenizer(example["input"], max_length=512, truncation=True, padding="max_length")
    with tokenizer.as_target_tokenizer():
        labels = tokenizer(example["output"], max_length=128, truncation=True, padding="max_length")

    # Reinforce loss on `{` and `}`
    target_ids = labels["input_ids"]
    for i, token_id in enumerate(target_ids):
        if token_id == tokenizer.pad_token_id:
            target_ids[i] = -100  # Mask pad tokens
        if token_id in [tokenizer.convert_tokens_to_ids('{'), tokenizer.convert_tokens_to_ids('}')]:
            continue  # Keep loss for curly braces

    model_inputs["labels"] = target_ids
    return model_inputs

# ✅ Tokenize dataset
tokenized_datasets = dataset.map(tokenize_and_enhance_loss, batched=False)

# ✅ Define training arguments
training_args = Seq2SeqTrainingArguments(
    output_dir="/content/drive/MyDrive/Changai/S3_test",
    eval_strategy="epoch",
    learning_rate=3e-5,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    weight_decay=0.01,
    save_total_limit=2,
    num_train_epochs=10,
    predict_with_generate=True,
    logging_dir="./logs",
    logging_steps=100
)

# ✅ Data collator
data_collator = DataCollatorForSeq2Seq(tokenizer, model=model)

# ✅ Trainer setup
trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["test"],
    tokenizer=tokenizer,
    data_collator=data_collator,
)

# ✅ Train and Save
trainer.train()
trainer.evaluate()
model.save_pretrained("/content/drive/MyDrive/Changai/S3_test")
tokenizer.save_pretrained("/content/drive/MyDrive/Changai/S3_test")

In [None]:
from transformers import T5Tokenizer, T5ForConditionalGeneration

model_path = "/content/drive/MyDrive/Changai/S3_test"
tokenizer = T5Tokenizer.from_pretrained(model_path)
model = T5ForConditionalGeneration.from_pretrained(model_path)

# 🔍 Sample test input
test_prompt = """Instruction: Generate the correct Frappe query for the given question, using the provided doctype and fields.
Doctype: Company
Question: Show me company details where the name is 'France'.
Fields: name, country"""

inputs = tokenizer(test_prompt, return_tensors="pt")

output_ids = model.generate(
    input_ids=inputs["input_ids"],
    max_length=128,
    do_sample=False,
    num_beams=4,
    early_stopping=True
)

# ✅ Decode and print output
output_text = tokenizer.decode(output_ids[0], skip_special_tokens=False)
output_text=output_text.replace("<pad>","")
output_text=output_text.replace("</s>","")
print("📄 Generated Query:\n", output_text)


In [None]:
model_path="/content/drive/MyDrive/Changai/S3_test"
model = T5ForConditionalGeneration.from_pretrained(model_path)
tokenizer = T5Tokenizer.from_pretrained(model_path)

In [None]:
from huggingface_hub import HfApi, HfFolder, notebook_login
notebook_login()

In [None]:
from huggingface_hub import create_repo

create_repo("text2frappe-s3-flan-query", private=True)

In [None]:
from huggingface_hub import upload_folder

upload_folder(
    repo_id="hyrinmansoor/text2frappe-s3-flan-query",
    folder_path=model_path,
    path_in_repo=".",
    repo_type="model"
)


In [None]:
from transformers import T5ForConditionalGeneration, T5Tokenizer

model = T5ForConditionalGeneration.from_pretrained('hyrinmansoor/text2frappe-s3-flan-query')
tokenizer = T5Tokenizer.from_pretrained('hyrinmansoor/text2frappe-s3-flan-query')
test_cases = [
    """Instruction: Generate the correct Frappe query for the given question, using the provided doctype and fields.
Doctype: Company
Fields: company_name
Question: What's the full name of our primary registered company named 'HTS Pvt Ltd'?""",

    """Instruction: Generate the correct Frappe query for the given question, using the provided doctype and fields.
Doctype: Company
Fields: parent_company
Question: Who is listed as the parent company of 'FusionCorp International'?""",

    """Instruction: Generate the correct Frappe query for the given question, using the provided doctype and fields.
Doctype: Finance Book
Fields: finance_book_name
Question: Can I rename or update the name of an existing finance book?"""
]

# Define generation function
def generate_query(test_input):
    inputs = tokenizer(test_input, return_tensors="pt", max_length=512, truncation=True, padding=True)
    outputs = model.generate(inputs['input_ids'], max_length=512, num_beams=5, early_stopping=True)
    decoded= tokenizer.decode(outputs[0], skip_special_tokens=False)
    decoded=output_text.replace("<pad>","")
    decoded=output_text.replace("</s>","")
    return decoded


# Run test cases
for i, test_input in enumerate(test_cases, start=1):
    print(f"\n--- Test Case {i} ---")
    print(f"Input:\n{test_input}")
    print("\nGenerated Query:")
    print(generate_query(test_input))
    print("\n")
