In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, LSTM, Dense, Dropout, Flatten, Attention, LayerNormalization, Reshape
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import numpy as np
import re
from urllib.parse import urlparse
from keras.utils import plot_model
from IPython.display import Image

# 데이터셋 로드
df = pd.read_csv('/content/malicious_phish.csv')

# 1. 라벨 인코딩
label_encoder = LabelEncoder()
df['label_encoded'] = label_encoder.fit_transform(df['type'])
y = to_categorical(df['label_encoded'])

# 2. 텍스트 데이터 전처리
urls = df['url'].values
max_sequence_length = 200
max_num_words = 10000
embedding_dim = 128

tokenizer = Tokenizer(num_words=max_num_words, oov_token="<OOV>")
tokenizer.fit_on_texts(urls)
seq_data = tokenizer.texts_to_sequences(urls)
seq_data = pad_sequences(seq_data, maxlen=max_sequence_length)

# URL의 type을 매핑
df_features = pd.DataFrame()

type_mapping = {
    'benign': 0,
    'phishing': 1,
    'defacement': 2,
    'malware': 3
}
df_features['url'] = df['url']
# df_features['type'] = df['type'].map(type_mapping)
df_features

# URL의 길이
df_features['url_length'] = df['url'].apply(len)

# IP 주소 포함 여부
# df_features['has_ip'] = df['url'].str.contains(r'\d+\.\d+\.\d+\.\d+').astype(int)

# URL이 'https://'로 시작하는지 여부
# df_features['is_https'] = df['url'].str.startswith('https://').astype(int)

# 숫자의 개수
# df_features['digit_count'] = df['url'].str.count(r'\d')

# 영문자 개수
# df_features['letter_count'] = df['url'].str.count(r'[a-zA-Z]')

df_features

# IP 주소 포함 여부
def has_ip(url):
  return int(bool(re.search(r'\d+\.\d+\.\d+\.\d+', url)))

df_features['has_ip'] = df['url'].apply(has_ip)
df_features

# URL이 'https://'로 시작하는지 여부
def is_https(url):
  return int(url.startswith('https://'))

df_features['is_https'] = df['url'].apply(is_https)
df_features

# 숫자의 개수
def digit_count(url):
  return sum(c.isdigit() for c in url)

df_features['digit_count'] = df['url'].apply(digit_count)
df_features

# 영문자의 개수
def letter_count(url):
  return sum(c.isalpha() for c in url)

df_features['letter_count'] = df['url'].apply(letter_count)
df_features

# 찾고자 하는 TLD 목록
tld_list = ['.tk', '.buzz', '.xyz', '.top', '.ga', '.ml', '.info', '.cf', '.gq',
            '.icu', '.wang', '.live', '.host', '.shop', '.vip', '.id', '.cc',
            '.br', '.ci', '.zw', '.sx', '.mw']

# TLD 개수
df_features['tld_count'] = df['url'].apply(lambda x: sum(x.endswith(tld) for tld in tld_list))

df_features

def count_special_chars(url):
    non_alpha_num = re.findall(r'\W',url)
    return len(non_alpha_num)

df_features['special_chars_count'] = df['url'].apply(count_special_chars)   # 특수 문자의 갯수
df_features

def count_keyword_flags(url):
    # 찾고자 하는 키워드 목록
    keywords = [
        "php", "index", "option","article", "content", "html", "view", "component"
    ]

    # URL을 소문자로 변환
    url_lower = url.lower()

    # 각 키워드가 포함되어 있는지 여부를 1 또는 0으로 반환
    return {keyword: (1 if keyword in url_lower else 0) for keyword in keywords}

# 각 URL에 대해 키워드 포함 여부를 데이터프레임으로 변환
keyword_flags_df = df['url'].apply(count_keyword_flags).apply(pd.Series)

# 기존 데이터프레임에 키워드 포함 여부 데이터프레임을 병합
df_features = pd.concat([df_features, keyword_flags_df], axis=1)

df_features

def count_slash(url):
    # '/'의 개수 계산
    return url.count('/')

def count_hyphen(url):
    # '-'의 개수 계산
    return url.count('-')

def count_dot(url):
    # '.'의 개수 계산
    return url.count('.')

def count_underscore(url):
    # '_'의 개수 계산
    return url.count('_')

def count_equals(url):
    # '='의 개수 계산
    return url.count('=')

def count_question(url):
    # '_'의 개수 계산
    return url.count('?')

def count_percent(url):
    # '%'의 개수 계산
    return url.count('%')

df_features['slash_count'] = df['url'].apply(lambda x: count_slash(x))
df_features['hyphen_count'] = df['url'].apply(lambda x: count_hyphen(x))
df_features['underscore_count'] = df['url'].apply(lambda x: count_underscore(x))
df_features['dot_count'] = df['url'].apply(lambda x: count_dot(x))
df_features['equals_count'] = df['url'].apply(lambda x: count_equals(x))
df_features['question_count'] = df['url'].apply(lambda x: count_question(x))
df_features['percent_count'] = df['url'].apply(lambda x: count_percent(x))

# 결과 확인
df_features

from urllib.parse import urlparse, parse_qs

def count_url_parameters(url):
    # URL 파싱
    parsed_url = urlparse(url)

    # URL로부터 query 추출
    query = parsed_url.query

    # parse_qs() 이용하여 query 분석
    parameters = parse_qs(query)

    # parameter 개수 리턴
    return len(parameters)

df_features['param_count'] = df['url'].apply(count_url_parameters)    # URL의 parameter 개수
df_features

def no_of_dir(url):
    urldir = urlparse(url).path
    return urldir.count('/')

df_features['count_dir'] = df['url'].apply(lambda i: no_of_dir(i))    # directory 개수
df_features

def abnormal_url(url):
    hostname = urlparse(url).hostname
    hostname = str(hostname)
    match = re.search(hostname, url)
    if match:
        # print match.group()
        return 1
    else:
        # print 'No matching pattern found', "component"
        return 0


df_features['abnormal_url'] = df['url'].apply(lambda i: abnormal_url(i))
df_features

# 'www' 갯수
df_features['count_www'] = df['url'].apply(lambda i: i.count('www'))
df_features

def suspicious_words(url):
    match = re.search('PayPal|login|signin|bank|account|update|free|lucky|service|bonus|ebayisapi|webscr',
                      url)
    if match:
        return 1
    else:
        return 0

df_features['sus_url'] = df['url'].apply(lambda i: suspicious_words(i))
df_features

# 열 이름 출력
print("x_val 배열의 열 이름 (순서대로):")
for idx, name in enumerate(df_features.columns[1:]): # url 열 제외
    print(f"Feature_{idx}: {name}")

# 4. 수치형 특성과 시퀀스 데이터 결합
# Select only numerical features from df_features
numerical_features = df_features.select_dtypes(include=np.number).columns
X_num = df_features[numerical_features].values

X_seq = seq_data

# Convert X_num to float32
X_num = X_num.astype(np.float32) # Convert X_num to float32

# 데이터 나누기
X_train_num, X_temp_num, X_train_seq, X_temp_seq, y_train, y_temp = train_test_split(
    X_num, X_seq, y, test_size=0.4, random_state=42, stratify=y.argmax(axis=1)
)
X_val_num, X_test_num, X_val_seq, X_test_seq, y_val, y_test = train_test_split(
    X_temp_num, X_temp_seq, y_temp, test_size=0.5, random_state=42, stratify=y_temp.argmax(axis=1)
)

# 5. 모델 구성
# 수치형 입력
input_num = Input(shape=(X_train_num.shape[1],), name='Numerical_Input')
x_num = Dense(64, activation='relu')(input_num)
x_num = Dropout(0.3)(x_num)
x_num = Dense(32, activation='relu')(x_num)

# LSTM 입력
input_seq = Input(shape=(max_sequence_length,), name='Sequence_Input')
x_seq = Embedding(input_dim=max_num_words, output_dim=embedding_dim)(input_seq)
x_seq = LSTM(64, return_sequences=True)(x_seq)
x_seq = LSTM(32, return_sequences=True)(x_seq)  # return_sequences=True로 수정

# x_num을 3차원으로 변환
x_num = Reshape((1, 32))(x_num)  # 32는 x_num의 마지막 차원 크기

# 교차 주의 메커니즘
attention_output = Attention()([x_seq, x_num])
attention_output = LayerNormalization()(attention_output)

# Flatten 및 최종 출력
merged = Flatten()(attention_output)
merged = Dense(64, activation='relu')(merged)
merged = Dropout(0.3)(merged)
output = Dense(y.shape[1], activation='softmax', name='Output')(merged)

# 모델 정의
model = Model(inputs=[input_num, input_seq], outputs=output)

from tensorflow.keras.callbacks import EarlyStopping

# 조기 종료 콜백
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

# 6. 모델 컴파일 및 학습
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
history = model.fit(
    [X_train_num, X_train_seq], y_train,
    validation_data=([X_val_num, X_val_seq], y_val),
    epochs=20, batch_size=128,
    callbacks=[early_stopping]
)

# 7. 평가
test_loss, test_accuracy = model.evaluate([X_test_num, X_test_seq], y_test)
print("Test Loss:", test_loss)
print("Test Accuracy:", test_accuracy)

# 모델 시각화 파일 저장 및 출력
def visualize_model_in_colab(model, file_name="model_structure.png"):
    plot_model(
        model,
        to_file=file_name,
        show_shapes=True,
        show_layer_names=True
    )
    return Image(file_name)

# 실행
visualize_model_in_colab(model)