In [1]:
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from scipy.sparse import hstack
import lightgbm as lgb
from urllib.parse import urlparse, parse_qs
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score
import joblib
from urllib.parse import urlparse, parse_qs

In [2]:
data = pd.read_csv('model_data(1).csv')

In [4]:
def tokenize_url(url: str):
    """
    주어진 URL을 토큰화하여 리스트로 반환하는 함수.

    Parameters:
    url (str): 토큰화할 URL 문자열.

    Returns:
    list: URL의 토큰 리스트.
    """
    tokens = []
    # URL 파싱
    parsed_url = urlparse(url)
    
    # 도메인 토큰화
    domain_tokens = parsed_url.netloc.split('.')
    tokens.extend(domain_tokens)
    
    # 경로 토큰화
    path_tokens = parsed_url.path.split('/')
    tokens.extend([token for token in path_tokens if token])
    
    # 쿼리 매개변수 토큰화
    query_tokens = parse_qs(parsed_url.query)
    for key, values in query_tokens.items():
        tokens.append(key)
        tokens.extend(values)
    
    return tokens

data['Tokenized_url'] = data['url'].apply(lambda x: ' '.join(tokenize_url(x)))

In [6]:
#텍스트 피처 벡터화
vectorizer = CountVectorizer()
X_text = vectorizer.fit_transform(data['Tokenized_url'])

#다른 피처들과 결합
# Tokenized_url을 제외한 나머지 피처들
other_features = data.drop(columns=['url', 'Label', 'Tokenized_url'])
X_other = other_features.values

# 희소 행렬과 밀집 행렬 결합
from scipy.sparse import hstack
X = hstack([X_text, X_other])

# 종속 변수
y = data['Label']

In [7]:
# 데이터를 훈련 데이터와 테스트 데이터로 나누기
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 훈련 데이터를 다시 훈련 데이터와 검증 데이터로 나누기
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=42)  # 0.25 * 0.8 = 0.2

훈련 데이터 크기: (230916, 243448) (230916,)
검증 데이터 크기: (76972, 243448) (76972,)
테스트 데이터 크기: (76972, 243448) (76972,)


In [8]:
from sklearn.ensemble import RandomForestClassifier

# 랜덤 포레스트 모델 초기화
rf_model = RandomForestClassifier(random_state=42)

In [None]:
# 모델 훈련
rf_model.fit(X_train, y_train)

In [None]:
# 모델 및 벡터라이저 저장
joblib.dump(rf_model, 'rf_model.pkl')
joblib.dump(vectorizer, 'vectorizer.pkl')

In [None]:
# 검증 데이터에 대한 예측
val_predictions = rf_model.predict(X_val)

# 검증 데이터 정확도 평가
val_accuracy = accuracy_score(y_val, val_predictions)

In [None]:
# 테스트 데이터에 대한 예측 및 평가
test_predictions = rf_model.predict(X_test)
test_accuracy = accuracy_score(y_test, test_predictions)
test_f1 = f1_score(y_test, test_predictions)
test_recall = recall_score(y_test, test_predictions)
test_precision = precision_score(y_test, test_predictions)

print(f"Validation Accuracy: {val_accuracy}")
print(f"Test Accuracy: {test_accuracy}")
print(f"Test F1 Score: {test_f1}")
print(f"Test Recall: {test_recall}")
print(f"Test Precision: {test_precision}")

In [None]:
from flask import Flask, request, jsonify
import joblib
from urllib.parse import urlparse, parse_qs
import re
import urllib

app = Flask(__name__)

# 모델 및 벡터라이저 로드
rf_model = joblib.load('rf_model.pkl')
vectorizer = joblib.load('vectorizer.pkl')

ip_pattern = r"(?:\d{1,3}\.){3}\d{1,3}"
shorteningServices = r"bit\.ly|goo\.gl|shorte\.st|go2l\.ink|x\.co|ow\.ly|t\.co|tinyurl|tr\.im|is\.gd|cli\.gs|yfrog\.com|migre\.me|ff\.im|tiny\.cc|url4\.eu|twit\.ac|su\.pr|twurl\.nl|snipurl\.com|short\.to|BudURL\.com|ping\.fm|post\.ly|Just\.as|bkite\.com|snipr\.com|fic\.kr|loopt\.us|doiop\.com|short\.ie|kl\.am|wp\.me|rubyurl\.com|om\.ly|to\.ly|bit\.do|t\.co|lnkd\.in|db\.tt|qr\.ae|adf\.ly|goo\.gl|bitly\.com|cur\.lv|tinyurl\.com|ow\.ly|bit\.ly|ity\.im|q\.gs|is\.gd|po\.st|bc\.vc|twitthis\.com|u\.to|j\.mp|buzurl\.com|cutt\.us|u\.bb|yourls\.org|x\.co|prettylinkpro\.com|scrnch\.me|filoops\.info|vzturl\.com|qr\.net|1url\.com|tweez\.me|v\.gd|tr\.im|link\.zip\.net|buff\.ly|rb\.gy|rebrand\.ly|short\.cm|clk\.im|cutt\.ly|t2m\.io|bl\.ink|tiny\.cc"

def parse_query_string(url):
    if '?' not in url:
        return {}
    query_string = url.split('?')[-1]
    query_pairs = query_string.split('&')
    params = {}
    for pair in query_pairs:
        split_pair = pair.split('=', 1)
        key = split_pair[0]
        value = split_pair[1] if len(split_pair) > 1 else None
        params[key] = value
    return params

def extract_features(url):
    features = {}
    features['IP_LIKE'] = 1 if re.search(ip_pattern, url) else 0
    features['AT'] = 1 if "@" in url else 0
    features['URL_Depth'] = len([segment for segment in urlparse(url).path.split('/') if segment])
    features['Redirection'] = 1 if url.rfind('//') > 6 else 0
    features['Is_Https'] = 1 if urllib.parse.urlsplit(url).scheme == 'https' else 0
    features['TINY_URL'] = 1 if re.search(shorteningServices, url) else 0
    features['Query'] = len(parse_query_string(url))
    features['(-)_InDomain'] = 1 if '-' in urlparse(url).netloc else 0
    return features

def tokenize_url(url: str):
    tokens = []
    parsed_url = urlparse(url)
    domain_tokens = parsed_url.netloc.split('.')
    tokens.extend(domain_tokens)
    path_tokens = parsed_url.path.split('/')
    tokens.extend([token for token in path_tokens if token])
    query_tokens = parse_qs(parsed_url.query)
    for key, values in query_tokens.items():
        tokens.append(key)
        tokens.extend(values)
    return tokens

def predict_phishing(url: str):
    tokenized_url = ' '.join(tokenize_url(url))
    url_vector = vectorizer.transform([tokenized_url])
    
    features = extract_features(url)
    other_features = [[features['IP_LIKE'], features['AT'], features['URL_Depth'], features['Redirection'], 
                       features['Is_Https'], features['TINY_URL'], features['Query'], features['(-)_InDomain']]]
    
    from scipy.sparse import hstack
    final_vector = hstack([url_vector, other_features])
    
    prediction = rf_model.predict(final_vector)
    return prediction[0]

@app.route('/predict', methods=['POST'])
def predict():
    data = request.get_json()
    url = data['url']
    prediction = predict_phishing(url)
    result = 'Phishing' if prediction == 1 else 'Not Phishing'
    return jsonify({'url': url, 'prediction': result})

if __name__ == '__main__':
    app.run(debug=True)
