# Training Phi-3-mini-128k-instruct to Learn Swift Programming Language

This notebook trains Microsoft's Phi-3-mini-128k-instruct model to understand and work with Swift code using a dataset of real Swift files.

In [None]:
# Install required libraries
!pip install transformers datasets evaluate torch scikit-learn tqdm dropbox requests accelerate peft bitsandbytes

In [None]:
# Important: These imports must be properly separated
import os
import json
import torch
import random
import numpy as np
import time
import gc
import re
import collections
import psutil  # Add psutil for memory monitoring
from tqdm.auto import tqdm
from datasets import load_dataset, ClassLabel
from torch.utils.data import DataLoader, Dataset
from transformers import (
    AutoTokenizer, 
    AutoModelForCausalLM,
    Trainer, 
    TrainingArguments,
    set_seed,
    DataCollatorForLanguageModeling,
    EarlyStoppingCallback,
    BitsAndBytesConfig
)
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
# Note: We're using string literals for task_type instead of importing TaskType enum to avoid compatibility issues

# Set a seed for reproducibility
set_seed(42)

# Add memory management function with more detailed reporting
def cleanup_memory():
    """Force garbage collection and clear CUDA cache if available."""
    # Get memory usage before cleanup
    before = psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2
    
    # Perform cleanup
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    
    # Get memory usage after cleanup
    after = psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2
    print(f"Memory cleaned up. Before: {before:.2f} MB, After: {after:.2f} MB, Freed: {before - after:.2f} MB")
    
    # Print system memory info
    mem = psutil.virtual_memory()
    print(f"System memory: {mem.percent}% used, {mem.available / 1024 / 1024:.2f} MB available")


In [None]:
# Check if GPU is available
import torch

if torch.cuda.is_available():
    device = torch.device('cuda')
    print(f"Using GPU: {torch.cuda.get_device_name(0)}")
else:
    device = torch.device('cpu')
    print("Using CPU - Note: Training will be much slower on CPU")

# Set random seeds for reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)


## Dataset and Model Configuration

Let's define the model and dataset we'll be using:

In [None]:
# Dataset configuration - using the same dataset as the original notebook
DATASET_ID = "mvasiliniuc/iva-swift-codeint"

# Model configuration - using Phi-3-mini-128k-instruct
MODEL_NAME = "microsoft/Phi-3-mini-128k-instruct"
MAX_LENGTH = 4096  # Phi-3 can handle long sequences natively
BATCH_SIZE = 4  # Reduced batch size due to model size
LEARNING_RATE = 2e-5
WEIGHT_DECAY = 0.01
NUM_EPOCHS = 3
WARMUP_RATIO = 0.03
GRADIENT_ACCUMULATION_STEPS = 8

# LoRA configuration
LORA_R = 16
LORA_ALPHA = 32
LORA_DROPOUT = 0.05

# Debug mode for testing with smaller dataset
DEBUG_MODE = False
DEBUG_SAMPLE_SIZE = 100

print(f"Using model: {MODEL_NAME}")
print(f"Max sequence length: {MAX_LENGTH}")
print(f"Batch size: {BATCH_SIZE}")
print(f"LoRA rank: {LORA_R}")


In [None]:
# Function to load dataset with retry logic
def load_dataset_with_retry(dataset_id, max_retries=3, retry_delay=5):
    """Load a dataset with retry logic."""
    for attempt in range(max_retries):
        try:
            print(f"Loading dataset (attempt {attempt+1}/{max_retries})...")
            data = load_dataset(dataset_id, trust_remote_code=True)
            print(f"Dataset loaded successfully with {len(data['train'])} examples")
            return data
        except Exception as e:
            print(f"Error loading dataset (attempt {attempt+1}/{max_retries}): {e}")
            if attempt < max_retries - 1:
                print(f"Retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)
            else:
                print("Maximum retries reached. Could not load dataset.")
                raise

# Load the dataset with retry logic
try:
    print(f"Loading dataset: {DATASET_ID}")
    data = load_dataset_with_retry(DATASET_ID)
    print("Dataset structure:")
    print(data)
    
    # If in debug mode, take a small sample of the dataset
    if DEBUG_MODE and 'train' in data:
        print(f"DEBUG MODE: Sampling {DEBUG_SAMPLE_SIZE} examples from dataset")
        # Take a stratified sample if possible
        data['train'] = data['train'].shuffle(seed=42).select(range(min(DEBUG_SAMPLE_SIZE, len(data['train']))))
        print(f"Reduced dataset size: {len(data['train'])} examples")
        
except Exception as e:
    print(f"Fatal error loading dataset: {e}")
    raise


In [None]:
# Verify dataset structure and column names
def verify_dataset_structure(dataset):
    """Verify that the dataset has the expected structure and columns."""
    required_columns = ['repo_name', 'path', 'content']
    if 'train' not in dataset:
        print("WARNING: Dataset does not have a 'train' split.")
        return False
    
    missing_columns = [col for col in required_columns if col not in dataset['train'].column_names]
    if missing_columns:
        print(f"WARNING: Dataset is missing required columns: {missing_columns}")
        return False
    
    print("Dataset structure verification passed.")
    return True

# Verify dataset structure
dataset_valid = verify_dataset_structure(data)
if not dataset_valid:
    print("Dataset structure is not as expected. Proceeding with caution.")


In [None]:
# Load the Phi-3 tokenizer
try:
    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, model_max_length=MAX_LENGTH)
    # Add padding token if it doesn't exist
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    print(f"Tokenizer vocabulary size: {len(tokenizer)}")
    print(f"Tokenizer type: {tokenizer.__class__.__name__}")
except Exception as e:
    print(f"Error loading tokenizer: {e}")
    raise

In [None]:
def extract_file_type(path):
    """
    Extract the file type/category based on the file path and naming conventions in Swift projects.
    
    Args:
        path (str): The file path
        
    Returns:
        int: The category label (0-5)
    """
    path_lower = path.lower()
    filename = path.split('/')[-1].lower()
    
    # Category 0: Models - Data structures and model definitions
    if ('model' in path_lower or 
        'struct' in path_lower or 
        'entity' in path_lower or
        'data' in path_lower and 'class' in path_lower):
        return 0
    
    # Category 1: Views - UI related files
    elif ('view' in path_lower or 
          'ui' in path_lower or 
          'screen' in path_lower or 
          'page' in path_lower or
          'controller' in path_lower and 'view' in path_lower):
        return 1
    
    # Category 2: Controllers - Application logic
    elif ('controller' in path_lower or 
          'manager' in path_lower or 
          'coordinator' in path_lower or
          'service' in path_lower):
        return 2
    
    # Category 3: Utilities - Helper functions and extensions
    elif ('util' in path_lower or 
          'helper' in path_lower or 
          'extension' in path_lower or
          'common' in path_lower):
        return 3
    
    # Category 4: Tests - Test files
    elif ('test' in path_lower or 
          'spec' in path_lower or 
          'mock' in path_lower):
        return 4
    
    # Category 5: Configuration - Package and configuration files
    elif ('package.swift' in path_lower or 
          'config' in path_lower or 
          'settings' in path_lower or
          'info.plist' in path_lower):
        return 5
    
    # Default to category 3 (Utilities) if no clear category is found
    return 3

# Define category names for better readability
category_names = {
    0: "Models",
    1: "Views",
    2: "Controllers",
    3: "Utilities",
    4: "Tests",
    5: "Configuration"
}

# Apply the function to create labels
try:
    # Create a new column with the extracted labels
    labeled_data = data['train'].map(lambda example: {
        **example,
        'label': extract_file_type(example['path'])
    })
    
    # Count the distribution of labels
    label_counts = collections.Counter(labeled_data['label'])
    
    print("Label distribution:")
    for label, count in sorted(label_counts.items()):
        category_name = category_names.get(label, f"Unknown-{label}")
        print(f"Label {label} ({category_name}): {count} examples ({count/len(labeled_data)*100:.2f}%)")
    
    # Get unique labels
    unique_labels = sorted(label_counts.keys())
    num_labels = len(unique_labels)
    
    print(f"\nTotal unique labels: {num_labels}")
except Exception as e:
    print(f"Error in data preparation: {e}")
    raise


In [None]:
# Split the data into train, validation, and test sets
try:
    # Shuffle the data
    shuffled_data = labeled_data.shuffle(seed=42)
    
    # Split into train (80%), validation (10%), and test (10%)
    train_size = int(0.8 * len(shuffled_data))
    val_size = int(0.1 * len(shuffled_data))
    
    train_data = shuffled_data.select(range(train_size))
    val_data = shuffled_data.select(range(train_size, train_size + val_size))
    test_data = shuffled_data.select(range(train_size + val_size, len(shuffled_data)))
    
    print(f"Training set size: {len(train_data)}")
    print(f"Training set label distribution: {collections.Counter(train_data['label'])}")
    print(f"Validation set size: {len(val_data)}")
    print(f"Validation set label distribution: {collections.Counter(val_data['label'])}")
    print(f"Test set size: {len(test_data)}")
    print(f"Test set label distribution: {collections.Counter(test_data['label'])}")
except Exception as e:
    print(f"Error splitting data: {e}")
    raise


In [None]:
# Create instruction-based prompts for the model
def create_instruction_prompt(example):
    """Convert a code example into an instruction-based prompt for language learning."""
    code = example['content']
    label = example['label']
    category = category_names.get(label, f"Unknown-{label}")
    
    # Create different types of prompts to help the model learn the language
    prompt_types = [
        # Explain code functionality
        "Explain what this Swift code does and how it works:\n\n",
        
        # Identify patterns and features
        "Identify and explain the key Swift language features used in this code:\n\n",
        
        # Complete or extend code
        "Complete or extend this Swift code with appropriate functionality:\n\n",
        
        # Fix or improve code
        "Suggest improvements or best practices for this Swift code:\n\n",
        
        # Understand code structure
        f"This is a Swift {category.lower()} file. Explain its structure and purpose:\n\n",
        
        # Code generation tasks
        "Write a Swift function that accomplishes the same task as this code but more efficiently:\n\n",
        
        # Language understanding
        "Explain the Swift syntax and language features demonstrated in this code:\n\n",
        
        # Learning from examples
        "Study this Swift code example and explain what you can learn from it:\n\n"
    ]
    
    # Select a random prompt type
    import random
    instruction = random.choice(prompt_types)
    
    code_section = f"```swift\n{code}\n```\n\n"
    
    # Create the full prompt
    prompt = instruction + code_section
    
    # Create a detailed response based on the prompt type and code category
    if "Explain what this Swift code does" in instruction:
        response = f"This Swift code is a {category.lower()} file that "
        if category == "Models":
            response += "defines data structures and model objects. "
        elif category == "Views":
            response += "implements user interface components. "
        elif category == "Controllers":
            response += "manages application logic and coordinates between models and views. "
        elif category == "Utilities":
            response += "provides helper functions and extensions. "
        elif category == "Tests":
            response += "contains test cases to verify functionality. "
        elif category == "Configuration":
            response += "configures application settings and parameters. "
        
        response += "The code uses Swift syntax with "
        
        # Add some language-specific details based on code content
        if "class" in code:
            response += "class definitions, "
        if "struct" in code:
            response += "struct definitions, "
        if "func" in code:
            response += "function declarations, "
        if "var" in code:
            response += "variable declarations, "
        if "let" in code:
            response += "constant declarations, "
        if "guard" in code or "if let" in code:
            response += "optional unwrapping, "
        if "extension" in code:
            response += "extensions, "
        if "protocol" in code:
            response += "protocol implementations, "
            
        # Remove trailing comma and space if present
        if response.endswith(", "):
            response = response[:-2] + "."
        else:
            response += "various Swift features."
    
    elif "Identify and explain the key Swift language features" in instruction:
        response = "This Swift code demonstrates several key language features:\n\n"
        
        # Add language features based on code content
        features = []
        if "class" in code:
            features.append("1. **Classes**: Swift classes are reference types that support inheritance.")
        if "struct" in code:
            features.append("1. **Structs**: Swift structs are value types that are copied when assigned or passed.")
        if "protocol" in code:
            features.append("1. **Protocols**: Swift protocols define a blueprint of methods, properties, and requirements.")
        if "extension" in code:
            features.append("1. **Extensions**: Swift extensions add new functionality to existing types.")
        if "guard let" in code:
            features.append("1. **Guard statements**: Used for early returns and unwrapping optionals with cleaner code flow.")
        if "if let" in code:
            features.append("1. **Optional binding**: Using if-let to safely unwrap optional values.")
        if "enum" in code:
            features.append("1. **Enumerations**: Swift enums can contain methods and associated values.")
        if "func" in code and "->" in code:
            features.append("1. **Functions with return types**: Swift functions with explicit return type declarations.")
        if "var" in code and "?" in code:
            features.append("1. **Optional variables**: Variables that may contain a value or nil.")
        if "let" in code:
            features.append("1. **Constants**: Immutable values defined with let.")
        
        # If no specific features were identified, add a generic response
        if not features:
            features.append("1. **Swift syntax**: Basic Swift programming constructs and syntax.")
        
        # Join the features with newlines
        response += "\n".join(features)
    
    # Return the prompt and response
    return {
        "prompt": prompt,
        "response": response
    }

# Create instruction-based prompts for training
try:
    # Apply the function to create prompts and responses
    train_prompts = train_data.map(create_instruction_prompt)
    val_prompts = val_data.map(create_instruction_prompt)
    
    # Print some examples
    print("\nExample prompts and responses:")
    for i in range(min(3, len(train_prompts))):
        print(f"\nExample {i+1}:")
        print(f"Prompt:\n{train_prompts[i]['prompt'][:200]}...")
        print(f"Response:\n{train_prompts[i]['response'][:200]}...")
        print("-" * 40)
    
    print(f"\nCreated {len(train_prompts)} training prompts and {len(val_prompts)} validation prompts")
except Exception as e:
    print(f"Error creating prompts: {e}")
    raise


In [None]:
# Tokenize the prompts and responses for training
def tokenize_prompt_response(example):
    """Tokenize a prompt and response pair for training."""
    # Format as a chat-like instruction
    formatted_text = f"<|user|>\n{example['prompt']}\n<|assistant|>\n{example['response']}"
    
    # Tokenize the text
    tokenized = tokenizer(formatted_text, truncation=True, max_length=MAX_LENGTH, padding="max_length")
    
    # Set the labels to be the same as the input_ids for causal language modeling
    tokenized["labels"] = tokenized["input_ids"].copy()
    
    return tokenized

# Apply tokenization to the datasets
try:
    # Tokenize the training and validation datasets
    tokenized_train = train_prompts.map(
        tokenize_prompt_response,
        remove_columns=train_prompts.column_names,
        desc="Tokenizing training data"
    )
    
    tokenized_val = val_prompts.map(
        tokenize_prompt_response,
        remove_columns=val_prompts.column_names,
        desc="Tokenizing validation data"
    )
    
    print(f"Tokenized {len(tokenized_train)} training examples and {len(tokenized_val)} validation examples")
    
    # Print an example of tokenized data
    if len(tokenized_train) > 0:
        example = tokenized_train[0]
        print("\nExample of tokenized data:")
        print(f"Input IDs length: {len(example['input_ids'])}")
        print(f"Attention mask length: {len(example['attention_mask'])}")
        print(f"Labels length: {len(example['labels'])}")
        
        # Decode a portion to verify
        decoded = tokenizer.decode(example['input_ids'][:100])
        print(f"\nDecoded first 100 tokens: {decoded}...")
except Exception as e:
    print(f"Error tokenizing data: {e}")
    raise


In [None]:
# Load the Phi-3 model with quantization for efficiency
try:
    print(f"Loading model: {MODEL_NAME}")
    
    # Configure quantization for efficient training
    quantization_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_compute_dtype=torch.float16,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_use_double_quant=True
    )
    
    # Load the model with quantization
    model = AutoModelForCausalLM.from_pretrained(
        MODEL_NAME,
        quantization_config=quantization_config,
        device_map="auto",
        trust_remote_code=True
    )
    
    # Prepare the model for training with LoRA
    model = prepare_model_for_kbit_training(model)
    
    # Configure LoRA for efficient fine-tuning
    lora_config = LoraConfig(
        r=LORA_R,
        lora_alpha=LORA_ALPHA,
        lora_dropout=LORA_DROPOUT,
        bias="none",
        task_type="CAUSAL_LM",  # Using string literal instead of enum
        target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"]
    )
    
    # Apply LoRA to the model
    model = get_peft_model(model, lora_config)
    
    # Print model information
    print(f"Model loaded: {model.__class__.__name__}")
    print(f"Model parameters: {model.num_parameters():,} total, {model.num_parameters(True):,} trainable")
    print(f"Training {100 * model.num_parameters(True) / model.num_parameters():.2f}% of parameters")
    
    # Clean up memory
    cleanup_memory()
except Exception as e:
    print(f"Error loading model: {e}")
    raise


In [None]:
# Define training arguments
training_args = TrainingArguments(
    output_dir="./phi3_swift_results",
    num_train_epochs=NUM_EPOCHS,
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS,
    warmup_ratio=WARMUP_RATIO,
    weight_decay=WEIGHT_DECAY,
    learning_rate=LEARNING_RATE,
    # Removed evaluation_strategy and eval_steps as they're not supported in this version
    # Removed save_strategy and save_steps as they might not be supported
    # Removed load_best_model_at_end as it depends on evaluation_strategy
    logging_dir="./logs",
    logging_steps=100,
    save_total_limit=3,
    fp16=torch.cuda.is_available(),
    report_to="none",
    remove_unused_columns=False,
    push_to_hub=False,
    disable_tqdm=False,
    dataloader_num_workers=2,
    dataloader_pin_memory=True,
    group_by_length=True  # Group similar length sequences for efficiency
)

# Define early stopping callback
early_stopping_callback = EarlyStoppingCallback(
    early_stopping_patience=3,
    early_stopping_threshold=0.01
)

# Create data collator for language modeling
data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=False  # We're doing causal language modeling, not masked language modeling
)

# Create trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
    tokenizer=tokenizer,
    data_collator=data_collator,
    callbacks=[early_stopping_callback]
)

print("Training setup complete")

In [None]:
# Function to monitor system resources during training
def monitor_resources():
    process = psutil.Process(os.getpid())
    memory_info = process.memory_info()
    mem = psutil.virtual_memory()
    cpu_percent = psutil.cpu_percent(interval=0.1)
    
    print(f"\nSystem Resources:")
    print(f"CPU Usage: {cpu_percent}%")
    print(f"Process Memory: {memory_info.rss / 1024 / 1024:.2f} MB")
    print(f"System Memory: {mem.percent}% used, {mem.available / 1024 / 1024:.2f} MB available\n")

# Run training with more detailed monitoring
try:
    print("Starting training...")
    
    # Monitor resources before training
    print("Resources before training:")
    monitor_resources()
    
    # Start training with a timeout
    start_time = time.time()
    
    # Run training
    train_result = trainer.train()
    
    # Monitor resources after training
    print("Resources after training:")
    monitor_resources()
    
    # Print training results
    print(f"Training completed in {train_result.metrics['train_runtime']:.2f} seconds")
    print(f"Training loss: {train_result.metrics['train_loss']:.4f}")
    
    # Save the model
    trainer.save_model("./phi3_swift_model")
    print("Model saved to ./phi3_swift_model")
    
    # Clean up memory
    cleanup_memory()
    
except Exception as e:
    print(f"Error during training: {e}")
    
    # Print stack trace for debugging
    import traceback
    traceback.print_exc()
    
    # Monitor resources after error
    print("Resources after error:")
    monitor_resources()
    
    raise


In [None]:
# Test the model with Swift code examples
try:
    print("Testing the model with Swift code examples...")
    
    # Function to generate responses for test examples
    def generate_response(prompt):
        inputs = tokenizer(prompt, return_tensors="pt").to(device)
        with torch.no_grad():
            outputs = model.generate(
                inputs.input_ids,
                max_new_tokens=200,
                temperature=0.7,
                top_p=0.9,
                do_sample=True,
                pad_token_id=tokenizer.pad_token_id,
                eos_token_id=tokenizer.eos_token_id
            )
        response = tokenizer.decode(outputs[0], skip_special_tokens=True)
        # Extract just the assistant's response
        if "<|assistant|>" in response:
            response = response.split("<|assistant|>")[-1].strip()
        return response
    
    # Test prompts for different Swift language tasks
    test_prompts = [
        # Explain Swift syntax
        "<|user|>\nExplain the key features of Swift's optional unwrapping syntax:\n\n```swift\nfunc processName(_ name: String?) {\n    guard let unwrappedName = name else {\n        print("No name provided")\n        return\n    }\n    print("Hello, \(unwrappedName)!")\n}\n```\n<|assistant|>",
        
        # Code completion
        "<|user|>\nComplete this Swift function that calculates the factorial of a number:\n\n```swift\nfunc factorial(_ n: Int) -> Int {\n    // Add implementation here\n}\n```\n<|assistant|>",
        
        # Debugging help
        "<|user|>\nWhat's wrong with this Swift code and how can I fix it?\n\n```swift\nclass Person {\n    var name: String\n    var age: Int\n    \n    func greet() {\n        print("Hello, my name is \(name) and I am \(age) years old.")\n    }\n}\n\nlet person = Person()\nperson.greet()\n```\n<|assistant|>",
        
        # Swift best practices
        "<|user|>\nExplain Swift best practices for error handling:\n<|assistant|>"
    ]
    
    # Generate and print responses
    for i, prompt in enumerate(test_prompts):
        print(f"\nTest {i+1}:\n{'-'*40}")
        print(f"Prompt: {prompt.split('<|assistant|>')[0].replace('<|user|>', '')}")
        response = generate_response(prompt)
        print(f"\nResponse:\n{response}\n")
    
    print("\nTesting complete")
except Exception as e:
    print(f"Error during testing: {e}")
    import traceback
    traceback.print_exc()
