In [None]:
# Cell 1: Installation & Setup
!pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
!pip install --no-deps xformers "trl<0.9.0" peft accelerate bitsandbytes
!pip install fastapi uvicorn pyngrok nest_asyncio python-multipart

import nest_asyncio
from pyngrok import ngrok
import uvicorn
import zipfile
import shutil

# PASTE YOUR NGROK AUTHTOKEN HERE
# You can get one from https://dashboard.ngrok.com/get-started/your-authtoken
NGROK_AUTHTOKEN = "PASTE_YOUR_AUTHTOKEN_HERE"

ngrok.set_auth_token(NGROK_AUTHTOKEN)
nest_asyncio.apply()

In [None]:
# Cell 2: The FastAPI Application
from fastapi import FastAPI, File, UploadFile, HTTPException, BackgroundTasks, Form
from fastapi.responses import FileResponse
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
import os
import sys
import re
import json
import uuid
import asyncio
import subprocess
from datetime import datetime
import logging
from pathlib import Path
import shutil
import zipfile

# --- Setup for Colab Environment ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(
    title="Unsloth Fine-tuning API (on Colab)",
    description="API running on Google Colab for fine-tuning models and automated download.",
    version="1.1.0"
)

# Global state is stored in memory for the Colab runtime
training_jobs: Dict[str, Dict[str, Any]] = {}

# Use Colab's temporary filesystem
UPLOAD_DIR = Path("/content/uploads")
MODELS_DIR = Path("/content/trained_models")
ZIPPED_MODELS_DIR = Path("/content/zipped_models")
UPLOAD_DIR.mkdir(exist_ok=True)
MODELS_DIR.mkdir(exist_ok=True)
ZIPPED_MODELS_DIR.mkdir(exist_ok=True)

# Pydantic Models remain the same
class TrainingStatus(BaseModel):
    job_id: str; status: str; progress: Optional[float] = None; message: Optional[str] = None
    start_time: Optional[datetime] = None; end_time: Optional[datetime] = None
    model_path: Optional[str] = None; logs: Optional[List[str]] = None

AVAILABLE_MODELS = [
    "unsloth/Llama-3.2-1B-Instruct", "unsloth/tinyllama-bnb-4bit",
    "unsloth/llama-2-7b-bnb-4bit", "unsloth/mistral-7b-bnb-4bit",
]

# The core training logic and functions (create_training_script, run_training)
# are the same as before. They are included here for completeness.

def create_training_script(job_data: Dict[str, Any]) -> str:
    model_name = job_data["model_name"]
    dataset_path = UPLOAD_DIR / job_data["dataset_file"]
    params = job_data["parameters"]
    job_id = job_data["job_id"]
    output_model_dir = f"/content/trained_models/{job_id}"
    script = f"""
import os, sys, torch, json, re
from unsloth import FastLanguageModel
from datasets import Dataset
from trl import SFTTrainer
from transformers import TrainingArguments
def prepare_document_data(file_path, chunk_size=256):
    with open(file_path, 'r', encoding='utf-8') as f: document_text = f.read()
    sentences = re.split(r'(?<=[.!?])\\s+', document_text)
    chunks, current_chunk, current_length = [], [], 0
    for sentence in sentences:
        sentence = sentence.strip()
        if not sentence: continue
        sentence_length = len(sentence.split())
        if current_length + sentence_length > chunk_size and current_chunk:
            chunks.append(" ".join(current_chunk)); current_chunk = [sentence]; current_length = sentence_length
        else: current_chunk.append(sentence); current_length += sentence_length
    if current_chunk: chunks.append(" ".join(current_chunk))
    return Dataset.from_dict({{"text": chunks}})
def main():
    try:
        model_name = "{model_name}"
        max_seq_length = {params['max_seq_length']}
        model, tokenizer = FastLanguageModel.from_pretrained(model_name=model_name, max_seq_length=max_seq_length, dtype=None, load_in_4bit=True)
        model = FastLanguageModel.get_peft_model(model, r=8, target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"], lora_alpha=16, lora_dropout=0, bias="none", use_gradient_checkpointing=True, random_state=3407)
        dataset = prepare_document_data(r"{str(dataset_path)}")
        training_args = TrainingArguments(output_dir="{output_model_dir}", num_train_epochs={params['num_train_epochs']}, per_device_train_batch_size={params['per_device_train_batch_size']}, gradient_accumulation_steps={params['gradient_accumulation_steps']}, warmup_steps={params['warmup_steps']}, learning_rate={params['learning_rate']}, fp16=not torch.cuda.is_bf16_supported(), bf16=torch.cuda.is_bf16_supported(), logging_steps={params['logging_steps']}, optim="adamw_8bit", save_strategy="steps", save_steps={params['save_steps']}, save_total_limit=1, report_to="none", seed=3407, disable_tqdm=True)
        trainer = SFTTrainer(model=model, tokenizer=tokenizer, train_dataset=dataset, dataset_text_field="text", max_seq_length=max_seq_length, args=training_args)
        print(f"UNSLOTH_TOTAL_STEPS={{trainer.state.max_steps}}"); sys.stdout.flush()
        trainer.train()
        model.save_pretrained("{output_model_dir}"); tokenizer.save_pretrained("{output_model_dir}")
    except Exception as e:
        import traceback; traceback.print_exc(); sys.exit(1)
if __name__ == "__main__": main()
"""
    return script

async def run_training(job_id: str, job_data: Dict[str, Any]):
    job_data["status"] = "running"
    script_content = create_training_script(job_data)
    script_path = Path(f"/content/training_script_{job_id}.py")
    total_steps = 0
    try:
        with open(script_path, "w") as f: f.write(script_content)
        process = await asyncio.create_subprocess_exec(sys.executable, "-u", str(script_path), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
        async for line in process.stdout:
            line_str = line.decode().strip()
            if not line_str: continue
            job_data["logs"].append(line_str)
            logger.info(f"Job {job_id}: {line_str}")
            if line_str.startswith("UNSLOTH_TOTAL_STEPS="):
                try: total_steps = int(line_str.split('=')[1])
                except: pass
            if line_str.startswith("{") and line_str.endswith("}"):
                try:
                    log_data = json.loads(line_str)
                    if "step" in log_data and total_steps > 0:
                        job_data["progress"] = round((log_data["step"] / total_steps) * 100, 2)
                except: pass
        await process.wait()
        if process.returncode == 0:
            job_data["status"] = "completed"; job_data["progress"] = 100.0; job_data["end_time"] = datetime.now()
            job_data["model_path"] = f"/content/trained_models/{job_id}"
        else:
            job_data["status"] = "failed"; job_data["end_time"] = datetime.now()
    finally:
        if script_path.exists(): script_path.unlink()

@app.post("/train", status_code=202)
async def start_training(background_tasks: BackgroundTasks, model_name: str = Form(...), dataset_file: str = Form(...)):
    if model_name not in AVAILABLE_MODELS: raise HTTPException(400, "Model not available")
    if not (UPLOAD_DIR / dataset_file).exists(): raise HTTPException(404, "Dataset not found")
    job_id = str(uuid.uuid4())
    training_jobs[job_id] = {"job_id": job_id, "status": "pending", "model_name": model_name, "dataset_file": dataset_file,
        "parameters": {"max_seq_length": 1024, "learning_rate": 2e-4, "num_train_epochs": 1, "per_device_train_batch_size": 2, "gradient_accumulation_steps": 4, "warmup_steps": 5, "save_steps": 50, "logging_steps": 1},
        "start_time": datetime.now(), "logs": [], "progress": 0.0}
    background_tasks.add_task(run_training, job_id, training_jobs[job_id])
    return {"job_id": job_id, "status": "Training queued"}

@app.post("/upload")
async def upload_dataset(file: UploadFile = File(...)):
    file_path = UPLOAD_DIR / file.filename
    with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer)
    return {"message": "File uploaded successfully", "filename": file.filename}

@app.get("/status/{job_id}", response_model=TrainingStatus)
async def get_training_status(job_id: str):
    if job_id not in training_jobs: raise HTTPException(status_code=404, detail="Job not found")
    return TrainingStatus(**training_jobs[job_id])

@app.get("/download/{job_id}")
async def download_model(job_id: str):
    """Zips the trained model folder and provides it for download."""
    if job_id not in training_jobs or training_jobs[job_id].get("status") != "completed":
        raise HTTPException(status_code=404, detail="Job not found or not completed.")

    model_path = Path(training_jobs[job_id]["model_path"])
    zip_path = ZIPPED_MODELS_DIR / f"{job_id}.zip"

    # Create a zip file of the model directory
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, dirs, files in os.walk(model_path):
            for file in files:
                file_path = os.path.join(root, file)
                archive_name = os.path.relpath(file_path, model_path)
                zipf.write(file_path, archive_name)

    return FileResponse(path=zip_path, media_type='application/zip', filename=f"model_{job_id}.zip")

# --- Start the server ---
public_url = ngrok.connect(8000)
print("✅ FastAPI server is running.")
print(f"🚀 Public URL: {public_url.public_url}")
print(f"📄 API Docs available at: {public_url.public_url}/docs")
uvicorn.run(app, host="0.0.0.0", port=8000)