In [12]:
import pandas as pd
from datasets import Dataset
import os
import librosa
from collections import defaultdict

# 1. 把音频读入并保存成数据集

# 获取当前工作目录
current_dir = os.getcwd()
# 设置当前工作目录,现在可以使用相对路径来操作文件了
os.chdir(current_dir)

# 设置文件夹路径
base_folder = "audio"

# 获取所有子文件夹的名字
subfolders = [f.name for f in os.scandir(base_folder) if f.is_dir()]

# 初始化字典，用于存储每个文件夹中的音频文件
data = defaultdict(list)

# 遍历每个子文件夹，获取音频文件名并存储到字典中
for subfolder in subfolders:
    subfolder_path = os.path.join(base_folder, subfolder)
    audio_files = [
        os.path.join(subfolder_path, f.name)
        for f in os.scandir(subfolder_path)
        if f.is_file() and f.name.endswith(".wav")
    ]

    data[subfolder] = audio_files

# 初始化一个空的列表，用于存储每个音频和其对应的类名
rows = []

# 遍历字典中的每个类别和对应的音频列表
for class_name, audio_list in data.items():
    for audio_name in audio_list:
        rows.append({"file": audio_name, "class": class_name})

# 将列表转换为DataFrame
df = pd.DataFrame(rows)

# 输出DataFrame
df

Unnamed: 0,file,class
0,audio/cat/胶州路口马路施工-53.wav,cat
1,audio/dog/胶州路口马路施工-53.wav,dog
2,audio/pig/胶州路口马路施工-53.wav,pig


In [13]:
from datasets import Dataset
import numpy as np

dataset_structure = {"audio": [], "label": []}
dataset_origin = Dataset.from_dict(dataset_structure)


for i in range(len(df)):
    wav_path = df.loc[i, "file"]
    data, sr = librosa.load(wav_path, sr=16000)
    new_data = {
        "audio": {"array": np.array(data), "path": wav_path, "sampling_rate": 16000},
        "label": df.loc[i, "class"],
    }

    dataset_origin = dataset_origin.add_item(new_data)

dataset_origin

Dataset({
    features: ['audio', 'label'],
    num_rows: 3
})

In [18]:
from datasets import ClassLabel

unique = df["class"].unique()
# 这个下面要用的！！！
num_classes = len(unique)
class_names = unique.tolist()

target_transform = ClassLabel(names=class_names)

# 使用dataset的.map方法来修改列的数据类型
column_name = "label"
dataset = dataset_origin.map(
    lambda example: {column_name: target_transform.str2int(example[column_name])}
)

# 将数据集转换为DataFrame
df = dataset.to_pandas()

# # 将DataFrame保存为Parquet文件
parquet_path = "./dataset/new_dataset.parquet"
df.to_parquet(parquet_path, index=False)

Map:   0%|          | 0/3 [00:00<?, ? examples/s]

In [17]:
from transformers import AutoProcessor, ASTModel
import torch
from datasets import Dataset, load_metric

# 2. 开始训练，上面都是在处理数据集

# model_checkpoint = "MIT/ast-finetuned-audioset-10-10-0.4593"
model_checkpoint = r"../checkpoint-95"
sampling_rate = 16000
metric = load_metric("accuracy")
# metric = evaluate.load("accuracy")

processor = AutoProcessor.from_pretrained(model_checkpoint)
model = ASTModel.from_pretrained(model_checkpoint)

Could not find image processor class in the image processor config or the model config. Loading based on pattern matching with the model's feature extractor configuration.


In [19]:
from datasets import Dataset, Features, Sequence, Value, ClassLabel
from datasets import load_dataset



# 定义特征的数据类型
features = Features(
    {
        "audio": {
            "array": Sequence(feature=Value(dtype="float32")),
            "path": Value(dtype="string"),
            "sampling_rate": Value(dtype="int64"),
        },
        "label": ClassLabel(num_classes=num_classes, names=class_names),
    }
)

dataset = load_dataset(
    "parquet", data_files=r"./dataset/new_dataset.parquet", features=features
)
dataset

Downloading data files:   0%|          | 0/1 [00:00<?, ?it/s]

Extracting data files:   0%|          | 0/1 [00:00<?, ?it/s]

Generating train split: 0 examples [00:00, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['audio', 'label'],
        num_rows: 3
    })
})

In [20]:
def split_train_test_val(csv_dataset):
    split_dataset = csv_dataset.train_test_split(test_size=0.15, seed=42)
    # Rename the default "test" split to "validation"
    split_dataset["validation"] = split_dataset.pop("test")
    split_dataset_test = split_dataset["validation"].train_test_split(
        test_size=0.5, seed=42
    )
    split_dataset["test"] = split_dataset_test["test"]
    split_dataset["validation"] = split_dataset_test["train"]
    return split_dataset


dataset = split_train_test_val(dataset["train"])
dataset

ValueError: With n_samples=1, test_size=0.5 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.

In [None]:
labels = dataset["train"].features["label"].names
label2id, id2label = dict(), dict()
for i, label in enumerate(labels):
    label2id[label] = str(i)
    id2label[str(i)] = label

In [None]:
import random
from IPython.display import Audio, display
import numpy as np

for _ in range(5):
    rand_idx = random.randint(0, len(dataset["train"]) - 1)
    example = dataset["train"][rand_idx]
    audio = example["audio"]

    print(f'Label: {(example["label"])}')
    print(
        f'Shape: {(np.array(audio["array"])).shape}, sampling rate: {audio["sampling_rate"]}'
    )
    display(Audio(audio["array"], rate=audio["sampling_rate"]))
    print()

In [21]:
from transformers import AutoFeatureExtractor


feature_extractor = AutoFeatureExtractor.from_pretrained(model_checkpoint)
feature_extractor

ASTFeatureExtractor {
  "do_normalize": true,
  "feature_extractor_type": "ASTFeatureExtractor",
  "feature_size": 1,
  "max_length": 1024,
  "mean": -4.2677393,
  "num_mel_bins": 128,
  "padding_side": "right",
  "padding_value": 0.0,
  "return_attention_mask": false,
  "sampling_rate": 16000,
  "std": 4.5689974
}

In [None]:
max_duration = 5.0


def preprocess_function(examples):
    audio_arrays = [x["array"] for x in examples["audio"]]
    inputs = feature_extractor(
        audio_arrays,
        sampling_rate=feature_extractor.sampling_rate,
        max_length=int(feature_extractor.sampling_rate * max_duration),
        truncation=True,
    )
    return inputs


preprocess_function(dataset["train"][:5])

In [None]:
encoded_dataset = dataset.map(
    preprocess_function, remove_columns=["audio"], batched=True
)
encoded_dataset

In [None]:
from transformers import AutoModelForAudioClassification, TrainingArguments, Trainer

num_labels = len(label2id)
model = AutoModelForAudioClassification.from_pretrained(
    model_checkpoint,
    num_labels=num_labels,
    ignore_mismatched_sizes=True,
    label2id=label2id,
    id2label=id2label,
)

In [None]:
import numpy as np

model_name = model_checkpoint.split("/")[-1]
batch_size = 4

args = TrainingArguments(
    f"{model_name}-finetuned",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=3e-5,
    per_device_train_batch_size=batch_size,
    gradient_accumulation_steps=4,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=5,
    warmup_ratio=0.1,
    logging_steps=10,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    push_to_hub=False,
)


def compute_metrics(eval_pred):
    """Computes accuracy on a batch of predictions"""
    predictions = np.argmax(eval_pred.predictions, axis=1)
    return metric.compute(predictions=predictions, references=eval_pred.label_ids)


trainer = Trainer(
    model,
    args,
    train_dataset=encoded_dataset["train"],
    eval_dataset=encoded_dataset["validation"],
    tokenizer=feature_extractor,
    compute_metrics=compute_metrics,
)

trainer.train()

In [None]:
trainer.evaluate()