In [None]:
from google.colab import drive
drive.mount('/content/drive')

MessageError: Error: credential propagation was unsuccessful

In [None]:
  # Input CSVs on Google Drive
  # /content/drive/MyDrive/csvs/4차년도.csv
  # /content/drive/MyDrive/csvs/5차년도.csv
  # /content/drive/MyDrive/csvs/5차년도_2차.csv

In [None]:
import pandas as pd

file_paths = [
    '/content/drive/MyDrive/csvs/4차년도.csv',
    '/content/drive/MyDrive/csvs/5차년도.csv',
    '/content/drive/MyDrive/csvs/5차년도_2차.csv'
]

dataframes = [pd.read_csv(file, encoding='cp949') for file in file_paths]

combined_df = pd.concat(dataframes, ignore_index=True)

duplicate_rows = combined_df[combined_df.duplicated()]

cleaned_df = combined_df.drop_duplicates()

print("Duplicate rows found:" if not duplicate_rows.empty else "No duplicate rows found.")
print(duplicate_rows)

cleaned_df.to_csv('/content/drive/MyDrive/csvs/cleaned_combined.csv', index=False, encoding='cp949')


In [None]:
import numpy as np
np.bool = np.bool_


In [None]:
!python --version

In [None]:
# boto3 <=1.15.18
# gluonnlp >= 0.6.0, <=0.10.0
# mxnet >= 1.4.0, <=1.7.0.post2
# onnxruntime == 1.8.0, <=1.8.0
# sentencepiece >= 0.1.6, <=0.1.96
# torch >= 1.7.0, <=1.10.1
# transformers >= 4.8.1, <=4.8.1

In [None]:
!pip install gluonnlp pandas tqdm


In [None]:
!pip install torch

In [None]:
!pip install mxnet

In [None]:
!pip install sentencepiece==0.1.96

In [None]:
!pip install transformers

In [None]:
# !pip install transformers==4.8.2

In [None]:
# !pip install onnxruntime

In [None]:
# https://github.com/SKTBrain/KoBERT 의 파일들을 Colab으로 다운로드
!pip install git+https://git@github.com/SKTBrain/KoBERT.git@master --no-deps

In [None]:
!pip install gluonnlp pandas tqdm

In [None]:
# !pip install onnxruntime

In [None]:
# pip install boto3

In [None]:
# !pip3 install mxnet-mkl==1.6.0 numpy==1.23.1

# koBERT
from kobert.utils import get_tokenizer
from kobert.pytorch_kobert import get_pytorch_kobert_model


In [None]:
!pip install boto3


In [None]:
from kobert.utils import get_tokenizer
from kobert.pytorch_kobert import get_pytorch_kobert_model


In [None]:
from transformers import AdamW
from transformers.optimization import get_cosine_schedule_with_warmup


In [None]:
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import gluonnlp as nlp
import numpy as np
from tqdm import tqdm, tqdm_notebook
import pandas as pd


In [None]:
device = torch.device("cuda:0")


In [None]:
bertmodel, vocab = get_pytorch_kobert_model(cachedir=".cache")

In [None]:
# [AI Hub] Conversation speech dataset for sentiment classification
data = pd.read_csv("/content/drive/MyDrive/csvs/cleaned_combined.csv", encoding='cp949')


In [None]:
data.shape

In [None]:
print(data)

In [None]:
data['상황'].unique()

In [None]:
# Map 7 emotion classes to numeric labels
# 0: negative (fear), 1: neutral (surprise), 2: negative (angry/anger),
# 3: negative (sadness/sad), 4: neutral (neutral), 5: positive (happiness), 6: negative (disgust)
data.loc[(data['상황'] == "fear"), '상황'] = 0
data.loc[(data['상황'] == "surprise"), '상황'] = 1
data.loc[(data['상황'] == "angry"), '상황'] = 2
data.loc[(data['상황'] == "anger"), '상황'] = 2
data.loc[(data['상황'] == "sadness"), '상황'] = 3
data.loc[(data['상황'] == "neutral"), '상황'] = 4
data.loc[(data['상황'] == "happiness"), '상황'] = 5
data.loc[(data['상황'] == "disgust"), '상황'] = 6
data.loc[(data['상황'] == "sad"), '상황'] = 3

In [None]:
data['상황'].unique()

In [None]:
data_list = []
for ques, label in zip (data['발화문'], data['상황']):
  data = []
  data.append(ques)
  data.append(str(label))

  data_list.append(data)

In [None]:
print(data)
print(data_list[:10])

In [None]:
from sklearn.model_selection import train_test_split
dataset_train, dataset_test = train_test_split(data_list, test_size = 0.2, shuffle = True, random_state = 32)


In [None]:
print(len(dataset_train), len(dataset_test))


In [None]:
tokenizer = get_tokenizer()
tok = nlp.data.BERTSPTokenizer(tokenizer, vocab, lower = False)


In [None]:
# BERTDataset : 각 데이터가 BERT 모델의 입력으로 들어갈 수 있도록 tokenization, int encoding, padding하는 함수
# 출처 : https://github.com/SKTBrain/KoBERT/blob/master/scripts/NSMC/naver_review_classifications_pytorch_kobert.ipynb

class BERTDataset(Dataset):
    def __init__(self, dataset, sent_idx, label_idx, bert_tokenizer, vocab, max_len, pad, pair):

        transform = nlp.data.BERTSentenceTransform(
            bert_tokenizer, max_seq_length=max_len,vocab = vocab, pad = pad, pair = pair)

        self.sentences = [transform([i[sent_idx]]) for i in dataset]
        self.labels = [np.int32(i[label_idx]) for i in dataset]

    def __getitem__(self, i):
        return (self.sentences[i] + (self.labels[i], ))


    def __len__(self):
        return (len(self.labels))


In [None]:
# parameter 값 출처 : https://github.com/SKTBrain/KoBERT/blob/master/scripts/NSMC/naver_review_classifications_pytorch_kobert.ipynb
max_len = 64
batch_size = 64
warmup_ratio = 0.1
num_epochs = 10
max_grad_norm = 1
log_interval = 200
learning_rate =  5e-5


In [None]:
!python --version


In [None]:
data_train = BERTDataset(dataset_train, 0, 1, tok, vocab, max_len, False, False)
data_test = BERTDataset(dataset_test, 0, 1, tok, vocab, max_len, False, False)



In [None]:
train_dataloader = torch.utils.data.DataLoader(data_train, batch_size = batch_size, num_workers = 0)
test_dataloader = torch.utils.data.DataLoader(data_test, batch_size = batch_size, num_workers = 0)


### KoBERT 구현

In [None]:
# KoBERT 오픈소스 내 예제코드 : https://github.com/SKTBrain/KoBERT/blob/master/scripts/NSMC/naver_review_classifications_pytorch_kobert.ipynb
class BERTClassifier(nn.Module):
    def __init__(self,
                 bert,
                 hidden_size = 768,
                 num_classes = 7,   # 감정 클래스 수로 조정
                 dr_rate = None,
                 params = None):
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate

        self.classifier = nn.Linear(hidden_size , num_classes)
        if dr_rate:
            self.dropout = nn.Dropout(p = dr_rate)

    def gen_attention_mask(self, token_ids, valid_length):
        attention_mask = torch.zeros_like(token_ids)
        for i, v in enumerate(valid_length):
            attention_mask[i][:v] = 1
        return attention_mask.float()

    def forward(self, token_ids, valid_length, segment_ids):
        attention_mask = self.gen_attention_mask(token_ids, valid_length)

        _, pooler = self.bert(input_ids = token_ids, token_type_ids = segment_ids.long(), attention_mask = attention_mask.float().to(token_ids.device),return_dict = False)
        if self.dr_rate:
            out = self.dropout(pooler)
        return self.classifier(out)


In [None]:
model = BERTClassifier(bertmodel,  dr_rate = 0.5).to(device)

In [None]:
no_decay = ['bias', 'LayerNorm.weight']
optimizer_grouped_parameters = [
    {'params': [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
    {'params': [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
]

optimizer = AdamW(optimizer_grouped_parameters, lr = learning_rate)
loss_fn = nn.CrossEntropyLoss()

t_total = len(train_dataloader) * num_epochs
warmup_step = int(t_total * warmup_ratio)

scheduler = get_cosine_schedule_with_warmup(optimizer, num_warmup_steps = warmup_step, num_training_steps = t_total)


In [None]:
def calc_accuracy(X,Y):
    max_vals, max_indices = torch.max(X, 1)
    train_acc = (max_indices == Y).sum().data.cpu().numpy()/max_indices.size()[0]
    return train_acc

train_dataloader


In [None]:
import torch
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader

def collate_fn(batch):
    try:
        token_ids = pad_sequence([torch.tensor(item[0]) for item in batch], batch_first=True, padding_value=0)
        valid_length = torch.tensor([int(item[1]) for item in batch])
        segment_ids = pad_sequence([torch.tensor(item[2]) for item in batch], batch_first=True, padding_value=0)
        label = torch.tensor([int(item[3]) for item in batch])
    except Exception as e:
        print("Data Error:", e)
        print("Batch Content:", batch)
        raise e

    return token_ids, valid_length, segment_ids, label

train_dataloader = DataLoader(data_train, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_dataloader = DataLoader(data_test, batch_size=batch_size, collate_fn=collate_fn, shuffle=False)


### 각 감성 분석 및 가중치 부여

In [None]:
!pip install soynlp

In [None]:
!git clone https://github.com/SOMJANG/Mecab-ko-for-Google-Colab.git
!cd Mecab-ko-for-Google-Colab
!bash install_mecab-ko_on_colab_light_220429.sh

In [None]:
!git clone https://github.com/SOMJANG/Mecab-ko-for-Google-Colab.git

In [None]:
!cd Mecab-ko-for-Google-Colab

In [None]:
!ls

In [None]:
!bash install_mecab-ko_on_colab_light_220429.sh

In [None]:
import sys
import os
import pandas as pd
from konlpy.tag import Mecab
from soynlp.normalizer import repeat_normalize





mecab = Mecab(dicpath='/usr/local/lib/mecab/dic/mecab-ko-dic')

def load_sentiword_dict(filepath):
    sentiword_dict = {}
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) >= 2:
                word = parts[0]
                try:
                    score = int(parts[1])
                    sentiword_dict[word] = score
                except ValueError:
                    pass
    return sentiword_dict

def preprocess_text(text):
    if not isinstance(text, str):
        text = ''
    normalized_sent = repeat_normalize(text, num_repeats=2)
    return normalized_sent

def calculate_sentiment_score(text, sentiword_dict):
    tokens = mecab.morphs(text)
    score = 0
    token_scores = []
    for token in tokens:
        if token in sentiword_dict:
            token_score = sentiword_dict[token]
            score += token_score
            token_scores.append((token, token_score))
        else:
            token_scores.append((token, 0))
    return score, token_scores

filepath = '/content/drive/MyDrive/grad_model_kobert/SentiWord_Dict.txt'
sentiword_dict = load_sentiword_dict(filepath)

folder_path = '/content/drive/MyDrive/grad_model_kobert/dataset_3month'
output_folder = '/content/drive/MyDrive/grad_model_kobert/sent-result-3month'  


if not os.path.exists(output_folder):
    os.makedirs(output_folder)


for filename in os.listdir(folder_path):
    if filename.endswith('.xlsx'):
        file_path = os.path.join(folder_path, filename)
        df = pd.read_excel(file_path)

        
        review_scores = []
        for idx, row in df.iterrows():
            review = row['content']
            preprocessed_review = preprocess_text(review)
            score, token_scores = calculate_sentiment_score(preprocessed_review, sentiword_dict)

            
            star_weight = row['score'] / 5.0  
            thumbs_weight = row['thumbsUpCount'] / 100.0  
            if row['thumbsUpCount'] == 0:
                thumbs_weight = 1

            weighted_score = score * star_weight * thumbs_weight
            senti = 0
            if weighted_score > 0:
                senti = 1
            elif weighted_score < 0:
                senti = -1

            star_senti = 0
            if row['score'] <= 2:
                star_senti = -1
            elif row['score'] >= 4:
                star_senti = 1

            review_scores.append((senti, star_senti))

        
        df['senti'] = [s[0] for s in review_scores]
        df['star_senti'] = [s[1] for s in review_scores]

        
        new_filename = f"3month_senti_{filename}"
        output_path = os.path.join(output_folder, new_filename)

        
        df.to_excel(output_path, index=False)
        print(f"Sentiment analysis completed for {filename}. Results saved to {output_path}")


---

In [None]:
import sys
import os
import pandas as pd
from konlpy.tag import Mecab
from soynlp.normalizer import repeat_normalize

mecab = Mecab(dicpath='/usr/local/lib/mecab/dic/mecab-ko-dic')


def load_sentiword_dict(filepath):
    sentiword_dict = {}
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) >= 2:
                word = parts[0]
                try:
                    score = int(parts[1])
                    sentiword_dict[word] = score
                except ValueError:
                    pass
    return sentiword_dict


def preprocess_text(text):
    if not isinstance(text, str):
        text = ''
    normalized_sent = repeat_normalize(text, num_repeats=2)
    return normalized_sent
filepath = '/content/drive/MyDrive/grad_model_kobert/SentiWord_Dict.txt'
sentiword_dict = load_sentiword_dict(filepath)


def calculate_sentiment_score(text, sentiword_dict):
    tokens = mecab.morphs(text)
    score = 0
    token_scores = []
    for token in tokens:
        if token in sentiword_dict:
            token_score = sentiword_dict[token]
            score += token_score
            token_scores.append((token, token_score))
        else:
            token_scores.append((token, 0))
    return score, token_scores


bertmodel, vocab = get_pytorch_kobert_model(cachedir=".cache")
class BERTClassifier(nn.Module):
    def __init__(self, bert, dr_rate=0.5):
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate
        self.classifier = nn.Linear(768, 3)
        if dr_rate:
            self.dropout = nn.Dropout(p=dr_rate)

    def forward(self, token_ids, valid_length, segment_ids):
        _, pooler = self.bert(input_ids=token_ids, token_type_ids=segment_ids)
        if self.dr_rate:
            out = self.dropout(pooler)
        return self.classifier(out)


model = BERTClassifier(bertmodel).to('cuda')
model.load_state_dict(torch.load('/content/drive/MyDrive/grad_model_kobert/model.pth'))
model.eval()
tokenizer = get_tokenizer()
tok = nlp.data.BERTSPTokenizer(tokenizer, vocab, lower=False)


def predict_sentiment_kobert(text):
    max_len = 64
    tokenized_text = tok(text)
    token_ids = torch.tensor([vocab[token] for token in tokenized_text]).unsqueeze(0).to('cuda')
    segment_ids = torch.zeros_like(token_ids).to('cuda')
    valid_length = torch.tensor([min(len(tokenized_text), max_len)]).to('cuda')

    with torch.no_grad():
        output = model(token_ids, valid_length, segment_ids)
        score = output.argmax().item()

    if score == 0:
        return -1  
    elif score == 1:
        return 0  
    else:
        return 1  


folder_path = './content/drive/MyDrive/grad_model_kobert/dataset_3month'
output_folder = '/content/drive/MyDrive/grad_model_kobert/sent-result-3month'


if not os.path.exists(output_folder):
    os.makedirs(output_folder)


for filename in os.listdir(folder_path):
    if filename.endswith('.xlsx'):
        file_path = os.path.join(folder_path, filename)
        df = pd.read_excel(file_path)

        review_scores = []
        senti_dict_scores = []
        for idx, row in df.iterrows():
            review = row['content']
            preprocessed_review = preprocess_text(review)

            
            senti_dict_score, _ = calculate_sentiment_score(preprocessed_review, sentiword_dict)
            senti_dict_scores.append(senti_dict_score)

            
            senti_kobert = predict_sentiment_kobert(preprocessed_review)

            
            star_weight = row['score'] / 5.0
            thumbs_weight = row['thumbsUpCount'] / 100.0
            if row['thumbsUpCount'] == 0:
                thumbs_weight = 1

            
            combined_score = (senti_kobert + senti_dict_score) / 2  
            weighted_score = combined_score * star_weight * thumbs_weight
            review_scores.append(weighted_score)

        
        df['senti_dict_score'] = senti_dict_scores
        df['weighted_senti'] = review_scores
        new_filename = f"3month_senti_{filename}"
        output_path = os.path.join(output_folder, new_filename)
        df.to_excel(output_path, index=False)
        print(f"Sentiment analysis completed for {filename}. Results saved to {output_path}")


In [None]:
import os
import pandas as pd
import torch
from transformers import BertTokenizer
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm

from google.colab import drive
drive.mount('/content/drive')

class BERTClassifier(nn.Module):
    def __init__(self, bert, dr_rate=0.5):
        super(BERTClassifier, self).__init__()
        self.bert = bert
        self.dr_rate = dr_rate
        self.classifier = nn.Linear(768, 7) 

    def forward(self, input_ids, attention_mask, token_type_ids):
        _, pooler = self.bert(input_ids=input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask)
        if self.dr_rate:
            out = F.dropout(pooler, p=self.dr_rate, training=self.training)
        return self.classifier(out)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_path = '/content/drive/MyDrive/grad_model_kobert/model.pth'
model = BERTClassifier(bertmodel, dr_rate=0.5).to(device)
model.load_state_dict(torch.load(model_path, map_location=device))
model.eval()

tokenizer = get_tokenizer()
tok = nlp.data.BERTSPTokenizer(tokenizer, vocab, lower=False)

class ReviewDataset(Dataset):
    def __init__(self, reviews, tokenizer, max_len=64):
        self.reviews = reviews
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __getitem__(self, idx):
        review = str(self.reviews[idx])
        encoding = self.tokenizer(review, max_length=self.max_len, padding='max_length', truncation=True)
        input_ids = torch.tensor(encoding['input_ids'])
        attention_mask = torch.tensor(encoding['attention_mask'])
        token_type_ids = torch.tensor(encoding['token_type_ids'])
        return input_ids, attention_mask, token_type_ids

    def __len__(self):
        return len(self.reviews)

def classify_and_save(folder_path, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for filename in os.listdir(folder_path):
        if filename.endswith('.xlsx'):
            file_path = os.path.join(folder_path, filename)
            df = pd.read_excel(file_path)
            reviews = df['content'].fillna('').tolist()

            dataset = ReviewDataset(reviews, tok, max_len=64)
            dataloader = DataLoader(dataset, batch_size=64, shuffle=False)

            results = []
            with torch.no_grad():
                for input_ids, attention_mask, token_type_ids in tqdm(dataloader):
                    input_ids = input_ids.to(device)
                    attention_mask = attention_mask.to(device)
                    token_type_ids = token_type_ids.to(device)

                    outputs = model(input_ids, attention_mask, token_type_ids)
                    _, preds = torch.max(outputs, dim=1)
                    results.extend(preds.cpu().numpy())

        
            df['emotion'] = results
            new_filename = f"3month_emotion_{filename}"
            output_path = os.path.join(output_folder, new_filename)
            df.to_excel(output_path, index=False)
            print(f"Emotion classification completed for {filename}. Results saved to {output_path}")

folder_path = '/content/datasets/review_3m' 
output_folder = '/content/senti-result-감성사전-3m' 
classify_and_save(folder_path, output_folder)
