In [241]:
import soundcard as sc
import soundfile as sf
import cv2
import numpy as np
import time
import keyboard

import torch
import torchaudio

import torch.optim as optim
import torch.nn as nn
from torchvision import utils, datasets
import torchvision.transforms as T
import torch.nn.functional as F

import matplotlib.pyplot as plt

import os

from datetime import datetime

import math

In [36]:
class RealTimeAudioStream:
    def __init__(self, sample_rate = 44100, window_size = 1024, overlap = 512, buffer_seconds = 5, cv2_window_size = (256, 512)):
        
        self.cv2_window_size = cv2_window_size # (H, W)

        self.done = None
        self.current_rms = None
        self.current_zcr = None

        self._d =  torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.sample_rate = (sample_rate // window_size) * window_size # samples per seconds, a.k.a [Hz]
        print(f"RealTimeAudioStream initialized with {self.sample_rate} sample rate")
        self.window_size = window_size # samples per processsing step
        self.overlap = overlap # overlap

        self._mic = sc.get_microphone(id=str(sc.default_speaker().name), include_loopback=True)
        self._num_channels = self._mic.channels

        self._buffer_size = self.sample_rate * buffer_seconds # samples memory size
        self._buffer_wav = torch.zeros((self._buffer_size, self._num_channels), dtype=float, device=self._d)

        self.buffer_rms = torch.zeros((self._buffer_size // overlap, self._num_channels), dtype=float, device=self._d)
        self.buffer_zcr = torch.zeros((self._buffer_size // overlap, self._num_channels), dtype=float, device=self._d)

    def _rms(self):
        current = self._buffer_wav[-self.window_size:, :]
        self.current_rms = current.pow(2).mean(0, keepdim=True).sqrt()
        self.buffer_rms = torch.cat((self.buffer_rms, self.current_rms), dim=0)[1:,:]

    def _zcr(self):
        current = self._buffer_wav[-self.window_size:, :]
        self.current_zcr = (torch.diff(current > 0, dim=0).type(torch.int).abs() > 0).sum(dim=0, keepdim=True) / self.window_size
        self.buffer_zcr = torch.cat((self.buffer_zcr, self.current_zcr), dim=0)[1:,:]

    def _vis(self):

        show_R = self.buffer_rms
        show_G = self.buffer_zcr

        W1, C = show_R.shape

        H, W = self.cv2_window_size
        sound_R = torch.clamp(  H - ((show_R + 1) * H // 2)    , 0, H-1).type(torch.LongTensor)[:,0] # [W1]
        sound_G = torch.clamp(  H - ((show_G + 1) * H // 2)    , 0, H-1).type(torch.LongTensor)[:,0] # [W1]
        image = torch.zeros((H, W1, 3), dtype=float) # [H, W1]

        image[sound_R, torch.arange(0, W1), 2] = 1
        image[sound_G, torch.arange(0, W1), 1] = 1

        image = cv2.resize(image.cpu().numpy(), (W, H))

        return image

    def step(self, mic):
        self._current = torch.from_numpy(mic.record(numframes=self.overlap)).to(self._d) # [window_size, num_channels] ~ [1024, 2]

        self._buffer_wav = torch.cat((self._buffer_wav, self._current), dim=0)[self.overlap:,:]
        self._rms()
        self._zcr()

        keyboard.on_press_key("ESC", lambda _: self._done())

        return self.current_rms, self.current_zcr
    
    def get_recorder(self):
        return self._mic.recorder(samplerate=self.sample_rate)
    
    def _done(self):
        self.done = True

    def stream(self):
        self.done = False

        with self.get_recorder() as mic:
            while not self.done:
                self.step(mic)
                cv2.imshow('stream', self._vis())

                k = cv2.waitKey(33)
                if k==27:    # Esc key to stop
                    self.done = True
                    cv2.destroyAllWindows()
                    break

            cv2.destroyAllWindows()

In [246]:
audio_stream = RealTimeAudioStream()
vae = torch.load(r"models\vae_mnist_2dim.pth")
vae.eval()

angle = 0

with torch.no_grad():
    with audio_stream.get_recorder() as mic:
        while not audio_stream.done:
            angle += 2
            angle_r = math.radians(angle)
            
            r_m = torch.tensor([[math.cos(angle_r), -math.sin(angle_r)], 
                                [math.sin(angle_r), math.cos(angle_r),]])

            rms, zcr = audio_stream.step(mic)

            rms = (rms.mean().item() - 0.15) * 20 # 0.3
            zcr = (zcr.mean().item() - 0.07) * 40# 0.14
            print(angle_r, rms, zcr, end="\r")
            z = (torch.tensor([rms, zcr]) @ r_m).unsqueeze(0).to("cuda") 
            sample = vae.decoder(z)

            image = sample[0].permute(1, 2, 0).detach().cpu().numpy()
            image = cv2.resize(image, (256, 256))

            cv2.imshow("generation", image)

            k = cv2.waitKey(33)
            if k==27:    # Esc key to stop
                cv2.destroyAllWindows()
                break

cv2.destroyAllWindows()


#audio_stream.stream()

RealTimeAudioStream initialized with 44032 sample rate
28.97246558310587 1.6812304722883726 3.371874999999999702773

In [None]:
batch_size_train = 256
batch_size_test = 256

torch.backends.cudnn.enabled = False

# MNIST Dataset
train_dataset = datasets.MNIST(root='./mnist_data/', train=True, transform=T.ToTensor(), download=True)
test_dataset = datasets.MNIST(root='./mnist_data/', train=False, transform=T.ToTensor(), download=False)


# Data Loader (Input Pipeline)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size_train, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size_test, shuffle=False)

examples = enumerate(test_loader)
batch_idx, (example_data, example_targets) = next(examples)
example_data.shape

torch.Size([256, 1, 28, 28])

In [168]:
class VAE(nn.Module):
    def __init__(self, sample_x, hidden_dims, z_dim):
        super(VAE, self).__init__()

        modules = []

        if hidden_dims is None:
            self.hidden_dims = [2, 4, 8]

        # Build Encoder
        in_channels, out_size, out_size = sample_x.shape
        K, S, P = 3, 2, 1
        for h_dim in self.hidden_dims:
            modules.append(
                nn.Sequential(
                    nn.Conv2d(in_channels, out_channels=h_dim,
                              kernel_size = K, stride= S, padding = P),
                    nn.BatchNorm2d(h_dim),
                    nn.LeakyReLU())
            )
            out_size = int(((out_size - K + (2*P)) / S) + 1)

            in_channels = h_dim

        self.encoder_layers = nn.Sequential(*modules)
        
        self.fc_mu = nn.Linear(self.hidden_dims[-1]*out_size*out_size, z_dim) # [B, Z_dim]
        self.fc_var = nn.Linear(self.hidden_dims[-1]*out_size*out_size, z_dim) # [B, Z_dim]

        # Build Decoder
        modules = []

        self.decoder_input = nn.Linear(z_dim, self.hidden_dims[-1] * out_size * out_size)

        self.hidden_dims.reverse()

        for i in range(len(self.hidden_dims) - 1):
            modules.append(
                nn.Sequential(
                    nn.ConvTranspose2d(self.hidden_dims[i],
                                       self.hidden_dims[i + 1],
                                       kernel_size=K,
                                       stride = S,
                                       padding = P,
                                       output_padding = 1),
                    nn.BatchNorm2d(self.hidden_dims[i + 1]),
                    nn.LeakyReLU())
            )



        self.decoder_layers = nn.Sequential(*modules)

        self.final_layer = nn.Sequential(
                            nn.ConvTranspose2d(self.hidden_dims[-1],
                                               self.hidden_dims[-1],
                                               kernel_size=3,
                                               stride=2,
                                               padding=1,
                                               output_padding=1),
                            nn.BatchNorm2d(self.hidden_dims[-1]),
                            nn.LeakyReLU(),
                            nn.Conv2d(self.hidden_dims[-1], out_channels=1,
                                      kernel_size= 7, padding= 1),
                            nn.Sigmoid())
        
    def encoder(self, x):
        h = self.encoder_layers(x)
        h = h.reshape(h.shape[0], -1)
        return self.fc_mu(h), self.fc_var(h) # mu, log_var
    
    def sampling(self, mu, log_var):
        std = torch.exp(0.5*log_var)
        eps = torch.randn_like(std)
        return eps.mul(std).add_(mu) # return z sample = g(x, eps)
        
    def decoder(self, z):
        h = self.decoder_input(z)
        B, CHW = h.shape
        C = self.hidden_dims[0]
        HW = int((CHW // C)**0.5)
        h = h.reshape(B, C, HW, HW)
        h = self.decoder_layers(h)
        h = self.final_layer(h)
        return h
    
    def forward(self, x):
        mu, log_var = self.encoder(x)
        z = self.sampling(mu, log_var)
        return self.decoder(z), mu, log_var


In [169]:
def eval_on_test(model, test_loader):
    model.eval()

    with torch.no_grad():
        latents = []
        labels = []
        for x, y in test_loader:
            mu, log_var = model.encoder(x.cuda())
            z = model.sampling(mu, log_var).cpu().numpy()

            latents.append(z)
            labels.append(y)

    latents = np.concatenate(latents, 0)
    labels = np.concatenate(labels, 0)
    model.train()

    return latents, labels

In [223]:
def visualize_latent_space(model, loss_items, experiment_name, test_loader):

    latents, labels = eval_on_test(model, test_loader)
    
    now = datetime.now()
    pic_name = now.strftime("%Y%m%d%H%M%S%f")

    extent = 5

    cmap = plt.cm.tab20
    bounds = np.linspace(0,10,11)
    fig, ax = plt.subplots()

    if extent is not None: 
        ax.set_xlim(-extent, extent)
        ax.set_ylim(-extent, extent)
    scat = ax.scatter(latents[:, 0], latents[:,1], s=2, marker='o', cmap=cmap, c=labels)
    cb = plt.colorbar(scat, spacing='proportional',ticks=bounds)

    title = f"Recon: {loss_items[0].item():2.3f}, KLD {loss_items[1].item():2.3f}"
    ax.set_title(title)

    path1 = rf'latent_space_vis\{experiment_name}'

    if not os.path.exists(path1):
        os.makedirs(path1)

    fig.savefig(path1 + rf'\{pic_name}.jpg')
    plt.close()

In [224]:
# return reconstruction error + KL divergence losses
def vae_loss(recon_x, x, mu, log_var):
    # recons_loss = F.binary_cross_entropy(recon_x.view(-1,784), x.view(-1, 784), reduction='mean')
    recons_loss = F.mse_loss(recon_x.view(-1, 784), x.view(-1, 784), reduction="mean") * 2000
    KLD = -0.5 * torch.mean(1 + log_var - mu.pow(2) - log_var.exp()) # 1 + log(sigma**2) - mu**2 - sigma**2
    return recons_loss, KLD

In [225]:
def norm_image(image):
    dtype = image.dtype
    image = image.astype(float)
    image = image - np.min(image)
    image = image / np.max(image) * 255
    image = image.astype(dtype)
    return image

def save_recon(x_recon, experiment_name):
    image = x_recon[0].permute(1, 2, 0).detach().cpu().numpy()
    now = datetime.now()
    pic_name = now.strftime("%Y%m%d%H%M%S%f")
    path =  rf"latent_space_vis\{experiment_name}\recons"
    if not os.path.exists(path):
        os.makedirs(path)

    cv2.imwrite(os.path.join(path, f"{pic_name}.jpg"), norm_image(image))

In [226]:
def train(model, loss_f, train_loader, test_loader, optimizer, epoch, experiment_name):
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model.train()

    for batch_idx, (x, _) in enumerate(train_loader):
        x = x.to(device)
        optimizer.zero_grad()
        x_recon, mu, log_var = model(x)

        rec, KLD = loss_f(x_recon, x, mu, log_var)
        loss = rec + KLD

        loss.backward()
        optimizer.step()

        if batch_idx % 25 == 0:
            if experiment_name is not None: 
                visualize_latent_space(model, (rec, KLD), experiment_name, test_loader)
                save_recon(x_recon, experiment_name)
                
            print("Epoch {} Iteration {}: Loss = {}".format(epoch, batch_idx, loss.item()))

    return loss


In [227]:
embedding_size = 2


# build model
vae = VAE(sample_x=example_data[0], hidden_dims=None, z_dim=2)

if torch.cuda.is_available():
    vae.cuda()

optimizer = optim.Adam(vae.parameters(), lr=1e-2, weight_decay=0.99)



In [228]:
num_epochs = 200

experiment_name = f"1_rms_zcr"


for epoch in range(1, num_epochs + 1):
    train(vae, vae_loss, train_loader, test_loader, optimizer, epoch, experiment_name)

Epoch 1 Iteration 0: Loss = 390.8219909667969
Epoch 1 Iteration 25: Loss = 131.3209686279297
Epoch 1 Iteration 50: Loss = 121.09114837646484
Epoch 1 Iteration 75: Loss = 119.03294372558594
Epoch 1 Iteration 100: Loss = 116.9339599609375
Epoch 1 Iteration 125: Loss = 113.74452209472656
Epoch 1 Iteration 150: Loss = 114.39002990722656
Epoch 1 Iteration 175: Loss = 112.44861602783203
Epoch 1 Iteration 200: Loss = 113.15045928955078
Epoch 1 Iteration 225: Loss = 113.28880310058594
Epoch 2 Iteration 0: Loss = 112.61363220214844
Epoch 2 Iteration 25: Loss = 108.91123962402344
Epoch 2 Iteration 50: Loss = 110.76921844482422
Epoch 2 Iteration 75: Loss = 112.32485961914062
Epoch 2 Iteration 100: Loss = 105.51924896240234
Epoch 2 Iteration 125: Loss = 108.69564056396484
Epoch 2 Iteration 150: Loss = 107.24166870117188
Epoch 2 Iteration 175: Loss = 101.22027587890625
Epoch 2 Iteration 200: Loss = 109.8602523803711
Epoch 2 Iteration 225: Loss = 113.53419494628906
Epoch 3 Iteration 0: Loss = 106.36

KeyboardInterrupt: 

In [229]:
torch.save(vae, r"models\vae_mnist_2dim.pth")