# YAMNet
+ tensorflow 공식 메뉴얼
+ https://www.tensorflow.org/tutorials/audio/transfer_learning_audio

In [1]:
import os

from IPython import display
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_io as tfio

In [2]:
path =  'C:/YAMNet_test'
os.chdir(path)

In [3]:
# Utility functions for loading audio files and making sure the sample rate is correct.

@tf.function
def load_wav_16k_mono(filename):
    """ Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio. """
    file_contents = tf.io.read_file(filename)
    wav, sample_rate = tf.audio.decode_wav(
          file_contents,
          desired_channels=1)
    wav = tf.squeeze(wav, axis=-1)
    sample_rate = tf.cast(sample_rate, dtype=tf.int64)
    wav = tfio.audio.resample(wav, rate_in=sample_rate, rate_out=16000)
    return wav

In [4]:
# combined data
# import pandas as pd
# song = pd.read_csv('C:/music_data/Song.csv')
# station_song = pd.read_csv('C:/music_data/Station_Song.csv')
# station_tag = pd.read_csv('C:/music_data/Station_Tag.csv')
# station_tag_info = pd.read_csv('C:/music_data/Station_Tag_Info.csv')
# song.drop(columns = ['IDX'], inplace = True)
# station_song.drop(columns = ['IDX'], inplace = True)
# station_tag.drop(columns = ['IDX'], inplace = True)
# station_tag_info.columns = ['ST_TAG_ID','TYPE_ID','TAG_NAME','CDATE']
# print(song.columns, station_song.columns ,station_tag.columns, station_tag_info.columns, sep = '\n')
# part1 = pd.merge(song,station_song, on = 'SONG_ID', how = 'inner')
# part2 = pd.merge(part1,station_tag, on = 'ST_ID', how = 'inner')
# final = pd.merge(part2,station_tag_info, on = 'ST_TAG_ID', how = 'inner')
# final.sort_values(by = ['SONG_ID'], inplace = True)
# final_data = final.drop_duplicates(['SONG_ID']).reset_index(drop = True)
# final_data.head()
# final_data.to_csv('total_data.csv', index = False, encoding = 'utf-8')

In [5]:
final_data = pd.read_csv('total_data.csv', encoding = 'utf-8')
final_data

Unnamed: 0,SONG_ID,SONG_TITLE,ST_ID,PRIORITY,ST_TAG_ID,TYPE_ID,TAG_NAME,CDATE
0,2466,오늘도 난,467710,2,5711,2,편안해요,2022-03-17 14:48:33
1,5112,끝 (End),456206,17,5766,12,신남,2022-03-31 14:31:33
2,7712,당신은 모르실거야 (CF - LIG 핑클편),488851,11,5322,2,화나요,2021-06-08 12:07:54
3,9270,매직 카펫 라이드,363115,13,5330,2,행복해요,2021-06-08 12:07:54
4,9302,Deep In The Night,489238,9,5322,2,화나요,2021-06-08 12:07:54
...,...,...,...,...,...,...,...,...
1233,34850634,사랑인가 봐,466008,1,5711,2,편안해요,2022-03-17 14:48:33
1234,34890162,다시 만날까 봐,466884,20,5711,2,편안해요,2022-03-17 14:48:33
1235,34927767,정이라고 하자 (Feat. 10CM),493585,12,5322,2,화나요,2021-06-08 12:07:54
1236,34943312,나의 X에게,493585,19,5322,2,화나요,2021-06-08 12:07:54


In [6]:
# pretrained yamnet load
yamnet_model_handle = 'https://tfhub.dev/google/yamnet/1'
yamnet_model = hub.load(yamnet_model_handle)

In [7]:
my_classes = ['화나요','행복해요','편안해요','슬픔','불안','신남']
map_class_to_id = {'화나요':0,'행복해요':1,'편안해요':2,'슬픔':3,'불안':4,'신남':5}

class_id = final_data['TAG_NAME'].apply(lambda name : map_class_to_id[name])
filtered_pd = final_data.assign(TAG_NAME = class_id)

In [8]:
full_path = filtered_pd['SONG_ID'].apply(lambda row: os.path.join('C:/music_data/data_wav/',str(row) + '.wav'))
filtered_pd = filtered_pd.assign(SONG_ID = full_path)
filtered_pd.head()

Unnamed: 0,SONG_ID,SONG_TITLE,ST_ID,PRIORITY,ST_TAG_ID,TYPE_ID,TAG_NAME,CDATE
0,C:/music_data/data_wav/2466.wav,오늘도 난,467710,2,5711,2,2,2022-03-17 14:48:33
1,C:/music_data/data_wav/5112.wav,끝 (End),456206,17,5766,12,5,2022-03-31 14:31:33
2,C:/music_data/data_wav/7712.wav,당신은 모르실거야 (CF - LIG 핑클편),488851,11,5322,2,0,2021-06-08 12:07:54
3,C:/music_data/data_wav/9270.wav,매직 카펫 라이드,363115,13,5330,2,1,2021-06-08 12:07:54
4,C:/music_data/data_wav/9302.wav,Deep In The Night,489238,9,5322,2,0,2021-06-08 12:07:54


In [9]:
filenames = filtered_pd['SONG_ID']
targets = filtered_pd['TAG_NAME']

In [10]:
from sklearn.model_selection import train_test_split
train_X, test_X , train_y, test_y = train_test_split(filenames, targets, test_size = 0.15)
train_X, valid_X, train_y, valid_y = train_test_split(train_X, train_y, test_size = 0.176)

In [11]:
train_ds = tf.data.Dataset.from_tensor_slices((train_X, train_y))
valid_ds = tf.data.Dataset.from_tensor_slices((valid_X, valid_y))
test_ds = tf.data.Dataset.from_tensor_slices((test_X, test_y))

In [12]:
def load_wav_for_map(filename, label):
    return load_wav_16k_mono(filename), label

train_ds = train_ds.map(load_wav_for_map)
valid_ds = valid_ds.map(load_wav_for_map)
test_ds = test_ds.map(load_wav_for_map)





In [13]:
# applies the embedding extraction model to a wav data
def extract_embedding(wav_data,targets):
    with tf.device('/CPU:0'):
        scores, embeddings, spectrogram = yamnet_model(wav_data)
        num_embeddings = tf.shape(embeddings)[0]
    return (embeddings,
           tf.repeat(targets, num_embeddings))

# extract embedding
train_ds = train_ds.map(extract_embedding).unbatch()
valid_ds = valid_ds.map(extract_embedding).unbatch()
test_ds = test_ds.map(extract_embedding).unbatch()

In [14]:
train_ds = train_ds.cache().shuffle(1000).batch(32).prefetch(tf.data.AUTOTUNE)
valid_ds = valid_ds.cache().batch(32).prefetch(tf.data.AUTOTUNE)
test_ds = test_ds.cache().batch(32).prefetch(tf.data.AUTOTUNE)

In [15]:
my_model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(1024), dtype=tf.float32,
                          name='input_embedding'),
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(len(my_classes), activation = 'softmax')
    ], name='my_model')

my_model.summary()

Model: "my_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 512)               524800    
                                                                 
 dense_1 (Dense)             (None, 6)                 3078      
                                                                 
Total params: 527,878
Trainable params: 527,878
Non-trainable params: 0
_________________________________________________________________


In [16]:
my_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                 optimizer="adam",
                 metrics=['accuracy'])

callback = tf.keras.callbacks.EarlyStopping(monitor='loss',
                                            patience=3,
                                            restore_best_weights=True)

In [17]:
history = my_model.fit(train_ds,epochs=20, validation_data = valid_ds, callbacks = callback)

Epoch 1/20


  output, from_logits, "Softmax", "sparse_categorical_crossentropy"


Epoch 2/20
Epoch 3/20
Epoch 4/20


In [18]:
loss, accuracy = my_model.evaluate(test_ds)
print(loss, accuracy)

2.5558950901031494 0.22518004477024078
