In [None]:
!pip install pydub
!pip install torchinfo

In [None]:
import shutil
import glob
import re
import os
from natsort import natsorted
from pydub import AudioSegment
from pydub.silence import split_on_silence
import librosa
import librosa.display
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
from torchinfo import summary
from torchvision.models import resnet34

In [None]:
#フォルダ作成
for i in range(2):
  binary_path = f"./{i}_asmr/"
  if not os.path.exists(binary_path):
    os.mkdir(binary_path)

#mp3をwavに変換
for path in natsorted(glob.glob("./asmr_trial/**/*.mp3", recursive=True)):
  title = os.path.splitext(path)[0]
  audio = AudioSegment.from_file(path, format="mp3")
  audio.export(f"{title}.wav", format="wav")
  os.remove(path)
  print(title)

#ラベリング
for path in natsorted(glob.glob("./asmr_trial/**/*.wav", recursive=True)):
  title = os.path.abspath(path)
  title =  title.replace("./asmr_trial/", "").replace("DLsite 同人 - R18", "")
  title = re.sub(r"/", "", title)

  #Colabの接続が途中で切れた時用
  #if int(title[:3]) < 6:
    #continue
  
  #ラベル付け
  label = input(f"{title}のラベルは？")

  #File name too long 対策
  if len(title) > 76:
    num = title[:4]
    title = title[len(title) - 76:]
    title = num + title
  #print(title)
  
  if label == str(1):
    shutil.copy(path, f"./1_asmr/{title}")
  elif label == "skip":
    continue
  else:
    shutil.copy(path, f"./0_asmr/{title}")

In [None]:
#フォルダ内音声の無音区間カット

for i in range(2):

  revised_path = f"./{i}_asmr_revised/"
  if not os.path.exists(revised_path):
    os.mkdir(revised_path)

  #フォルダ内のwavデータを取得
  for path in natsorted(glob.glob(f"./{i}_asmr/**/*.wav", recursive=True)):

      # 音声ファイルを読み込む
      audio = AudioSegment.from_file(path)
      org_ms = len(audio)
      title = os.path.splitext(os.path.basename(path))[0]
      print("{}: {:.2f} [min]".format(title, org_ms/60/1000))

      #音声が27秒未満なら次の音声データへ
      if org_ms < 27000:
        print("skip")
        continue

      # 無音部分で分割する
      chunks = split_on_silence(audio, min_silence_len=200, silence_thresh=-50, keep_silence=100)

      #無音部分が検出されなかったらスキップ
      if not chunks:
        print("skip")
        continue

      # 分割結果を合算して無音部分を除去した音声を作成
      revised_audio = sum(chunks)

      # 無音部分を除去した音声の長さをミリ秒単位で計算
      revised_ms = len(revised_audio)
      print("{}_revised = {:.2f} [min]".format(title, revised_ms/60/1000))
      revised_audio.export(f"{revised_path}/{title}.wav", format="wav")

In [None]:
#音声を27秒分割→3秒ずつ分割
#最初の27秒を抽出

for i in range(2):
  path_27 = f"./{i}_asmr27sec/"

  if not os.path.exists(path_27):
    os.mkdir(path_27)

  #フォルダ内のwavデータを取得
  for path in natsorted(glob.glob(f'./{i}_asmr_cutted/**/*.wav', recursive=True)):

      # 音声ファイルを読み込む
      audio = AudioSegment.from_file(path)
      org_ms = len(audio)
      title = os.path.splitext(os.path.basename(path))[0]

      #音声が27秒未満なら次の音声データへ
      if org_ms < 27000:
        print("skip")
        continue

      if not os.path.exists(path_27):
        os.mkdir(path_27)

      #初めの27秒抽出
      sample_length = 27000
      if org_ms > sample_length:
            # ～27000ms(27秒)を抽出
            sound_splitted = audio[:sample_length]
            # 抽出した部分を出力
            sound_splitted.export(f"{path_27}/{title}", format="wav")

  path_3 = f"./{i}_asmr3sec/"
  if not os.path.exists(path_3):
    os.mkdir(path_3)

  #フォルダ内のwavデータを取得
  for path in natsorted(glob.glob(f'./{i}_asmr27sec/**/*.wav', recursive=True)):

      # 音声ファイルを読み込む
      audio = AudioSegment.from_file(path)
      org_ms = len(audio)
      title = os.path.splitext(os.path.basename(path))[0]

      sample_length = 3000
      move_point = 0
      j = 1
      while org_ms > move_point:
            # ～3000ms(3秒)を抽出
            sound_splitted = audio[move_point:move_point + sample_length]
            move_point += 3000
            # 抽出した部分を出力
            sound_splitted.export(f"./{i}_asmr3sec/{title}_{j}.wav", format="wav")
            j += 1

In [None]:
#メルスペクトログラムの取得
# loadメソッドでy=音声信号の値（audio time series）、sr=サンプリング周波数（sampling rate）を取得

def calc_melsp(file, n_fft=2048,  hop_length=512, n_mels=128):
    
    #音源ファイルの読み込みと波形の表示
    y, sr = librosa.load(file, sr=44100)

    #メルスペクトログラム（人間の聴覚に適したスペクトログラム）
    S = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=2048, hop_length=512, n_mels=128)
    melsp = librosa.power_to_db(S, ref=np.max)
    return melsp

#メルスペクトログラムのnpyファイルを作成
for i in range(2):
    
    number = 1
    path_dataset = f"./{i}_dataset"
    
    if not os.path.exists(path_dataset):
      os.mkdir(path_dataset)
    
    for path in natsorted(glob.glob(f'./{i}_asmr3sec/**/*.wav', recursive=True)):
        x_melsp = calc_melsp(path)
        np.save(f"./{i}_dataset/{number}", x_melsp)
        number += 1

In [2]:
# データセットクラスの定義
class ASMRDataset(Dataset):
    def __init__(self):

        class_num = 2
        class_paths = {}

        # クラスごとに対応するデータのパスを取得
        for i in range(class_num):
            data = glob.glob(f".{str(i)}_dataset/*.npy")
            class_paths[i] = data

        id = 0
        self.id_class = {}
        self.id_path = {}

        # クラスごとにデータIDを割り当てる
        for i in class_paths:
            for path in class_paths[i]:
                self.id_class[id] = i  # IDに対応するラベルを格納
                self.id_path[id] = path  # IDに対応するデータパスを格納
                id += 1

    def __getitem__(self, idx):
        # 指定されたIDに対応するデータとラベルを取得し、Tensorに変換
        return torch.tensor(np.load(self.id_path[idx]).T).float(), self.id_class[idx]

    def __len__(self):
        # データセットの総データ数を返す
        return len(self.id_class)

In [3]:
#訓練・検証データローダの作成

dataset = ASMRDataset()

length = len(dataset)
train_length = int(length*0.9)
val_length = length - train_length

train,val = torch.utils.data.random_split(dataset,[train_length,val_length])
train_loader = DataLoader(train,batch_size=256,shuffle=True)
val_loader = DataLoader(val,batch_size=256,shuffle=False)

In [None]:
## CNN自作モデル

# CNNモデルの定義（VGGnet等を参考に）
class MyCNN(nn.Module):
    def __init__(self):
        super(MyCNN, self).__init__()

        self.conv1 = nn.Conv2d(1, 64, kernel_size=3, padding='same')
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, padding='same')
        self.relu2 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=3)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding='same')
        self.relu3 = nn.ReLU()
        self.conv4 = nn.Conv2d(128, 128, kernel_size=3, padding='same')
        self.relu4 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=3)
        self.conv5 = nn.Conv2d(128, 256, kernel_size=5, padding='same')
        self.relu5 = nn.ReLU()
        self.pool3 = nn.MaxPool2d(kernel_size=5)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(2560, 256)
        self.relu6 = nn.ReLU()
        self.fc2 = nn.Linear(256, 2)

    def forward(self, x):
        x = self.relu1(self.conv1(x))
        x = self.relu2(self.conv2(x))
        x = self.pool1(x)
        x = self.relu3(self.conv3(x))
        x = self.relu4(self.conv4(x))
        x = self.pool2(x)
        x = self.relu5(self.conv5(x))
        x = self.pool3(x)
        x = self.flatten(x)
        x = self.relu6(self.fc1(x))
        x = self.fc2(x)

        return x

# モデルのインスタンス化
model = MyCNN()

# 損失関数と最適化アルゴリズムの定義
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

#GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)

summary(model)

In [None]:
losses = []
for epoch in range(1, 21):

    # 学習
    train_losses = 0

    for data in train_loader:
        optimizer.zero_grad()
        x, y = data
        x = x.to(device, dtype=torch.float32)
        y = y.to(device)
        x = x.unsqueeze(1) # チャネル数1を挿入
        out = model(x)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()
        train_losses += loss.item()

    # 検証
    val_losses = 0
    actual_list, predict_list = [], []

    for data in val_loader:
        with torch.no_grad():
            x, y = data
            x = x.to(device, dtype=torch.float32)
            y = y.to(device)
            x = x.unsqueeze(1)
            out = model(x)
            loss = criterion(out, y)
            _, y_pred = torch.max(out, 1)
            val_losses += loss.item()

            actual_list.append(y.cpu().numpy())
            predict_list.append(y_pred.cpu().numpy())

    actual_list = np.concatenate(actual_list)
    predict_list = np.concatenate(predict_list)
    accuracy = np.mean(actual_list == predict_list)

    # epoch毎の精度確認
    print("epoch", epoch, "\t train_loss", train_losses, "\t val_loss", val_losses, "\t accuracy", accuracy)

    # 保存
    save_path = "./cnn_model.pth"
    torch.save({'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': train_losses,},
              save_path)

In [None]:
## 事前学習済みモデル（ResNet）

#学習済みのResNetをダウンロード
resnet_model = resnet34(pretrained=True)


#最初の畳み込みのチャネル3をチャネル1に変更
resnet_model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)

#最後の層の次元をカテゴリ数に調整
resnet_model.fc = nn.Linear(512, 2)

#GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
resnet_model = resnet_model.to(device)

loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(resnet_model.parameters(), lr=1e-4)

#summary(resnet_model)

In [None]:
losses = []
for epoch in range(1, 21):

    # 学習
    train_losses = 0

    for data in train_loader:
        optimizer.zero_grad()
        x, y = data
        x = x.to(device, dtype=torch.float32)
        y = y.to(device)
        x = x.unsqueeze(1) # チャネル数1を挿入
        out = resnet_model(x)
        loss = loss_function(out, y)
        loss.backward()
        optimizer.step()
        train_losses += loss.item()

    # 検証
    val_losses = 0
    actual_list, predict_list = [], []

    for data in val_loader:
        with torch.no_grad():
            x, y = data
            x = x.to(device, dtype=torch.float32)
            y = y.to(device)
            x = x.unsqueeze(1)
            out = resnet_model(x)
            loss = loss_function(out, y)
            _, y_pred = torch.max(out, 1)
            val_losses += loss.item()

            actual_list.append(y.cpu().numpy())
            predict_list.append(y_pred.cpu().numpy())

    actual_list = np.concatenate(actual_list)
    predict_list = np.concatenate(predict_list)
    accuracy = np.mean(actual_list == predict_list)

    # epoch毎の精度確認
    print("epoch", epoch, "\t train_loss", train_losses, "\t val_loss", val_losses, "\t accuracy", accuracy)

    # 保存
    save_path = "./resnet_model.pth"
    torch.save({'epoch': epoch,
                'model_state_dict': resnet_model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': train_losses,},
              save_path)

In [34]:
#推論用データセットの作成

class ASMRDataset2(Dataset):
    def __init__(self):
        i = 0
        class_paths = {}
        data = natsorted(glob.glob("./pred_dataset/*.npy"))
        class_paths[i]=data
        id = 0
        self.id_class = {}
        self.id_path = {}
        for i in class_paths:
            for path in class_paths[i]:
                self.id_class[id]=i
                self.id_path[id]=path
                id+=1

    def __getitem__(self, idx):
        return torch.tensor(np.load(self.id_path[idx]).T).float(), self.id_class[idx]

    def __len__(self):
        return len(self.id_class)

test = ASMRDataset2()
test_loader = DataLoader(test,batch_size=1,shuffle=False)

In [None]:
pred_y = []

#推論(自作モデルの方が精度が良かったのでそちらを採用)
for batch in test_loader:
    with torch.no_grad():
        x, y = batch
        x = x.to(device, dtype=torch.float32)
        y = y.to(device)
        x = x.unsqueeze(1)
        out = model(x)
        _, y_pred = torch.max(out, 1)
        pred_y.append(y_pred.cpu().numpy())

In [76]:
#推論結果をデータフレームに
path = "./pred_dataset/"

cut_title = []
asmr_number = []
track_number = []

for path in natsorted(glob.glob('./pred_dataset/*.npy', recursive=True)):
  title = os.path.splitext(os.path.basename(path))[0]
  cut_title.append(title)
  asmr_number.append(int(title[:3])) #作品ナンバー
  track_number.append(int(title[4:7])) #作品内トラックナンバー

pred_y = np.squeeze(pred_y)
pred_y = pred_y.tolist()

pred_df = pd.DataFrame({'asmr_num':asmr_number, 'track_num':track_number, 'title':cut_title, 'pred':pred_y})

In [None]:
#結果を総合して分類

asmr_num = asmr_number[-1]+1
asmr_preds = []
sum_track_pred = 0
sum_3s_pred = 0
all_title = []

#asmr_preds:作品総評価リスト / sum_track_pred:trackごとの評価合計 / sum_3s_pred:3sごとの評価合計

for j in range(asmr_num):
    
    #作品ナンバーごとのデータフレーム抽出
    temp_df1 = pred_df[pred_df['asmr_num'] == j] 
    
    #音声形式の体験版がない作品は除外されているため、その分の作品ナンバーをスキップ
    if temp_df1.empty == True: 
      continue

    title = temp_df1.iloc[0, 2]
    title = title[12:]
    all_title.append(title)

    #トラックごとに抽出
    sum_track_pred = 0
    track_num = temp_df1.iloc[-1]['track_num']+1

    for k in range(1, track_num): 
        sum_3s_pred = 0
        temp_df2 = temp_df1[temp_df1['track_num'] == k]

        #トラック内で3sごとの評価
        for _3s_pred in temp_df2['pred']:
            if _3s_pred == 1:
                sum_3s_pred += 1
            elif _3s_pred == 0:
                sum_3s_pred += -1

        #トラック評価
        if sum_3s_pred > 0:
            sum_track_pred += 1
        elif sum_3s_pred <= 0:
            sum_track_pred += 0
    
    #総合評価
    if sum_track_pred > 0:
      asmr_preds.append(1)
    elif sum_track_pred <= 0:
      asmr_preds.append(0)

#最終推薦結果
pred_df_sum = pd.DataFrame({'title':all_title, 'asmr_preds':asmr_preds})
pred_df_sum