## Tutorial for Audio_pseudo-UAV

In [None]:
import torch
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
import numpy as np
import torch.nn as nn
import random
from tqdm import tqdm
import torch.optim as optim
from dataloader.dataloader_tutorial import *
from network.audio_net import *
from utils import loss
%matplotlib inline
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [None]:
 # The path of the dataset
root = 'xxxxx'
# The path for the annotation file 
train_anno_path = "xxxxxx"
val_anno_path   =  "xxxxxx"

with open(train_anno_path, "r") as f:
    train_anno = f.readlines()
    print(train_anno)

with open(val_anno_path, "r") as f:
    val_anno = f.readlines()


uav_traindataset = UAVLoader(train_anno,root,dark_aug=1)
uav_valdataset = UAVLoader(val_anno,root,dark_aug=1,testing=1) 

train_dataloader = DataLoader(uav_traindataset,32, shuffle=True, num_workers=16, drop_last=True) 
val_dataloader = DataLoader(uav_valdataset, 32, shuffle=True, num_workers=16, drop_last=True)

index = 0
# Randomly select an index
spec,gt, pseudo_label = uav_traindataset.__getitem__(index)
print(gt)
print(pseudo_label)

plt.figure(figsize=(8, 4))
plt.subplot(1, 2, 1)
plt.imshow(spec.numpy()[0], aspect='auto')
plt.title("Spectrogram")


In [None]:
model = AudioNet(dropout_rate=0.2).to(device)

In [None]:
print(model)

In [None]:
def train_model(model, train_dataloader, optimizer, loss_train, loss_alpha, device):
    model.train()
    train_loss = 0
    for data in tqdm(train_dataloader, total=len(train_dataloader), unit='batch'):
        spec,  gt, pseudo_label = [d.to(device) for d in data]              
        optimizer.zero_grad()
        p = model(spec)
        loss_position = loss_train(p, gt, pseudo_label, loss_alpha)          
        loss = loss_position 
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    return train_loss / len(train_dataloader)

def validate_model(model, val_dataloader, loss_val, device):
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for data in val_dataloader:
            spec,  gt, pseudo_label = [d.to(device) for d in data]    
            p  = model(spec)
            loss_position = loss_val(p, gt)
            loss = loss_position 
            val_loss += loss.item()
    return val_loss / len(val_dataloader)

In [None]:
train_epoch = 100
loss_alpha = 0   
optimizer = optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.999))
loss_train  = loss.regression_loss_w_pseudo
loss_val  = loss.regression_loss

output_path = 'output/'
os.makedirs(output_path,exist_ok=True)

# Training loop
best_val_loss = float('inf')
for epoch in range(train_epoch):
    train_loss = train_model(model, train_dataloader, optimizer, loss_train, loss_alpha, device)
    val_loss = validate_model(model, val_dataloader, loss_val, device)

    print(f"Epoch {epoch + 1}/{train_epoch}, Train Loss: {train_loss}, Val Loss: {val_loss}")

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), os.path.join(output_path, 'model_best.pth'))


In [None]:

model.eval()
model.load_state_dict(torch.load('xxxx.pth'))

In [None]:
def evaluate(model,val_anno,audio_path,gt_path,device):
    # read data here
    gt_array = []
    predict_array = []

    for name in tqdm(val_anno, total=len(val_anno)):
        audio_name  = os.path.join(audio_path,name[:-4]+'npy')
        gt_name     = os.path.join(gt_path,name[:-4]+'npy')
    
        # audio   = np.load(audio_name[:])
        audio   = make_seq_audio(audio_path,name[:-4]+'npy')
        audio   = np.transpose(audio,[1,0])
        spec       = Audio2Spectrogram(audio,sr=46080)
        spec       = spec.float()

        gt      = np.load(gt_name)
        gt_array.append(gt)

        with torch.no_grad():

            spec = spec.to(device)
            p= model(spec.unsqueeze(0))
            p = p.cpu().detach().numpy()[0]
            predict_array.append(p)
           
    gt_array = np.array(gt_array)
    predict_array = np.array(predict_array)
    print(gt_array.shape,predict_array.shape)

    Dx = np.mean(np.abs(gt_array[:,0] - predict_array[:,0]))
    Dy = np.mean(np.abs(gt_array[:,1] - predict_array[:,1]))
    Dz = np.mean(np.abs(gt_array[:,2] - predict_array[:,2]))
    E = np.mean(np.sqrt(np.sum((gt_array - predict_array) ** 2, axis=1)))
    print(Dx,Dy,Dz,E)
    return gt_array, predict_array

In [None]:

gt_array, predict_array = evaluate(
    model, val_anno, 
    "xxxxx", 
    "xxxxx", 
    device
)

In [None]:
# Gaussian process to Smooth
import os
import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C
import matplotlib.pyplot as plt


def smooth_with_gp(points, kernel=None):
   

    if kernel is None:
        kernel = C(1.0, (1e-3, 1e3)) * RBF(10, (1e-2, 1e2))
    
    gp = GaussianProcessRegressor(kernel=kernel, alpha=0.1, n_restarts_optimizer=10)
    
    X = np.arange(len(points)).reshape(-1, 1)  # 输入变量

    gp.fit(X, y)
    

    smoothed_points, _ = gp.predict(X, return_std=True)
    
    return smoothed_points



smoothed_points = smooth_with_gp(predict_array)


Dx = np.mean(np.abs(gt_array[:, 0] - smoothed_points[:, 0]))
Dy = np.mean(np.abs(gt_array[:, 1] - smoothed_points[:, 1]))
Dz = np.mean(np.abs(gt_array[:, 2] - smoothed_points[:, 2]))
E = np.mean(np.sqrt(np.sum((gt_array - smoothed_points) ** 2, axis=1)))

print(Dx, Dy, Dz, E)


plt.figure(figsize=(10, 6))
plt.plot(predict_array[:, 0], predict_array[:, 1], 'b.', label='Original Data')
plt.plot(smoothed_points[:, 0], smoothed_points[:, 1], 'r-', label='Smoothed Data')
plt.legend()
plt.xlabel('X')
plt.ylabel('Y')
plt.title('Trajectory Smoothing with Gaussian Process')
plt.show()