## add generated transcription (mandarin)

In [29]:
import re
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor
from jiwer import wer, cer

MODEL_ID = "jonatasgrosman/wav2vec2-large-xlsr-53-chinese-zh-cn"
CHARS_TO_IGNORE = [",", "?", "¿", ".", "!", "¡", ";", "；", ":", '""', "%", '"', "�", "ʿ", "·", "჻", "~", "՞",
                  "؟", "،", "।", "॥", "«", "»", "„", "“", "”", "「", "」", "‘", "’", "《", "》", "(", ")", "[", "]",
                  "{", "}", "=", "`", "_", "+", "<", ">", "…", "–", "°", "´", "ʾ", "‹", "›", "©", "®", "—", "→", "。",
                  "、", "﹂", "﹁", "‧", "～", "﹏", "，", "｛", "｝", "（", "）", "［", "］", "【", "】", "‥", "〽",
                  "『", "』", "〝", "〟", "⟨", "⟩", "〜", "：", "！", "？", "♪", "；", "/", "\\", "º", "−", "^", "'", "ʻ", "ˆ"]
chars_to_ignore_regex = f"[{re.escape(''.join(CHARS_TO_IGNORE))}]"

# Load ASR model and processor
processor = Wav2Vec2Processor.from_pretrained(MODEL_ID)
asr_model = Wav2Vec2ForCTC.from_pretrained(MODEL_ID)
asr_model.eval()

def clean_text(text):
    # Remove special characters and convert to uppercase for fair comparison
    return re.sub(chars_to_ignore_regex, "", text).upper()

def transcribe(audio_array, sampling_rate):
    # Resample to 16kHz if needed
    if sampling_rate != 16000:
        audio_array = librosa.resample(audio_array, orig_sr=sampling_rate, target_sr=16000)
        sampling_rate = 16000
    # Normalize audio
    if audio_array.dtype != 'float32':
        audio_array = audio_array.astype('float32')
    # Prepare input
    input_values = processor(audio_array, sampling_rate=sampling_rate, return_tensors="pt").input_values
    with torch.no_grad():
        logits = asr_model(input_values).logits
        pred_ids = torch.argmax(logits, dim=-1)
        transcription = processor.batch_decode(pred_ids)[0]
    return transcription.strip()

# Build new dataset with reference, prediction, label, and gender
asr_results = []
for audio, sampling_rate, label, text, gender in tqdm(combined_data_with_gender[:10], desc="ASR Unit Test"):  # Use [:10] for unit test, remove for full dataset
    pred_text = transcribe(audio, sampling_rate)
    asr_results.append({
        "reference": clean_text(text),
        "prediction": clean_text(pred_text),
        "label": label,
        "gender": gender
    })

# Separate Shanghai and Mandarin results
shanghai_refs = [row["reference"] for row in asr_results if row["label"] == 0]
shanghai_preds = [row["prediction"] for row in asr_results if row["label"] == 0]
mandarin_refs = [row["reference"] for row in asr_results if row["label"] == 1]
mandarin_preds = [row["prediction"] for row in asr_results if row["label"] == 1]

# Compute metrics
shanghai_wer = wer(shanghai_refs, shanghai_preds) if shanghai_refs else None
shanghai_cer = cer(shanghai_refs, shanghai_preds) if shanghai_refs else None
mandarin_wer = wer(mandarin_refs, mandarin_preds) if mandarin_refs else None
mandarin_cer = cer(mandarin_refs, mandarin_preds) if mandarin_refs else None

print(f"Shanghai WER: {shanghai_wer:.3f}" if shanghai_wer is not None else "No Shanghai samples")
print(f"Shanghai CER: {shanghai_cer:.3f}" if shanghai_cer is not None else "No Shanghai samples")
print(f"Mandarin WER: {mandarin_wer:.3f}" if mandarin_wer is not None else "No Mandarin samples")
print(f"Mandarin CER: {mandarin_cer:.3f}" if mandarin_cer is not None else "No Mandarin samples")

# Optional: inspect results
for row in asr_results:
    print(f"Label: {row['label']}, Gender: {row['gender']}")
    print(f"Reference: {row['reference']}")
    print(f"Prediction: {row['prediction']}")
    print("-" * 40)

ASR Unit Test: 100%|██████████| 10/10 [00:07<00:00,  1.26it/s]

Shanghai WER: 1.000
Shanghai CER: 0.894
No Mandarin samples
No Mandarin samples
Label: 0, Gender: male
Reference: 北京爱数智慧语音采集
Prediction: 不近也书资会许域切间
----------------------------------------
Label: 0, Gender: female
Reference: 北京爱数智慧语音采集
Prediction: 破听UNK手智慧理且救
----------------------------------------
Label: 0, Gender: male
Reference: 阿拉两个拧来聊聊金融方面呃
Prediction: 安了脸岸历列有着金用方闭的
----------------------------------------
Label: 0, Gender: male
Reference: 金融方面嘛
Prediction: 真入发给外来
----------------------------------------
Label: 0, Gender: male
Reference: 搿呃阿姨喃应该讲侬已经交关年数辣辣了解了
Prediction: 和爱意的应给段奴京这被理素了了表加来
----------------------------------------
Label: 0, Gender: male
Reference: 葛末吾辣辣金融方面已经有的三四年了
Prediction: 跟么无来的近方比经的谢死年了
----------------------------------------
Label: 0, Gender: male
Reference: 最少辰光阿拉是做撒呃喃有钞票就是到银行里保本保息
Prediction: 据说有猛光阿来主杀呢有扯破了子斗你那里薄泵抱士
----------------------------------------
Label: 0, Gender: female
Reference: 吾已经做了已经到八七年了
Prediction: 我经足累经的霸劝一来
-----------------------------




## Run pre-trained classification model

In [101]:
# Function to extract features
def extract_features(audio, sr, n_mfcc=40):
    mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=n_mfcc)
    return np.transpose(mfccs)  # Transpose to match (time, features) format

# Preprocess the combined dataset with tqdm
processed_data = []
for audio, sr, label in tqdm(combined_data, desc="Processing Combined Dataset"):
    features = extract_features(audio, sr)
    processed_data.append((features, label))

Processing Combined Dataset: 100%|██████████| 6792/6792 [00:35<00:00, 190.25it/s]


In [103]:
# Check the labels in processed_data
labels = [label for _, label in processed_data]
print(set(labels))  # Should print {0, 1}

{0, 1}


In [None]:
os.environ["CUDA_VISIBLE_DEVICES"]=""
device = torch.device("cpu")
#force CPU usage; no GPU available


In [None]:
loaded_model = torch.load('model9.model', map_location=device)
#download from https://github.com/Colt1990/chinese-dialect-recognition/blob/master/model9.model

In [29]:
print(loaded_model.keys())

odict_keys(['layer1.GRU.weight_ih_l0', 'layer1.GRU.weight_hh_l0', 'layer1.GRU.bias_ih_l0', 'layer1.GRU.bias_hh_l0', 'layer1.GRU.weight_ih_l1', 'layer1.GRU.weight_hh_l1', 'layer1.GRU.bias_ih_l1', 'layer1.GRU.bias_hh_l1', 'layer2.linear.weight', 'layer2.linear.bias', 'layer3.linear.weight', 'layer3.linear.bias'])


In [104]:
from torch.utils.data import DataLoader, TensorDataset

# Prepare tensors
features = [torch.tensor(f, dtype=torch.float32) for f, _ in processed_data]
labels = [torch.tensor(l, dtype=torch.long) for _, l in processed_data]

# Pad sequences to the same length
from torch.nn.utils.rnn import pad_sequence
padded_features = pad_sequence(features, batch_first=True)
labels = torch.tensor(labels)

# Create a DataLoader
dataset = TensorDataset(padded_features, labels)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

In [106]:
## LanNet Model: 

import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

class LanNet(nn.Module):
    def __init__(self, input_dim=40, hidden_dim=512, bn_dim=192, output_dim=2):
        super(LanNet, self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.bn_dim = bn_dim
        self.output_dim = output_dim

        self.layer1 = nn.Sequential()
        self.layer1.add_module('GRU', nn.GRU(self.input_dim, self.hidden_dim, num_layers=2, batch_first=True, bidirectional=False))
	
	
        self.layer2 = nn.Sequential()
        self.layer2.add_module('linear', nn.Linear(self.hidden_dim, self.bn_dim))

        self.layer3 = nn.Sequential()
        self.layer3.add_module('linear', nn.Linear(self.bn_dim,self.output_dim))

    # def forward(self, src, mask, target):
    #     batch_size, fea_frames, fea_dim = src.size()

    #     out_hidden, hidd = self.layer1(src)
    #     out_hidden = out_hidden.contiguous().view(-1, out_hidden.size(-1))   
    #     out_bn = self.layer2(out_hidden)
    #     out_target = self.layer3(out_bn)


    #     out_target = out_target.contiguous().view(batch_size, fea_frames, -1)
    #     mask = mask.contiguous().view(batch_size, fea_frames, 1).expand(batch_size, fea_frames, out_target.size(2))
    #     out_target_mask = out_target * mask
    #     out_target_mask = out_target_mask.sum(dim=1)/mask.sum(dim=1)
    #     predict_target = F.softmax(out_target_mask, dim=1)

    #     # Reshape target to match the dimensions of predict_target
    #     target = target.view(-1, 1)  # Reshape to [batch_size, 1]
	

    #     # 计算loss
    #     tar_select_new = torch.gather(predict_target, 1, target)
    #     ce_loss = -torch.log(tar_select_new) 
    #     ce_loss = ce_loss.sum() / batch_size

    #     # 计算acc
    #     _, predict = predict_target.max(dim=1)
    #     predict = predict.contiguous().view(-1,1)
    #     correct = predict.eq(target).float()       
    #     num_samples = predict.size(0)
    #     sum_acc = correct.sum().item()
    #     acc = sum_acc/num_samples

    #     return acc, ce_loss
    
    def forward(self, src, mask, target=None):
        batch_size, fea_frames, fea_dim = src.size()

        out_hidden, hidd = self.layer1(src)
        out_hidden = out_hidden.contiguous().view(-1, out_hidden.size(-1))   
        out_bn = self.layer2(out_hidden)
        out_target = self.layer3(out_bn)

        out_target = out_target.contiguous().view(batch_size, fea_frames, -1)
        mask = mask.contiguous().view(batch_size, fea_frames, 1).expand(batch_size, fea_frames, out_target.size(2))
        out_target_mask = out_target * mask
        out_target_mask = out_target_mask.sum(dim=1) / mask.sum(dim=1)
        predict_target = F.softmax(out_target_mask, dim=1)

        if target is None:
            # During evaluation, return only the predicted probabilities
            return predict_target

        # Reshape target to match the dimensions of predict_target
        target = target.view(-1, 1)  # Reshape to [batch_size, 1]

        # Compute loss
        tar_select_new = torch.gather(predict_target, 1, target)
        ce_loss = -torch.log(tar_select_new) 
        ce_loss = ce_loss.sum() / batch_size

        # Compute accuracy
        _, predict = predict_target.max(dim=1)
        predict = predict.contiguous().view(-1, 1)
        correct = predict.eq(target).float()
        num_samples = predict.size(0)
        sum_acc = correct.sum().item()
        acc = sum_acc / num_samples

        return acc, ce_loss

    

In [107]:
# Define the LanNet model
model = LanNet(input_dim=40, hidden_dim=512, bn_dim=192, output_dim=2)  # 2 classes: Shanghai and Mandarin
state_dict = torch.load('model9.model', map_location='cpu')

# Remove the weights for the final layer (layer3.linear) from the state_dict since we have only 2 classes
state_dict.pop('layer3.linear.weight')
state_dict.pop('layer3.linear.bias')

model.load_state_dict(state_dict, strict=False)  # Use strict=False to ignore missing keys
model.eval()  # Set the model to evaluation mode

LanNet(
  (layer1): Sequential(
    (GRU): GRU(40, 512, num_layers=2, batch_first=True)
  )
  (layer2): Sequential(
    (linear): Linear(in_features=512, out_features=192, bias=True)
  )
  (layer3): Sequential(
    (linear): Linear(in_features=192, out_features=2, bias=True)
  )
)

In [108]:

# Perform classification with tqdm
with torch.no_grad():
    for batch_features, batch_labels in tqdm(dataloader, desc="Classifying Batches"):
        mask = torch.ones(batch_features.size(0), batch_features.size(1), dtype=torch.float32)  # Create masks
        predictions, _ = model(batch_features, mask, batch_labels)

Classifying Batches: 100%|██████████| 425/425 [01:18<00:00,  5.44it/s]


In [109]:
predictions

0.375

In [110]:
# from tqdm import tqdm

# # Initialize variables to track accuracy
# correct_predictions = 0
# total_samples = 0

# # Perform classification and calculate accuracy
# with torch.no_grad():
#     for batch_features, batch_labels in tqdm(dataloader, desc="Evaluating Accuracy"):
#         # Create masks (all ones, assuming no padding)
#         mask = torch.ones(batch_features.size(0), batch_features.size(1), dtype=torch.float32)

#         # Get predictions from the model
#         predictions, _ = model(batch_features, mask, batch_labels)

#         # Get the predicted classes
#         predicted_classes = torch.argmax(predictions, dim=1)  # Shape: [batch_size]

#         # Compare with ground truth labels
#         correct_predictions += (predicted_classes == batch_labels).sum().item()
#         total_samples += batch_labels.size(0)

# # Calculate accuracy
# accuracy = correct_predictions / total_samples
# print(f"Accuracy: {accuracy * 100:.2f}%")

from tqdm import tqdm

# Initialize variables to track accuracy
correct_predictions = 0
total_samples = 0

# Perform classification and calculate accuracy
with torch.no_grad():
    for batch_features, batch_labels in tqdm(dataloader, desc="Evaluating Accuracy"):
        # Ensure batch_labels has the correct shape and type
        batch_labels = batch_labels.view(-1)  # Flatten to [batch_size]
        batch_labels = batch_labels.long()  # Ensure type is torch.long

        # Create masks (all ones, assuming no padding)
        mask = torch.ones(batch_features.size(0), batch_features.size(1), dtype=torch.float32)

        # Get predictions from the model
        predictions = model(batch_features, mask)  # Only predicted probabilities are returned

        # Get the predicted classes
        predicted_classes = torch.argmax(predictions, dim=1)  # Shape: [batch_size]

        # Compare with ground truth labels
        correct_predictions += (predicted_classes == batch_labels).sum().item()
        total_samples += batch_labels.size(0)

# Calculate accuracy
accuracy = correct_predictions / total_samples
print(f"Accuracy: {accuracy * 100:.2f}%")

Evaluating Accuracy: 100%|██████████| 425/425 [01:15<00:00,  5.62it/s]

Accuracy: 54.51%





### Fine-tuning

Note that the dialect data sizes are similar. Thus, our accuracy is no better than a coin flip. Unfortunate. 

1. Need to sort out features and break out gender, length of audio, age to see if impacts predictive accuracy/classification
2. Fine-tune and adjust parameters to find improvements
3. Implement denoising model on all to see if improves 
4. Add additional classes (dialects) to see if classification improves 

In [11]:
# # test fine-tuning

# # Fine-tune the model
# model.train()
# for epoch in range(10):  # Number of epochs
#     for batch_features, batch_labels in dataloader:
#         # Ensure batch_labels has the correct shape and type
#         batch_labels = batch_labels.view(-1, 1)  # Reshape to [batch_size, 1]
#         batch_labels = batch_labels.long()  # Ensure type is torch.long

#         # Create a mask (all ones, as no padding is applied here)
#         mask = torch.ones(batch_features.size(0), batch_features.size(1), dtype=torch.float32)

#         # Forward pass
#         _, loss = model(batch_features, mask, batch_labels)

#         # Backward pass and optimization
#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()

#     print(f"Epoch {epoch + 1}, Loss: {loss.item()}")


## Analyze linguistic components 