In [1]:
!pip install dotenv unsloth trl accelerate bitsandbytes peft transformers datasets

Collecting dotenv
  Using cached dotenv-0.9.9-py2.py3-none-any.whl.metadata (279 bytes)
Collecting unsloth
  Downloading unsloth-2025.5.8-py3-none-any.whl.metadata (47 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m47.1/47.1 kB[0m [31m740.0 kB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hCollecting trl
  Downloading trl-0.18.0-py3-none-any.whl.metadata (11 kB)
Collecting accelerate
  Downloading accelerate-1.7.0-py3-none-any.whl.metadata (19 kB)
Collecting bitsandbytes
  Downloading bitsandbytes-0.46.0-py3-none-manylinux_2_24_x86_64.whl.metadata (10 kB)
Collecting peft
  Downloading peft-0.15.2-py3-none-any.whl.metadata (13 kB)
Collecting transformers
  Downloading transformers-4.52.3-py3-none-any.whl.metadata (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.2/40.2 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting datasets
  Using cached datasets-3.6.0-py3-none-any.whl.metadata (19 kB)
Collecting python-dotenv (from dote

In [2]:
import transformers
import torch
torch.cuda.get_device_name(0)

  from .autonotebook import tqdm as notebook_tqdm


'NVIDIA GeForce RTX 3050 Laptop GPU'

## CHECK URL, MAIL, PHONE

In [None]:
VT_API_KEY=''
ABSTRACT_EMAIL_API=''
ABSTRACT_PHONE_API=''

In [None]:
import os
import base64
import requests

SUSPICIOUS_COUNTRIES = {"Cambodia", "Nigeria", "Pakistan", "Afghanistan", "North Korea"}

def check_url_virustotal(url):
    api_key = VT_API_KEY
    url_id = base64.urlsafe_b64encode(url.encode()).decode().strip("=")
    vt_url = f"https://www.virustotal.com/api/v3/urls/{url_id}"

    headers = {
        "x-apikey": api_key
    }

    response = requests.get(vt_url, headers=headers)
    return response.json()

def parse_vt_result_for_display(vt_json):
    try:
        data = vt_json["data"]["attributes"]
        stats = data["last_analysis_stats"]

        url = data.get("last_final_url", data.get("url", ""))

        harmless = stats.get("harmless", 0)
        malicious = stats.get("malicious", 0)
        suspicious = stats.get("suspicious", 0)
        undetected = stats.get("undetected", 0)

        # Đánh giá tổng quát
        if malicious > 0:
            overall = "Nguy hiểm"
        elif suspicious > 0:
            overall = "Có thể đáng ngờ"
        else:
            overall = "An toàn"

        results = {
            "url": url,
            "harmless": harmless,
            "malicious": malicious,
            "suspicious": suspicious,
            "undetected": undetected,
            "overall": overall
        }

        return results

    except Exception as e:
        return {
            "error": f"Không thể phân tích dữ liệu VirusTotal: {e}"
        }

def check_email_validity(email):
    api_key = ABSTRACT_EMAIL_API
    url = "https://emailvalidation.abstractapi.com/v1/"
    params = {
        "api_key": api_key,
        "email": email
    }
    response = requests.get(url, params=params)
    return response.json()

def parse_email_result(result):
    try:
        email = result.get("email", "N/A")
        deliverability = result.get("deliverability", "UNKNOWN")
        is_format_valid = result["is_valid_format"]["value"]
        is_smtp_valid = result["is_smtp_valid"]["value"]
        is_mx_found = result["is_mx_found"]["value"]
        is_free = result["is_free_email"]["value"]
        is_disposable = result["is_disposable_email"]["value"]
        is_role = result["is_role_email"]["value"]

        # Tổng kết hợp lệ
        is_valid = all([
            is_format_valid,
            is_smtp_valid,
            is_mx_found,
            deliverability == "DELIVERABLE"
        ])

        result_dict = {
            "email": email,
            "valid": is_valid,
            "deliverability": deliverability,
            "is_format_valid": is_format_valid,
            "is_smtp_valid": is_smtp_valid,
            "is_mx_found": is_mx_found,
            "is_free_email": is_free,
            "is_disposable_email": is_disposable,
            "is_role_email": is_role,
            "conclusion": (
                "Hợp lệ (SMTP & MX tồn tại)" if is_valid else
                "Không hợp lệ hoặc không gửi được"
            ),
            "description": {
                "type": "Miễn phí" if is_free else "Domain riêng",
                "spam": "Tạm thời / spam" if is_disposable else "Không phải spam",
                "role": "Đại diện tổ chức" if is_role else "Email cá nhân"
            }
        }

        return result_dict

    except Exception as e:
        return {
            "error": f"Không thể phân tích kết quả email: {e}"
        }

def normalize_phone_vn(phone: str) -> str:
    if phone.startswith("0") and len(phone) == 10:
        return "+84" + phone[1:]
    elif phone.startswith("+84"):
        return phone
    return phone

def check_phone_validity(phone):
    api_key = ABSTRACT_PHONE_API
    if not api_key:
        raise ValueError("❌ ABSTRACT_PHONE_API chưa được thiết lập trong .env")

    url = "https://phonevalidation.abstractapi.com/v1/"
    normalized_phone = normalize_phone_vn(phone)
    params = {
        "api_key": api_key,
        "phone": normalized_phone
    }

    response = requests.get(url, params=params)
    if response.status_code != 200:
        raise Exception(f"Lỗi API: {response.status_code} – {response.text}")

    return response.json()

# Hàm phân tích kết quả trả về
def parse_phone_result(result):
    try:
        phone = result.get("phone")
        valid = result.get("valid", False)
        country = result.get("country", {}).get("name", "")
        country_code = result.get("country", {}).get("code", "")
        intl_format = result.get("format", {}).get("international", "")
        local_format = result.get("format", {}).get("local", "")

        is_foreign = country and country != "Vietnam"
        is_high_risk = country in SUSPICIOUS_COUNTRIES

        return {
            "phone": phone,
            "valid": valid,
            "international_format": intl_format,
            "local_format": local_format,
            "country": country,
            "country_code": country_code,
            "location": result.get("location"),
            "carrier": result.get("carrier"),
            "type": result.get("type"),
            "is_foreign_number": is_foreign,
            "is_high_risk_country": is_high_risk,
            "conclusion": (
                "Không hợp lệ" if not valid else
                "Số từ quốc gia rủi ro (cần cẩn trọng)" if is_high_risk else
                "Số từ nước ngoài" if is_foreign else
                "Số hợp lệ nội địa"
            )
        }

    except Exception as e:
        return {
            "error": f"Lỗi phân tích dữ liệu số điện thoại: {e}"
        }

def build_checks_summary(url=None, email=None, phone=None):
    parts = []

    if url:
        url_result = check_url_virustotal(url)
        check_url = parse_vt_result_for_display(url_result)
        parts.append(f"Kết quả kiểm tra URL: {check_url}")

    if email:
        mail_result = check_email_validity(email)
        check_mail = parse_email_result(mail_result)
        parts.append(f"Kết quả kiểm tra Mail: {check_mail}")

    if phone:
        phone_result = check_phone_validity(phone)
        check_phone = parse_phone_result(phone_result)
        parts.append(f"Kết quả kiểm tra Phone: {check_phone}")

    return parts

## Load model

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "unsloth/gemma-3-4b-it"

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype="auto",
    device_map="auto"
)

tokenizer = AutoTokenizer.from_pretrained(model_name)

In [None]:
prompt = "Bạn đã trúng thưởng 1 chiếc Iphone 16, hãy nhấn vào link để nhận thưởng"
messages = [
    {"role": "system", "content": "Bạn là 1 AI thông minh hỗ trợ phân loại tin tức real và fake. Hãy phân loại tin tức người dùng thuộc loại real hoặc fake. Nhớ chỉ cần trả lời đúng là real hoặc fake không cần giải thích thêm"},
    {"role": "user", "content": prompt}
]

In [None]:
text = tokenizer.apply_chat_template(
    messages,
    tokenize=False,
    add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)

generated_ids = model.generate(
    **model_inputs,
    max_new_tokens=512
)
generated_ids = [
    output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]

response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
print(response)

## Finetune model

In [None]:
import pandas as pd

# Load dữ liệu
train_df = pd.read_csv("/content/train.csv")
val_df   = pd.read_csv("/content/val.csv")

train_df = train_df[["text", "label"]]
val_df   = val_df[["text", "label"]]

# Định dạng instruction-style (prompt/response)
def format_supervised(example):
    return {
        "prompt": f"Phân loại tin tức sau là real hay fake:\n\n{example['text']}",
        "response": example["label"],
    }

train_data = train_df.apply(format_supervised, axis=1).to_list()
val_data   = val_df.apply(format_supervised, axis=1).to_list()

In [None]:
from unsloth import FastLanguageModel
from transformers import AutoTokenizer

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "unsloth/gemma-3-4b-it",
    max_seq_length = 1024,
    load_in_4bit = True,  # giảm RAM cho Colab free
)

model.config.use_cache = False

# Cấu hình LoRA
model = FastLanguageModel.get_peft_model(
    model,
    r = 8,
    lora_alpha = 16,
    lora_dropout = 0.05,
    # target_modules = ["q_proj", "v_proj"],  # dùng mặc định
    target_modules=['o_proj', 'qkv_proj', 'gate_up_proj', 'down_proj'],
    bias = "none",
    use_gradient_checkpointing=False,
    random_state = 42,
    use_rslora = False,
    loftq_config=None,
)

==((====))==  Unsloth 2025.5.7: Fast Gemma3 patching. Transformers: 4.51.3.
   \\   /|    Tesla T4. Num GPUs = 1. Max memory: 14.741 GB. Platform: Linux.
O^O/ \_/ \    Torch: 2.7.0+cu126. CUDA: 7.5. CUDA Toolkit: 12.6. Triton: 3.3.0
\        /    Bfloat16 = FALSE. FA [Xformers = 0.0.30. FA2 = False]
 "-____-"     Free license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!
Unsloth: Using float16 precision for gemma3 won't work! Using float32.
Unsloth: Making `model.base_model.model.language_model.model` require gradients


In [None]:
from datasets import Dataset

def convert_to_prompt(row):
    return {
        "text": f"### Instruction:\nPhân loại tin tức sau là real hay fake:\n\n{row['text']}\n\n### Response:\n{row['label']}"
    }

# Áp dụng cho từng bộ dữ liệu
train_dataset = Dataset.from_pandas(train_df.apply(convert_to_prompt, axis=1, result_type="expand"))
val_dataset   = Dataset.from_pandas(val_df.apply(convert_to_prompt, axis=1, result_type="expand"))

In [None]:
from transformers import TrainingArguments
training_args = TrainingArguments(
        output_dir = "Gemma3-lora-output",
        per_device_train_batch_size = 2,
        per_device_eval_batch_size = 2,
        gradient_accumulation_steps=4,
        num_train_epochs = 3,
        learning_rate = 2e-4,
        logging_steps = 10,
        eval_steps=50,
        save_strategy="steps",
        save_steps=50,
        optim="adamw_8bit",
        lr_scheduler_type="linear",
        seed=3407,
        fp16 = True,
    )

### Gemma3

In [None]:
from trl import SFTTrainer

trainer = SFTTrainer(
    model=model,
    tokenizer=tokenizer,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    formatting_func = lambda example: [example["text"]],
    max_seq_length=2048,
    args=training_args
)

trainer.train()

In [None]:
trainer.model.save_pretrained("Gemma3-lora-news")
tokenizer.save_pretrained("Gemma3-lora-news")

## Inference

In [None]:
from unsloth import FastLanguageModel
from transformers import AutoTokenizer

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "qwen2.5_finetuned",
    max_seq_length = 1024,
    load_in_4bit = True,
)

In [None]:
prompt = "Phân loại tin tức sau là real hay fake:\n\nBạn đã trúng thưởng giải Jackpot trị giá 1 tỷ đồng tại https://www.x311y.com/. Nhấn vào link để nhận ngay\n\n### Response:"
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

outputs = model.generate(**inputs, max_new_tokens=20)
response = tokenizer.decode(outputs[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
print("Kết luận:", response)

## Test model after finetune

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import json
import re

# Load model (Qwen fine-tuned)
from unsloth import FastLanguageModel
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "Gemma3-lora-news",
    max_seq_length = 1024,
    load_in_4bit = True,
)

def extract_contact_info(text: str) -> str:
    email_pattern = r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+"
    url_pattern = r"https?://[^\s]+|www\.[^\s]+"
    phone_pattern = r"(\+?84|0)?\s?(\d{9,10})"

    email = ''
    phone = ''
    url = ''

    email_match = re.search(email_pattern, text)
    if email_match:
        email = email_match.group(0)
    
    # Tìm URL đầu tiên
    url_match = re.search(url_pattern, text)
    if url_match:
        url = url_match.group(0)
    
    # Tìm số điện thoại đầu tiên
    phone_match = re.search(phone_pattern, text)
    if phone_match:
        phone = "".join([g if g is not None else "" for g in phone_match.groups()]) if phone_match else ""

    return email, phone, url

def classify_news(input_text: str, check_summary: list) -> str:
    joined_check = "\n".join(check_summary)
    full_prompt = f"""Bạn là trợ lý AI có nhiệm vụ xác thực tin tức là real hay fake.

    Thông tin cần xác thực: {input_text}

    Kết quả kiểm tra bổ sung (nếu có):
    {joined_check}

    Yêu cầu:
    Chỉ trả lời duy nhất 1 trong 2 từ sau: real hoặc fake.
    Không thêm giải thích, không ghi chú, không dòng thừa.


    Kết luận:"""

    inputs = tokenizer(full_prompt, return_tensors="pt").to(model.device)
    outputs = model.generate(**inputs, max_new_tokens=20)
    response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True)
    return response.strip()

In [None]:
# ==== TEST ====
input_text = "Chính phủ ra lệnh cấm sử dụng mạng xã hội Facebook tại Việt Nam từ tháng sau"
email, phone, url = extract_contact_info(input_text)

check_summary = build_checks_summary(url, email, phone)
# print("Check Summary:", check_summary)

final_label = classify_news(input_text, check_summary)
print("\n🧠 Kết luận cuối cùng:", final_label)

## Testing with test_dataset

In [None]:
import pandas as pd
import json

df = pd.read_csv('/content/test.csv')
llm_outputs = []

for i, row in df.iterrows():
    input_text = row['text']

    try:
        email, phone, url = extract_contact_info(input_text)
        check_summary = build_checks_summary(url, email, phone)
        # Phân loại
        final_label = classify_news(input_text, check_summary)
        llm_outputs.append(final_label)
        print("Kết luận:", final_label)

    except Exception as e:
        error_msg = f"Error: {str(e)}"
        llm_outputs.append(error_msg)
        print("❌ Lỗi xử lý:", error_msg)

# Ghi kết quả vào cột mới và lưu file
df['Gemma3_4B_finetuned'] = llm_outputs
df.to_csv('test_output_Gemma3_4B_finetuned.csv', index=False)

print("\nĐã xử lý xong toàn bộ test.csv và lưu kết quả.")