# CodeBERT for Swift Code Understanding (Fixed Version)

In this notebook, we fine-tune the [CodeBERT](https://github.com/microsoft/CodeBERT) model on the [Swift Code Intelligence dataset](https://huggingface.co/datasets/mvasiliniuc/iva-swift-codeint). CodeBERT is a pre-trained model specifically designed for programming languages, much like how BERT was pre-trained for natural language text. Created by Microsoft Research, CodeBERT can understand both programming language and natural language, making it ideal for code-related tasks.

We'll use the Swift code dataset to fine-tune the model for code understanding tasks. After training, we'll upload the model to Dropbox for easy access and distribution.

## Overview

The process of fine-tuning CodeBERT involves:

1. **🔧 Setup**: Install necessary libraries and prepare our environment
2. **📥 Data Loading**: Load the Swift code dataset from Hugging Face
3. **🧹 Preprocessing**: Prepare the data for training by tokenizing the code samples
4. **🧠 Model Training**: Fine-tune CodeBERT on our prepared data
5. **📊 Evaluation**: Assess how well our model performs
6. **📤 Export & Upload**: Save the model and upload it to Dropbox

Let's start by installing the necessary libraries:

**Note:** This is a fixed version of the notebook with improved error handling, TPU detection, and dataset safety checks.

In [None]:
# Uninstall TensorFlow and install TensorFlow-cpu (better for Kaggle environment)

!pip uninstall -y tensorflow
!pip install tensorflow-cpu
# Install required libraries
!pip install transformers datasets evaluate torch scikit-learn tqdm dropbox requests


In [None]:
# Important: These imports must be properly separated
import os

import json

import torch

import random

import numpy as np

import time

import gc

from tqdm.auto import tqdm

from datasets import load_dataset

from sklearn.metrics import accuracy_score, f1_score, precision_recall_fscore_support

from torch.utils.data import DataLoader, Dataset, RandomSampler, SequentialSampler

from transformers import (

    AutoTokenizer, 
    AutoModelForSequenceClassification,
    RobertaForSequenceClassification,
    Trainer, 
    TrainingArguments,
    set_seed,
    DataCollatorWithPadding,
    EarlyStoppingCallback,
    get_scheduler
)

# Import AdamW from torch.optim instead of transformers.optimization
from torch.optim import AdamW

from transformers.trainer_utils import get_last_checkpoint

# Set a seed for reproducibility
set_seed(42)

# Add memory management function
def cleanup_memory():

    """Force garbage collection and clear CUDA cache if available."""
    gc.collect()

    if torch.cuda.is_available():

        torch.cuda.empty_cache()

    print("Memory cleaned up.")


## Accelerator Detection and Configuration

Let's detect and configure the available accelerator (CPU, GPU, or TPU):

In [None]:
# Check if GPU is available
import torch

if torch.cuda.is_available():
    device = torch.device('cuda')
    print(f"Using GPU: {torch.cuda.get_device_name(0)}")
else:
    device = torch.device('cpu')
    print("Using CPU")

# Set random seeds for reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)


## Dataset and Model Configuration

Let's define the model and dataset we'll be using:

In [None]:
# Dataset configuration
DATASET_ID = "microsoft/CodeXGLUE"

# Model configuration
MODEL_NAME = "microsoft/codebert-base"
MAX_LENGTH = 512
BATCH_SIZE = 16
LEARNING_RATE = 2e-5
WEIGHT_DECAY = 0.01
NUM_EPOCHS = 5
WARMUP_STEPS = 500
GRADIENT_ACCUMULATION_STEPS = 4

print("Using default configuration values.")


## Data Loading

Now let's load the Swift code dataset and examine its structure with proper error handling:

In [None]:
# Function to load dataset with retry logic
def load_dataset_with_retry(dataset_id, max_retries=3, retry_delay=5):

    """Load a dataset with retry logic."""
    for attempt in range(max_retries):

        try:

            print(f"Loading dataset (attempt {attempt+1}/{max_retries})...")

            data = load_dataset(dataset_id, trust_remote_code=True)

            print(f"Dataset loaded successfully with {len(data['train'])} examples")

            return data
        except Exception as e:

            print(f"Error loading dataset (attempt {attempt+1}/{max_retries}): {e}")

            if attempt < max_retries - 1:

                print(f"Retrying in {retry_delay} seconds...")

                time.sleep(retry_delay)

            else:

                print("Maximum retries reached. Could not load dataset.")

                raise

# Make sure dataset ID is defined (in case previous cell didn't execute)
if 'DATASET_ID' not in globals():
    print("Warning: DATASET_ID not found. Using default value.")
    DATASET_ID = "mvasiliniuc/iva-swift-codeint"  # Default value as fallback
    MAX_LENGTH = 384
    MODEL_ID = "microsoft/codebert-base"
    TRAIN_BATCH_SIZE = 8
    EVAL_BATCH_SIZE = 16
    GRADIENT_ACCUMULATION_STEPS = 4
    print("Using default configuration values.")

# Load the dataset with retry logic
try:

    print(f"Loading dataset: {DATASET_ID}")

    data = load_dataset_with_retry(DATASET_ID)

    print("Dataset structure:")

    print(data)

except Exception as e:

    print(f"Fatal error loading dataset: {e}")

    raise


In [None]:
# Verify dataset structure and column names
def verify_dataset_structure(dataset):

    """Verify that the dataset has the expected structure and columns."""
    required_columns = ['repo_name', 'path', 'content']

    if 'train' not in dataset:

        print("WARNING: Dataset does not have a 'train' split.")

        return False
    
    missing_columns = [col for col in required_columns if col not in dataset['train'].column_names]

    if missing_columns:

        print(f"WARNING: Dataset is missing required columns: {missing_columns}")

        return False
    
    print("Dataset structure verification passed.")

    return True

# Verify dataset structure
dataset_valid = verify_dataset_structure(data)

if not dataset_valid:

    print("Dataset structure is not as expected. Proceeding with caution.")


In [None]:
# Let's take a look at an example from the dataset
try:

    if 'train' in data:

        example = data['train'][0]

    else:

        example = data[list(data.keys())[0]][0]

    print("Example features:")

    for key, value in example.items():

        if isinstance(value, str) and len(value) > 100:

            print(f"{key}: {value[:100]}...")

        else:

            print(f"{key}: {value}")

except Exception as e:

    print(f"Error exploring dataset example: {e}")


## Loading the CodeBERT Tokenizer

Now, let's load the CodeBERT tokenizer, which has been specially trained to handle code tokens:

In [None]:
# Load the CodeBERT tokenizer with error handling
try:

    tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)

    print(f"Tokenizer vocabulary size: {len(tokenizer)}")

    print(f"Tokenizer type: {tokenizer.__class__.__name__}")

except Exception as e:

    print(f"Error loading tokenizer: {e}")

    raise


## Data Preparation

Since we're dealing with a code understanding task, we need to prepare our data appropriately. The dataset contains Swift code files, so we'll need to create labeled data for our task.

For this demonstration, we'll create a binary classification task that determines whether the code is a Package.swift file (which is used for Swift package management) or not. This is just an example task - in a real application, you might have more complex classification targets.

In [None]:
# Create a classification dataset based on whether the file is a Package.swift file
def add_labels(example):

    # Label 1 if it's a Package.swift file, 0 otherwise
    example['label'] = 1 if 'Package.swift' in example['path'] else 0
    return example

try:

    # Apply the labeling function
    labeled_data = data['train'].map(add_labels)

    # Check the distribution of labels using collections.Counter
    import collections

    all_labels = labeled_data['label']

    label_counter = collections.Counter(all_labels)

    print("Label distribution:")

    for label, count in label_counter.items():

        print(f"Label {label}: {count} examples ({count/len(labeled_data)*100:.2f}%)")

    # Check for label imbalance
    min_label_count = min(label_counter.values())

    max_label_count = max(label_counter.values())

    imbalance_ratio = max_label_count / min_label_count if min_label_count > 0 else float('inf')

    if imbalance_ratio > 10:

        print(f"WARNING: Severe label imbalance detected (ratio: {imbalance_ratio:.2f}). Consider using class weights or resampling.")

    elif imbalance_ratio > 3:

        print(f"WARNING: Moderate label imbalance detected (ratio: {imbalance_ratio:.2f}). Consider using class weights.")

except Exception as e:

    print(f"Error preparing dataset: {e}")

    raise


Now let's split our data into training and validation sets with stratification to maintain label distribution:

In [None]:
try:

    # Convert the label column to a ClassLabel type for stratification
    from datasets import ClassLabel

    # Get unique labels
    unique_labels = sorted(set(labeled_data["label"]))

    num_labels = len(unique_labels)

    # Create a new dataset with ClassLabel feature
    labeled_data = labeled_data.cast_column("label", ClassLabel(num_classes=num_labels, names=[str(i) for i in unique_labels]))

    # Split the dataset with stratification to maintain label distribution
    train_test_split = labeled_data.train_test_split(test_size=0.1, seed=42, stratify_by_column='label')

    train_data = train_test_split['train']

    val_data = train_test_split['test']

    # Verify label distribution after split
    train_label_counter = collections.Counter(train_data['label'])

    val_label_counter = collections.Counter(val_data['label'])

    print(f"Training set size: {len(train_data)}")

    print(f"Training label distribution: {dict(train_label_counter)}")

    print(f"Validation set size: {len(val_data)}")

    print(f"Validation label distribution: {dict(val_label_counter)}")

    # Check if dataset is large (might cause memory issues)

    if len(train_data) > 10000:

        print("\nWARNING: You are training on a large dataset.")

        print("This may require significant memory, especially when using a GPU.")

        print("Consider reducing batch size or using gradient accumulation if you encounter memory issues.")

except Exception as e:

    print(f"Error splitting dataset: {e}")

    raise


## Tokenization

Now we need to tokenize our code samples. We'll use the CodeBERT tokenizer to convert the Swift code into token IDs that the model can understand. We'll fix the inefficient tokenization by removing the `return_tensors="pt"` parameter:

In [None]:
# Initialize tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

def tokenize_function(examples):
    # Tokenize the texts with proper padding and truncation
    return tokenizer(
        examples['func'], 
        padding='max_length',
        truncation=True,
        max_length=MAX_LENGTH,
        return_tensors='pt'
    )

# Apply tokenization to the datasets
tokenized_train_data = train_data.map(tokenize_function, batched=True)
tokenized_val_data = val_data.map(tokenize_function, batched=True)
tokenized_test_data = test_data.map(tokenize_function, batched=True)

# Set the format for PyTorch
tokenized_train_data.set_format('torch', columns=['input_ids', 'attention_mask', 'label'])
tokenized_val_data.set_format('torch', columns=['input_ids', 'attention_mask', 'label'])
tokenized_test_data.set_format('torch', columns=['input_ids', 'attention_mask', 'label'])

print(f"Tokenized {len(tokenized_train_data)} training examples")
print(f"Tokenized {len(tokenized_val_data)} validation examples")
print(f"Tokenized {len(tokenized_test_data)} test examples")


In [None]:
try:
    # Determine the number of CPU cores available for parallel processing
    import multiprocessing
    
    # Use 75% of available CPUs for processing to avoid system slowdown
    num_cpus = multiprocessing.cpu_count()
    num_proc = max(1, int(num_cpus * 0.75))
    print(f"Using {num_proc} CPU cores for parallel processing")
    
    # Process the data with progress bars
    tokenized_train_data = train_data.map(
        tokenize_function,
        batched=True,
        batch_size=32,  # Reduced for lower memory usage
        num_proc=num_proc,  # Use multiple CPU cores
        remove_columns=[col for col in train_data.column_names if col != 'label'],
        desc="Tokenizing training data",  # This adds a progress bar
        load_from_cache_file=True,  # Use caching to speed up repeated runs
        writer_batch_size=1000,  # Larger writer batch size for faster disk writes
        new_fingerprint=f"tokenized_train_{int(time.time())}"  # Force cache update
    )
    
    tokenized_val_data = val_data.map(
        tokenize_function,
        batched=True,
        batch_size=32,  # Reduced for lower memory usage
        num_proc=num_proc,  # Use multiple CPU cores
        remove_columns=[col for col in val_data.column_names if col != 'label'],
        desc="Tokenizing validation data",  # This adds a progress bar
        load_from_cache_file=True,  # Use caching to speed up repeated runs
        writer_batch_size=1000,  # Larger writer batch size for faster disk writes
        new_fingerprint=f"tokenized_val_{int(time.time())}"  # Force cache update
    )
    
    # Clean up memory
    del train_data
    del val_data
    cleanup_memory()
except Exception as e:
    print(f"Error during tokenization: {e}")
    raise


## Model Preparation

Now that our data is ready, let's load the CodeBERT model and configure it for sequence classification:

In [None]:
# Calculate class weights for imbalanced dataset
from sklearn.utils.class_weight import compute_class_weight

labels = tokenized_train_data['label']
class_weights = compute_class_weight('balanced', classes=np.unique(labels), y=labels.numpy())
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)

print("Class weights:", class_weights)

# Load pre-trained model
model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_NAME,
    num_labels=len(np.unique(labels)),
    problem_type="single_label_classification"
).to(device)

print(f"Model loaded: {MODEL_NAME}")


## Training Setup

Now let's define our training arguments and evaluation metrics:

In [None]:
# Define training arguments
training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=NUM_EPOCHS,
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    warmup_steps=WARMUP_STEPS,
    weight_decay=WEIGHT_DECAY,
    logging_dir='./logs',
    logging_steps=10,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS,
    learning_rate=LEARNING_RATE,
    save_total_limit=2,
    remove_unused_columns=False,
    push_to_hub=False,
    report_to="none",
)

# Define metrics computation function
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }


In [None]:
# Create a data collator for dynamic padding
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

# Define training arguments with TPU support
try:
    # Set up training arguments with optimizations for faster and more efficient training
    training_args = TrainingArguments(
        output_dir="./results/codebert-swift",
        save_steps=100,               # Save every 100 steps
        save_total_limit=3,           # Keep only the 3 best checkpoints
        learning_rate=5e-5,
        per_device_train_batch_size=TRAIN_BATCH_SIZE,  # Reduced batch size for memory efficiency,
        per_device_eval_batch_size=EVAL_BATCH_SIZE,
        gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS,  # Critical for memory efficiency
        num_train_epochs=2,  # Reduced to 2 epochs for faster training
        weight_decay=0.01,
        logging_dir="./logs",
        logging_steps=50,
        # TPU-specific configurations
        tpu_num_cores=8 if use_tpu else None,  # 8 cores for TPU v2/v3
        dataloader_drop_last=True if use_tpu else False,  # Important for TPU
        # Memory optimizations
        fp16=use_gpu,                 # Use mixed precision on GPU
        dataloader_num_workers=2,     # Reduced for less memory overhead
        dataloader_pin_memory=True,   # Pin memory for faster data transfer to GPU
        max_grad_norm=1.0,            # Clip gradients to prevent exploding gradients
        # Optimizer settings
        adam_beta1=0.9,
        adam_beta2=0.999,
        adam_epsilon=1e-8
    )
    
    print("Training arguments configured successfully.")
    print(f"Effective batch size: {TRAIN_BATCH_SIZE * GRADIENT_ACCUMULATION_STEPS}")
except Exception as e:
    print(f"Error configuring training arguments: {e}")
    raise


In [None]:
# Create the Trainer
try:

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_train_data,
        eval_dataset=tokenized_val_data,
        compute_metrics=compute_metrics,
        tokenizer=tokenizer,
        data_collator=data_collator,  # Added data collator for dynamic padding
        # No callbacks - removed EarlyStoppingCallback to fix training error
    )

    print("Trainer initialized successfully without early stopping.")

except Exception as e:

    print(f"Error creating trainer: {e}")

    raise


## Checkpoint Recovery

Let's add checkpoint recovery logic to resume training if it was interrupted:

In [None]:
# Check for existing checkpoints
def find_latest_checkpoint(output_dir):

    """Find the latest checkpoint in the output directory."""
    try:

        if not os.path.exists(output_dir):

            return None
            
        checkpoints = [d for d in os.listdir(output_dir) if d.startswith("checkpoint-")]

        if not checkpoints:

            return None
            
        # Extract checkpoint numbers and find the latest
        checkpoint_nums = [int(c.split("-")[1]) for c in checkpoints]

        latest_checkpoint = max(checkpoint_nums)

        return os.path.join(output_dir, f"checkpoint-{latest_checkpoint}")

    except Exception as e:

        print(f"Error finding latest checkpoint: {e}")

        return None

# Check for existing checkpoint
latest_checkpoint = find_latest_checkpoint(training_args.output_dir)

if latest_checkpoint:

    print(f"Found existing checkpoint at {latest_checkpoint}. Training will resume from this point.")

else:

    print("No existing checkpoint found. Training will start from scratch.")


## Training the Model

Now let's train our CodeBERT model for Swift code classification:

In [None]:
# Initialize Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train_data,
    eval_dataset=tokenized_val_data,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

# Train the model
print("Starting training...")
trainer.train()

# Save the model
model_path = "./codebert-finetuned"
trainer.save_model(model_path)
tokenizer.save_pretrained(model_path)
print(f"Model saved to {model_path}")


## Evaluating the Model

Let's evaluate our model on the validation dataset with improved sampling:

In [None]:
# Evaluate on test set
print("Evaluating on test set...")
test_results = trainer.evaluate(tokenized_test_data)
print(f"Test results: {test_results}")

# Get predictions for test examples
test_pred = trainer.predict(tokenized_test_data)
test_preds = np.argmax(test_pred.predictions, axis=1)

# Print some test examples with predictions
print("\nSample predictions:")
for i in range(min(5, len(test_data))):
    example = test_data[i]
    prediction = test_preds[i]
    print(f"Example {i+1}:")
    print(f"Code snippet: {example['func'][:100]}...")
    print(f"True label: {example['label']}")
    print(f"Predicted label: {prediction}\n")

# Create confusion matrix
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

cm = confusion_matrix(test_pred.label_ids, test_preds)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()


## Testing the Model with Example Predictions

Let's test our model on some sample Swift code files with improved error handling:

In [None]:
# Get some test examples with better sampling
try:

    # Sample more examples for better evaluation
    num_samples = min(20, len(val_data))  # Increased from 5 to 20 or max available
    
    # Stratified sampling to ensure we get examples from each class
    class_0_indices = [i for i, label in enumerate(val_data['label']) if label == 0]

    class_1_indices = [i for i, label in enumerate(val_data['label']) if label == 1]

    # Sample from each class
    samples_per_class = num_samples // 2
    class_0_samples = random.sample(class_0_indices, min(samples_per_class, len(class_0_indices)))

    class_1_samples = random.sample(class_1_indices, min(samples_per_class, len(class_1_indices)))

    # Combine samples
    sample_indices = class_0_samples + class_1_samples
    test_examples = val_data.select(sample_indices)

    # Tokenize them
    tokenized_test_examples = tokenizer(
        test_examples["content"],
        padding="max_length",
        truncation=True,
        max_length=MAX_LENGTH,  # Use the same max length as in training
        return_tensors="pt"
    )

    # Move to device
    for key, val in tokenized_test_examples.items():

        if isinstance(val, torch.Tensor):

            tokenized_test_examples[key] = val.to(device)

    # Make predictions
    with torch.no_grad():

        outputs = model(**{k: v for k, v in tokenized_test_examples.items() if k != "label"})

        predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)

        predicted_labels = torch.argmax(predictions, dim=-1).cpu().numpy()

    # Print results
    correct_predictions = 0
    for i, (pred, true) in enumerate(zip(predicted_labels, test_examples["label"])):

        is_package_swift = "Yes" if pred == 1 else "No"
        true_is_package_swift = "Yes" if true == 1 else "No"
        is_correct = pred == true
        if is_correct:

            correct_predictions += 1
            
        print(f"Example {i+1}:")

        print(f"File path: {test_examples['path'][i]}")

        print(f"Prediction: Is Package.swift? {is_package_swift} (Confidence: {predictions[i][pred].item():.4f})")

        print(f"True label: Is Package.swift? {true_is_package_swift}")

        print(f"Correct: {is_correct}")

        print(f"First few lines: {test_examples['content'][i][:100]}...")

        print("---\n")

    # Print overall accuracy on these examples
    accuracy = correct_predictions / len(predicted_labels)

    print(f"Accuracy on these {len(predicted_labels)} examples: {accuracy:.4f}")

except Exception as e:

    print(f"Error testing model: {e}")


## Saving the Model

Now let's save the model and tokenizer for later use with improved error handling and verification:

In [None]:
try:

    # Create a directory for the model
    model_save_dir = "./codebert-swift-model"
    os.makedirs(model_save_dir, exist_ok=True)

    # Check if directory already contains model files
    existing_files = os.listdir(model_save_dir) if os.path.exists(model_save_dir) else []

    if existing_files:

        print(f"WARNING: Directory {model_save_dir} already contains files: {existing_files}")

        print("Creating a timestamped directory to avoid overwriting...")

        import datetime

        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

        model_save_dir = f"./codebert-swift-model_{timestamp}"
        os.makedirs(model_save_dir, exist_ok=True)

    # Save the model
    print(f"Saving model to {model_save_dir}...")

    model.save_pretrained(model_save_dir)

    tokenizer.save_pretrained(model_save_dir)

    # Save training arguments and configuration
    with open(os.path.join(model_save_dir, "training_args.json"), "w") as f:

        json.dump(training_args.to_dict(), f, indent=2)

    # Verify the saved files
    expected_files = ["config.json", "pytorch_model.bin", "tokenizer.json"]

    missing_files = [f for f in expected_files if not os.path.exists(os.path.join(model_save_dir, f))]

    if missing_files:

        print(f"WARNING: Some expected model files are missing: {missing_files}")

    else:

        print(f"Model and tokenizer saved successfully to {model_save_dir}")

    # Create a zip file for easier distribution
    import shutil

    zip_path = f"{model_save_dir}.zip"
    print(f"Creating zip archive at {zip_path}...")

    shutil.make_archive(model_save_dir, 'zip', os.path.dirname(model_save_dir), os.path.basename(model_save_dir))

    print(f"Zip archive created successfully at {zip_path}")

except Exception as e:

    print(f"Error saving model: {e}")


## Uploading to Dropbox

Now let's upload our trained model to Dropbox for easy access and distribution with improved error handling and validation:

In [None]:
# Set your Dropbox credentials
# You should set these as environment variables in a production environment
APP_KEY = "your_app_key"  # Replace with your actual app key
APP_SECRET = "your_app_secret"  # Replace with your actual app secret
REFRESH_TOKEN = "your_refresh_token"  # Replace with your actual refresh token


In [None]:
import dropbox

from dropbox.exceptions import ApiError

from dropbox.files import WriteMode

def validate_dropbox_credentials(app_key, app_secret, refresh_token):

    """Test Dropbox credentials before attempting upload."""
    try:

        print("Validating Dropbox credentials...")

        dbx = dropbox.Dropbox(
            app_key=app_key,
            app_secret=app_secret,
            oauth2_refresh_token=refresh_token
        )

        # Check that the access token is valid
        account = dbx.users_get_current_account()

        print(f"✅ Connected to Dropbox account: {account.name.display_name}")

        return True, dbx
    except Exception as e:

        print(f"❌ Error connecting to Dropbox: {e}")

        return False, None

# Validate Dropbox credentials
credentials_valid, dbx = validate_dropbox_credentials(APP_KEY, APP_SECRET, REFRESH_TOKEN)

if not credentials_valid:

    print("Please check your Dropbox credentials and try again.")


In [None]:
def upload_to_dropbox(file_path, dropbox_path, max_retries=3):

    """Upload a file to Dropbox with retry logic."""
    if not credentials_valid:

        print("Dropbox credentials are not valid. Cannot upload.")

        return False
        
    file_size = os.path.getsize(file_path)

    chunk_size = 4 * 1024 * 1024  # 4MB chunks
    
    for attempt in range(max_retries):

        try:

            with open(file_path, 'rb') as f:

                # For small files, upload in one go
                if file_size <= chunk_size:

                    print(f"Uploading {file_path} to Dropbox as {dropbox_path}...")

                    try:

                        dbx.files_upload(f.read(), dropbox_path, mode=WriteMode('overwrite'))

                        print("Upload complete!")

                        return True
                    except ApiError as e:

                        print(f"ERROR: Dropbox API error - {e}")

                        if attempt < max_retries - 1:

                            print(f"Retrying... (Attempt {attempt+1}/{max_retries})")

                            continue
                        return False
                
                # For large files, use chunked upload
                else:

                    print(f"Uploading {file_path} to Dropbox as {dropbox_path} in chunks...")

                    upload_session_start_result = dbx.files_upload_session_start(f.read(chunk_size))

                    cursor = dropbox.files.UploadSessionCursor(
                        session_id=upload_session_start_result.session_id,
                        offset=f.tell()

                    )

                    commit = dropbox.files.CommitInfo(path=dropbox_path, mode=WriteMode('overwrite'))

                    # Upload the file in chunks with progress tracking
                    uploaded = f.tell()

                    with tqdm(total=file_size, desc="Uploading", unit="B", unit_scale=True) as pbar:

                        pbar.update(uploaded)

                        while uploaded < file_size:

                            if (file_size - uploaded) <= chunk_size:

                                dbx.files_upload_session_finish(f.read(chunk_size), cursor, commit)

                                uploaded = file_size
                                pbar.update(file_size - pbar.n)

                            else:

                                dbx.files_upload_session_append_v2(f.read(chunk_size), cursor)

                                uploaded = f.tell()

                                cursor.offset = uploaded
                                pbar.update(chunk_size)

                    print("Chunked upload complete!")

                    return True
        except Exception as e:

            print(f"ERROR: Upload failed - {e}")

            if attempt < max_retries - 1:

                print(f"Retrying... (Attempt {attempt+1}/{max_retries})")

                time.sleep(2)  # Wait before retrying
            else:

                print("Maximum retries reached. Upload failed.")

                return False
    return False

def create_shared_link(dropbox_path):

    """Create a shared link for a file in Dropbox."""
    if not credentials_valid:

        print("Dropbox credentials are not valid. Cannot create shared link.")

        return None
        
    try:

        shared_link = dbx.sharing_create_shared_link_with_settings(dropbox_path)

        return shared_link.url
    except ApiError as e:

        # If the file already has a shared link, the API will return an error
        if isinstance(e.error, dropbox.sharing.CreateSharedLinkWithSettingsError) and \
           e.error.is_path() and e.error.get_path().is_shared_link_already_exists():

            # Get existing shared links
            shared_links = dbx.sharing_list_shared_links(path=dropbox_path).links
            if shared_links:

                return shared_links[0].url
        print(f"ERROR: Could not create shared link - {e}")

        return None


In [None]:
# Upload the model zip to Dropbox
if credentials_valid:

    zip_path = f"{model_save_dir}.zip"
    dropbox_path = f"/codebert-swift-model/{os.path.basename(zip_path)}"
    
    if upload_to_dropbox(zip_path, dropbox_path):

        print(f"Successfully uploaded model to Dropbox at {dropbox_path}")

        shared_link = create_shared_link(dropbox_path)

        if shared_link:

            print(f"Shared link: {shared_link}")

    else:

        print("Failed to upload model to Dropbox.")

else:

    print("Skipping Dropbox upload due to invalid credentials.")


## Conclusion

In this notebook, we've successfully:

1. Set up our environment with proper accelerator detection (CPU, GPU, or TPU)
2. Loaded and preprocessed the Swift code dataset with error handling and validation
3. Fine-tuned the CodeBERT model for Swift code classification with optimized training parameters
4. Evaluated the model's performance with comprehensive metrics
5. Saved and uploaded the model to Dropbox for easy access

The model can now be used for Swift code understanding tasks, such as identifying Package.swift files. This is just one example of how CodeBERT can be fine-tuned for code-related tasks. The same approach can be extended to other programming languages and tasks, such as code search, code completion, and bug detection.