In [None]:
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import accuracy_score, confusion_matrix
import pandas as pd
import matplotlib.pyplot as plt
import string
import re
from nltk.stem.porter import PorterStemmer
import random
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import urllib.request

In [None]:
#1. 데이터 로드 및 전처리
#spam.csv 파일을 불러와서 불필요한 칼럼을 삭제
#레이블을 수치형으로 변환 (ham -> 0, spam -> 1)
#중복된 텍스트를 삭제

#2. 훈련 및 테스트 데이터 분할
#train_test_split을 사용하여 데이터를 훈련 및 테스트 세트로 분할

#3. 토큰화
#Tokenizer를 사용하여 훈련 데이터를 토큰화
#이 토큰화된 데이터는 RNN 모델에 사용

#4. 데이터 탐색 및 시각화
#스팸 메일과 정상 메일의 분포를 확인
#메일의 길이 분포에 대한 시각화도 진행

#5. 텍스트 전처리
#NLTK의 stopwords와 구두점(punctuation)을 사용하여 메일 내의 불필요한 단어나 문자를 제거
#전처리된 텍스트에서 스팸 메일과 정상 메일의 가장 빈번한 단어를 확인

#6. RNN을 사용한 스팸 메일 분류
#임베딩층, RNN, 및 Dense 층을 포함하는 심층 신경망을 구성
#모델을 훈련 데이터에 학습시키고 테스트 데이터에 대한 성능을 평가

#7. 모델의 손실 시각화
#훈련 및 검증 손실을 시각화하여 모델의 학습 과정을 확인


In [None]:
#Spam mail data url
url = "https://raw.githubusercontent.com/ukairia777/tensorflow-nlp-tutorial/main/10.%20RNN%20Text%20Classification/dataset/spam.csv"

urllib.request.urlretrieve(url, filename="spam.csv")
data = pd.read_csv('spam.csv', encoding='latin1')
print('총 메일 수 :',len(data))

In [None]:
#Raw data head
data.head()

In [None]:
# Drop unwanted columns and rename remaining columns
if len(data.columns) > 3:
    data = data.drop(['Unnamed: 2', 'Unnamed: 3', 'Unnamed: 4'], axis = 1)
if data.columns[0] != 'label':
    data = data.rename(columns={'v1': 'label', 'v2': 'text'}) # ham :일반, spam: 스팸
data.head()

In [None]:
data.describe()

In [None]:
data.groupby('label').describe()

In [None]:
# Convert label to numerical variable (정수형태로 구분)
data['label'] = data.label.map({'ham': 0, 'spam': 1}) # spam to 1, ham to 0
data['message_len'] = data['text'].apply(len)
data.head()

In [None]:
#Delete duplicate E-mail
data.drop_duplicates(subset=['text'], inplace=True)
print('중복삭제한 이메일 수 :',len(data))

In [None]:
X_data = data['text']
y_data = data['label'] #data1,2 to x, y data

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, test_size=0.2, random_state=0, stratify=y_data) # 0.2로 했는데, 0.1로 하면?

In [None]:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(X_train)
X_train_encoded = tokenizer.texts_to_sequences(X_train)
print(X_train_encoded[:5])

In [None]:
#Words key-value dictionary
word_to_index = tokenizer.word_index
print(word_to_index)

In [None]:
vocab_size = len(word_to_index) + 1
print('단어 집합 사이즈: {}'.format((vocab_size)))

In [None]:
max_len = 189
X_train_padded = pad_sequences(X_train_encoded, maxlen = max_len)
print("훈련 데이터의 크기:", X_train_padded.shape)

In [None]:
import string
import nltk
from nltk.corpus import stopwords

# nltk data download(stopwords)
nltk.download('stopwords')

def text_process(mess):
    """
    Takes in a string of text, then performs the following:
    1. Remove all punctuation
    2. Remove all stopwords
    3. Returns a list of the cleaned text
    """
    STOPWORDS = stopwords.words('english') + ['u', 'ü', 'ur', '4', '2', 'im', 'dont', 'doin', 'ure', 'ltgt']
    # Check characters to see if they are in punctuation
    nopunc = [char for char in mess if char not in string.punctuation]

    # Join the characters again to form the string.
    nopunc = ''.join(nopunc)

    # Now just remove any stopwords
    return ' '.join([word for word in nopunc.split() if word.lower() not in STOPWORDS])

# 'text' 컬럼에 text_process 함수 적용
data['clean_msg'] = data['text'].apply(text_process)

data.head()


In [None]:
# 두 개의 빈 딕셔너리(스팸, 일반) 초기화하여 스팸 메시지와 일반 메시지 단어 빈도 저장
ham_words = {}
spam_words = {}

for idx, email in data.iterrows():
    for word in email['clean_msg'].split():
        if email['label'] == 0:
            # 해당 단어의 빈도를 ham_words 딕셔너리에서 1 증가
            # 만약 해당 단어가 딕셔너리에 없다면 기본값 0을 사용
            ham_words[word] = ham_words.get(word, 0) + 1
        else:
            # 만약 해당 이메일이 스팸 메시지라면
            # 해당 단어의 빈도를 spam_words 딕셔너리에서 1 증가
            # 만약 해당 단어가 딕셔너리에 없다면 기본값 0을 사용
            spam_words[word] = spam_words.get(word, 0) + 1

print(ham_words)
print(spam_words)

In [None]:
total_ham_words = sum(ham_words.values())  # 햄 메시지에서의 총 단어 수
total_spam_words = sum(spam_words.values())  # 스팸 메시지에서의 총 단어 수

print(total_ham_words, total_spam_words)

# ham_words와 spam_words 딕셔너리의 키(단어)들을 합쳐서 중복을 제거
total_words = len(set(list(ham_words.keys()) + list(spam_words.keys())))  # 전체 고유 단어 수

print(total_words)

N_ham = len(data[data['label'] == 0])  # 햄 메시지의 수
N_spam = len(data[data['label'] == 1])  # 스팸 메시지의 수
N_total = len(data)  # 전체 메시지의 수

print(N_ham, N_spam, N_total)

# 𝑃 ̂(𝑐_𝑗 )=𝑁_(𝑐_𝑗 )/𝑁_𝑡𝑜𝑡𝑎𝑙 공식을 사용하여 햄 및 스팸 메시지의 비율을 계산
P_ham = N_ham / N_total  # 햄 메시지의 비율
P_spam = N_spam / N_total  # 스팸 메시지의 비율

print(P_ham, P_spam) # 일반, 스팸 비율

In [None]:
# score 계산
def calculate_score(sentence, label_words, total_words_class):
    score = 1
    for word in sentence.split():
        if word in label_words:
            score *= (label_words[word] + 1) / (total_words_class + total_words)
        else:
            score *= 1 / (total_words_class + total_words)
    return score

In [None]:
# 𝑝(𝑤_𝑖│𝑐)= (𝑐𝑜𝑢𝑛𝑡(𝑤_𝑖,𝑐)+1)/((∑_(𝑤∈𝑉) 𝑐𝑜𝑢𝑛𝑡(𝑤,𝑐) )+|𝑉|)
def calculate_ham_spam_scores(email):
    ham_score = P_ham * calculate_score(email, ham_words, total_ham_words)
    spam_score = P_spam * calculate_score(email, spam_words, total_spam_words)
    return ham_score, spam_score

# 각 메일에 대한 스팸 및 햄 점수를 계산
scores = X_test.apply(calculate_ham_spam_scores)

#print(scores)

# 계산된 점수를 DataFrame에 새로운 열로 추가
X_test_with_scores = X_test.to_frame()  # Series to DataFrame
X_test_with_scores['ham_score'] = scores.apply(lambda x: x[0])
X_test_with_scores['spam_score'] = scores.apply(lambda x: x[1])

print(X_test_with_scores.head())


In [None]:
from collections import Counter
# 1. 학습 데이터만 사용하여 spam/ham 단어 빈도를 계산
train_data = pd.DataFrame({'text': X_train, 'label': y_train})

words = train_data[train_data.label==0].text.apply(lambda x: [word.lower() for word in x.split()])
ham_words = Counter()
for msg in words:
    ham_words.update(msg)

words = train_data[train_data.label==1].text.apply(lambda x: [word.lower() for word in x.split()])
spam_words = Counter()
for msg in words:
    spam_words.update(msg)

# 2. 이 단어 빈도를 기반으로 테스트 데이터의 스코어를 계산
scores = X_test.apply(calculate_ham_spam_scores)
X_test_with_scores = X_test.to_frame()
X_test_with_scores['ham_score'] = scores.apply(lambda x: x[0])
X_test_with_scores['spam_score'] = scores.apply(lambda x: x[1])

# 원본 데이터에 스코어 추가
data = pd.merge(data, X_test_with_scores[['ham_score', 'spam_score']], left_index=True, right_index=True, how='left')

# ... 중략 ...
#이렇게 위치시키면 학습 데이터를 기반으로 스팸/햄 단어 빈도를 계산하고, 그 후 테스트 데이터의 스코어를 계산하는 것이 더 자연스러워집니다.

# 원본 데이터에 스코어 추가
#data.drop(columns=['ham_score_x', 'spam_score_x', 'ham_score_y', 'spam_score_y'], inplace=True)

#data = pd.merge(data, X_test_with_scores[['ham_score', 'spam_score']], left_index=True, right_index=True, how='left')
data = pd.merge(data, X_test_with_scores[['ham_score', 'spam_score']], left_index=True, right_index=True, how='left', suffixes=('', '_test'))

print(data.head())

In [None]:
def calculate_word_probabilities(sentence, label_words, total_words_class):
    word_probabilities = {}
    for word in sentence.split():
        if word in label_words:
            word_probabilities[word] = (label_words[word] + 1) / (total_words_class + total_words)
        else:
            word_probabilities[word] = 1 / (total_words_class + total_words)
    return word_probabilities

# 테스트 데이터의 head 부분에 대해서 각 단어의 확률을 계산
word_probs_ham = X_test.head().apply(lambda x: calculate_word_probabilities(x, ham_words, total_ham_words))
word_probs_spam = X_test.head().apply(lambda x: calculate_word_probabilities(x, spam_words, total_spam_words))

print("Word Probabilities for Ham(일반):")
print(word_probs_ham)
print("\nWord Probabilities for Spam(스팸):")
print(word_probs_spam)


In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score

# 1. 테스트 데이터에 대한 예측값 계산
y_pred_test = X_test_with_scores.apply(lambda row: 1 if row['spam_score'] > row['ham_score'] else 0, axis=1)
#위의 코드는 X_test_with_scores DataFrame의 각 행에 대하여 spam_score와 ham_score를 비교
#만약 spam_score가 ham_score보다 크다면, 해당 메일은 스팸(1)으로 예측되고 그렇지 않다면 햄(0)으로 예측됨

# 2. 테스트 데이터의 실제 라벨과 예측값을 사용하여 성능 평가
# 매트릭스 출력
cm_test = confusion_matrix(y_test, y_pred_test)
print("Confusion Matrix for Test Data:")
print(cm_test)

# 정확도 출력
accuracy_test = accuracy_score(y_test, y_pred_test)
print(f"Accuracy for Test Data: {accuracy_test:.4f}")



True Negative (TN): 실제로 햄 메일인데, 햄으로 예측한 것.

False Positive (FP): 실제로 햄 메일인데, 스팸으로 잘못 예측한 것.

False Negative (FN): 실제로 스팸인데, 햄으로 잘못 예측한 것.

True Positive (TP): 실제로 스팸이며, 스팸으로 올바르게 예측한 것.

TN = 850: 850개의 햄 메일이 올바르게 햄으로 예측

FP = 53: 53개의 햄 메일이 스팸으로 잘못 예측

FN = 12: 12개의 스팸 메일이 햄으로 잘못 예측

TP = 119: 119개의 스팸 메일이 올바르게 스팸으로 예측

Accuracy=  TP+TN​ / TP+TN+FP+FN

테스트 데이터에 대해 모델의 예측 정확도는 약 93.71%

---- 아래는 이전과제 코드 ----

In [None]:
from collections import Counter

#ham(not spam) word count
words = data[data.label==0].clean_msg.apply(lambda x: [word.lower() for word in x.split()])
ham_words = Counter()

for msg in words:
    ham_words.update(msg)

print(ham_words.most_common(50))

In [None]:
#spam word count
words = data[data.label==1].clean_msg.apply(lambda x: [word.lower() for word in x.split()])
spam_words = Counter()

for msg in words:
    spam_words.update(msg)

print(spam_words.most_common(50))

In [None]:
data_ham  = data[data['label'] == 0].copy()
data_spam = data[data['label'] == 1].copy()

In [None]:
def show_wordcloud(data_spam_or_ham, title):
    text = ' '.join(data_spam_or_ham['text'].astype(str).tolist())
    stopwords = set(wordcloud.STOPWORDS)

    fig_wordcloud = wordcloud.WordCloud(stopwords=stopwords,background_color='lightgrey',
                    colormap='viridis', width=800, height=600).generate(text)

    plt.figure(figsize=(10,7), frameon=True)
    plt.imshow(fig_wordcloud)
    plt.axis('off')
    plt.title(title, fontsize=20 )
    plt.show()


In [None]:
print(len(X_train))
print(len(y_train))

print(len(X_train_encoded))
print(len(X_train_padded))

In [None]:
from tensorflow import keras
from keras.layers import Dense
from keras.models import Sequential, load_model
from keras.layers import Embedding, SimpleRNN
from tensorflow.python.keras.engine.sequential import Sequential
from tensorflow.python.keras.layers import Dense, Conv2D, Flatten, Dropout, MaxPooling2D

embedding_dim = 32
hidden_units = 32

y_train = np.array(y_train)

model = Sequential()
model.add(Embedding(vocab_size, embedding_dim))
model.add(SimpleRNN(hidden_units))
model.add(Dense(1, activation='sigmoid'))

model.compile(optimizer='rmsprop', loss='binary_crossentropy', metrics=['acc'])
history = model.fit(X_train_padded, y_train.astype(int), epochs=4, batch_size=64, validation_split=0.2)

In [None]:
#Preprocess for test data
X_test_encoded = tokenizer.texts_to_sequences(X_test)
X_test_padded = pad_sequences(X_test_encoded, maxlen=max_len)

#Model evaluation (딥러닝 기반 모델성능)
loss, accuracy = model.evaluate(X_test_padded, y_test.astype(int), batch_size=64)
print("테스트 정확도:", accuracy)

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import sklearn.metrics as metrics



pipe = Pipeline([('bow', CountVectorizer()),
                 ('tfid', TfidfTransformer()),
                 ('model', MultinomialNB())]) # 나이브 베이즈

In [None]:
pipe.fit(X_train, y_train)

In [None]:
y_pred = pipe.predict(X_test)

In [None]:
from sklearn.metrics import recall_score, f1_score

# For Naive Bayes model
print("=======Accuracy Score===========")
print(metrics.accuracy_score(y_test, y_pred))

# Print Recall
print("=======Recall Score===========")
print(recall_score(y_test, y_pred))

# Print F1-Score
print("=======F1 Score===========")
print(f1_score(y_test, y_pred))

# print the confusion matrix
print("=======Confusion Matrix===========")
print(metrics.confusion_matrix(y_test, y_pred))

Accuracy (정확도) - 0.9526 (95.26%):

95.26%가 올바르게 예측됨.
(딥러닝 기반 모델은 0.9806576371192932로 나옴)

Recall (재현율) - 0.6259 (62.59%):

실제 SPAM 메일 중 얼마나 많은 메일이 SPAM으로 올바르게 예측되었는지를 나타냄. 여기서의 결과는 62.59%로, 실제 SPAM 메일 중 약 62.59%만이 SPAM으로 올바르게 분류되었다는 것을 확인할 수 있음.
즉, SPAM 메일의 약 37.41%는 HAM으로 잘못 분류되었음.

F1 Score - 0.77 (77%):

F1 스코어는 정밀도와 재현율의 조화 평균으로 정밀도와 재현율의 균형을 나타내며, 특히 불균형한 데이터셋에서 유용함.
여기서는 77%로, 분류기의 전반적인 성능을 나타내며 중요한 지표 중 하나

혼동 행렬(Confusion Matrix):
array([[903, 0], [49, 82]])는 혼동 행렬을 나타냄.

True Negative (TN): 903개 - 실제로 'ham'(정상 메시지)이며, 모델도 'ham'으로 분류한 경우.
False Positive (FP): 0개 - 실제로 'ham'인데, 모델이 'spam'으로 잘못 분류한 경우.
False Negative (FN): 49개 - 실제로 'spam'인데, 모델이 'ham'으로 잘못 분류한 경우.
True Positive (TP): 82개 - 실제로 'spam'이며, 모델도 'spam'으로 올바르게 분류한 경우.

해석:

1. 모델은 'ham' 메시지를 매우 잘 분류하며, 모든 'ham' 메시지를 올바르게 'ham'으로 분류함 (FP = 0).
2. 그러나 'spam' 메시지 중 일부를 잘못 분류함. 49개의 'spam' 메시지를 'ham'으로 잘못 분류하였음.
3. SPAM 메일을 잘 감지하는 능력 (재현율)은 상대적으로 낮습니다 (62.59%).
이는 SPAM 메일 중 약 37%를 놓친다는 것을 의미하며, 이것은 SPAM 필터링에서 중요한 문제가 될 수 있음.
4. F1 스코어는 77%로 나쁘지 않은 성능을 나타냄.
5. 전반적으로 Confusion Matrix를 통해 모델이 실제 HAM 메일에 대해 매우 안정적인 성능을 보여주고 있음을 확인할 수 있음.



In [None]:
import string
# 추가 코드, 스팸판단 유무 (위 코드와 독립적)
def predict_spam(model, tokenizer, text, max_len):

    # 토큰화
    encoded_text = tokenizer.texts_to_sequences([text])

    # 패딩 처리
    padded_text = pad_sequences(encoded_text, maxlen=max_len, padding='post')

    # 모델 예측
    prediction = model.predict(padded_text)

    # 예측 결과와 예측값 출력
    print(f"Prediction Value: {prediction[0][0]:.4f}")
    if prediction > 0.5:
        print("This is SPAM!")
    else:
        print("This is NOT spam!")

# 사용 예제
text_samples = [
    "Your free ringtone is waiting to be collected.",
    "Click here for gain money!",
    "Congratulations! You've won a $1000 gift card. Click here to claim.",
    "Hi, are you available for a meeting tomorrow?",
    "Your bank account has been compromised. Please send us your credentials.",
    "Submission of the teaching journal in September"
]

for text in text_samples:
    print(f"Testing for text: {text}")
    predict_spam(model, tokenizer, text, max_len)
    print("-------------------------------")
