In [1]:
from __future__ import print_function

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio

from PIL import Image
import matplotlib.pyplot as plt

import torchvision.transforms as transforms
import torchvision.models as models

import copy

  '"sox" backend is being deprecated. '


In [2]:
device = torch.device('cuda')

In [3]:
sample_path = '/scratch/cz2064/myjupyter/Time_Series/Data/data_VoxCeleb/wav/id10983/K3VF9KATPqc/00123.wav'
sample_path_2 = '/scratch/cz2064/myjupyter/Time_Series/Data/data_VoxCeleb/wav/id10452/cLMPZ3fQHJw/00004.wav'
sample,sample_rate = torchaudio.load(sample_path_2)

sample,sample_rate = torchaudio.load(sample_path)
torchaudio.save('2_style.wav',sample,sample_rate)

sample,sample_rate = torchaudio.load(sample_path_2)
torchaudio.save('2_content.wav',sample,sample_rate)

In [4]:
statistical_mean = 2e-05
statistical_std = 0.05

In [5]:
def load_and_normalization(sample_path,sampling_length=256*256,mean = statistical_mean, std = statistical_std):
    sample,_ = torchaudio.load(sample_path)
    length = sample.size(1)
    if length<sampling_length:
        pad = int(sampling_length-length)
        sample = torch.cat((sample,torch.zeros((1,pad))),-1)
    sample = sample[:,:sampling_length]
    sample = (sample-mean)/std
    sample = sample.unsqueeze(0)
    return sample

In [6]:
def inverse_normalization(tensor,mean = statistical_mean, std = statistical_std):
    audio = tensor.cpu().clone()
    audio = audio.squeeze(0)
    audio = (audio * std) + mean
    return audio

In [7]:
style = "2_style.wav"
content = "2_content.wav"

In [8]:
sample_1,_ = torchaudio.load(style)
lenth_1 = sample_1.size(1)
sample_2,_ = torchaudio.load(content)
lenth_2 = sample_2.size(1)
lenth = lenth_2#max(lenth_1,lenth_2)


style_img = load_and_normalization(style,lenth).to(device)
content_img = load_and_normalization(content,lenth).to(device)

## Biden and Trump

biden_sample,_ = torchaudio.load('Biden.wav')
downsample_resample = torchaudio.transforms.Resample(_, sample_rate)
biden_sample = downsample_resample(biden_sample)
biden_sample = biden_sample[0].unsqueeze(0)
lenth_1 = biden_sample.size(1)

trump_sample,_ = torchaudio.load('Trump_short.wav')
downsample_resample = torchaudio.transforms.Resample(_, sample_rate)
trump_sample = downsample_resample(trump_sample)
trump_sample = trump_sample[0].unsqueeze(0)
lenth_2 = trump_sample.size(1)

lenth = max(lenth_1,lenth_2)

def load_and_normalization(sample,sampling_length=lenth,mean = statistical_mean, std = statistical_std):
    length = sample.size(1)
    if length<sampling_length:
        pad = int(sampling_length-length)
        sample = torch.cat((sample,torch.zeros((1,pad))),-1)
    sample = sample[:,:sampling_length]
    sample = (sample-mean)/std
    sample = sample.unsqueeze(0)
    return sample

style_img = load_and_normalization(trump_sample).to(device)
content_img = load_and_normalization(biden_sample).to(device)

In [9]:
class ContentLoss(nn.Module):
    def __init__(self, target,):
        super(ContentLoss, self).__init__()
        self.target = target.detach()

    def forward(self, input):
        self.loss = F.mse_loss(input, self.target)
        return input

In [10]:
def gram_matrix(input):
    a, b, c = input.size()
    features = input.view(a * b, c )
    G = torch.mm(features, features.t())
    return G.div(a * b * c )


class StyleLoss(nn.Module):
    def __init__(self, target_feature):
        super(StyleLoss, self).__init__()
        self.target = gram_matrix(target_feature).detach()
    def forward(self, input):
        G = gram_matrix(input)
        self.loss = F.mse_loss(G, self.target)
        return input

In [11]:
cfg = {
    'VGG11': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG13': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
    'VGG19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'],
}


class VGG_1D(nn.Module):
    def __init__(self, vgg_name):
        super(VGG_1D, self).__init__()
        self.features = self._make_layers(cfg[vgg_name])
        #self.classifier = nn.Linear(512, 10)

        
    def forward(self, x):
        out = self.features(x)
        out = out.view(out.size(0), -1)
        #out = self.classifier(out)
        return out

    def _make_layers(self, cfg):
        layers = []
        in_channels = 1
        for x in cfg:
            if x == 'M':
                layers += [nn.MaxPool1d(kernel_size=5, stride=5)]
            else:
                layers += [nn.Conv1d(in_channels, x, kernel_size=10, padding=5),
                           nn.BatchNorm1d(x),
                           nn.ReLU(inplace=True)]
                in_channels = x
        layers += [nn.AvgPool1d(21)]
        return nn.Sequential(*layers)
    
model_CNN = VGG_1D('VGG16')

class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.cnn = model_CNN
        self.fc1 = nn.Linear(512*3,2048)
        self.activation_fc1 = nn.ReLU()
        self.dropout1 = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(2048,1024)
        self.activation_fc2 = nn.ReLU()
        self.fc3 = nn.Linear(1024,2)
        
    def forward(self, x1, x2):
        x1 = self.cnn(x1)
        x2 = self.cnn(x2)
        
        x_add = x1+x2
        x_minus = x1-x2
        x_multiply = x1*x2
        x = torch.cat((x_add, x_minus, x_multiply),-1)
        x = self.fc1(x)
        x = self.activation_fc1(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.activation_fc2(x)
        x = self.fc3(x)
        return x

In [12]:
model = MyModel()
check_path = '/scratch/cz2064/myjupyter/Time_Series/notebook/python_files/Model_CNN_VGG16/Second_Train/checkpoint_CNN.pt'
model.load_state_dict(torch.load(check_path,map_location=torch.device('cpu'))['best_model_wts'])
pretrained_VGG  = model.cnn.features

In [13]:
cnn = nn.Sequential()
i = 0
for layer in pretrained_VGG.children():
    if isinstance(layer, nn.Conv1d):
        cnn.add_module(str(i),layer)
    elif isinstance(layer, nn.ReLU):
        cnn.add_module(str(i),layer)
    elif isinstance(layer, nn.MaxPool1d):
        cnn.add_module(str(i),layer)
    elif isinstance(layer, nn.BatchNorm1d):
        cnn.add_module(str(i),layer)
    i += 1

In [14]:
cnn.to(device);

In [15]:
class Normalization(nn.Module):
    def __init__(self, mean=statistical_mean, std=statistical_std):
        super(Normalization, self).__init__()
        self.mean = torch.tensor(mean).to(device)
        self.std = torch.tensor(std).to(device)
    def forward(self, img):
        return (img - self.mean) / self.std

In [16]:
content_layers_default = ['conv_1']
style_layers_default = ['conv_1','conv_2', 'conv_3', 'conv_4', 'conv_5']

def get_style_model_and_losses(cnn,style_img, content_img,
                               content_layers=content_layers_default,
                               style_layers=style_layers_default):
    cnn = copy.deepcopy(cnn)

    normalization = Normalization().to(device)

    content_losses = []
    style_losses = []

    # assuming that cnn is a nn.Sequential, so we make a new nn.Sequential
    # to put in modules that are supposed to be activated sequentially
    model = nn.Sequential(normalization)

    i = 0  # increment every time we see a conv
    for layer in cnn.children():
        if isinstance(layer, nn.Conv1d):
            i += 1
            name = 'conv_{}'.format(i)
        elif isinstance(layer, nn.ReLU):
            name = 'relu_{}'.format(i)
            # The in-place version doesn't play very nicely with the ContentLoss
            # and StyleLoss we insert below. So we replace with out-of-place
            # ones here.
            layer = nn.ReLU(inplace=False)
        elif isinstance(layer, nn.MaxPool1d):
            name = 'pool_{}'.format(i)
        elif isinstance(layer, nn.BatchNorm1d):
            name = 'bn_{}'.format(i)
        else:
            raise RuntimeError('Unrecognized layer: {}'.format(layer.__class__.__name__))

        model.add_module(name, layer)

        if name in content_layers:
            # add content loss:
            target = model(content_img).detach()
            content_loss = ContentLoss(target)
            model.add_module("content_loss_{}".format(i), content_loss)
            content_losses.append(content_loss)

        if name in style_layers:
            # add style loss:
            target_feature = model(style_img).detach()
            style_loss = StyleLoss(target_feature)
            model.add_module("style_loss_{}".format(i), style_loss)
            style_losses.append(style_loss)

    # now we trim off the layers after the last content and style losses
    for i in range(len(model) - 1, -1, -1):
        if isinstance(model[i], ContentLoss) or isinstance(model[i], StyleLoss):
            break

    model = model[:(i + 1)]

    return model, style_losses, content_losses

In [17]:
input_img = content_img.clone()

In [18]:
def get_input_optimizer(input_img):
    optimizer = optim.LBFGS([input_img.requires_grad_()])
    return optimizer

In [19]:
def run_style_transfer(cnn,content_img, style_img, input_img, num_steps=300,
                       style_weight=1000000, content_weight=1):
    """Run the style transfer."""
    print('Building the style transfer model..')
    model, style_losses, content_losses = get_style_model_and_losses(cnn, style_img, content_img)
    optimizer = get_input_optimizer(input_img)

    print('Optimizing..')
    run = [0]
    while run[0] <= num_steps:

        def closure():
            # correct the values of updated input image
            #input_img.data.clamp_(0, 1)

            optimizer.zero_grad()
            model(input_img)
            style_score = 0
            content_score = 0

            for sl in style_losses:
                style_score += sl.loss
            for cl in content_losses:
                content_score += cl.loss

            style_score *= style_weight
            content_score *= content_weight

            loss = style_score + content_score
            loss.backward()

            run[0] += 1
            if run[0] % 1000 == 0:
                print("run {}:".format(run))
                print('Style Loss : {:4f} Content Loss: {:4f}'.format(
                    style_score.item(), content_score.item()))
                print()

            return style_score + content_score

        optimizer.step(closure)

    # a last correction...
    #input_img.data.clamp_(0, 1)

    return input_img

In [20]:
output = run_style_transfer(cnn,content_img, style_img, input_img,num_steps=20000,style_weight=1e10, content_weight=1)

Building the style transfer model..
Optimizing..
run [1000]:
Style Loss : 3941.174316 Content Loss: 60.644741

run [2000]:
Style Loss : 824.923279 Content Loss: 64.474380

run [3000]:
Style Loss : 348.591949 Content Loss: 66.360519

run [4000]:
Style Loss : 189.707794 Content Loss: 67.834724

run [5000]:
Style Loss : 121.392448 Content Loss: 68.992905

run [6000]:
Style Loss : 86.886482 Content Loss: 69.812057

run [7000]:
Style Loss : 67.194351 Content Loss: 70.365768

run [8000]:
Style Loss : 54.620529 Content Loss: 70.751869

run [9000]:
Style Loss : 46.002888 Content Loss: 71.037300

run [10000]:
Style Loss : 39.794888 Content Loss: 71.256752

run [11000]:
Style Loss : 35.027843 Content Loss: 71.422020

run [12000]:
Style Loss : 31.396433 Content Loss: 71.540009

run [13000]:
Style Loss : 28.500126 Content Loss: 71.623276

run [14000]:
Style Loss : 26.138916 Content Loss: 71.673767

run [15000]:
Style Loss : 24.176781 Content Loss: 71.702164

run [16000]:
Style Loss : 22.534859 Con

In [21]:
output[0,0,:10]

tensor([-0.1540, -0.1254,  0.1124, -0.0159,  0.0037,  0.0395, -0.1904, -0.2021,
        -0.2771, -0.2235], device='cuda:0', grad_fn=<SliceBackward>)

In [22]:
output_audio = inverse_normalization(output)
torchaudio.save('2_output.wav',output_audio,sample_rate)

In [23]:
style_img.size()

torch.Size([1, 1, 75521])

In [24]:
content_img

tensor([[[ 0.0509, -0.0218, -0.2464,  ..., -0.3025, -1.3004, -1.2815]]],
       device='cuda:0')

import numpy as np
import wave
import nextpow2
import math

# 打开WAV文档
f = wave.open("BT_output.wav")
# 读取格式信息
# (nchannels, sampwidth, framerate, nframes, comptype, compname)
params = f.getparams()
nchannels, sampwidth, framerate, nframes = params[:4]
fs = framerate
# 读取波形数据
str_data = f.readframes(nframes)
f.close()
# 将波形数据转换为数组
x = np.fromstring(str_data, dtype=np.short)
# 计算参数
len_ = 20 * fs // 1000 # 样本中帧的大小
PERC = 50 # 窗口重叠占帧的百分比
len1 = len_ * PERC // 100  # 重叠窗口
len2 = len_ - len1   # 非重叠窗口
# 设置默认参数
Thres = 3
Expnt = 2.0
beta = 0.002
G = 0.9
# 初始化汉明窗
win = np.hamming(len_)
# normalization gain for overlap+add with 50% overlap
winGain = len2 / sum(win)

# Noise magnitude calculations - assuming that the first 5 frames is noise/silence
nFFT = 2 * 2 ** (nextpow2.nextpow2(len_))
noise_mean = np.zeros(nFFT)

j = 0
for k in range(1, 6):
    noise_mean = noise_mean + abs(np.fft.fft(win * x[j:j + len_], nFFT))
    j = j + len_
noise_mu = noise_mean / 5

# --- allocate memory and initialize various variables
k = 1
img = 1j
x_old = np.zeros(len1)
Nframes = len(x) // len2 - 1
xfinal = np.zeros(Nframes * len2)

# =========================    Start Processing   ===============================
for n in range(0, Nframes):
    # Windowing
    insign = win * x[k-1:k + len_ - 1]
    # compute fourier transform of a frame
    spec = np.fft.fft(insign, nFFT)
    # compute the magnitude
    sig = abs(spec)

    # save the noisy phase information
    theta = np.angle(spec)
    SNRseg = 10 * np.log10(np.linalg.norm(sig, 2) ** 2 / np.linalg.norm(noise_mu, 2) ** 2)


    def berouti(SNR):
        if -5.0 <= SNR <= 20.0:
            a = 4 - SNR * 3 / 20
        else:
            if SNR < -5.0:
                a = 5
            if SNR > 20:
                a = 1
        return a


    def berouti1(SNR):
        if -5.0 <= SNR <= 20.0:
            a = 3 - SNR * 2 / 20
        else:
            if SNR < -5.0:
                a = 4
            if SNR > 20:
                a = 1
        return a

    if Expnt == 1.0:  # 幅度谱
        alpha = berouti1(SNRseg)
    else:  # 功率谱
        alpha = berouti(SNRseg)
    #############
    sub_speech = sig ** Expnt - alpha * noise_mu ** Expnt;
    # 当纯净信号小于噪声信号的功率时
    diffw = sub_speech - beta * noise_mu ** Expnt
    # beta negative components

    def find_index(x_list):
        index_list = []
        for i in range(len(x_list)):
            if x_list[i] < 0:
                index_list.append(i)
        return index_list

    z = find_index(diffw)
    if len(z) > 0:
        # 用估计出来的噪声信号表示下限值
        sub_speech[z] = beta * noise_mu[z] ** Expnt
        # --- implement a simple VAD detector --------------
    if SNRseg < Thres:  # Update noise spectrum
        noise_temp = G * noise_mu ** Expnt + (1 - G) * sig ** Expnt  # 平滑处理噪声功率谱
        noise_mu = noise_temp ** (1 / Expnt)  # 新的噪声幅度谱
    # flipud函数实现矩阵的上下翻转，是以矩阵的“水平中线”为对称轴
    # 交换上下对称元素
    sub_speech[nFFT // 2 + 1:nFFT] = np.flipud(sub_speech[1:nFFT // 2])
    x_phase = (sub_speech ** (1 / Expnt)) * (np.array([math.cos(x) for x in theta]) + img * (np.array([math.sin(x) for x in theta])))
    # take the IFFT

    xi = np.fft.ifft(x_phase).real
    # --- Overlap and add ---------------
    xfinal[k-1:k + len2 - 1] = x_old + xi[0:len1]
    x_old = xi[0 + len1:len_]
    k = k + len2
# 保存文件
wf = wave.open('en_outfile.wav', 'wb')
# 设置参数
wf.setparams(params)
# 设置波形文件 .tostring()将array转换为data
wave_data = (winGain * xfinal).astype(np.short)
wf.writeframes(wave_data.tostring())
wf.close()

