In [1]:
!pip uninstall kafka-python
!pip install kafka-python

[0mCollecting kafka-python
  Downloading kafka_python-2.2.15-py2.py3-none-any.whl.metadata (10.0 kB)
Downloading kafka_python-2.2.15-py2.py3-none-any.whl (309 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m309.8/309.8 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.2.15


In [2]:
# --- 1. Mount Google Drive ---
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import os

# --- 4. Verify Essential Paths from Google Drive ---
MODEL_CHECKPOINT_PATH = "/content/drive/MyDrive/DS200_project/Vietnamese_Political_Events_Extraction/streaming/best_model.pt"
ENCODER_DIR_PATH = "/content/drive/MyDrive/DS200_project/Vietnamese_Political_Events_Extraction/streaming/oneie_encoders"

if not os.path.exists(MODEL_CHECKPOINT_PATH) or not os.path.exists(ENCODER_DIR_PATH):
    print(f"❌ ERROR: Could not find required model/encoder files. Please check your Drive paths.")
else:
    print("✅ Google Drive paths verified.")

✅ Google Drive paths verified.


In [4]:
import time

# --- Cấp quyền thực thi ---
!chmod +x /content/drive/MyDrive/DS200_project/Vietnamese_Political_Events_Extraction/streaming/kafka_2.13-3.6.1/kafka_2.13-3.6.1/bin/*.sh

# --- Start Zookeeper and Kafka Broker ---
print("Starting Kafka infrastructure...")
!/content/drive/MyDrive/DS200_project/Vietnamese_Political_Events_Extraction/streaming/kafka_2.13-3.6.1/kafka_2.13-3.6.1/bin/zookeeper-server-start.sh -daemon /content/drive/MyDrive/DS200_project/Vietnamese_Political_Events_Extraction/streaming/kafka_2.13-3.6.1/kafka_2.13-3.6.1/config/zookeeper.properties
!/content/drive/MyDrive/DS200_project/Vietnamese_Political_Events_Extraction/streaming/kafka_2.13-3.6.1/kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon /content/drive/MyDrive/DS200_project/Vietnamese_Political_Events_Extraction/streaming/kafka_2.13-3.6.1/kafka_2.13-3.6.1/config/server.properties

print("Waiting for Kafka and Zookeeper to initialize...")
time.sleep(10)

# --- Create a Kafka Topic ---
print("Creating Kafka topic 'articles_for_extraction'...")
!/content/drive/MyDrive/DS200_project/Vietnamese_Political_Events_Extraction/streaming/kafka_2.13-3.6.1/kafka_2.13-3.6.1/bin/kafka-topics.sh --create --topic articles_for_extraction --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

print("\nVerifying topic creation:")
!/content/drive/MyDrive/DS200_project/Vietnamese_Political_Events_Extraction/streaming/kafka_2.13-3.6.1/kafka_2.13-3.6.1/bin/kafka-topics.sh --list --bootstrap-server localhost:9092


Starting Kafka infrastructure...
Waiting for Kafka and Zookeeper to initialize...
Creating Kafka topic 'articles_for_extraction'...
[2025-07-02 01:35:10,433] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2025-07-02 01:35:10,566] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2025-07-02 01:35:10,667] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2025-07-02 01:35:10,869] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2025-07-02 01:3

In [5]:
# Import core libraries
import json
import pickle
from pathlib import Path

# Import ML/DL libraries
import torch
import torch.nn as nn
from transformers import XLMRobertaTokenizerFast, XLMRobertaModel

# --- Model Architecture ---
import torch
import torch.nn as nn
from transformers import XLMRobertaModel

# Model cải thiện với weighted loss
class JointModel(nn.Module):
    def __init__(self, model_name, num_trigger_labels, num_arg_labels, num_event_labels, dropout_rate=0.1):
        super().__init__()
        self.xlm_roberta = XLMRobertaModel.from_pretrained(model_name)
        hidden_size = self.xlm_roberta.config.hidden_size

        self.trigger_classifier = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_size // 2, num_trigger_labels)
        )

        self.arg_classifier = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_size // 2, num_arg_labels)
        )

        self.event_classifier = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_size // 2, num_event_labels)
        )

    def forward(self, input_ids, attention_mask):
        outputs = self.xlm_roberta(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = outputs.last_hidden_state
        cls_output = sequence_output[:, 0, :]

        trigger_logits = self.trigger_classifier(sequence_output)
        arg_logits = self.arg_classifier(sequence_output)
        event_logits = self.event_classifier(cls_output)

        return trigger_logits, arg_logits, event_logits

# --- Encapsulated Inference Class ---
class EventExtractor:
    def __init__(self, model_path, encoder_dir, model_name='xlm-roberta-base', max_length=128):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.max_length = max_length
        self.tokenizer = XLMRobertaTokenizerFast.from_pretrained(model_name)
        self.encoders = self._load_encoders(encoder_dir)
        self.model = self._load_model(model_path)
        self.model.to(self.device)
        self.model.eval()
        print(f"✅ EventExtractor is ready and running on {self.device}.")

    def _load_encoders(self, encoder_dir):
        encoders = {}
        for name in ['trigger', 'arg', 'event']:
            with open(Path(encoder_dir) / f"{name}_encoder.pkl", "rb") as f:
                encoders[name] = pickle.load(f)
        return encoders

    def _load_model(self, model_path):
        model = JointModel('xlm-roberta-base', 26, 7, 14)
        model.load_state_dict(torch.load(model_path, map_location=self.device))
        return model

    def predict(self, text):
        if not text: return {'error': 'Input text is empty.'}
        tokens = text.strip().split()
        encoding = self.tokenizer(tokens, is_split_into_words=True, return_tensors='pt', padding='max_length', truncation=True, max_length=self.max_length)
        word_ids = encoding.word_ids()
        inputs = {k: v.to(self.device) for k, v in encoding.items()}

        with torch.no_grad():
            trigger_logits, arg_logits, event_logits = self.model(**inputs)

        trigger_preds, arg_preds, event_pred = torch.argmax(trigger_logits, -1).cpu().numpy()[0], torch.argmax(arg_logits, -1).cpu().numpy()[0], torch.argmax(event_logits, -1).cpu().item()

        predicted_tags, previous_word_idx = {}, None
        for i, word_idx in enumerate(word_ids):
            if word_idx is None or word_idx == previous_word_idx: continue
            predicted_tags[word_idx] = {'trigger': self.encoders['trigger'].inverse_transform([trigger_preds[i]])[0], 'argument': self.encoders['arg'].inverse_transform([arg_preds[i]])[0]}
            previous_word_idx = word_idx

        extracted_entities = [{"token": token, "trigger_tag": predicted_tags[i]['trigger'], "argument_tag": predicted_tags[i]['argument']} for i, token in enumerate(tokens) if i in predicted_tags and (predicted_tags[i]['trigger'] != 'O' or predicted_tags[i]['argument'] != 'O')]

        return {'event_type': self.encoders['event'].inverse_transform([event_pred])[0], 'entities': extracted_entities}

print("✅ Custom model and extractor classes defined.")

✅ Custom model and extractor classes defined.


In [6]:
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from tqdm.notebook import tqdm

def run_crawler_and_produce(producer, topic):
    """Crawls articles and sends them directly to a Kafka topic."""
    HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
    BASE_URL = 'https://baochinhphu.vn'
    all_seen_urls, initial_articles = set(), []

    print("--- CRAWLER & PRODUCER: Starting to crawl article list ---")
    for page in range(1, 3):
        url = f'{BASE_URL}/chinh-tri/trang-{page}.htm'
        try:
            response = requests.get(url, headers=HEADERS, timeout=10)
            soup = BeautifulSoup(response.text, 'html.parser')
            for a_tag in soup.select('.box-category-item a, .list__main li a'):
                href, title = a_tag.get('href'), a_tag.get('title')
                if href and title:
                    full_url = BASE_URL + href
                    if full_url not in all_seen_urls:
                        all_seen_urls.add(full_url)
                        initial_articles.append({'title': title.strip(), 'url': full_url})
        except requests.RequestException: continue

    print(f"\n--- CRAWLER & PRODUCER: Fetching details and publishing to Kafka ---")
    total_published = 0
    for article in tqdm(initial_articles, desc="Crawling & Publishing"):
        try:
            res = requests.get(article['url'], headers=HEADERS, timeout=15)
            soup = BeautifulSoup(res.text, 'html.parser')
            summary = soup.find('h2', class_='detail-sapo').get_text(strip=True) if soup.find('h2', class_='detail-sapo') else None

            if summary:
                # Create the message payload
                message_payload = {
                    'url': article['url'],
                    'title': article['title'],
                    'summary': summary
                }
                # Send the message to Kafka
                producer.send(topic, value=message_payload)
                total_published += 1
            time.sleep(0.2)
        except Exception: continue

    producer.flush()
    print(f"\n✅ Producer finished. Published {total_published} articles to topic '{topic}'.")


# --- Main Producer Execution ---
kafka_producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
KAFKA_TOPIC = 'articles_for_extraction'
run_crawler_and_produce(kafka_producer, KAFKA_TOPIC)

--- CRAWLER & PRODUCER: Starting to crawl article list ---

--- CRAWLER & PRODUCER: Fetching details and publishing to Kafka ---


Crawling & Publishing:   0%|          | 0/2 [00:00<?, ?it/s]


✅ Producer finished. Published 2 articles to topic 'articles_for_extraction'.


In [7]:
from kafka import KafkaConsumer
from tqdm.notebook import tqdm
import os

# --- Main Consumer Execution ---
try:
    # 1. Instantiate the inference engine
    event_extractor = EventExtractor(
        model_path=MODEL_CHECKPOINT_PATH,
        encoder_dir=ENCODER_DIR_PATH
    )

    # 2. Configure the Kafka Consumer
    consumer = KafkaConsumer(
        'articles_for_extraction',
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='earliest', # Start from the beginning of the topic
        consumer_timeout_ms=10000,    # Stop if no new messages for 10s
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )

    print("\n🚀 CONSUMER: Starting to listen for messages...")
    all_extracted_events = []

    # 3. Main processing loop
    for message in tqdm(consumer, desc="CONSUMER Processing messages"):
        article_data = message.value
        text_to_process = article_data.get("summary")

        # Perform event extraction
        extracted_event_data = event_extractor.predict(text_to_process)

        # Combine results
        final_record = {
            "source_url": article_data.get("url"),
            "source_title": article_data.get("title"),
            "original_summary": text_to_process,
            "extracted_event": extracted_event_data
        }
        all_extracted_events.append(final_record)

        # Optional: Print real-time results for one message
        print(f"\n--- Processed: {final_record['source_title'][:50]}... ---")
        print(json.dumps(final_record['extracted_event'], ensure_ascii=False, indent=2))
        print("-" * 50)


    print(f"\nPipeline finished. Processed {len(all_extracted_events)} articles.")

    # 4. Save the final aggregated results
    OUTPUT_EVENTS_FILE = 'extracted_events_from_kafka.json'
    print(f"💾 Saving all results to {OUTPUT_EVENTS_FILE}...")
    with open(OUTPUT_EVENTS_FILE, 'w', encoding='utf-8') as f:
        json.dump(all_extracted_events, f, ensure_ascii=False, indent=4)
    print("✅ All done. Results saved.")

except Exception as e:
    print(f"\n❌ An error occurred during the consumer process: {e}")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

sentencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.10M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/615 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.12G [00:00<?, ?B/s]



✅ EventExtractor is ready and running on cpu.

🚀 CONSUMER: Starting to listen for messages...


CONSUMER Processing messages: 0it [00:00, ?it/s]


--- Processed: Phát biểu của Tổng Bí thư Tô Lâm tại lễ kỷ niệm 11... ---
{
  "event_type": "Publication",
  "entities": [
    {
      "token": "Hôm",
      "trigger_tag": "O",
      "argument_tag": "B-Arg-Time"
    },
    {
      "token": "nay,",
      "trigger_tag": "O",
      "argument_tag": "B-Arg-Time"
    },
    {
      "token": "Hưng",
      "trigger_tag": "O",
      "argument_tag": "B-Arg-Location"
    },
    {
      "token": "Yên,",
      "trigger_tag": "O",
      "argument_tag": "B-Arg-Location"
    },
    {
      "token": "thư",
      "trigger_tag": "O",
      "argument_tag": "B-Arg-Subject"
    },
    {
      "token": "phát",
      "trigger_tag": "B-Publication",
      "argument_tag": "O"
    },
    {
      "token": "biểu",
      "trigger_tag": "B-Publication",
      "argument_tag": "O"
    },
    {
      "token": "lễ",
      "trigger_tag": "B-Commemoration",
      "argument_tag": "O"
    },
    {
      "token": "kỷ",
      "trigger_tag": "B-Commemoration",
      "argument_

In [8]:
from kafka import KafkaConsumer
from tqdm.notebook import tqdm
import os

# --- Main Consumer Execution ---
try:
    # 1. Instantiate the inference engine
    event_extractor = EventExtractor(
        model_path=MODEL_CHECKPOINT_PATH,
        encoder_dir=ENCODER_DIR_PATH
    )

    # 2. Configure the Kafka Consumer
    consumer = KafkaConsumer(
        'articles_for_extraction',
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='earliest', # Start from the beginning of the topic
        consumer_timeout_ms=10000,    # Stop if no new messages for 10s
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )

    print("\n🚀 CONSUMER: Starting to listen for messages...")
    all_extracted_events = []

    # 3. Main processing loop
    for message in tqdm(consumer, desc="CONSUMER Processing messages"):
        article_data = message.value
        text_to_process = article_data.get("summary")

        # Perform event extraction
        extracted_event_data = event_extractor.predict(text_to_process)

        # Combine results
        final_record = {
            "source_url": article_data.get("url"),
            "source_title": article_data.get("title"),
            "original_summary": text_to_process,
            "extracted_event": extracted_event_data
        }
        all_extracted_events.append(final_record)

        # Optional: Print real-time results for one message
        print(f"\n--- Processed: {final_record['source_title'][:50]}... ---")
        print(json.dumps(final_record['extracted_event'], ensure_ascii=False, indent=2))
        print("-" * 50)


    print(f"\nPipeline finished. Processed {len(all_extracted_events)} articles.")

    # 4. Save the final aggregated results
    OUTPUT_EVENTS_FILE = 'extracted_events_from_kafka.json'
    print(f"💾 Saving all results to {OUTPUT_EVENTS_FILE}...")
    with open(OUTPUT_EVENTS_FILE, 'w', encoding='utf-8') as f:
        json.dump(all_extracted_events, f, ensure_ascii=False, indent=4)
    print("✅ All done. Results saved.")

except Exception as e:
    print(f"\n❌ An error occurred during the consumer process: {e}")



✅ EventExtractor is ready and running on cpu.

🚀 CONSUMER: Starting to listen for messages...


CONSUMER Processing messages: 0it [00:00, ?it/s]


--- Processed: Phát biểu của Tổng Bí thư Tô Lâm tại lễ kỷ niệm 11... ---
{
  "event_type": "Publication",
  "entities": [
    {
      "token": "Hôm",
      "trigger_tag": "O",
      "argument_tag": "B-Arg-Time"
    },
    {
      "token": "nay,",
      "trigger_tag": "O",
      "argument_tag": "B-Arg-Time"
    },
    {
      "token": "Hưng",
      "trigger_tag": "O",
      "argument_tag": "B-Arg-Location"
    },
    {
      "token": "Yên,",
      "trigger_tag": "O",
      "argument_tag": "B-Arg-Location"
    },
    {
      "token": "thư",
      "trigger_tag": "O",
      "argument_tag": "B-Arg-Subject"
    },
    {
      "token": "phát",
      "trigger_tag": "B-Publication",
      "argument_tag": "O"
    },
    {
      "token": "biểu",
      "trigger_tag": "B-Publication",
      "argument_tag": "O"
    },
    {
      "token": "lễ",
      "trigger_tag": "B-Commemoration",
      "argument_tag": "O"
    },
    {
      "token": "kỷ",
      "trigger_tag": "B-Commemoration",
      "argument_