In [None]:
!pip install beautifulsoup4 whois requests

URL데이터 파일 업로드 (파일 업로드 따로 할거면 사용 X)

In [None]:
from google.colab import files
uploaded = files.upload()

라이브러리 임포트 및 데이터 로드

In [None]:
import pandas as pd
import urllib.parse
import whois
import socket
import re
import requests
from bs4 import BeautifulSoup
from datetime import datetime, timedelta

# URL 파일 경로
url_file_path = 'urls.txt'

# URL 데이터를 읽어와 데이터프레임으로 변환
urls = pd.read_csv(url_file_path, header=None, names=['url'])

피처 수집 함수 정의

In [None]:
import pandas as pd
import re
import urllib.parse
import whois
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
import requests
import socket
from concurrent.futures import ThreadPoolExecutor, as_completed

# 1. 정규 표현식으로 IP 주소 형식 패턴 정의
ip_pattern = r"(?:\d{1,3}\.){3}\d{1,3}"

# 2. URL 단축 서비스 패턴 정의
shortening_services = r"bit\.ly|goo\.gl|shorte\.st|go2l\.ink|x\.co|ow\.ly|t\.co|tinyurl|tr\.im|is\.gd|cli\.gs|" \
                      r"yfrog\.com|migre\.me|ff\.im|tiny\.cc|url4\.eu|twit\.ac|su\.pr|twurl\.nl|snipurl\.com|" \
                      r"short\.to|BudURL\.com|ping\.fm|post\.ly|Just\.as|bkite\.com|snipr\.com|fic\.kr|loopt\.us|" \
                      r"doiop\.com|short\.ie|kl\.am|wp\.me|rubyurl\.com|om\.ly|to\.ly|bit\.do|t\.co|lnkd\.in|db\.tt|" \
                      r"qr\.ae|adf\.ly|goo\.gl|bitly\.com|cur\.lv|tinyurl\.com|ow\.ly|bit\.ly|ity\.im|q\.gs|is\.gd|" \
                      r"po\.st|bc\.vc|twitthis\.com|u\.to|j\.mp|buzurl\.com|cutt\.us|u\.bb|yourls\.org|x\.co|" \
                      r"prettylinkpro\.com|scrnch\.me|filoops\.info|vzturl\.com|qr\.net|1url\.com|tweez\.me|v\.gd|" \
                      r"tr\.im|link\.zip\.net|buff\.ly|rb\.gy|rebrand\.ly|short\.cm|clk\.im|cutt\.ly|t2m\.io|bl\.ink|" \
                      r"tiny\.cc"

# 타임아웃 시간 설정
TIMEOUT = 3

# 피처 수집 함수 정의
def extract_features(url):
    result = {}

    # 1. IP_LIKE
    result['IP_LIKE'] = 1 if re.search(ip_pattern, url) else 0

    # 2. AT
    result['AT'] = 1 if "@" in url else 0

    # 3. URL_Depth
    path = urllib.parse.urlparse(url).path
    segments = [segment for segment in path.split('/') if segment]
    depth = len(segments)
    result['URL_Depth'] = -1 if depth == 0 else (0 if depth == 1 else 1)

    # 4. Redirection
    result['Redirection'] = 1 if url.rfind('//') > 6 else 0

    # 5. Is_Https
    result['Is_Https'] = 1 if urllib.parse.urlsplit(url).scheme == 'https' else 0

    # 6. TINY_URL
    result['TINY_URL'] = 1 if re.search(shortening_services, url) else 0

    # 7. Check_Hyphen
    result['Check_Hyphen'] = 1 if '-' in url else 0

    # 8. Query
    def parse_query_string(url):
        if '?' not in url:
            return 0
        query_string = url.split('?')[-1]
        return len(query_string.split('&'))

    def categorize_query_count(count):
        return -1 if count == 0 else (0 if count == 1 else 1)

    result['Query'] = categorize_query_count(parse_query_string(url))

    # 9. Domain_Age
    def is_domain_created(url):
        try:
            domain_name = urllib.parse.urlsplit(url).netloc
            socket.setdefaulttimeout(TIMEOUT)
            domain_info = whois.whois(domain_name)
            creation_date = domain_info.creation_date

            if isinstance(creation_date, list):
                creation_date = creation_date[0]

            if not isinstance(creation_date, datetime):
                return 1

            today = datetime.today()
            one_years_ago = today - timedelta(days=365)

            return 0 if creation_date <= one_years_ago else 1
        except Exception:
            return 1

    result['Domain_Age'] = is_domain_created(url)

    # 10. Domain_end
    def domain_end(domain_name):
        try:
            socket.setdefaulttimeout(TIMEOUT)
            domain_info = whois.whois(domain_name)
            expiration_date = domain_info.expiration_date

            if isinstance(expiration_date, list):
                expiration_date = expiration_date[0]

            if isinstance(expiration_date, str):
                expiration_date = datetime.strptime(expiration_date, "%Y-%m-%d")

            if expiration_date is None:
                return 0

            if expiration_date.tzinfo is not None:
                expiration_date = expiration_date.replace(tzinfo=None)

            today = datetime.now()
            days_until_expiry = (expiration_date - today).days

            return 1 if (days_until_expiry / 30) < 5 else 0
        except Exception:
            return 1

    result['Domain_end'] = domain_end(urllib.parse.urlsplit(url).netloc)

    # 11. Mouseover
    def check_mouseover(html_content):
        soup = BeautifulSoup(html_content, 'html.parser')
        return 1 if soup.find(attrs={"onmouseover": True}) else 0

    def check_url(url):
        try:
            response = requests.get(url, timeout=TIMEOUT)
            return check_mouseover(response.text)
        except requests.RequestException:
            return -1

    result['Mouseover'] = check_url(url)

    # 12. Web_Forwards
    def web_forwards(url):
        try:
            response = requests.get(url, allow_redirects=True, timeout=TIMEOUT)
            return 0 if len(response.history) <= 2 else 1
        except requests.RequestException:
            return 1

    result['Web_forwards'] = web_forwards(url)

    # 13. Hyperlinks
    def count_hyperlinks(url):
        try:
            response = requests.get(url, timeout=TIMEOUT)
            soup = BeautifulSoup(response.content, 'html.parser')
            return len(soup.find_all('a'))
        except requests.RequestException:
            return -1

    result['Hyperlinks'] = count_hyperlinks(url)

    # 14. Domain Consistency
    def check_domain_consistency(url):
        try:
            response = requests.get(url, timeout=TIMEOUT)
            original_domain = urllib.parse.urlparse(url).netloc
            final_domain = urllib.parse.urlparse(response.url).netloc
            return 1 if original_domain == final_domain else 0
        except requests.RequestException:
            return -1

    result['Domain_Consistency'] = check_domain_consistency(url)

    return result

 피처 수집 실행 및 데이터프레임 생성

In [None]:
# 모든 URL에 대해 피처 수집
features = urls['url'].apply(extract_features)

# 결과를 데이터프레임으로 변환
df_results = pd.DataFrame(features.tolist())

# 결과 확인
print(df_results.head())

데이터 전처리

In [None]:
# 결측치 처리
df_results.fillna(0, inplace=True)  # 결측치를 0으로 대체

# 데이터 정규화 (Min-Max Scaling 예시)
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(df_results.drop(columns=['label']))  # 레이블 제외

머신러닝 모델 학습

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score

# 레이블 데이터 (예시로 0과 1로 구성된 레이블을 생성)
# 실제로는 레이블을 별도로 준비해야 함
df_results['label'] = [0, 1] * (len(df_results) // 2)  # 예시로 임의의 레이블 추가(수정 필요)

# 피처와 레이블 분리
X = df_results.drop(columns=['label'])
y = df_results['label']

# 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 모델 초기화 및 학습
model = RandomForestClassifier()
model.fit(X_train, y_train)

# 예측
y_pred = model.predict(X_test)

# 결과 평가
print(classification_report(y_test, y_pred))
print('Accuracy:', accuracy_score(y_test, y_pred))


모델 성능 평가

In [None]:
from sklearn.model_selection import cross_val_score

# K-Fold 교차 검증
scores = cross_val_score(model, X_scaled, y, cv=5)
print("Cross-validation scores:", scores)
print("Mean score:", scores.mean())

모델 성능 시각화

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# 혼동 행렬 생성
cm = confusion_matrix(y_test, y_pred)

# 혼동 행렬 시각화
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Normal', 'Phishing'], yticklabels=['Normal', 'Phishing'])
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.title('Confusion Matrix')
plt.show()

하이퍼파라미터 튜닝

In [None]:
from sklearn.model_selection import GridSearchCV

# 하이퍼파라미터 그리드 설정
param_grid = {
    'n_estimators': [50, 100, 200],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10]
}

grid_search = GridSearchCV(RandomForestClassifier(), param_grid, cv=5, scoring='accuracy')
grid_search.fit(X_train, y_train)

# 최적의 하이퍼파라미터 출력
print("Best parameters:", grid_search.best_params_)
best_model = grid_search.best_estimator_

모델 저장

In [None]:
import joblib

# 모델 저장
joblib.dump(model, 'phishing_detection_model.pkl')