## Imports & Constants

In [1]:
import os
import sys
import glob
import h5py

import numpy as np
from moviepy.editor import *

import torch
import torchvision as tv

sys.path.append(os.path.abspath(f'{os.getcwd()}/..'))

from model import AudioCLIP
from utils.transforms import ToTensor1D

from PIL import Image
from moviepy.editor import *
import librosa

from tqdm import tqdm

torch.set_grad_enabled(False)

MODEL_FILENAME = 'AudioCLIP-CFT.pt'
# derived from ESResNeXt
SAMPLE_RATE = 44100
# derived from CLIP
IMAGE_SIZE = 224
IMAGE_MEAN = 0.48145466, 0.4578275, 0.40821073
IMAGE_STD = 0.26862954, 0.26130258, 0.27577711

AVE_DATASET = ['bell', 'Male', 'Bark', 'aircraft', 'car', 'Female', 'Helicopter',
    'Violin', 'Flute', 'Ukulele', 'Fry food', 'Truck', 'Shofar', 'Motorcycle',
    'guitar', 'Train', 'Clock', 'Banjo', 'Goat', 'Baby', 'Bus',
    'Chainsaw', 'Cat', 'Horse', 'Toilet', 'Rodents', 'Accordion', 'Mandolin', 'background']
# AVE_DATASET = ['Church bell', 'Male speech, man speaking', 'Bark', 'Fixed-wing aircraft, airplane', 'Race car, auto racing', \
#                     'Female speech, woman speaking', 'Helicopter', 'Violin, fiddle', 'Flute', 'Ukulele', 'Frying (food)', 'Truck', 'Shofar', \
#                     'Motorcycle', 'Acoustic guitar', 'Train horn', 'Clock', 'Banjo', 'Goat', 'Baby cry, infant cry', 'Bus', 'Chainsaw',\
#                     'Cat', 'Horse', 'Toilet flush', 'Rodents, rats, mice', 'Accordion', 'Mandolin']
STANDARD_AVE_DATASET = ['Church bell', 'Male speech, man speaking', 'Bark', 'Fixed-wing aircraft, airplane', 'Race car, auto racing', \
                    'Female speech, woman speaking', 'Helicopter', 'Violin, fiddle', 'Flute', 'Ukulele', 'Frying (food)', 'Truck', 'Shofar', \
                    'Motorcycle', 'Acoustic guitar', 'Train horn', 'Clock', 'Banjo', 'Goat', 'Baby cry, infant cry', 'Bus', 'Chainsaw',\
                    'Cat', 'Horse', 'Toilet flush', 'Rodents, rats, mice', 'Accordion', 'Mandolin']
data_root = "../data/"

## Model Instantiation

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

In [3]:
aclp = AudioCLIP(pretrained=f'../assets/{MODEL_FILENAME}')
aclp.to(device)
aclp.eval()

AudioCLIP(
  (visual): ModifiedResNet(
    (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (avgpool): AvgPool2d(kernel_size=2, stride=2, padding=0)
    (relu): ReLU(inplace=True)
    (layer1): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      

## Audio & Image Transforms

In [4]:
audio_transforms = ToTensor1D()

image_transforms = tv.transforms.Compose([
    tv.transforms.ToTensor(),
    tv.transforms.Resize(IMAGE_SIZE, interpolation=Image.BICUBIC),
    tv.transforms.CenterCrop(IMAGE_SIZE),
    tv.transforms.Normalize(IMAGE_MEAN, IMAGE_STD)
])

In [5]:
def video_frame_sample(frame_interval, video_length, sample_num = 16):
    num = []
    for l in range(video_length):
        for i in range(sample_num):
            num.append(int(l * frame_interval + (i * 1.0 / sample_num) * frame_interval))

    return num

In [6]:
# Now for the supervised task
visual_feature_path = os.path.join(data_root, 'raw_images_data.h5')
audio_feature_path = os.path.join(data_root, 'raw_audio_data.h5')
labels_path = os.path.join(data_root, 'labels.h5')
sample_order_path = os.path.join(data_root, f'val_order.h5')

with h5py.File(labels_path, 'r') as f:
    labels = f['avadataset'][:]
with h5py.File(sample_order_path, 'r') as f:
    sample_order = f['order'][:]

In [28]:
visual_feature = h5py.File(visual_feature_path, 'r')['data']
audio_feature = h5py.File(audio_feature_path, 'r')['data']

output_audio = np.zeros((4143, 10, 1024), dtype=np.float16)
output_images = np.zeros((4143, 10, 1024), dtype=np.float16)

do_image = True
do_audio = False
for num in tqdm(range(len(labels))):
    # audio: np.ndarray = audio_feature[num]
    # # image: np.ndarray = visual_feature[num]
    
    # audio *= 32768.0
    # audio = np.expand_dims(audio, axis=[0,1])
    # # image = np.squeeze(image)
    
    # audio = torch.stack([audio_transforms(x.reshape(1, -1)) for x in audio])
    # # image = torch.stack([image_transforms(x) for x in image])
    
    # sample_idx = 0
    # audio_features = np.zeros((10, 1024))
    # for start_idx in range(0, audio.size(-1), SAMPLE_RATE):
    #     x = audio[:, :, start_idx:start_idx+SAMPLE_RATE]
    #     ((audio_sample, _, _), _), _ = aclp(audio=x)
    #     audio_features[sample_idx, :] = audio_sample.squeeze()
    #     sample_idx += 1
    # audio_features = torch.tensor(audio_features)

    # # ((_, image_features, _), _), _ = aclp(image=image)
    
    # output_audio[num, :, :] = audio_features
    # # output_images[num, :, :] = image_features
    
    if do_image:
        image: np.ndarray = visual_feature[num]
        
        image = np.squeeze(image)
        
        image = torch.stack([image_transforms(x) for x in image])
        image = image.to(device)
        ((_, image_features, _), _), _ = aclp(image=image)
        
        image_features = image_features.cpu()
        
        t = 0
        for start_frame in range(0,160,16):
            end_frame = start_frame + 16
            mean_feature = image_features[start_frame:end_frame].mean(dim=0)
            output_images[num, t, :] = mean_feature
            
            t += 1

            
    if do_audio:
        audio: np.ndarray = audio_feature[num]
        
        audio *= 32768.0
        audio = np.expand_dims(audio, axis=[0,1])
        
        audio = torch.stack([audio_transforms(x.reshape(1, -1)) for x in audio])
        
        sample_idx = 0
        audio_features = np.zeros((10, 1024))
        for start_idx in range(0, audio.size(-1), SAMPLE_RATE):
            x = audio[:, :, start_idx:start_idx+SAMPLE_RATE]
            ((audio_sample, _, _), _), _ = aclp(audio=x)
            audio_features[sample_idx, :] = audio_sample.cpu().squeeze()
            sample_idx += 1
        audio_features = torch.tensor(audio_features)
        
        output_audio[num, :, :] = audio_features

if do_image:    
    with h5py.File(data_root + 'CFT_visual_features.h5', 'w') as f:
        f.create_dataset(f'data', data=output_images)
if do_audio:
    with h5py.File(data_root + 'CFT_audio_features.h5', 'w') as f:
        f.create_dataset(f'data', data=output_audio)



  3%|▎         | 112/4143 [12:22<7:28:35,  6.68s/it]

## TEST

In [27]:
with h5py.File(data_root+ 'CFT_audio_features.h5', 'r') as f:
    CFT_audio_feature = f['data'][0]
with h5py.File(data_root+ 'CFT_visual_features.h5', 'r') as f:
    CFT_visual_feature = f['data'][0]
print(np.shape(CFT_audio_feature))
print(np.shape(CFT_visual_feature))
CFT_visual_feature

(10, 1024)
(160, 1024)


array([[-0.00557 , -0.00542 ,  0.02725 , ...,  0.007965, -0.02217 ,
         0.03763 ],
       [-0.00582 , -0.004826,  0.02736 , ...,  0.007187, -0.02246 ,
         0.03864 ],
       [-0.00923 , -0.005943,  0.03503 , ...,  0.00212 , -0.0105  ,
         0.05563 ],
       ...,
       [-0.005096, -0.00144 ,  0.02403 , ...,  0.001043, -0.0335  ,
         0.04544 ],
       [-0.002739, -0.01027 ,  0.01645 , ...,  0.01534 , -0.02931 ,
         0.03647 ],
       [-0.00884 , -0.01538 ,  0.0193  , ...,  0.01382 , -0.03455 ,
         0.0531  ]], dtype=float16)

In [116]:
with h5py.File(data_root+ 'CFT_audio_features.h5', 'r') as f:
    CFT_audio_feature = f['data'][0]
with h5py.File(data_root+ 'CFT_visual_features.h5', 'r') as f:
    CFT_visual_feature = f['data'][0]
((_, _, text_features), _), _ = aclp(text=STANDARD_AVE_DATASET)

# CFT_audio_feature = CFT_audio_feature.mean(axis=0)
# CFT_visual_feature = CFT_visual_feature.mean(axis=0)
CFT_audio_feature = CFT_audio_feature[1]
CFT_visual_feature = CFT_visual_feature[16]

CFT_audio_feature = np.expand_dims(CFT_audio_feature, axis=0)
CFT_visual_feature = np.expand_dims(CFT_visual_feature, axis=0)

audio_features = CFT_audio_feature / torch.linalg.norm(torch.from_numpy(CFT_audio_feature), dim=-1, keepdim=True)
image_features = CFT_visual_feature / torch.linalg.norm(torch.from_numpy(CFT_visual_feature), dim=-1, keepdim=True)
text_features = text_features / torch.linalg.norm(text_features, dim=-1, keepdim=True)

In [62]:
scale_audio_image = torch.clamp(aclp.logit_scale_ai.exp(), min=1.0, max=100.0)
scale_audio_text = torch.clamp(aclp.logit_scale_at.exp(), min=1.0, max=100.0)
scale_image_text = torch.clamp(aclp.logit_scale.exp(), min=1.0, max=100.0)


In [63]:
audio_features = audio_features.float()
image_features = image_features.float()
text_features = text_features.float()

In [64]:
audio_features = audio_features.to(device)
image_features = image_features.to(device)
text_features = text_features.to(device)

In [65]:
logits_audio_image = scale_audio_image * audio_features @ image_features.T
logits_audio_text = scale_audio_text * audio_features @ text_features.T
logits_image_text = scale_image_text * image_features @ text_features.T

In [66]:
print('\t\tFilename, Audio\t\t\tTextual Label (Confidence)', end='\n\n')

# calculate model confidence
confidence = logits_audio_text.softmax(dim=1)
for audio_idx in range(1):
    # acquire Top-3 most similar results
    conf_values, ids = confidence[audio_idx].topk(3)

    # format output strings
    # query = f'{os.path.basename(paths_to_audio[audio_idx]):>30s} ->\t\t'
    results = ', '.join([f'{STANDARD_AVE_DATASET[i]:>15s} ({v:06.2%})' for v, i in zip(conf_values, ids)])

    print(results)

		Filename, Audio			Textual Label (Confidence)

            Cat (14.35%),         Ukulele (10.20%),           Clock (09.05%)


In [100]:
with h5py.File(data_root+ 'raw_labels_data.h5', 'r') as f:
    labels = f['data'][0]

In [101]:
labels

array([[1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0