In [1]:
import numpy as np
import pandas as pd
import pyeeg
from scipy import stats
from scipy import signal

default_path = "C:/Users/cjy89/Desktop/EEG/capstone"
labels = ["HA_HV", "HA_LV", "LA_HV", "LA_LV"]
features = ["time_feature", "frequency_feature"]

index_channel = ["FP1", "AF3", "F3", "F7", "FC5", "FC1", "C3", "T7", "CP5", "CP1", "P3", "P7",
                 "PO3", "O1", "Oz", "Pz", "FP2", "AF4", "Fz", "F4", "F8", "FC6", "FC2", "Cz",
                 "C4", "T8", "CP6", "CP2", "P4", "P8", "PO4", "O2"]
basic_channel = ["F3", "F4", "F7", "F8", "FC1", "FC2", "FC5", "FC6", "FP1", "FP2", "AF3", "AF4"]

# Feature extraction

In [2]:
# time_feature 계산
def time_feature_extractor(df):
    mean = np.mean(df, axis=1)
    std = np.std(df, axis=1)
    skewness = stats.skew(df, axis=1)
    kurotosis = stats.kurtosis(df, axis=1)

    hjorth_mob = df.apply(lambda x: pyeeg.hjorth(list(x))[0], axis=1)
    hjorth_com = df.apply(lambda x: pyeeg.hjorth(list(x))[1], axis=1)
    hurst = df.apply(lambda x: pyeeg.hurst(x), axis=1)
    dfa = df.apply(lambda x: pyeeg.dfa(x), axis=1)
    hfd = df.apply(lambda x: pyeeg.hfd(list(x), 8), axis=1)
    pfd = df.apply(lambda x: pyeeg.pfd(x), axis=1)

    time_feature_df = pd.DataFrame({"mean": mean, "std": std, "skewness": skewness, "kurotosis": kurotosis, "hjorth_mob": hjorth_mob,
                                    "hjorth_com": hjorth_com, "hurst": hurst, "dfa": dfa, "hfd": hfd, "pfd": pfd})
    return time_feature_df


# frequency_feature 계산
def frequency_feature_extractor(df):
    # sampling frequency 정의
    sampling_frequency = 128
    sampling_period = 1/128

    frequency_feature_list = []
    for channel in df.index:
        row = df.loc[channel, :]
        pxx_den = signal.welch(row, fs=sampling_frequency, nperseg=128)[1]
        pxx_spec = signal.welch(row, fs=sampling_frequency, nperseg=128, scaling="spectrum")[1]

        max_psd = np.sqrt(np.max(pxx_den))
        max_freq = np.argmax(pxx_den)
        rms = np.sqrt(np.max(pxx_spec))
        frequency_feature_list.append([max_psd, max_freq, rms])

    frequency_feature_df = pd.DataFrame(frequency_feature_list, columns=["MAX_PSD", "MAX_freq", "RMS"])
    return frequency_feature_df


# time-frequency-feature 계산 (13-42Hz 이용)
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = signal.butter(order, [low, high], btype='band')
    return b, a


def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = signal.lfilter(b, a, data)
    return y


def time_frequency_feature_extractor(df):
    # sampling frequency 정의
    sampling_frequency = 128
    sampling_period = 1/128

    # band-pass frequency 정의
    low_frequency = 13
    high_frequency = 42

    time_frequency_list = []
    beta_gamma_max_freq_list = []
    for channel in df.index:
        row = df.loc[channel, :].to_numpy()

        # band-pass
        bandpass_signal = butter_bandpass_filter(row, low_frequency, high_frequency, sampling_frequency, order=15)
        f, t, zxx = signal.stft(bandpass_signal, fs=sampling_frequency, nperseg=128, noverlap=0)

        # time_frequency_extract
        time_frequency_list.append(pd.DataFrame(np.abs(zxx)))
        beta_gamma_max_freq_list.append(np.argmax(signal.welch(bandpass_signal, fs=sampling_frequency, nperseg=128)[1]))

    return beta_gamma_max_freq_list, time_frequency_list

In [218]:
# basic_channel 중 첫 3초 제거
eeg_file_path = "C:/Users/cjy89/Desktop/EEG/capstone/HA_HV/deap_eeg2/eeg_441.csv"
eeg_file = pd.read_csv(eeg_file_path, header=None)
eeg_file.index = index_channel
eeg_file = eeg_file.loc[basic_channel, 128 * 3:]

time_feature = time_feature_extractor(eeg_file)
print("Time Feature .... Done ✨")
frequency_feature = frequency_feature_extractor(eeg_file)
frequency_feature.index = basic_channel
print("Frequency Feature .... Done ✨")
beta_gamma_max_freq_list, time_frequency_list = time_frequency_feature_extractor(eeg_file)
print("Time Frequency Feature .... Done ✨")

total_feature = pd.concat([time_feature, frequency_feature], axis=1)
total_feature["label"] = np.full(total_feature.shape[0], -1)
total_feature

Time Feature .... Done ✨
Frequency Feature .... Done ✨
Time Frequency Feature .... Done ✨


Unnamed: 0,mean,std,skewness,kurotosis,hjorth_mob,hjorth_com,hurst,dfa,hfd,pfd,MAX_PSD,MAX_freq,RMS,label
F3,-0.003533,15.128062,-0.034592,0.477892,0.012597,113.708502,0.134208,-0.001102,0.877031,0.548979,3.698153,7,4.529293,-1
F4,0.194975,18.969424,-0.024686,0.787917,0.013056,111.903302,0.107373,0.00275,0.910987,0.548193,4.959353,5,6.073942,-1
F7,-0.099105,9.705415,-0.110583,3.652892,0.00965,140.916906,0.221248,0.004683,0.759454,0.550791,3.651952,5,4.472709,-1
F8,-0.042442,12.048552,-0.032636,1.025251,0.010934,127.459764,0.123713,0.003221,0.795751,0.550361,3.530089,5,4.323458,-1
FC1,-0.299918,20.10286,-0.198816,4.008109,0.011392,127.836682,0.193621,0.00474,0.87347,0.548059,7.911831,5,9.689975,-1
FC2,0.115152,13.355543,0.139622,2.300862,0.011113,128.230782,0.17206,0.003426,0.820586,0.549503,4.443733,5,5.442439,-1
FC5,-0.077718,11.614049,-0.006031,0.505926,0.013283,109.887372,0.130595,0.002929,0.925167,0.547874,2.902991,5,3.555423,-1
FC6,-0.057064,8.281119,-0.18331,2.479467,0.008344,152.548276,0.17257,0.003732,0.654437,0.553211,3.11939,5,3.820457,-1
FP1,0.251704,16.309917,0.195269,2.876589,0.010533,133.958195,0.184389,0.003613,0.795078,0.549521,5.815446,5,7.122438,-1
FP2,0.111231,8.606155,0.013476,0.683302,0.009869,134.092593,0.144786,-0.000176,0.72355,0.552193,2.722974,7,3.334948,-1


# Predict

In [219]:
import pycaret.classification as pc

# 모델 불러오기
def load_model(model):
    return pc.load_model(model_name=f"C:/Users/cjy89/Desktop/EEG/application/model/{model}")


# "High" == 1, "Low" == 0
def predict_emotion(eeg_feature, model):
    predict = pc.predict_model(model, eeg_feature)
    return predict["Label"].value_counts(ascending=False)

In [220]:
arousal_model = load_model("Arousal_0.8")
valence_model = load_model("Valence_0.8")
print("Load Model .... Done ✨")

arousal_predict = predict_emotion(total_feature, arousal_model)
valence_predict = predict_emotion(total_feature, valence_model)

Transformation Pipeline and Model Successfully Loaded
Transformation Pipeline and Model Successfully Loaded
Load Model .... Done ✨


In [221]:
arousal_predict

1    12
Name: Label, dtype: int64

In [222]:
valence_predict

1    12
Name: Label, dtype: int64

# Music Composition

In [241]:
import pretty_midi as pm

# 피아노 오른손자리 옥타브 주파수
piano_frequency = np.array([261.63, 277.18, 293.67, 311.13, 329.63, 349.23, 369.99, 392.00, 415.30, 440.00, 466.16, 493.88, 523.25])

# major 코드별 스케일
C_major_scale = ['C4', 'D4', 'E4', 'F4', 'G4', 'A4', 'B4']
C_sharp_major_scale = ["Db4", 'Eb4', 'F4', 'Gb4', 'Ab4', 'Bb4', "C5"]
D_major_scale = ['D4', 'E4', 'F#4', 'G4', 'A4', 'B4', 'C#5']
D_sharp_major_scale = ['Eb4', 'F4', 'G4', 'Ab4', 'Bb4', 'C5', 'D5']
E_major_scale = ['E4', 'F#4', 'G#4', 'A4', 'B4', 'C#5', "B#5"]
F_major_scale = ['F4', 'G4', 'A4', 'A#4', 'C5', 'D5', "E5"]
F_sharp_major_scale = ['F#4', 'G#4', 'A#4', 'B4', 'C#5', 'D#5', 'F5']
G_major_scale = ['G4', 'A4', 'B4', 'C5', 'D5', 'E5', "F#5"]
G_sharp_major_scale = ['Ab4', 'Bb4', 'C5', 'Db5', 'Eb5', 'F5', 'G5']
A_major_scale = ['A4', 'B4', 'C#5', 'D5', 'E5', 'F#5', "G#5"]
A_sharp_major_scale = ['Bb4', 'C5', 'D5', 'Eb5', 'F5', 'G5', 'A5']
B_major_scale = ['B4', 'C#5', 'D#5', 'E5', 'F#5', 'G#5', "A#5"]
major_scale_list = [C_major_scale, C_sharp_major_scale, D_major_scale, D_sharp_major_scale, E_major_scale, F_major_scale, F_sharp_major_scale,
                    G_major_scale, G_sharp_major_scale, A_major_scale, A_sharp_major_scale, B_major_scale]
major_scale_names = ["C major", "C sharp major", "D major", "D sharp major", "E major", "F major", "F sharp major",
                    "G major", "G sharp major", "A major", "A sharp major", "B major"]

# minor 코드별 스케일
C_minor_scale = ['C4', 'D4', 'Eb4', 'F4', 'G4', 'Ab4', 'Bb4']
C_sharp_minor_scale = ['C#4', 'D#4', 'E4', 'F#4', 'G#4', 'A4', 'B4']
D_minor_scale = ['D4', 'E4', 'F4', 'G4', 'A4', 'Bb4', 'C5']
D_sharp_minor_scale = ['D#4', 'F4', 'F#4', 'G#4', 'A#4', 'B4', 'C#5']
E_minor_scale = ['E4', 'F#4', 'G4', 'A4', 'B4', 'C5', 'D5']
F_minor_scale = ['F4', 'G4', 'Ab4', 'Bb4', 'C5', 'Db5', 'Eb5']
F_sharp_minor_scale = ['F#4', 'G#4', 'A4', 'B4', 'C#5', 'D5', 'E5']
G_minor_scale = ['G4', 'A4', 'Bb4', 'C5', 'D5', 'Eb5', 'F5']
G_sharp_minor_scale = ['G#4', 'A#4', 'B4', 'C#5', 'D#5', 'E5', 'F#5']
A_minor_scale = ['A4', 'B4', 'C5', 'D5', 'E5', 'F5', 'G5']
A_sharp_minor_scale = ['A#4', 'C5', 'C#5', 'D#5', 'F5', 'F#5', 'G#5']
B_minor_scale = ['B4', 'C#5', 'D5', 'E5', 'F#5', 'G5', 'A5', 'B5']
minor_scale_list = [C_minor_scale, C_sharp_minor_scale, D_minor_scale, D_sharp_minor_scale, E_minor_scale, F_minor_scale, F_sharp_minor_scale,
                    G_minor_scale, G_sharp_minor_scale, A_minor_scale, A_sharp_minor_scale, B_minor_scale]
minor_scale_names = ["C minor", "C sharp minor", "D minor", "D sharp minor", "E minor", "F minor", "F sharp minor",
                    "G minor", "G sharp minor", "A minor", "A sharp minor", "B minor"]

major_or_minor = [major_scale_list, minor_scale_list]
major_or_minor_names = [major_scale_names, minor_scale_names]

In [242]:
def scaling_index(frequency):
    scale_frequency = piano_frequency[0] + (piano_frequency[-1] - piano_frequency[0]) * (frequency - 13) / (42 - 13)
    idx = np.abs(piano_frequency - scale_frequency).argmin()
    return idx

def octave_control(note_name, number):
    octave = str(int(note_name[-1]) + number)
    note_name = note_name[:-1] + octave
    return note_name

In [243]:
def chord_and_melody(valence_label, stft_list, max_psf_list):
    # 12개의 channel에서 측정한 stft와 max_psf의 평균 계산
    max_stft_signal_mean = np.array([])
    max_psf_mean = 0

    for stft, mpsf in zip(stft_list, max_psf_list):
        max_stft_signal = [stft.loc[:, column].argmax() for column in stft.columns]
        if max_stft_signal_mean.shape == (0,):
            max_stft_signal_mean = np.array(max_stft_signal)
        else:
            max_stft_signal_mean += np.array(max_stft_signal)
        max_psf_mean += mpsf

    max_stft_signal_mean = max_stft_signal_mean / 12
    max_psf_mean = max_psf_mean / 12
    
    # Chord 결정
    scale_idx = scaling_index(max_psf_mean)
    if valence_label.index[0] == 1:
        key = 0
        base_octave = 1
    elif valence_label.index[0] == 0:
        key = 1
        base_octave = 0
        
    return key, scale_idx, base_octave, max_stft_signal_mean


def tempo(arousal_label):
    # Tempo 조절
    index = arousal_label.index
    value = arousal_label.values
    if index[0] == 0:
        interval = 0.4 + 0.05 * value[0]
        interpol_interval = interval / 2
    elif index[0] == 1:
        if value[0] == 12:
            interval = 0.4
            interpol_interval = 0.2
        else:
            interval = 0.4 + 0.05 * value[1]
            interpol_interval = interval / 2
    
    return interval, interpol_interval


def volume_and_octave(eeg_signal):
    # 볼륨 조절
    high_kurtosis = pd.read_csv("high_kurtosis.csv", header=None)
    high_kurtosis_estimator = stats.gaussian_kde(high_kurtosis.mean(axis=0), bw_method='silverman')
    low_kurtosis = pd.read_csv("low_kurtosis.csv", header=None)
    low_kurtosis_estimator = stats.gaussian_kde(low_kurtosis.mean(axis=0), bw_method='silverman')

    start_index = 0
    sliding_window = 1

    kurtosis_list = pd.DataFrame([])
    while start_index + 10 <=60:
        temp = eeg_file.iloc[:, 128 * start_index : 128 * (start_index + 10)]
        kurtosis_list = pd.concat([kurtosis_list, pd.Series(stats.kurtosis(temp, axis=1))], axis=1)
        start_index += sliding_window
    kurtosis_list = np.mean(kurtosis_list, axis=0)

    volume_list = []
    for i in range(len(kurtosis_list)):
        K1 = high_kurtosis_estimator(kurtosis_list.iloc[i])
        K2 = low_kurtosis_estimator(kurtosis_list.iloc[i])
        volume_list.append(0.5 if K1 > K2 else -0.5)

    # Key 조절
    high_dfa = pd.read_csv("high_dfa.csv", header=None)
    high_dfa_estimator = stats.gaussian_kde(high_dfa.mean(axis=0), bw_method='silverman')
    low_dfa = pd.read_csv("low_dfa.csv", header=None)
    low_dfa_estimator = stats.gaussian_kde(low_dfa.mean(axis=0), bw_method='silverman')

    start_index = 0
    sliding_window = 30

    dfa_list = pd.DataFrame([])
    while start_index + sliding_window<=60:
        temp = eeg_file.iloc[:, 128 * start_index : 128 * (start_index + sliding_window)]
        dfa_list = pd.concat([dfa_list, temp.apply(lambda x: pyeeg.dfa(x), axis=1)], axis=1)
        start_index += sliding_window
    dfa_list = np.mean(dfa_list, axis=0)

    key_list = []
    for i in range(len(dfa_list)):
        K1 = high_dfa_estimator(dfa_list.iloc[i])
        K2 = low_dfa_estimator(dfa_list.iloc[i])
        key_list.append(2 if K1 > K2 else -2)
        
    return volume_list, key_list

In [244]:
def left_note_record(scale, pitch_idx, base_octave):
    first_note_name = octave_control(scale[pitch_idx], base_octave - 2)
    first_note_number = pm.note_name_to_number(first_note_name)

    # 두번째 음부터 한 옥타브가 올라가는 경우
    if pitch_idx + 2 > 6:
        second_note_name = octave_control(scale[pitch_idx - 5], base_octave - 1)
        second_note_number = pm.note_name_to_number(second_note_name)
        third_note_name = octave_control(scale[pitch_idx - 3], base_octave - 1)
        third_note_number = pm.note_name_to_number(third_note_name)

    # 세번째 음부터 한 옥타브가 올라가는 경우
    elif (pitch_idx + 2 <= 6) and (pitch_idx + 4 > 6):
        second_note_name = octave_control(scale[pitch_idx + 2], base_octave - 2)
        second_note_number = pm.note_name_to_number(second_note_name)
        third_note_name = octave_control(scale[pitch_idx - 3], base_octave - 1)
        third_note_number = pm.note_name_to_number(third_note_name)

    # 옥타브가 올라가지 않는 경우
    else:
        second_note_name = octave_control(scale[pitch_idx + 2], base_octave - 2)
        second_note_number = pm.note_name_to_number(second_note_name)
        third_note_name = octave_control(scale[pitch_idx + 4], base_octave - 2)
        third_note_number = pm.note_name_to_number(third_note_name)

    return first_note_number, second_note_number, third_note_number

In [250]:
def convert_signal_to_music(inst, eeg_signal, arousal_label, valence_label, stft_list, max_psf_list):
    # 악기 결정
    pretty_midi_music = pm.PrettyMIDI()
    instrument_program = pm.instrument_name_to_program(inst)
    instrument = pm.Instrument(program=instrument_program)

    
    # Chord and Key melody 결정
    scale_name_list = []
    key, scale_idx, base_octave, max_stft_signal_mean = chord_and_melody(valence_label, stft_list, max_psf_list)
    scale = major_or_minor[key][scale_idx]
    scale_name = major_or_minor_names[key][scale_idx]
    scale_name_list.append(scale_name)
    
    # Tempo 결정
    start_time = 0.0
    interval, interpol_interval = tempo(arousal_label)
            
    # Volume / Key 조절
    right_base_volume, left_base_volume = 80, 60
    volume_list, key_list = volume_and_octave(eeg_signal)
    volume_list = [0] * 10 + volume_list

    
    # note 및 간격 list 초기화
    right_note = []
    right_volume = []
    right_start_time = []
    right_end_time = []

    left_note = []
    left_volume = []
    left_start_time = []
    left_end_time = []

    
    # 음악 만들기 -> 오른손과 왼손에서 입력할 note, 시작 시간, 끝 시간을 List up
    pre_pitch_idx = scaling_index(max_stft_signal_mean[0]) % 7
    for idx in range(0, len(max_stft_signal_mean)):
        
        # volume 및 key 조절 설정
        right_base_volume = right_base_volume + volume_list[idx]
        left_base_volume = left_base_volume + volume_list[idx]
        
        if idx % 30 == 0 and (0 < idx // 30 and idx // 30 < 2):
            scale_idx = (scale_idx + key_list[idx // 30]) % 12
            scale = major_or_minor[key][scale_idx]
            scale_name = major_or_minor_names[key][scale_idx]
            scale_name_list.append(scale_name)

        # stft값을 통해서 해당하는 가장 가까운 음을 찾는다.
        pitch_idx = scaling_index(max_stft_signal_mean[idx]) % 7

        # 오른손 - Interpolate Note를 추가하는 경우
        while np.abs(pre_pitch_idx - pitch_idx) > 1:
            if pre_pitch_idx > pitch_idx:
                pre_pitch_idx -= 1
            elif pre_pitch_idx < pitch_idx:
                pre_pitch_idx += 1
            note_name = octave_control(scale[pre_pitch_idx], base_octave)
            note_number = pm.note_name_to_number(note_name)

            # 오른손 list에 추가
            right_note.append(note_number)
            right_volume.append(right_base_volume)
            right_start_time.append(start_time)
            right_end_time.append(start_time + interpol_interval)

            # 왼손 list에 추가
            if start_time // 1 != (start_time + interpol_interval) // 1:
                left_note.append(left_note_record(scale, pitch_idx, base_octave))
                left_volume.append(left_base_volume)
                left_start_time.append(start_time)
                left_end_time.append(start_time + interpol_interval)
                
            start_time += interpol_interval

        # 오른손 일반적인 음을 추가하는 경우
        note_name = octave_control(scale[pitch_idx], base_octave)
        note_number = pm.note_name_to_number(note_name)

        # 오른손 List 들에 추가
        right_note.append(note_number)
        right_volume.append(right_base_volume)
        right_start_time.append(start_time)
        right_end_time.append(start_time + interval)

        # 왼손 일반적인 음을 추가하는 경우
        if start_time // 1 != (start_time + interval) // 1:
            left_note.append(left_note_record(scale, pitch_idx, base_octave))
            left_volume.append(left_base_volume)
            left_start_time.append(start_time)
            left_end_time.append(start_time + interval)

        start_time += interval
        pre_pitch_idx = pitch_idx

        
        
    # List up이 완료되면, 하나씩 객체에 입력
    # 오른손
    for i in range(len(right_note)):
        note = pm.Note(velocity=int(right_volume[i]), pitch=right_note[i], start=right_start_time[i], end=right_end_time[i])
        instrument.notes.append(note)

        # 왼손 -> 첫음 채워 넣기
        # 3개의 음을 동시에 치기 위해서 각각의 음들을 first, second, third list에 저장하고 동일한 인덱스의 위치에 저장한다.
        note = pm.Note(velocity=int(left_volume[0]), pitch=left_note[0][0], start=0, end=left_start_time[1])
        instrument.notes.append(note)
        note = pm.Note(velocity=int(left_volume[0]), pitch=left_note[0][1], start=0, end=left_start_time[1])
        instrument.notes.append(note)
        note = pm.Note(velocity=int(left_volume[0]), pitch=left_note[0][2], start=0, end=left_start_time[1])
        instrument.notes.append(note)

    for i in range(len(left_note)):

        # 마지막 인덱스인 경우
        if i == len(left_note) - 1:
            # 3개의 음을 동시에 치기 위해서 각각의 음들을 first, second, third list에 저장하고 동일한 인덱스의 위치에 저장한다.
            note = pm.Note(velocity=int(left_volume[i]), pitch=left_note[i][0], start=left_start_time[i], end=right_end_time[-1])
            instrument.notes.append(note)
            note = pm.Note(velocity=int(left_volume[i]), pitch=left_note[i][1], start=left_start_time[i], end=right_end_time[-1])
            instrument.notes.append(note)
            note = pm.Note(velocity=int(left_volume[i]), pitch=left_note[i][2], start=left_start_time[i], end=right_end_time[-1])
            instrument.notes.append(note)

        # 마지막 인덱스가 아닌 경우
        else:
            # 3개의 음을 동시에 치기 위해서 각각의 음들을 first, second, third list에 저장하고 동일한 인덱스의 위치에 저장한다.
            note = pm.Note(velocity=int(left_volume[i]), pitch=left_note[i][0], start=left_start_time[i], end=left_start_time[i + 1])
            instrument.notes.append(note)
            note = pm.Note(velocity=int(left_volume[i]), pitch=left_note[i][1], start=left_start_time[i], end=left_start_time[i + 1])
            instrument.notes.append(note)
            note = pm.Note(velocity=int(left_volume[i]), pitch=left_note[i][2], start=left_start_time[i], end=left_start_time[i + 1])
            instrument.notes.append(note)

    pretty_midi_music.instruments.append(instrument)
    return scale_name_list, pretty_midi_music

In [252]:
music = convert_signal_to_music("Electric Piano 1", eeg_file, arousal_predict, valence_predict, time_frequency_list, beta_gamma_max_freq_list)
music[1].write("C:/Users/cjy89/Desktop/EEG/application/music/music_temp.mid")
print(music[0])

['G major', 'A major']


In [239]:
0 % 12

0