In [None]:
!pip install -q datasets
!pip install -q unsloth

In [None]:
from datasets import load_dataset
import re

print("Loading MG-Verilog Dataset...")
dataset = load_dataset("GaTech-EIC/MG-Verilog", split="train")
dataset = load_dataset(
    "GaTech-EIC/MG-Verilog",
    split="train",
    data_files="**/*.arrow"
)
print(dataset)

In [None]:
import re

def smart_preprocessing(example):
    # 1. Access nested dictionary safely
    try:
        raw_summary = example['description']['detailed_global_summary']
        raw_body = example['code']
    except (KeyError, TypeError):
        return None # Skip broken rows

    # 2. Extract Header using the explicit separator
    # The dataset uses "Module header:" to separate the description from the code def
    if "Module header:" in raw_summary:
        parts = raw_summary.split("Module header:")

        # Part 0 is the Description
        raw_desc = parts[0]

        # Part 1 is the Header (we need to clean the trailing [/INST])
        raw_header = parts[1]

        # Clean the Header: Remove [/INST] and whitespace
        module_header = raw_header.replace("[/INST]", "").strip()

    else:
        # Fallback: If "Module header:" isn't found, try a STRICT regex
        # (Look for 'module' followed by a Variable Name, not just any word)
        header_match = re.search(r'(module\s+[a-zA-Z0-9_]+\s*#?.*?\);)', raw_summary, re.DOTALL)
        if header_match:
            module_header = header_match.group(1).strip()
            raw_desc = raw_summary.split(module_header)[0]
        else:
            return None # Skip if we can't find a valid header

    # 3. Clean the Description
    # Remove the Llama-2 system tags <s>[INST] ... <</SYS>>
    clean_desc = re.sub(r'<s>\[INST\].*?<</SYS>>', '', raw_desc, flags=re.DOTALL)
    clean_desc = clean_desc.strip()

    # 4. Stitch Header + Body
    full_verilog_code = f"{module_header}\n{raw_body}"

    return {
        "instruction": clean_desc,
        "output": full_verilog_code
    }

# Apply the fix
# .filter(lambda x: x is not None) removes rows where we couldn't find the header
processed_dataset = dataset.map(smart_preprocessing).filter(lambda x: x is not None)

# Verify the Fix
print(f"Successfully processed {len(processed_dataset)} examples.")
if len(processed_dataset) > 0:
    print("\n--- CHECKING FIRST EXAMPLE ---")
    print("INSTRUCTION START:", processed_dataset[0]['instruction'][:100])
    print("CODE START:", processed_dataset[0]['output'][:100])

In [None]:
# import json
# import random

# input_filename = "verilog_smolify_final.json"
# output_filename = "verilog_smolify_nano.json"

# # 1. Load Data
# with open(input_filename, "r") as f:
#     data = json.load(f)

# # 2. Extreme Reduction (Just 50 examples)
# # This is just to pass the server check.
# random.seed(42)
# nano_subset = random.sample(data, 50)

# # 3. Save
# with open(output_filename, "w") as f:
#     json.dump(nano_subset, f, indent=2)

# print(f"Created Nano file with 50 examples.")

In [None]:
# import json
# import csv

# input_filename = "verilog_smolify_nano.json" # Using the nano file (50 examples)
# output_filename = "verilog_smolify_nano.csv"

# print(f"Reading {input_filename}...")
# with open(input_filename, "r") as f:
#     data = json.load(f)

# print("Converting to Flattened CSV...")

# # Define the CSV headers explicitly requested by the error message
# headers = ["system", "user", "assistant"]

# with open(output_filename, "w", newline="", encoding="utf-8") as f:
#     writer = csv.DictWriter(f, fieldnames=headers)
#     writer.writeheader()

#     for entry in data:
#         # Extract content from the nested "messages" list
#         # We assume the order is always System -> User -> Assistant
#         # (which our previous script guaranteed)

#         try:
#             row = {
#                 "system": entry["messages"][0]["content"],
#                 "user": entry["messages"][1]["content"],
#                 "assistant": entry["messages"][2]["content"]
#             }
#             writer.writerow(row)
#         except (IndexError, KeyError):
#             continue # Skip any malformed rows

# print(f"Success! Download '{output_filename}' and upload it.")

In [None]:
# import csv

# # We use the dataset variable from memory (the one with 11k examples)
# # If you lost it, re-run the 'smart_preprocessing' cell.

# output_filename = "verilog_smolify_FULL.csv"

# print(f"Converting {len(processed_dataset)} examples to CSV...")

# # Define the 3 mandatory columns
# headers = ["system", "user", "assistant"]

# with open(output_filename, "w", newline="", encoding="utf-8") as f:
#     writer = csv.DictWriter(f, fieldnames=headers)
#     writer.writeheader()

#     for entry in processed_dataset:
#         # Our processed_dataset has 'instruction' and 'output'
#         # We map them to the 3 columns Smolify wants

#         row = {
#             "system": "You are an expert Hardware Engineer. Write synthesizable Verilog code.",
#             "user": entry['instruction'],
#             "assistant": entry['output']
#         }

#         # Clean up newlines to prevent CSV breaking (optional but safe)
#         # Python's csv module handles newlines inside quotes automatically,
#         # so we generally don't need to strip them, but we ensure utf-8.
#         writer.writerow(row)

# import os
# file_size_mb = os.path.getsize(output_filename) / (1024 * 1024)
# print(f"Success! File size is: {file_size_mb:.2f} MB")

# if file_size_mb > 50:
#     print("WARNING: This might be too big for their server.")
#     print("If upload fails, ask me for the '50MB Splitter Script'.")
# else:
#     print("PERFECT! This is safe to upload.")

In [None]:
# import csv
# import os

# input_filename = "verilog_smolify_FULL.csv"
# output_filename = "verilog_smolify_SAFE.csv"
# TARGET_SIZE_MB = 4.8  # Staying under 5MB to be 100% safe

# print(f"Creating a file under {TARGET_SIZE_MB} MB...")

# # 1. Read the full dataset
# rows = []
# with open(input_filename, "r", encoding="utf-8") as f:
#     reader = csv.DictReader(f)
#     headers = reader.fieldnames
#     for row in reader:
#         rows.append(row)

# # 2. Write rows until we hit the limit
# with open(output_filename, "w", newline="", encoding="utf-8") as f:
#     writer = csv.DictWriter(f, fieldnames=headers)
#     writer.writeheader()

#     count = 0
#     for row in rows:
#         writer.writerow(row)
#         count += 1

#         # Check size every 100 rows to be fast
#         if count % 100 == 0:
#             f.flush() # Force write to disk to check real size
#             current_size = os.path.getsize(output_filename) / (1024 * 1024)

#             if current_size >= TARGET_SIZE_MB:
#                 print(f"Reached {current_size:.2f} MB with {count} examples.")
#                 break

# print(f"Success! Saved '{output_filename}'.")
# print("Upload this specific file. It contains the maximum data allowed.")

In [None]:
# import json
# import csv

# # Load your existing data
# with open("verilog_smolify_nano.json", "r") as f:
#     data = json.load(f)

# # Take ONLY 3 examples (To stay under the 5 RPM limit)
# micro_subset = data[:3]

# # Save as CSV
# with open("verilog_smolify_micro.csv", "w", newline="", encoding="utf-8") as f:
#     writer = csv.DictWriter(f, fieldnames=["system", "user", "assistant"])
#     writer.writeheader()
#     for entry in micro_subset:
#         writer.writerow({
#             "system": entry["messages"][0]["content"],
#             "user": entry["messages"][1]["content"],
#             "assistant": entry["messages"][2]["content"]
#         })

# print("Created 'verilog_smolify_micro.csv'. Upload this!")

In [None]:
# 1. Stop the current cell and run this to clear the stuck process
import torch
from packaging import version as V

# 2. Optimized Unsloth & Xformers installation
# We use --no-deps to skip the "building wheels" trap for dependencies
!pip install --upgrade pip
!pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"

# Determine the correct xformers version based on your torch version
xformers_version = "xformers==0.0.27" if V.Version(torch.__version__) < V.Version("2.4.0") else "xformers"
!pip install --no-deps {xformers_version} trl peft accelerate bitsandbytes triton

In [None]:
from unsloth import FastVisionModel
import torch

# 1. Load the Model (4-bit for T4 VRAM efficiency)
model, tokenizer = FastVisionModel.from_pretrained(
    "Qwen/Qwen2.5-VL-7B-Instruct",
    load_in_4bit = True,
    use_gradient_checkpointing = "unsloth",
)

# 2. Add LoRA Adapters
model = FastVisionModel.get_peft_model(
    model,
    r = 16,
    target_modules = ["q_proj", "k_proj", "v_proj", "o_proj",
                      "gate_proj", "up_proj", "down_proj"],
    lora_alpha = 16,
    lora_dropout = 0,
    bias = "none",
    use_gradient_checkpointing = "unsloth",
    random_state = 3407,
)

print("Qwen2.5-VL Loaded for Text-to-Code Phase.")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Create a folder for your checkpoints
import os
checkpoint_path = "/content/drive/MyDrive/qwen_verilog_checkpoints"
if not os.path.exists(checkpoint_path):
    os.makedirs(checkpoint_path)

In [None]:
from trl import SFTTrainer
from transformers import TrainingArguments
from unsloth import is_bfloat16_supported

# 1. Updated Formatting for Qwen VL (Fixes the KeyError)
def format_data(examples):
    # Dig into the 'description' dictionary to find the 'detailed_global_summary'
    # We use .get() or nested access as you noted: dataset['description']['detailed_global_summary']
    instructions = [desc["detailed_global_summary"] for desc in examples["description"]]
    outputs = examples["code"] # The 'code' key is at the top level
    texts = []

    for inst, out in zip(instructions, outputs):
        messages = [
            {"role": "system", "content": [{"type": "text", "text": "You are a Verilog expert."}]},
            {"role": "user", "content": [{"type": "text", "text": inst}]},
            {"role": "assistant", "content": [{"type": "text", "text": out}]}
        ]
        texts.append(messages)
    return { "messages" : texts }

# Apply the mapping - this will now successfully find the nested keys
qwen_train_data = dataset.map(format_data, batched = True)

# 2. Setup the Trainer
trainer = SFTTrainer(
    model = model,
    tokenizer = tokenizer,
    data_collator = UnslothVisionDataCollator(model, tokenizer), # MUST use for Vision
    train_dataset = qwen_train_data,
    args = SFTConfig(
        per_device_train_batch_size = 2,
        gradient_accumulation_steps = 4,
        warmup_steps = 100,
        num_train_epochs = 1,
        learning_rate = 2e-4,
        fp16 = not is_bfloat16_supported(),
        bf16 = is_bfloat16_supported(),
        logging_steps = 1,
        optim = "adamw_8bit",
        output_dir = "/content/drive/MyDrive/qwen_verilog_checkpoints", # Save to Drive!
        save_steps = 200,                # Save every ~1 hour
        save_total_limit = 2,
        remove_unused_columns = False, # REQUIRED for Vision
        dataset_text_field = "",        # Must be empty for Vision
        dataset_kwargs = {"skip_prepare_dataset": True}, # REQUIRED
        max_seq_length = 2048,
    ),
)

trainer.train()

Map:   0%|          | 0/11144 [00:00<?, ? examples/s]

Unsloth: Model does not have a default image size - using 512


==((====))==  Unsloth - 2x faster free finetuning | Num GPUs used = 1
   \\   /|    Num examples = 11,144 | Num Epochs = 1 | Total steps = 1,393
O^O/ \_/ \    Batch size per device = 2 | Gradient accumulation steps = 4
\        /    Data Parallel GPUs = 1 | Total batch size (2 x 4 x 1) = 8
 "-____-"     Trainable parameters = 47,589,376 of 8,339,756,032 (0.57% trained)


Unsloth: Will smartly offload gradients to save VRAM!


Step,Training Loss
1,2.7857
2,2.742
3,3.4869
4,3.4525
5,2.9516
6,2.8866
7,3.5871
8,2.3078
9,2.6565
10,2.2904


Step,Training Loss
1,2.7857
2,2.742
3,3.4869
4,3.4525
5,2.9516
6,2.8866
7,3.5871
8,2.3078
9,2.6565
10,2.2904


In [None]:
from trl import SFTTrainer, SFTConfig
from unsloth.trainer import UnslothVisionDataCollator
from unsloth import is_bfloat16_supported

# 1. Re-apply Data Mapping
# (Dataset and Model must be loaded in previous cells for this to work)
def format_data(examples):
    instructions = [desc["detailed_global_summary"] for desc in examples["description"]]
    outputs = examples["code"]
    texts = []
    for inst, out in zip(instructions, outputs):
        messages = [
            {"role": "system", "content": [{"type": "text", "text": "You are a Verilog expert."}]},
            {"role": "user", "content": [{"type": "text", "text": inst}]},
            {"role": "assistant", "content": [{"type": "text", "text": out}]}
        ]
        texts.append(messages)
    return { "messages" : texts }

qwen_train_data = dataset.map(format_data, batched = True)

# 2. Re-initialize the Trainer with your Drive path
trainer = SFTTrainer(
    model = model,
    tokenizer = tokenizer,
    data_collator = UnslothVisionDataCollator(model, tokenizer),
    train_dataset = qwen_train_data,
    args = SFTConfig(
        per_device_train_batch_size = 2,
        gradient_accumulation_steps = 4,
        warmup_steps = 100,
        num_train_epochs = 1,
        learning_rate = 2e-4,
        fp16 = not is_bfloat16_supported(),
        bf16 = is_bfloat16_supported(),
        logging_steps = 1,
        optim = "adamw_8bit",
        output_dir = "/content/drive/MyDrive/qwen_verilog_checkpoints",
        save_steps = 200,
        save_total_limit = 2,
        remove_unused_columns = False,
        dataset_text_field = "",
        dataset_kwargs = {"skip_prepare_dataset": True},
        max_seq_length = 2048,
    ),
)

# 3. RESUME COMMAND
# This looks into 'output_dir' and loads the latest checkpoint automatically
trainer.train(resume_from_checkpoint = True)