### ***Install & Import***

In [None]:
!pip install pymongo                  # 安裝 pymongo 庫，用於與 MongoDB 資料庫進行互動
!pip install mysql-connector-python   # 安裝 mysql-connector-python 庫，用於與 MySQL 資料庫進行連線和操作
!pip install pandas                   # 安裝 pandas 庫，用於資料分析和資料處理，提供強大的資料結構和資料分析工具

Collecting pymongo
  Downloading pymongo-4.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Downloading pymongo-4.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m16.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dnspython-2.7.0-py3-none-any.whl (313 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.6/313.6 kB[0m [31m15.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dnspython, pymongo
Successfully installed dnspython-2.7.0 pymongo-4.10.1
Collecting mysql-connector-python
  Downloading mysql_connector_python-9.1.0-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (6.0 kB)
Downloading mysql_connector_python-9.1.0-cp310-cp310-manylinux_2_28_x86_64.whl (34.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import os                                           # 匯入作業系統模組，用來進行檔案和目錄操作
import pandas as pd                                 # 匯入Pandas庫，用於資料處理和分析
import pymongo                                      # 匯入pymongo庫，用於與MongoDB進行連線和操作
import mysql.connector                              # 匯入MySQL連線模組，用於連線MySQL資料庫
import re                                           # 匯入正規表示式模組，用於字串匹配和處理
import heapq                                        # 匯入堆積佇列模組，用於優先佇列和選擇
import json                                         # 匯入JSON模組，用於處理JSON格式的資料
import logging                                      # 匯入日誌紀錄模組，用於紀錄程式執行中的資訊
from pymongo import MongoClient                     # 匯入MongoDB客戶端，用於建立與MongoDB的連線
from mysql.connector import Error, pooling          # 匯入MySQL的錯誤處理和連線池管理模組
from collections import Counter, defaultdict        # 匯入計數器和預設字典，用於資料統計與組織
from datetime import datetime                       # 匯入日期時間模組，用於日期和時間的處理
from typing import List, Dict, Any, Optional, Tuple # 匯入型別提示，用於函式的型別定義
from concurrent.futures import ThreadPoolExecutor   # 匯入執行緒池執行器，用於多執行緒處理
from functools import lru_cache                     # 匯入LRU快取，用於提升函式效能
from contextlib import contextmanager               # 匯入上下文管理器，用於資源管理

### ***Data Cleaning & Transformation***

### ***Database Manager***

In [None]:
class DatabaseManager:
    """資料庫管理類別，包括MongoDB和MySQL的配置與操作"""

    MONGO_URI = "mongodb://user4:password4@35.189.181.117:28017/admin"  # MongoDB連線字串
    MYSQL_CONFIG = {  # MySQL配置字典
        'host': '34.81.244.193',
        'database': 'PTT',
        'user': 'user3',
        'password': 'password3',
        'pool_size': 5,
        'pool_name': 'mypool',
        'buffered': True
    }
    BATCH_SIZE = 100  # 批次處理大小

    def __init__(self):
        """初始化方法，設定MongoDB客戶端和MySQL連線池"""
        self._mongo_client = None  # MongoDB客戶端
        self._mysql_pool = None  # MySQL連線池
        self._setup_logging()  # 設定日誌紀錄

    def _setup_logging(self):
        """配置日誌紀錄的處理器和格式"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('post_analyzer.log'),  # 寫入日誌檔案
                logging.StreamHandler()  # 也輸出到控制檯
            ]
        )

    @contextmanager
    def get_mongo_connection(self):
        """MongoDB連線的上下文管理器"""
        if not self._mongo_client:  # 檢查MongoDB客戶端是否已初始化
            try:
                self._mongo_client = MongoClient(DatabaseManager.MONGO_URI)  # 建立MongoDB連線
                db = self._mongo_client['kafka']  # 指定資料庫
                collection = db['merged_collection']  # 指定集合
                yield collection  # 輸出集合以供使用
            except Exception as e:
                logging.error(f"MongoDB連線錯誤: {e}")  # 紀錄錯誤資訊
                raise  # 引發例外
        else:
            yield self._mongo_client['kafka']['merged_collection']  # 使用已存在的MongoDB連線

    def _create_mysql_pool(self):
        """如果不存在，建立MySQL連線池"""
        if not self._mysql_pool:  # 檢查MySQL連線池是否已初始化
            try:
                self._mysql_pool = mysql.connector.pooling.MySQLConnectionPool(
                    **DatabaseManager.MYSQL_CONFIG  # 使用配置初始化連線池
                )
                logging.info("MySQL連線池建立成功")  # 紀錄成功訊息
            except Error as e:
                logging.error(f"建立MySQL連線池時出錯: {e}")  # 紀錄錯誤資訊
                raise  # 引發例外

    @contextmanager
    def get_mysql_connection(self):
        """MySQL連線的上下文管理器，使用連線池"""
        if not self._mysql_pool:  # 檢查是否需要建立MySQL連線池
            self._create_mysql_pool()  # 建立連線池

        connection = None  # 初始化連線變數
        try:
            connection = self._mysql_pool.get_connection()  # 從連線池獲取連線
            yield connection  # 輸出連線以供使用
        except Error as e:
            logging.error(f"從池中獲取MySQL連線時出錯: {e}")  # 紀錄錯誤資訊
            raise  # 引發例外
        finally:
            if connection:  # 確保連線被正確關閉
                connection.close()  # 關閉連線

### ***Post Processor***

In [None]:
def extract_topic_from_title(title: str) -> str:
    """
    從標題中提取主題，主題是方括號內的內容
    """
    if not title:
        return '其他'
    match = re.search(r'\[(.*?)\]', title)
    return match.group(1) if match else '其他'

class PostProcessor:
    def __init__(self):
        self.db_manager = DatabaseManager()
        self._setup_tables()
        self.processed_links = {}

    def _setup_tables(self):
        """設定 MySQL 資料庫表格結構"""
        drop_table_query = "DROP TABLE IF EXISTS posts"
        create_table_query = """
        CREATE TABLE IF NOT EXISTS posts (
            id VARCHAR(255) PRIMARY KEY,
            來源 VARCHAR(50),
            發佈日期 DATE,
            連結 VARCHAR(255),
            主題 VARCHAR(100),
            標題 TEXT,
            內容 TEXT,
            記者或作者 VARCHAR(255),
            留言數 INT,
            正向心情總數 INT,
            負向心情總數 INT,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
            UNIQUE KEY unique_link (連結)
        ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
        """

        with self.db_manager.get_mysql_connection() as conn:
            cursor = conn.cursor()
            try:
                cursor.execute(create_table_query)
                self._create_indexes(cursor)
                conn.commit()
            except Error as e:
                logging.error(f"建立表格時出錯: {e}")
                raise
            finally:
                cursor.close()

    def _create_indexes(self, cursor):
        """建立必要的索引"""
        indexes = [
            ('idx_post_date', '發佈日期'),
            ('idx_post_link', '連結')
        ]

        for index_name, column in indexes:
            try:
                # 先檢查索引是否存在
                cursor.execute("""
                    SELECT COUNT(1)
                    FROM INFORMATION_SCHEMA.STATISTICS
                    WHERE TABLE_SCHEMA = DATABASE()
                    AND TABLE_NAME = 'posts'
                    AND INDEX_NAME = %s
                """, (index_name,))

                index_exists = cursor.fetchone()[0] > 0

                # 如果索引不存在，則創建
                if not index_exists:
                    cursor.execute(f"CREATE INDEX {index_name} ON posts({column})")
                    logging.info(f"Created index {index_name}")
            except Error as e:
                logging.error(f"建立索引時出錯: {e}")
                if e.errno != 1061:  # 忽略"索引已存在"錯誤
                    raise
    def process_documents(self):
        """從 MongoDB 讀取並處理文檔，將處理後的資料批次存入 MySQL"""
        BATCH_SIZE = 100  # 設定批次大小
        processed_count = 0
        error_count = 0
        batch = []

        try:
            with self.db_manager.get_mongo_connection() as collection:
                documents = collection.find({})

                for doc in documents:
                    try:
                        # 處理單個文檔
                        processed_data = self._process_single_document(doc)
                        if not processed_data:
                            continue

                        # 檢查是否需要更新
                        link = processed_data['link']
                        current_date = processed_data['date']

                        # 如果文檔已處理且日期不比已存在的新，則跳過
                        if link in self.processed_links:
                            stored_date = self.processed_links[link]
                            if current_date <= stored_date:
                                continue

                        batch.append(processed_data)

                        # 當達到批次大小時，執行存儲
                        if len(batch) >= BATCH_SIZE:
                            self._save_batch(batch)
                            processed_count += len(batch)
                            batch = []

                    except Exception as e:
                        error_count += 1
                        logging.error(f"處理文檔時出錯: {str(e)}")
                        continue

                # 處理剩餘的文檔
                if batch:
                    self._save_batch(batch)
                    processed_count += len(batch)

        except Exception as e:
            logging.error(f"批次處理文檔時出錯: {str(e)}")
            raise
        finally:
            logging.info(f"處理完成。成功: {processed_count}, 失敗: {error_count}")

    def _save_batch(self, batch):
        """將一批文檔存入 MySQL"""
        if not batch:
            return

        with self.db_manager.get_mysql_connection() as conn:
            cursor = conn.cursor()
            try:
                for post_data in batch:
                    self.save_post(cursor, post_data)
                    self.processed_links[post_data['link']] = post_data['date']
                conn.commit()
                logging.info(f"成功儲存 {len(batch)} 篇文章")
            except Error as e:
                logging.error(f"儲存批次時出錯: {str(e)}")
                conn.rollback()
                raise
            finally:
                cursor.close()
    @lru_cache(maxsize=1000)
    def format_date(self, date_string: str) -> Optional[str]:
        """統一日期格式為 YYYY-MM-DD"""
        if not date_string:
            return None

        try:
            # 處理不同來源的日期格式
            formats = {
                'news': (r'\d{4}年\d{1,2}月\d{1,2}日',
                        lambda x: x.replace('年','-').replace('月','-').replace('日','')),
                'ptt': (r'\d{1,2}/\d{1,2}',
                       lambda x: f"2024-{int(x.split('/')[0]):02d}-{int(x.split('/')[1]):02d}"),
                'dcard': (r'\d{4}-\d{2}-\d{2}T',
                         lambda x: x.split('T')[0])
            }

            for _, (pattern, formatter) in formats.items():
                if re.search(pattern, date_string):
                    return formatter(date_string)
            return None

        except Exception as e:
            logging.error(f"日期解析錯誤: {e}, 日期字串: {date_string}")
            return None

    def _process_single_document(self, doc: Dict) -> Optional[Dict]:
        """處理單一文件，從不同來源提取標準化資料"""
        try:
            value = doc.get('value', {})
            if not isinstance(value, dict) or '_id' not in doc:
                return None

            source_url = doc.get('key', '') or value.get('url', '')
            source = self._determine_source(source_url)

            # 根據來源設定資料對應
            field_mapping = {
                'PTT': {
                    'date': value.get('發佈日期'),
                    'title': value.get('標題'),
                    'content': value.get('內容'),
                    'author': value.get('作者'),
                    'comments': len(value.get('留言', [])),
                    'topic': extract_topic_from_title(value.get('標題', ''))
                },
                'Dcard': {
                    'date': value.get('發布時間'),
                    'title': value.get('標題'),
                    'content': value.get('內容'),
                    'author': value.get('作者'),
                    'comments': len(value.get('留言', [])),
                    'topic': value.get('看版', '其他'),
                    'emotions': value.get('emoji類型', [{}])[0]
                }
            }

            # 新聞網站通用映射
            news_mapping = {
                'date': value.get('date'),
                'title': value.get('title'),
                'content': value.get('content'),
                'author': value.get('reporter', ''),
                'comments': 0,
                'topic': '最新' if value.get('category') == 'homepage' else value.get('category', '最新')
            }

            # 獲取對應的資料映射
            data_map = field_mapping.get(source, news_mapping)

            # 處理日期
            formatted_date = self.format_date(data_map['date'])
            if not formatted_date:
                return None

            # 處理情感數據
            pos_emotions = 0
            neg_emotions = 0

            if source == 'PTT':
                pos_emotions = int(value.get('推', 0))
                neg_emotions = int(value.get('噓', 0))
            elif source == 'Dcard' and 'emotions' in data_map:
                pos_emotions = sum(int(data_map['emotions'].get(k, 0)) for k in ['愛心', '哈哈', '跪'])
                neg_emotions = sum(int(data_map['emotions'].get(k, 0)) for k in ['驚訝', '嗚嗚', '森77'])

            return {
                'id': str(doc['_id']),
                'source': source,
                'date': formatted_date,
                'link': source_url,
                'topic': data_map['topic'],
                'title': data_map['title'],
                'content': data_map['content'],
                'author': data_map['author'],
                'comment_count': data_map['comments'],
                'positive_emotions': pos_emotions,
                'negative_emotions': neg_emotions
            }

        except Exception as e:
            logging.error(f"處理文檔時出錯: {e}, 文檔: {doc}")
            return None

    @staticmethod
    def _determine_source(url: str) -> str:
        """根據 URL 判斷來源"""
            # 检查 url 是否为 None 或空字符串
        if not url:
            return 'ETtoday'

        url_mapping = {
            'ptt': 'PTT',
            'dcard': 'Dcard',
            'ettoday': 'ETtoday',
            'yahoo': 'Yahoo',
            'setn': '三立新聞網',
            'money.udn': '聯合新聞網',
            'ltn': '自由時報'
        }

        url_lower = url.lower()
        for key, value in url_mapping.items():
            if key in url_lower:
                return value
        return '其他新聞網'

    def save_post(self, cursor, post_data: Dict):
        """儲存單篇文章到資料庫"""
        insert_query = """
            INSERT INTO posts (
                id, 來源, 發佈日期, 連結, 主題, 標題, 內容, 記者或作者,
                留言數, 正向心情總數, 負向心情總數)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE
                來源 = VALUES(來源),
                發佈日期 = VALUES(發佈日期),
                主題 = VALUES(主題),
                標題 = VALUES(標題),
                內容 = VALUES(內容),
                記者或作者 = VALUES(記者或作者),
                留言數 = VALUES(留言數),
                正向心情總數 = VALUES(正向心情總數),
                負向心情總數 = VALUES(負向心情總數)
        """

        cursor.execute(insert_query, (
            post_data['id'],
            post_data['source'],
            post_data['date'],
            post_data['link'],
            post_data['topic'],
            post_data['title'],
            post_data['content'],
            post_data['author'],
            post_data['comment_count'],
            post_data['positive_emotions'],
            post_data['negative_emotions']
        ))

### ***Main***

In [None]:
# ----------------------主執行函式，包含適當的錯誤處理-------------------------------
def main():
    processor = None
    try:
        processor = PostProcessor()  # 初始化文章處理器
        # 運行除錯模式
        processor.process_documents() # 執行檔案處理
        logging.info("處理完成")  # 紀錄成功訊息
    except Exception as e:
        logging.error(f"主流程出錯: {e}")  # 紀錄錯誤訊息
        raise  # 重新丟擲例外

In [None]:
if __name__ == "__main__":
    main()