In [None]:
# IMPORTS
import os
from datasets import DatasetDict
from transformers import ViTImageProcessor, ViTForImageClassification, TrainingArguments, Trainer, EarlyStoppingCallback, set_seed as t_set_seed, AutoConfig
import torch
import torch.nn as nn
import torchaudio 
import torchaudio.transforms as T
import numpy as np
import json
from datasets import Dataset, disable_caching
import random
from PIL import Image, ImageOps
import evaluate
from transformers import EvalPrediction
from peft import LoraConfig, get_peft_model
from sklearn.metrics import confusion_matrix, roc_auc_score, classification_report, precision_recall_curve
from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import train_test_split
import seaborn as sns
import matplotlib.pyplot as plt
import shutil, os
import librosa
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
def set_seed(seed):
    random.seed(seed) 
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    t_set_seed(seed)
    

In [None]:
# Clear Hugging Face datasets cache (memory issues)
cache_dir = os.path.expanduser("~/.cache/huggingface/datasets")
if os.path.exists(cache_dir):
    print(f"Clearing Hugging Face datasets cache at {cache_dir} ...")
    shutil.rmtree(cache_dir)
    print("Cache cleared.")
else:
    print("No Hugging Face datasets cache found.")

No Hugging Face datasets cache found.


In [None]:
# PARAMETERS
training_size = int(input("Enter the amount of spectrograms per class to train on (0 for all): ")) # Number of spectrograms per class to use for training (0 for all)
print(f"Training size per class: {training_size if training_size > 0 else 'All available'}")
segments_path = "./segments"
model_output_dir = "./vit-base-manuai" # Fine-tuned model output directory
adapters_dir = "./manuai_lora_adapters" # LoRA adapters output directory
checkpoints_dir = "./manuai_checkpoints" # Checkpoints output directory
model_name = "google/vit-base-patch16-224-in21k"
# Check if fine-tuned model already exists
if os.path.exists(model_output_dir):
    model_name = model_output_dir
    print("Using existing fine-tuned model as base.")

processor = ViTImageProcessor.from_pretrained(model_name)
sample_rate = 22050
epochs = 25
batch_size = 16
n_proc = 3 # Number of processes for parallel processing
dataloader_num_workers=2 # Number of workers for data loading (during training)
seed = 42
segment_len = 4.0
lora_rank = 16
image_size = 224  # ViT base model image size
disable_caching() # Disable caching to avoid potential issues with large datasets
device = "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using device: {device}")

training_args = TrainingArguments(
    output_dir=checkpoints_dir, 
    learning_rate=1e-4,  # 0.0001, 5e-5 = 0.00005, 3e-5 = 0.00003
    lr_scheduler_type="cosine", # "linear", "cosine"
    warmup_ratio=0.1,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=epochs,
    gradient_accumulation_steps=2, # To simulate larger batch size (batch_size * gradient_accumulation_steps)
    weight_decay=0.01,
    eval_strategy="steps", # "steps" or "epoch"
    eval_steps=500, # Only if eval_strategy="steps"
    save_strategy="steps", # "steps" or "epoch"
    save_steps=500, # Only if save_strategy="steps"
    load_best_model_at_end=True,
    remove_unused_columns=False,
    bf16=False,
    logging_steps=100,
    report_to="tensorboard",
    save_total_limit=3,
    metric_for_best_model="eval_f1",
    greater_is_better=True,
    #label_smoothing_factor=0.05, # Label smoothing factor
    dataloader_pin_memory=False,
    #max_grad_norm=1.0,
    dataloader_num_workers=dataloader_num_workers,
)
set_seed(seed)

accuracy_metric = evaluate.load("accuracy")
precision_metric = evaluate.load("precision")
recall_metric = evaluate.load("recall")
f1_metric = evaluate.load("f1")

Training size per class: 1500
Using device: mps


KeyboardInterrupt: 

# Set up dataset
Defines the labels
Load all files
Assign ID to labels
Create dataset of `training_size` per specie, and augment if not enough samples.

In [None]:
def time_stretch_waveform(waveform, rate=1.1):
    rate = random.uniform(0.8, 1.2) if rate is None else rate
    waveform_np = waveform.squeeze().detach().cpu().numpy()
    if waveform_np.ndim > 1:
        waveform_np = waveform_np.mean(axis=0)  # Convert to mono
    stretched = librosa.effects.time_stretch(y=waveform_np, rate=rate)
    stretched_tensor = torch.tensor(stretched, dtype=waveform.dtype, device=waveform.device)
    if stretched_tensor.ndim == 1:
        stretched_tensor = stretched_tensor.unsqueeze(0)  # Ensure shape [1, time]
    return stretched_tensor

def augment_audio(sample, sample_rate, max_attempts=3):
    sample = sample.to(device)
    augmentations = [
        lambda x: T.PitchShift(sample_rate, n_steps=random.choice([-2, -1, 1, 2])).to(device)(x), # Change pitch by -2, -1, +1, or +2 semitones
        lambda x: x + torch.randn_like(x) * min(0.002, x.std().item() * 0.1), # Add Gaussian noise with stddev up to 10% of original signal's stddev, capped at 0.002
        lambda x: T.FrequencyMasking(freq_mask_param=random.randint(8, 16)).to(device)(x), # Apply frequency masking with max width of 16 bins
        lambda x: T.TimeMasking(time_mask_param=random.randint(8, 20)).to(device)(x), # Apply time masking with max width of 20 frames
        lambda x: time_stretch_waveform(x), # Time-stretch by a random rate between 0.8 and 1.2
    ]
    
    for attempt in range(max_attempts):
        num_aug = random.randint(2, 3)  # Apply 2 to 3 augmentations
        aug_funcs = random.sample(augmentations, num_aug)
        augmented = sample.clone()  # Preserve original sample
        
        for augment in aug_funcs:
            try:
                temp_augmented = augment(augmented)
                # Check if augmentation produces valid output
                if is_valid_waveform(temp_augmented, min_variance=1e-8, min_amplitude=1e-4):
                    augmented = temp_augmented
                else:
                    continue
            except Exception as e:
                print(f"Augmentation error: {e}")
                continue
        
        # Final validation before returning
        if is_valid_waveform(augmented, min_variance=1e-8, min_amplitude=1e-4):
            return augmented
        print(f"Attempt {attempt + 1} failed: aug_var={augmented.var().item():.6f}, aug_max={augmented.abs().max().item():.6f}")
    
    # If all attempts fail, return augmented sample
    print("All augmentation attempts failed, returning augmented sample")
    return augmented

def is_valid_waveform(waveform, min_variance=1e-8, min_amplitude=1e-4):
    # If waveform is empty or has low variance, it's invalid
    return waveform.abs().sum() > min_amplitude and waveform.var() > min_variance

def load_audio_segments():
    """
    Load exactly `training_size` samples per class.
    Uses augmentation to fill the gap if there aren't enough originals.
    """
    augmented_count = 0
    files_labels = {label: [] for label in labels}
    for root, dirs, files in os.walk(segments_path):
        for file in files:
            if file.endswith(".wav"):
                label = os.path.splitext(file)[0].split('_')[1]
                files_labels[label].append(os.path.join(root, file))
    for label in labels:
        print(f"Found {len(files_labels[label])} files for label '{label}'")
        files = files_labels[label]

        # Case 1: More files than training_size -> sample down
        if training_size > 0 and len(files) > training_size:
            selected_files = random.sample(files, training_size)
        else:
            selected_files = list(files)  # copy
        samples = []

        # Load original files
        for file_path in selected_files:
            waveform, sr = torchaudio.load(file_path)
            if sr != sample_rate:
                waveform = T.Resample(sr, sample_rate)(waveform)
                sr = sample_rate

            if not is_valid_waveform(waveform):
                print(f"Invalid original waveform for file: {file_path}")
                continue

            samples.append({
                "audio": {"array": waveform.squeeze().detach().cpu().numpy(), "path": file_path, "sampling_rate": sr},
                "label": label_to_id[label]
            })

        # Case 2: If need to augment more samples to reach training_size
        while len(samples) < training_size and len(selected_files) > 0:
            f = random.choice(selected_files)
            waveform, sr = torchaudio.load(f)
            if sr != sample_rate:
                waveform = T.Resample(sr, sample_rate)(waveform)
                sr = sample_rate

            if not is_valid_waveform(waveform):
                continue

            augmented = augment_audio(waveform, sr)
            if not is_valid_waveform(augmented):
                continue

            augmented_count += 1
            samples.append({
                "audio": {"array": augmented.squeeze().detach().cpu().numpy(), "path": "augmented.wav", "sampling_rate": sr},
                "label": label_to_id[label]
            })

        # Ensure exactly training_size (trim if overshot)
        samples = samples[:training_size]

        # Yield per-class samples
        for s in samples:
            yield s
    print(f"Total augmented samples created: {augmented_count}")

labels = sorted([d for d in os.listdir(segments_path) if not d.startswith('.')]) # Exclude hidden files
label_to_id = {lbl: i for i, lbl in enumerate(labels)}
id_to_label = {i: lbl for lbl, i in label_to_id.items()}

dataset = Dataset.from_generator(load_audio_segments, cache_dir=None)

print("Final label order:", labels)

if dataset:
    print(f"✅ Dataset created successfully with {len(dataset)} samples.")
else:
    print("❌ Dataset creation failed.")

# Generate and plot spectrograms

In [None]:
def convert_to_mel_spectrogram(sample, target_width=224, image_size=224):
    """
    Convert audio sample to Mel spectrogram for ViT-based bird sound classification.
    Ensures all images have consistent width for training.
    """
    spectrogram_mode = "delta3" # Options: "log-mel", "delta3"
    audio = sample["audio"]
    waveform = torch.tensor(audio["array"], dtype=torch.float32)

    if waveform.ndim == 1:
        waveform = waveform.unsqueeze(0)

    # Mel spectrogram
    mel_spec_transform = T.MelSpectrogram(
        sample_rate=sample_rate,
        n_fft=1024,
        hop_length=256,
        win_length=1024,
        n_mels=128,
        f_min=0,
        f_max=11000,
        power=2.0
    )

    mel_spec = mel_spec_transform(waveform).squeeze(0).numpy()
    y_db = librosa.power_to_db(mel_spec, ref=np.max)

    # Process spectrogram channels
    if spectrogram_mode == "log-mel":
        y = np.clip(y_db, -80, 0)
        y = ((y + 80) / 80 * 255).astype(np.uint8)
        y = np.stack([y] * 3, axis=-1)
    elif spectrogram_mode == "delta3":
        delta1 = librosa.feature.delta(y_db)
        delta2 = librosa.feature.delta(y_db, order=2)
        y = np.stack([y_db, delta1, delta2], axis=-1)
        y = ((y + 80) / 80 * 255).astype(np.uint8)
    else:
        raise ValueError(f"Unknown spectrogram mode: {spectrogram_mode}")

    # Convert to PIL image
    img = Image.fromarray(y).convert("RGB")

    # Resize height to target, scale width proportionally
    h = img.height
    w = img.width
    new_h = image_size
    new_w = int(w * (image_size / h))
    img_resized = img.resize((new_w, new_h), Image.BICUBIC)

    # Pad or crop width to target width
    if new_w < target_width:
        pad_left = (target_width - new_w) // 2
        pad_right = target_width - new_w - pad_left
        img_final = ImageOps.expand(img_resized, border=(pad_left, 0, pad_right, 0), fill=(0, 0, 0))
    else:
        # If width exceeds target, center crop
        left = (new_w - target_width) // 2
        img_final = img_resized.crop((left, 0, left + target_width, new_h))

    sample["image"] = img_final
    sample["log_mel"] = y_db
    return sample

def augment_spectrogram(spectrogram, time_mask_max=15, freq_mask_max=15, num_time_masks=2, num_freq_masks=2):
    """
    Apply SpecAugment-style masking to a spectrogram.
    """
    y = spectrogram.copy()
    
    H, W = y.shape[:2]
    
    # Apply frequency masks
    for _ in range(num_freq_masks):
        f = random.randint(0, freq_mask_max)
        f0 = random.randint(0, max(H - f, 1))
        if y.ndim == 2:
            y[f0:f0+f, :] = 0
        else:
            y[f0:f0+f, :, :] = 0
    
    # Apply time masks
    for _ in range(num_time_masks):
        t = random.randint(0, time_mask_max)
        t0 = random.randint(0, max(W - t, 1))
        if y.ndim == 2:
            y[:, t0:t0+t] = 0
        else:
            y[:, t0:t0+t, :] = 0
    
    return y

def transform(sample):
    """
    Convert PIL image to tensor using processor
    """
    # Augment with random probability (Not currently needed)
    if random.random() < 0.4:
        sample["image"] = Image.fromarray(augment_spectrogram(np.array(sample["image"])))
    inputs = processor(images=sample["image"], return_tensors="pt", do_normalize=True)
    
    # Remove batch dimension
    sample["pixel_values"] = inputs["pixel_values"].squeeze(0)
    return sample

dataset = dataset.map(
    convert_to_mel_spectrogram,
    remove_columns=["audio"],
    num_proc=n_proc,
    load_from_cache_file=False
)

dataset = dataset.map(
    transform,
    remove_columns=["image"],
    num_proc=n_proc,
    load_from_cache_file=False
)

print(f"Generated {len(dataset)} mel spectrograms.")
print(dataset.features)
print("\nData ready for training.")

In [None]:
# Display 9 random spectrograms and their labels
if os.path.exists("spectrograms/"):
    print(f"Directory spectrograms/ already exists. Deleting entire folder.")
    shutil.rmtree("spectrograms/")
os.makedirs("spectrograms/")
for plot_idx, i in enumerate(random.sample(range(len(dataset)), 9)):
    y_vis = np.array(dataset[i]["log_mel"])
    y_norm = (y_vis - y_vis.min()) / (y_vis.max() - y_vis.min())
    #plt.figure(figsize=(15, 4))
    plt.imshow(y_norm, aspect='auto', origin='lower')
    plt.title(f"Sample {i} - Label: {id_to_label[dataset[i]['label']]}")
    plt.axis("off")
    plt.colorbar()
    plt.savefig(f"spectrograms/sample_{id_to_label[dataset[i]['label']]}_{i}.png")
    plt.show()

# Set up Model architecture

In [None]:
# Split data into train, test, and validation sets
train_idx, temp_idx, train_labels, temp_labels = train_test_split(
    np.arange(len(dataset)), labels, test_size=0.2, stratify=labels, random_state=seed
)
val_idx, test_idx, val_labels, test_labels = train_test_split(
    temp_idx, temp_labels, test_size=0.5, stratify=temp_labels, random_state=seed
)
dataset = DatasetDict({
    'train': dataset.select(train_idx),
    'validation': dataset.select(val_idx),
    'test': dataset.select(test_idx)
})
for split in ['train', 'validation', 'test']:
    split_labels = dataset[split]['label']
    class_counts = np.bincount(split_labels)
    print(f"{split} class distribution:")
    for i, count in enumerate(class_counts):
        print(f"  {id_to_label[i]}: {count} ({100 * count / len(split_labels):.2f}%)")

In [None]:
# SET UP MODEL
def trainable_parameters(model):
    params, trainable = 0, 0
    
    for _, p in model.named_parameters():
        params += p.numel()
        trainable += p.numel() if p.requires_grad else 0

    return f"{model.__class__.__name__} trainable parameters: {trainable:,}/{params:,} ({100 * trainable / params:.2f}%)"

def collate_fn(batch):
    return {
        "pixel_values": torch.stack([torch.tensor(item["pixel_values"]) for item in batch]).to(device),
        "labels": torch.tensor([item["label"] for item in batch], dtype=torch.long).to(device)
    }

def compute_metrics(eval_pred: EvalPrediction):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)

    # Overall metrics
    acc = accuracy_metric.compute(predictions=preds, references=labels)["accuracy"]
    prec = precision_metric.compute(predictions=preds, references=labels, average="weighted")["precision"]
    rec = recall_metric.compute(predictions=preds, references=labels, average="weighted")["recall"]
    f1 = f1_metric.compute(predictions=preds, references=labels, average="weighted")["f1"]
    auc = roc_auc_score(labels, torch.nn.functional.softmax(torch.tensor(logits), dim=-1).numpy(), multi_class='ovr', average='macro')

    return {
        "accuracy": acc,
        "precision": prec,
        "recall": rec,
        "f1": f1,
        "auc": auc
    }

def focal_loss(inputs, targets, alpha=1, gamma=2, reduction='mean', weight=None):
    ce_loss = nn.CrossEntropyLoss(reduction='none', weight=weight, label_smoothing=training_args.label_smoothing_factor)(inputs, targets)
    pt = torch.exp(-ce_loss)
    focal_loss = alpha * (1 - pt) ** gamma * ce_loss

    if reduction == 'mean':
        return focal_loss.mean()
    elif reduction == 'sum':
        return focal_loss.sum()
    else:
        return focal_loss
    
class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, class_weights=None, **kwargs):
        labels = inputs.get("labels")
        outputs = model(**inputs)   
        logits = outputs.get("logits")
        #loss_fn = nn.CrossEntropyLoss(weight=class_weights, label_smoothing=training_args.label_smoothing_factor)
        #loss = loss_fn(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        loss = focal_loss(logits, labels, gamma=2.0, alpha=0.25, weight=class_weights) # alt. Focal loss
        return (loss, outputs) if return_outputs else loss

# Model Architecture
callbacks = [EarlyStoppingCallback(early_stopping_patience=4)]
config = AutoConfig.from_pretrained(
    "google/vit-base-patch16-224-in21k",
    num_labels=len(labels),
    id2label=id_to_label,
    label2id=label_to_id,
    hidden_dropout_prob=0.1, # Dropout for fully connected layers
    attention_probs_dropout_prob=0.1 # Dropout for attention layers
)

base_model = ViTForImageClassification.from_pretrained(
    model_name, 
    num_labels=len(labels),
    id2label=id_to_label,
    label2id=label_to_id
)

lora = LoraConfig(
    r=lora_rank,
    lora_alpha=lora_rank * 2,
    lora_dropout=0.1, # Dropout for LoRA layer
    bias="none", # No bias adaptation
    target_modules=["query", "key", "value", "dense"], # Attention & MLP layers 
    modules_to_save=["classifier"]
)

train_labels = dataset["train"]["label"]
class_weights = compute_class_weight(
    class_weight='balanced',
    classes=np.unique(train_labels),
    y=train_labels
)
class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device)

peft_model = get_peft_model(base_model, lora).to(device)

trainer = CustomTrainer(
    model=peft_model,
    args=training_args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["validation"],
    compute_metrics=compute_metrics,
    tokenizer=processor,
    data_collator=collate_fn,
    callbacks=callbacks,
)

print(f"ID to label mapping: {id_to_label}")
print(trainable_parameters(peft_model))
print("Model set-up complete. Ready to begin training...")

# Train the model

In [None]:
# TRAIN MODEL
result = trainer.train(resume_from_checkpoint=checkpoints_dir if os.path.exists(checkpoints_dir) else None)
trainer.log_metrics("train", result.metrics)
trainer.save_metrics("train", result.metrics)

# Evaluate & Save the model

In [None]:
# EVALUATE ON TEST SET 
metrics = trainer.evaluate(eval_dataset=dataset["test"])
trainer.log_metrics("test", metrics)
trainer.save_metrics("test", metrics)

In [None]:
# Save final model and processor
base_model.save_pretrained(model_output_dir) # Saves full fine-tuned model (ViTForImageClassification.from_pretrained("./vit-base-manuai"))
processor.save_pretrained(model_output_dir) # Saves image processor (ViTImageProcessor.from_pretrained("./vit-base-manuai"))
print(f"Model fine-tuned and saved to {model_output_dir}")
# Save LoRA adapters
peft_model.save_pretrained(adapters_dir) # Saves only LoRA adapters (PEFTModel.from_pretrained(base_model, "./manuai_lora_adapters"))
print(f"LoRA adapters saved to {adapters_dir}")

# Show Metrics of fine-tuned model

In [None]:
# MODEL METRIC EVALUATION REPORT
print(metrics)

# Per-class
predictions = trainer.predict(dataset["test"])
y_true = predictions.label_ids
y_pred = np.argmax(predictions.predictions, axis=-1)
logits = predictions.predictions  # shape (N, n_classes)
probs = torch.nn.functional.softmax(torch.tensor(logits), dim=-1).numpy()

accuracy_per_class = {}
for i, label_name in enumerate(labels):
    idx = (y_true == i) 
    accuracy_per_class[label_name] = (y_pred[idx] == y_true[idx]).mean()

print("Accuracy per class:")
for label, acc in accuracy_per_class.items():
    print(f"{label}: {acc:.2f}")

print("\nClassification Report:")
report = classification_report(y_true, y_pred, target_names=labels)
print(report)

# Confusion Matrix (plot)
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.savefig("confusion_matrix.png")
plt.show()

In [None]:
def apply_thresholds(probs, thresholds):
    preds = []
    for p in probs:
        # classes above their threshold
        above = [i for i, th in enumerate(thresholds) if p[i] >= th]
        if len(above) == 0:
            preds.append(int(np.argmax(p)))  # fallback
        elif len(above) == 1:
            preds.append(int(above[0]))
        else:
            # multiple candidates: pick the one with highest prob among them
            chosen = int(np.argmax(p[above]))
            preds.append(above[chosen])
    return np.array(preds)
# Find optimal thresholds per class based on F1 score
# Using precision-recall curve

n_classes = probs.shape[1]
best_thresholds = np.zeros(n_classes, dtype=float)

for i in range(n_classes):
    y_true_i = (y_true == i).astype(int)
    prob_i = probs[:, i]
    # if no positives or all positives, set threshold 0.5
    if y_true_i.sum() == 0 or y_true_i.sum() == len(y_true_i):
        best_thresholds[i] = 0.5
        continue
    precision, recall, thresholds = precision_recall_curve(y_true_i, prob_i)
    # thresholds length = len(precision) - compute F1 for the threshold points
    f1_scores = 2 * precision[:-1] * recall[:-1] / (precision[:-1] + recall[:-1] + 1e-12)
    if np.isnan(f1_scores).all():
        best_thresholds[i] = 0.5
    else:
        best_idx = np.nanargmax(f1_scores)
        best_thresholds[i] = thresholds[best_idx]

print("Best thresholds per class:", {id_to_label[i]: float(best_thresholds[i]) for i in range(n_classes)})

preds_thresh = apply_thresholds(probs, best_thresholds)
# Present classification report
thresh_report = classification_report(y_true, preds_thresh, target_names=labels)
print("Classification Report of Model with Optimal Thresholds:\n", thresh_report)
print()

# ZIP model & training files

In [None]:
# Gather all relevant info
training_info = {
    "hyperparameters": {
        "epochs": epochs,
        "batch_size": batch_size,
        "learning_rate": training_args.learning_rate,
        "model_name": model_name,
        "lora_rank": lora_rank,
        "seed": seed,
        "segment_len": segment_len,
        "image_size": image_size,
        "labels": labels,
    },
    "train_metrics": result.metrics if 'result' in locals() else None,
    "eval_metrics": metrics if 'metrics' in locals() else None,
    "per_class_accuracy": accuracy_per_class if 'accuracy_per_class' in locals() else None,
    "classification_report": report if 'report' in locals() else None,
    "best_thresholds": {id_to_label[i]: float(best_thresholds[i]) for i in range(len(best_thresholds))} if 'best_thresholds' in locals() else None,
}

# Save as JSON
with open("training_metrics.json", "w") as f:
    json.dump(training_info, f, indent=2)

print("Saved all training info to training_metrics.json")

In [None]:
# ZIP MODEL & ADAPTERS
import zipfile
!zip -r manuai_models.zip vit-base-manuai manuai_lora_adapters training_metrics.json confusion_matrix.png
print("Model, adapters, and training metrics zipped into manuai_models.zip")

with zipfile.ZipFile("manuai_models.zip", 'r') as zf:
    bad_file = zf.testzip()
    if bad_file is not None:
        print(f"Corrupted file in zip: {bad_file}.")