<a href="https://colab.research.google.com/github/ShuoHengLi/text-classification-demo/blob/main/Animeclassification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 網路爬蟲

## 載入selenium環境參數準備爬蟲

In [None]:
!wget https://edgedl.me.gvt1.com/edgedl/chrome/chrome-for-testing/135.0.7049.84/linux64/chrome-linux64.zip -O chrome-linux64.zip
!unzip -q chrome-linux64.zip
!mv chrome-linux64 /opt/chrome

import os
os.environ['PATH'] += os.pathsep + "/opt/chrome"

!wget https://edgedl.me.gvt1.com/edgedl/chrome/chrome-for-testing/135.0.7049.84/linux64/chromedriver-linux64.zip -O chromedriver.zip
!unzip -q chromedriver.zip
!mv chromedriver-linux64/chromedriver /usr/bin/chromedriver
!chmod +x /usr/bin/chromedriver

!pip install selenium > /dev/null


In [None]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import NoSuchElementException
import time

chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")

driver = webdriver.Chrome(service=Service("/usr/bin/chromedriver"), options=chrome_options)

## 開始爬蟲，目標為bangumi上的動畫評論，記錄評論內容和分數

In [None]:
url = "https://bangumi.tv/subject/454684"+"/comments"
driver.get(url)
time.sleep(3)

rating = []
nextpage_times = 0
maxpage = float("inf")
while nextpage_times < maxpage:
  IDs = driver.find_elements(By.CLASS_NAME, "text")
  for ID in IDs:
    try:
      Comment = ID.find_element(By.CLASS_NAME, "comment")
      Score = ID.find_element(By.CLASS_NAME, "starstop-s")
      rating.append({
              "comment": Comment.text,
              "score": Score.find_element(By.XPATH, ".//*").get_attribute("class").replace("starlight stars","")
              })
    except:
      continue
  print(f"第 {nextpage_times+1}頁，評論總數：{len(rating)}")

  try:
      next_link = driver.find_element(By.LINK_TEXT, "››")
      next_link.click()
      nextpage_times += 1
      time.sleep(2)
  except NoSuchElementException:
      break

## 畫出分數的分配比例圖

In [None]:
import matplotlib.pyplot as plt
from collections import Counter

labels = [int(item["score"]) for item in rating]
label_counts = Counter(labels)

x = sorted(label_counts.keys())
y = [label_counts[i] for i in x]

plt.figure(figsize=(8, 5))
plt.bar(x, y, color='skyblue')
plt.xlabel("Score")
plt.ylabel("Count")
plt.title("Label Distribution")
plt.xticks(x)
plt.grid(axis='y', linestyle='--', alpha=0.5)
plt.show()


## 分割為train，validation和test三個檔案，並存為jsonl檔為後續fine tuning作準備

In [None]:
import json
import random
from sklearn.model_selection import train_test_split

random.seed(42)

train_data, temp_data = train_test_split(rating, test_size=0.2, random_state=42)
valid_data, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)

def save_jsonl(data, filename):
    with open(filename, "w", encoding="utf-8") as f:
        for item in data:
            f.write(json.dumps(item, ensure_ascii=False) + "\n")

save_jsonl(rating, "rating.jsonl")
save_jsonl(train_data, "train_data.jsonl")
save_jsonl(valid_data, "valid_data.jsonl")
save_jsonl(test_data, "test_data.jsonl")

# fine tuning model

## 下載hugging face的library作準備

In [None]:
!pip install transformers -q
!pip install datasets
import torch

from datasets import Dataset


## 讀取爬蟲時存下的jsonl檔

In [None]:
train_dataset = Dataset.from_json("train_data.jsonl")
valid_dataset = Dataset.from_json("valid_data.jsonl")
test_dataset = Dataset.from_json("test_data.jsonl")

## Tokenize，並將分數label簡化為三類以得到更好的training效果

In [None]:
from transformers import AutoTokenizer, BertTokenizer, AutoModelForSequenceClassification, BertForSequenceClassification, DataCollatorWithPadding
import math

tokenizer = AutoTokenizer.from_pretrained('uer/roberta-medium-word-chinese-cluecorpussmall')

def map_to_3class(score):
    s = int(score)
    if s <= 3:
        return 0
    elif s <= 7:
        return 1
    else:
        return 2

def tokenize_function(dataset):
    tokenized = tokenizer(dataset["comment"], truncation=True)
    tokenized["labels"] = [map_to_3class(s) for s in dataset["score"]]
    return tokenized

def preprocess_function(dataset):
    PreprocessData = dataset.map(tokenize_function, batched=True)
    PreprocessData = PreprocessData.remove_columns(["comment", "score"])
    return PreprocessData

tokenize_train_datasets = preprocess_function(train_dataset)
tokenize_valid_datasets = preprocess_function(valid_dataset)
tokenize_test_datasets = preprocess_function(test_dataset)
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

## 開始training

### 設定模型參數

In [None]:
import torch
from torch.utils.data import DataLoader
from torch.optim import AdamW
from transformers import trainer, get_scheduler, DataCollatorWithPadding
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from tqdm import tqdm

model = AutoModelForSequenceClassification.from_pretrained("uer/roberta-medium-word-chinese-cluecorpussmall", num_labels=3)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

batch_size = 32
num_epochs = 100
model_name = "AnimeComment"

train_loader = DataLoader(tokenize_train_datasets, batch_size=batch_size, shuffle=True, collate_fn=data_collator)
eval_loader = DataLoader(tokenize_valid_datasets, batch_size=batch_size, collate_fn=data_collator)

# Optimizer & Scheduler
optimizer = AdamW(model.parameters(), lr=2e-5, weight_decay=0.05)
num_training_steps = len(train_loader) * num_epochs
warmup_steps = num_training_steps * 0.1
lr_scheduler = get_scheduler(
    "cosine",
    optimizer=optimizer,
    num_warmup_steps=warmup_steps,
    num_training_steps=num_training_steps
)

# compute metrics
def compute_metrics(preds, labels):
    preds = torch.argmax(preds, dim=-1).cpu().numpy()
    labels = labels.cpu().numpy()
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': round(acc, 4),
        'precision': round(precision, 4),
        'recall': round(recall, 4),
        'f1': round(f1, 4)
    }

### 開始training

In [None]:
# data for picture drawing
train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

# Early stopping parameter
best_f1 = 0
patience_counter = 0
early_stopping_patience = 5

# training epochs
for epoch in range(num_epochs):
    # train
    model.train()
    total_loss = 0
    correct_preds = 0
    total_preds = 0
    for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        loss = outputs.loss

        loss.backward()
        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()

        total_loss += loss.item()
        preds = torch.argmax(outputs.logits, dim=-1)
        correct_preds += (preds == batch["labels"]).sum().item()
        total_preds += batch["labels"].size(0)

    epoch_train_loss = total_loss / len(train_loader)
    epoch_train_accuracy = correct_preds / total_preds
    train_losses.append(epoch_train_loss)
    train_accuracies.append(epoch_train_accuracy)
    print(f"Epoch {epoch+1}: Train Loss = {epoch_train_loss:.4f}, Train Accuracy = {epoch_train_accuracy:.4f}")

    # evaluate
    model.eval()
    all_preds = []
    all_labels = []
    eval_loss = 0
    correct_preds = 0
    total_preds = 0
    with torch.no_grad():
        for batch in tqdm(eval_loader, desc="Evaluating"):
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(**batch)
            eval_loss += outputs.loss.item()
            preds = torch.argmax(outputs.logits, dim=-1)
            correct_preds += (preds == batch["labels"]).sum().item()
            total_preds += batch["labels"].size(0)
            all_preds.append(outputs.logits)
            all_labels.append(batch["labels"])

    epoch_eval_loss = eval_loss / len(eval_loader)
    epoch_eval_accuracy = correct_preds / total_preds
    val_losses.append(epoch_eval_loss)
    val_accuracies.append(epoch_eval_accuracy)

    all_preds = torch.cat(all_preds)
    all_labels = torch.cat(all_labels)
    metrics = compute_metrics(all_preds, all_labels)
    print(f"Validation Loss: {epoch_eval_loss:.4f}, Validation Accuracy: {epoch_eval_accuracy:.4f}, Metrics: {metrics}")

    final_epoch = epoch

    # Early Stopping & Save
    if metrics["f1"] > best_f1:
        best_f1 = metrics["f1"]
        patience_counter = 0
        torch.save(model.state_dict(), "best_model.pt")
        print("Model saved.")
    else:
        patience_counter += 1
        if patience_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break

### 畫出learning curve

In [None]:
import matplotlib.pyplot as plt
plt.plot(range(1, final_epoch + 2), train_losses, 'b-', label="Train Loss")
plt.plot(range(1, final_epoch + 2), val_losses, 'b--', label="Val Loss")
plt.plot(range(1, final_epoch + 2), train_accuracies, 'r-', label="Train Accuracy")
plt.plot(range(1, final_epoch + 2), val_accuracies, 'r--', label="Val Accuracy")
plt.xlabel("Epochs")
plt.ylabel("Loss / Accuracy")
plt.title("Learning Curve")
plt.legend()
plt.show()

### 儲存model參數

In [None]:
model.save_pretrained(f"{model_name}_best")
tokenizer.save_pretrained(f"{model_name}_best")

## 開始testing

In [None]:
def test_model(model, test_dataset, tokenizer, batch_size=32, device="cuda"):
    model.eval()
    model.to(device)

    test_dataloader = DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        collate_fn=data_collator
    )

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in test_dataloader:
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(**batch)
            logits = outputs.logits
            preds = torch.argmax(logits, dim=-1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(batch["labels"].cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted')

    print(f"Test Accuracy: {acc:.4f}")
    print(f"Test Precision: {precision:.4f}")
    print(f"Test Recall: {recall:.4f}")
    print(f"Test F1 Score: {f1:.4f}")

    return all_preds, all_labels

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
test_preds, test_labels = test_model(model, tokenize_test_datasets, tokenizer, batch_size=32, device=device)

## inference

### 設定inference功能

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification

model_path = "AnimeComment_best"
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForSequenceClassification.from_pretrained(model_path)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval()

def inference_function(texts):
    inputs = tokenizer(texts, padding=True, truncation=True, return_tensors="pt", max_length=512)

    inputs = {key: val.to(device) for key, val in inputs.items()}

    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
        predictions = torch.argmax(logits, dim=-1)

    return predictions.cpu().numpy()

### 實測inference

In [None]:
test_text = "祥子與他的伙伴們的過家家，最後大團圓也拉不回分數"
prediction = inference_function(test_text)
print(f"Predicted label: {prediction}")