In [1]:
# ! pip install ftfy
import os
import sys
import glob

import librosa
import librosa.display

import simplejpeg
import numpy as np

import torch
import torchvision as tv

import pyaudio
import IPython.display as ipd
import wave

# import matplotlib.pyplot as plt

from PIL import Image
from IPython.display import Audio, display

sys.path.append(os.path.abspath(f'{os.getcwd()}/..'))

from model import AudioCLIP
from utils.transforms import ToTensor1D

In [2]:
torch.set_grad_enabled(False)

MODEL_FILENAME = 'AudioCLIP-Full-Training.pt'

# derived from ESResNeXt
SAMPLE_RATE = 44100

# derived from CLIP
IMAGE_SIZE = 224
IMAGE_MEAN = 0.48145466, 0.4578275, 0.40821073
IMAGE_STD = 0.26862954, 0.26130258, 0.27577711

LABELS = ['cat', 'thunderstorm', 'coughing', 'alarm clock', 'car horn', 'door wood knock', 'mouse click', 'keyboard typing', 'sneezing', 'laughing', 'dog', 'rain']

# Model Instantiation

aclp = AudioCLIP(pretrained=f'./assets/{MODEL_FILENAME}')

In [19]:
RATE = 44100
CHANNELS = 1
RECORD_SECONDS = 12
CHUNK = 1024
FORMAT = pyaudio.paInt16
LENGTH = 220500
FILENAME = "door_knock"
# CHUNK = int(RATE/20) # RATE / number of updates per second

p = pyaudio.PyAudio()

stream = p.open(format = FORMAT, 
                channels = CHANNELS, 
                rate = RATE, 
                input = True,
                frames_per_buffer = CHUNK)

frames = []

for _ in range (0, int(RATE / CHUNK * RECORD_SECONDS)):

        # Read audio data from the stream
        raw_data = stream.read(CHUNK)

        frames.append(raw_data)

        # Convert the raw data to a NumPy array
        # data = np.frombuffer(raw_data, dtype=np.int16).astype(np.float32)

        # Your processing or analysis code goes here
        # For example, print the length of the audio data
        # print(f"Received {len(data)} frames of audio data.")

# Close the audio stream and terminate PyAudio
stream.stop_stream()
stream.close()
p.terminate()

data = np.frombuffer(b''.join(frames), dtype=np.int16)

# files = f"./test_audio/{FILENAME}.wav"
# wf = wave.open(files, "wb")
# wf.setnchannels(CHANNELS)
# wf.setsampwidth(p.get_sample_size(pyaudio.paInt16))
# wf.setframerate(RATE)
# wf.writeframes(data)
# wf.close()


In [15]:
if data.dtype in (np.int8, np.uint8, np.int16, np.int32, np.int64):
        data_x = data.astype(np.float32) / (np.iinfo(data.dtype).max+1)

data_x.dtype

dtype('float32')

In [20]:
# data = np.frombuffer(b''.join(frames), dtype=np.int16)

files = "./test_audio/sound_wave.wav"
wf = wave.open(files, "wb")
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(pyaudio.paInt16))
wf.setframerate(RATE)
wf.writeframes(data)
wf.close()

In [6]:
# Audio & Image Transforms

audio_transforms = ToTensor1D()

image_transforms = tv.transforms.Compose([
    tv.transforms.ToTensor(),
    tv.transforms.Resize(IMAGE_SIZE, interpolation=Image.BICUBIC),
    tv.transforms.CenterCrop(IMAGE_SIZE),
    tv.transforms.Normalize(IMAGE_MEAN, IMAGE_STD)
])

In [22]:
# audio loading

paths_to_audio = glob.glob('./demo/audio/*.wav')

audio = []
for path_to_audio in paths_to_audio:
    # track = 1D numpy array of audio wave form, _ = samppling rate 
    track, _ = librosa.load(path_to_audio, sr=SAMPLE_RATE, dtype=np.float32)

    if len(track) < LENGTH:
    # Pad with zeros if the tensor is shorter
        track = torch.nn.functional.pad(track, (0, LENGTH - len(track)))
    elif len(track) > LENGTH:
    # Truncate if the tensor is longer
        track = track[:LENGTH]

    # compute spectrograms using trained audio-head (fbsp-layer of ESResNeXt)
    # thus, the actual time-frequency representation will be visualized
    spec = aclp.audio.spectrogram(torch.from_numpy(track.reshape(1, 1, -1)))
    spec = np.ascontiguousarray(spec.numpy()).view(np.complex64)
    pow_spec = 10 * np.log10(np.abs(spec) ** 2 + 1e-18).squeeze()

    audio.append((track, pow_spec))

In [24]:
# image loading
paths_to_images = glob.glob('./demo/images/*.jpg')

images = list()
for path_to_image in paths_to_images:
    with open(path_to_image, 'rb') as jpg:
        image = simplejpeg.decode_jpeg(jpg.read())
        images.append(image)

In [25]:
# AudioCLIP handles raw audio on input, so the input shape is [batch x channels x duration]
audio = torch.stack([audio_transforms(track.reshape(1, -1)) for track, _ in audio])
# standard channel-first shape [batch x channels x height x width]
images = torch.stack([image_transforms(image) for image in images])
# textual input is processed internally, so no need to transform it beforehand
text = [[label] for label in LABELS]



In [26]:
## Obtaining Embeddings

# AudioCLIP's output: Tuple[Tuple[Features, Logits], Loss]
# Features = Tuple[AudioFeatures, ImageFeatures, TextFeatures]
# Logits = Tuple[AudioImageLogits, AudioTextLogits, ImageTextLogits]

((audio_features, _, _), _), _ = aclp(audio=audio)
((_, image_features, _), _), _ = aclp(image=images)
((_, _, text_features), _), _ = aclp(text=text)

In [27]:
# Normalization of Embeddings
# The AudioCLIP's output is normalized using L2-norm

audio_features = audio_features / torch.linalg.norm(audio_features, dim=-1, keepdim=True)
image_features = image_features / torch.linalg.norm(image_features, dim=-1, keepdim=True)
text_features = text_features / torch.linalg.norm(text_features, dim=-1, keepdim=True)

In [28]:
## Obtaining Logit Scales
#Outputs of the text-, image- and audio-heads are made consistent using dedicated scaling terms for each pair of modalities.
#The scaling factors are clamped between 1.0 and 100.0.

scale_audio_image = torch.clamp(aclp.logit_scale_ai.exp(), min=1.0, max=100.0)
scale_audio_text = torch.clamp(aclp.logit_scale_at.exp(), min=1.0, max=100.0)
scale_image_text = torch.clamp(aclp.logit_scale.exp(), min=1.0, max=100.0)

In [29]:
## Computing Similarities
# Similarities between different representations of a same concept are computed using [scaled](#Obtaining-Logit-Scales) dot product (cosine similarity).

logits_audio_image = scale_audio_image * audio_features @ image_features.T
logits_audio_text = scale_audio_text * audio_features @ text_features.T
logits_image_text = scale_image_text * image_features @ text_features.T

In [30]:
### AUDIO Classification

print('\t\tFilename, Audio\t\t\tTextual Label (Confidence)', end='\n\n')

# calculate model confidence
confidence = logits_audio_text.softmax(dim=1)
for audio_idx in range(len(paths_to_audio)):
    # acquire Top-3 most similar results
    conf_values, ids = confidence[audio_idx].topk(1)

    # format output strings
    query = f'{os.path.basename(paths_to_audio[audio_idx]):>30s} ->\t\t'
    results = ', '.join([f'{LABELS[i]:>15s} ({v:06.2%})' for v, i in zip(conf_values, ids)])

    print(query + results)

		Filename, Audio			Textual Label (Confidence)

 alarm_clock_3-120526-B-37.wav ->		    alarm clock (97.82%)
                 audiofile.wav ->		    mouse click (42.57%)
     car_horn_1-24074-A-43.wav ->		    mouse click (36.61%)
           cat_3-95694-A-5.wav ->		            cat (99.73%)
     coughing_1-58792-A-24.wav ->		       coughing (76.33%)
                door_knock.wav ->		keyboard typing (49.13%)
     thunder_3-144891-B-19.wav ->		   thunderstorm (96.92%)


In [109]:
import json
from collections import defaultdict

config_path = "./protocols/audioclip-esc50.json"

config = json.load(open(config_path))
config = defaultdict(None, config)


In [111]:
model_class = config['Model']['class']
model_args = config['Model']['args']

optimizer_class = config['Optimizer']['class']
optimizer_args = config['Optimizer']['args']

if 'Scheduler' in config:
    scheduler_class = config['Scheduler']['class']
    scheduler_args = config['Scheduler']['args']
else:
    scheduler_class = None
    scheduler_args = None

dataset_class = config['Dataset']['class']
dataset_args = config['Dataset']['args']

transforms = config['Transforms']
performance_metrics = config['Metrics']


In [112]:
model_class

'model.audioclip.AudioCLIP'