In [1]:
from pydantic import BaseModel, Field
from typing import Optional, Literal
from datetime import datetime

class TwitterQuery(BaseModel):
    query: str
    since: Optional[datetime] = None
    until: Optional[datetime] = None
    max_results: Optional[int] = Field(default=20, ge=1, le=20)
    mode: Literal['Latest', 'Top'] = 'Latest'

class SentimentAnswer(BaseModel):
    str: str
    sentiment: Literal['POSITIVE', 'NEGATIVE', 'NEUTRAL']
    

## Crawl data 1h trc -> cache tạm -> hết tiếng đấy thì xuất file cache -> push lên blob -> clean push lên pgsql

In [2]:
from datetime import datetime as dt, timedelta
import numpy as np
import time
import pandas as pd
import asyncio
import json

In [3]:
from twikit import Client
import os
import dotenv 
from typing import Literal

TW_USERNAME='Akikami_2005'
TW_EMAIL='nn20052004@gmail.com'
TW_PASSWORD='Hn20052004'



In [4]:
class TwikitClient:
    def __init__(self, language='en-US'):
        self.client = Client(language)
        self.is_logged_in = False
    
    async def login(self, mode=Literal['cookies', 'username']):
        """Async method to login to Twitter"""
        if mode == 'cookies':
            self.client.load_cookies('cookies.json')
            print("Logged in with cookies")
            self.is_logged_in = True
            return
        else:
            USERNAME = os.getenv('TW_USERNAME')
            EMAIL = os.getenv('TW_EMAIL')
            PASSWORD = os.getenv('TW_PASSWORD')

            await self.client.login(
                auth_info_1=USERNAME,
                auth_info_2=EMAIL,
                password=PASSWORD
            )
            self.client.save_cookies('cookies.json')
            print("Logged in with username and password")
            self.is_logged_in = True
            return 
    
    async def get_tweets(self, tq: TwitterQuery):
        """Async method to get tweets"""
        if not self.is_logged_in:
            await self.login()
            
        query_str = f"{tq.query}"
        if tq.since:
            query_str += f" since:{tq.since.date()}"
        if tq.until:
            query_str += f" until:{tq.until.date()}"
        
        tweets = await self.client.search_tweet(query=query_str, product=tq.mode, count=tq.max_results)
        return tweets


In [5]:
class TweetCache:
    def __init__(self):
        self.cache = []

    def add(self, tweet_list):
        """Thêm dữ liệu vào cache"""
        self.cache.extend(tweet_list)
    
    def get_all(self):
        """Lấy toàn bộ dữ liệu từ cache (không clear)"""
        return self.cache

    def save_to_file(self, base_dir, current_hour):
        """Ghi dữ liệu cache vào file"""
        os.makedirs(base_dir, exist_ok=True)

        path = os.path.join(base_dir, f"raw_{current_hour}_v1.json")
        
        # Kiểm tra nếu file đã tồn tại, thì append vào, nếu chưa tạo mới
        if os.path.exists(path):
            with open(path, "r+", encoding="utf-8") as f:
                # Đọc dữ liệu hiện có
                existing_data = json.load(f)
                existing_data.extend(self.cache)  # Gắn dữ liệu mới vào cuối

                # Di chuyển con trỏ về đầu file để ghi đè lại
                f.seek(0)
                json.dump(existing_data, f, ensure_ascii=False, indent=2)
            print(f"Đã thêm dữ liệu vào: {path}")
        else:
            with open(path, "w", encoding="utf-8") as f:
                json.dump(self.cache, f, ensure_ascii=False, indent=2)
            print(f"Đã lưu dữ liệu mới tại: {path}")

        # Xóa bộ nhớ cache sau khi lưu vào file
        self.cache = []

In [6]:
TWITTER_HASHTAGS = {
    'Pope Francis Passing': [
        '#RIPPopeFrancis',
        '#PopeFrancis',
        '#VaticanNews',
        '#CatholicChurch'
    ],
    'US Stocks': [
        '#nvda',
        '#tsla',
        '#aapl',
        '#stockmarket',
        '#stocks',
        '#investing'
    ]
}

In [8]:
async def crawl_tweet(client, hashtags_dict: dict):
    """
    Crawl tweets for multiple topics and their associated hashtags.

    :param client: TwikitClient instance.
    :param hashtags_dict: Dictionary where keys are topics and values are lists of hashtags.
    :return: Dictionary with topics as keys and crawled tweets as values.
    """
    await client.login(mode='cookies')

    tweets_by_topic = dict()
    now = dt.now()
    fifteen_minutes_ago = now - timedelta(minutes=15)

    for topic, hashtags in hashtags_dict.items():
        print(f"Crawling tweets for topic: {topic}")
        topic_tweets = []

        # Shuffle hashtags to randomize the order
        tags = np.random.permutation(hashtags)

        for tag in tags:
            tq = TwitterQuery(
                query=tag,
                mode='Latest',
            )
            try:
                tag_tweets = await client.get_tweets(tq)
                topic_tweets.extend([tweet for tweet in tag_tweets])

                await asyncio.sleep(15)  # async sleep để tránh lỗi
            except Exception as e:
                print(f"Error crawling {tag} for topic {topic}: {e}")
                continue

        # Store tweets for the current topic
        tweets_by_topic[topic] = topic_tweets

    return tweets_by_topic

def crawl_task(base_dir, hashtags_dict, **context):
    """
    Crawl tweets for multiple topics and save unique tweets to a cache file.

    :param base_dir: Directory to save the cache file.
    :param hashtags_dict: Dictionary where keys are topics and values are lists of hashtags.
    """
    client = TwikitClient()
    tweet_cache = TweetCache()  # Cache để lưu tạm thời

    # Tạo event loop mới
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    # Crawl dữ liệu
    tweets_by_topic = loop.run_until_complete(crawl_tweet(client, hashtags_dict))

    now = dt.now()
    current_hour = now.strftime('%Y%m%d_%H')  # Lấy phần ngày + giờ

    seen_texts = set()  # Tập hợp để theo dõi các tweet đã thấy
    unique_tweet_list = []  # Danh sách các tweet duy nhất

    for topic, tag_tweets in tweets_by_topic.items():
        # Signal: In ra số lượng tweet đã crawl được cho từng chủ đề
        print(f"[{now.strftime('%Y-%m-%d %H:%M:%S')}] Crawled {len(tag_tweets)} tweets for topic: {topic}")

        for tweet in tag_tweets:
            # Chỉ thêm tweet vào cache nếu nó chưa tồn tại
            if tweet.strip() not in seen_texts:
                seen_texts.add(tweet.strip())
                unique_tweet = {
                    'text': tweet.text.strip(),
                    'date_created':  tweet.created_at_datetime.strftime('%Y-%m-%d %H:%M:%S'),
                    'date_scrape': dt.now().strftime('%Y-%m-%d %H:%M:%S'),
                    'topic': topic  # Lưu chủ đề thay vì hashtag
                }
                unique_tweet_list.append(unique_tweet)

    # Gửi các tweet duy nhất vào cache
    tweet_cache.add(unique_tweet_list)

    # Lưu cache vào file bằng phương thức save_to_file
    tweet_cache.save_to_file(base_dir, current_hour)

    print(f"Data saved for hour {current_hour} with {len(unique_tweet_list)} unique tweets")
    return f"Data saved for hour {current_hour} with {len(unique_tweet_list)} unique tweets"


In [13]:
# mặc định là mode='username'
client = TwikitClient()
# await client.login()
import nest_asyncio
nest_asyncio.apply()
BASE_DIR = "././data_cache/raw" #CH test link này
output_path = crawl_task(base_dir=BASE_DIR, hashtags_dict=TWITTER_HASHTAGS)

print(f"Đã crawl xong. File lưu tại: {output_path}")

Logged in with cookies
Crawling tweets for topic: Pope Francis Passing
Crawling tweets for topic: US Stocks
[2025-05-02 03:34:48] Crawled 61 tweets for topic: Pope Francis Passing


AttributeError: 'str' object has no attribute 'text'

## Lưu vào blob lake raw(đã test lưu raw,ok)

In [None]:
from azure.storage.blob import BlobServiceClient
class BlobStorage:
    """
    Simple wrapper for Azure Blob Storage operations.
    Usage:
        blob = BlobStorage()
        blob.create_container('raw')
        blob.upload_file('raw', 'tweets.json', '/path/to/tweets.json')
        data = blob.download_blob('raw', 'tweets.json')
    """
    def __init__(self, conn_str: str):
        self.service_client = BlobServiceClient.from_connection_string(conn_str=conn_str)

    def create_container(self, container_name: str):
        """
        Create a container if it does not exist.
        """
        container = self.service_client.get_container_client(container_name)
        try:
            container.create_container()
        except Exception:
            # Container may already exist
            pass
        return container

    def upload_file(self, container_name: str, blob_name: str, file_path: str, overwrite: bool = True):
        """
        Upload a local file to a blob with input validation and enhanced error handling.
        """
        try:
            # Kiểm tra nếu file cục bộ tồn tại
            if not os.path.exists(file_path):
                raise FileNotFoundError(f"Local file '{file_path}' does not exist.")

            # Lấy container client
            container = self.service_client.get_container_client(container_name)

            # Kiểm tra nếu container không tồn tại, tạo mới
            if not container.exists():
                container.create_container()
                print(f"Container '{container_name}' created.")

            # Lấy blob client
            blob = container.get_blob_client(blob_name)

            # Mở file và tải lên blob
            with open(file_path, 'rb') as data:
                blob.upload_blob(data, overwrite=overwrite)
            print(f"File '{file_path}' uploaded to blob '{blob_name}' in container '{container_name}'.")

            # Trả về URL của blob
            return blob.url

        except FileNotFoundError as fnf_error:
            print(fnf_error)
            raise
        except Exception as e:
            print(f"Error uploading file '{file_path}' to blob '{blob_name}': {e}")
            raise
    
    def upload_blob(self,container_name, blob_name, data, overwrite=False):
        """
        Upload data to Azure Blob Storage. It ensures the blob name is valid.

        Parameters:
        - blob_name: The name of the blob (including folder structure).
        - data: The data to be uploaded (bytes).
        - overwrite: Boolean flag to overwrite if the blob already exists.
        """
        # Remove any trailing slashes from blob name to avoid invalid resource name
        blob_name = blob_name.rstrip('/')
        
        # Check if the blob name is valid
        if any(c in blob_name for c in ['\\', ':', '*', '?', '"', '<', '>', '|']):
            raise ValueError(f"Blob name contains invalid characters: {blob_name}")
        
        container = self.service_client.get_container_client(container_name)
        blob_client = container.get_blob_client(blob_name)

        try:
            # Upload the data
            blob_client.upload_blob(data, overwrite=overwrite)
            print(f"Blob '{blob_name}' uploaded successfully.")
        except Exception as e:
            print(f"Error uploading blob '{blob_name}': {e}")

    def upload_bytes(self, container: str, blob: str, data: bytes | str, *, overwrite: bool = True, blob_type: str = "BlockBlob") -> str:
        """Upload bytes or string to blob."""
        blob_c = self.client.get_blob_client(container, blob)
        
        # Chuyển đổi dữ liệu nếu là chuỗi
        if isinstance(data, str):
            data = data.encode('utf-8')  # Mã hóa chuỗi thành byte
        
        # Tải lên dữ liệu (bytes)
        blob_c.upload_blob(data, overwrite=overwrite, blob_type=blob_type)
        
        # Trả về URL của blob vừa tải lên
        return blob_c.url

    def create_folder_structure(self, container_name, folder_prefix):
        """
        Ensure folder structure is created in the container by uploading dummy files.

        Parameters:
        - folder_prefix: The folder structure to create.
        """
        # Ensure the folder structure exists by uploading a dummy file
        dummy_data = b""  # Empty data for a dummy file
        
        # Upload an init file to represent the folder structure
        self.upload_blob(container_name, f"{folder_prefix}/.init", dummy_data, overwrite=True)


    def download_blob(self, container_name: str, blob_name: str) -> bytes:
        """
        Download blob content and return as bytes.
        """
        blob = self.service_client.get_blob_client(container=container_name, blob=blob_name)
        downloader = blob.download_blob()
        return downloader.readall()

    def download_file(self, container: str, blob: str, path: str) -> str:
        """Download blob and write directly to file (streaming)."""
        blob_client = self.service_client.get_blob_client(container, blob)
        with open(path, "wb") as f:
            blob_client.download_blob().readinto(f)
        return path
    
    def read_json_from_container(self, container: str, file_path: str):
        """Read a JSON file directly from container into RAM without saving to local."""
        # 1. Lấy blob client từ container và đường dẫn file
        blob_client = self.service_client.get_blob_client(container=container, blob=file_path)
        # 2. Download toàn bộ nội dung file vào RAM
        blob_data = blob_client.download_blob().readall()        
        # 3. Parse JSON từ bytes
        data = json.loads(blob_data.decode('utf-8'))  # phải decode từ bytes -> str trước

        return data

    def list_blobs_by_path(self, container: str, path: str = "") -> list[str]:
        """
        List blobs or pseudo-folders in a given path within a container.
        If path is empty, list top-level blobs/folders.
        """
        container_client = self.service_client.get_container_client(container)
        path = path.strip('/')
        if path:
            path += "/"

        # Dùng delimiter để liệt kê như thư mục
        blob_list = container_client.walk_blobs(name_starts_with=path, delimiter='/')
        return [blob.name for blob in blob_list]

    def delete_blob(self, container_name: str, blob_name: str):
        """
        Delete a blob from a container.
        """
        blob = self.service_client.get_blob_client(container=container_name, blob=blob_name)
        blob.delete_blob()

    def delete_container(self, container_name: str):
        """
        Delete a container and all its blobs.
        """
        container = self.service_client.get_container_client(container_name)
        container.delete_container()

    def close(self):
        self.service_client.close()

In [25]:
import dotenv

dotenv.load_dotenv()

conn_str = os.getenv('LAKE_STORAGE_CONN_STR')
if not conn_str:
    raise EnvironmentError("LAKE_STORAGE_CONN_STR not found in environment variables.")

In [39]:
try:
    blob_storage = BlobStorage(conn_str)
    container_name = "data"  
    blob_name = "raw/raw.json"
    file_path = "../data_cache/raw/raw.json"

    # Kiểm tra file cục bộ
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Local file '{file_path}' does not exist.")

    # Kiểm tra nếu file rỗng
    if os.stat(file_path).st_size == 0:
        raise ValueError(f"File '{file_path}' is empty and will not be uploaded.")

    # Tải file lên
    blob_url = blob_storage.upload_file(
        container_name=container_name,
        blob_name=blob_name,
        file_path=file_path,
        overwrite=True
    )
    print(f"File uploaded successfully. Blob URL: {blob_url}")

except Exception as e:
    print(f"Error: {e}")

File '../data_cache/raw/raw.json' uploaded to blob 'raw/raw.json' in container 'data'.
File uploaded successfully. Blob URL: https://testlakehouse.blob.core.windows.net/data/raw/raw.json


## Pull code (.dvc sẽ làm sau)

In [149]:
import psycopg2
from psycopg2 import sql
from typing import List, Dict, Optional
from psycopg2 import extras


class Database:
    def __init__(self, **kwargs):
        """
        Initialize the database connection.
        """
        try:
            self.connection = psycopg2.connect(
                host=kwargs.get("host"),
                port=kwargs.get("port"),
                dbname=kwargs.get("dbname"),
                user=kwargs.get("user"),
                password=kwargs.get("password"),
                sslmode=kwargs.get("sslmode", "prefer")
            )
            self.cursor = self.connection.cursor()
        except Exception as e:
            print(f"Error connecting to the database: {e}")
            raise

    def create_table(self, table_name: str, columns: Dict[str, str]):
        """
        Create a table with the specified columns.

        :param table_name: Name of the table.
        :param columns: Dictionary of column names and their data types.
        """
        column_defs = [
            f"{col} {dtype}" if "FOREIGN KEY" not in dtype else f"{col} {dtype.split(', FOREIGN KEY ')[0]} {dtype.split(', FOREIGN KEY ')[1]}"
            for col, dtype in columns.items()
        ]
        query = sql.SQL("CREATE TABLE IF NOT EXISTS {} ({})").format(
            sql.Identifier(table_name),
            sql.SQL(", ").join(map(sql.SQL, column_defs))
        )
        self._execute_query(query, f"Error creating table {table_name}")

    def insert(self, table_name: str, data: Dict[str, any]):
        """
        Insert a record into the specified table.

        :param table_name: Name of the table.
        :param data: Dictionary of column names and their values.
        """
        columns = data.keys()
        values = tuple(data.values())
        query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
            sql.Identifier(table_name),
            sql.SQL(", ").join(map(sql.Identifier, columns)),
            sql.SQL(", ").join(sql.Placeholder() * len(values))
        )
        self._execute_query(query, f"Error inserting data into {table_name}", values)

    def get_columns(self, table_name: str) -> List[str]:
        """
        Get the list of columns in the specified table.

        :param table_name: Name of the table.
        :return: List of column names.
        """
        try:
            # Chuyển tên bảng sang chữ thường

            query = sql.SQL(
                "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = {}"
            ).format(sql.Literal(table_name))
            self.cursor.execute(query)  # Sử dụng sql.Literal để tránh lỗi định dạng
            return self.cursor.fetchall()
        except Exception as e:
            self.connection.rollback()  # Rollback giao dịch nếu có lỗi
            print(f"Error fetching columns for table '{table_name}': {e}")
            return []

    def read(self, table_name: str, columns: Optional[List[str]] = None, conditions: Optional[Dict[str, any]] = None, limit: Optional[int] = None):
        """
        Read data from the specified table.

        :param table_name: Name of the table.
        :param columns: List of columns to fetch (default: all columns).
        :param conditions: Dictionary of conditions for the WHERE clause.
        :param limit: Maximum number of rows to fetch.
        :return: List of rows.
        """
        columns_sql = sql.SQL(", ").join(map(sql.Identifier, columns)) if columns else sql.SQL("*")
        query = sql.SQL("SELECT {} FROM {}").format(columns_sql, sql.Identifier(table_name))
        if conditions:
            where_clause = sql.SQL(" WHERE ") + sql.SQL(" AND ").join(
                sql.Composed([sql.Identifier(k), sql.SQL(" = "), sql.Placeholder(k)]) for k in conditions.keys()
            )
            query += where_clause
        if limit:
            query += sql.SQL(" LIMIT {}").format(sql.Literal(limit))
        return self._fetch_query(query, f"Error reading data from {table_name}", conditions)

    def update(self, table_name: str, updates: Dict[str, any], conditions: Dict[str, any]):
        """
        Update records in the specified table.

        :param table_name: Name of the table.
        :param updates: Dictionary of columns to update and their new values.
        :param conditions: Dictionary of conditions for the WHERE clause.
        """
        set_clause = sql.SQL(", ").join(
            sql.Composed([sql.Identifier(k), sql.SQL(" = "), sql.Placeholder(f"set_{k}")]) for k in updates.keys()
        )
        where_clause = sql.SQL(" AND ").join(
            sql.Composed([sql.Identifier(k), sql.SQL(" = "), sql.Placeholder(f"where_{k}")]) for k in conditions.keys()
        )
        query = sql.SQL("UPDATE {} SET {} WHERE {}").format(
            sql.Identifier(table_name), set_clause, where_clause
        )
        params = {f"set_{k}": v for k, v in updates.items()}
        params.update({f"where_{k}": v for k, v in conditions.items()})
        self._execute_query(query, f"Error updating data in {table_name}", params)

    def delete(self, table_name: str, conditions: Dict[str, any]):
        """
        Delete records from the specified table.

        :param table_name: Name of the table.
        :param conditions: Dictionary of conditions for the WHERE clause.
        """
        where_clause = sql.SQL(" AND ").join(
            sql.Composed([sql.Identifier(k), sql.SQL(" = "), sql.Placeholder(k)]) for k in conditions.keys()
        )
        query = sql.SQL("DELETE FROM {} WHERE {}").format(sql.Identifier(table_name), where_clause)
        self._execute_query(query, f"Error deleting data from {table_name}", conditions)

    def _execute_query(self, query, error_message, params=None):
        """
        Execute a query and handle errors.

        :param query: SQL query to execute.
        :param error_message: Error message to display if the query fails.
        :param params: Parameters for the query.
        """
        try:
            self.cursor.execute(query, params)
            self.connection.commit()
        except Exception as e:
            self.connection.rollback()
            print(f"{error_message}: {e}")

    def _fetch_query(self, query, error_message, params=None):
        """
        Execute a query and fetch results.

        :param query: SQL query to execute.
        :param error_message: Error message to display if the query fails.
        :param params: Parameters for the query.
        :return: Query results.
        """
        try:
            self.cursor.execute(query, params)
            return self.cursor.fetchall()
        except Exception as e:
            self.connection.rollback()
            print(f"{error_message}: {e}")
            return []

    def close(self):
        """
        Close the database connection.
        """
        self.cursor.close()
        self.connection.close()

    def batch_insert(self, table_name: str, tweets_data: list, column_mapping: dict):
            """
            Insert multiple records into a table based on column mapping.

            :param table_name: Name of the table.
            :param tweets_data: List of dictionaries containing tweet data.
            :param column_mapping: Dictionary mapping input fields to database columns.
            """
            try:
                # Map input fields to database columns
                columns = list(column_mapping.values())

                # Prepare data for insertion
                values = [
                    tuple(tweet[field] for field in column_mapping.keys())
                    for tweet in tweets_data
                ]

                # Use psycopg2.extras.execute_values for batch insert
                insert_query = sql.SQL("INSERT INTO {} ({}) VALUES %s").format(
                    sql.Identifier(table_name),
                    sql.SQL(", ").join(map(sql.Identifier, columns))
                )
                extras.execute_values(self.cursor, insert_query, values)
                self.connection.commit()
                print(f"Successfully inserted {len(tweets_data)} records into {table_name}.")
            except Exception as e:
                self.connection.rollback()
                print(f"Error inserting batch data into {table_name}: {e}")
                raise

## Pull raw ra clean và pull lên pqsql

In [None]:
from typing import List
import re
from twikit import Tweet
import time
import json 
import pandas as pd
import string
from typing import List, Optional

LLM_API_KEY =''

def clean_data(raw: List[str] = None):
    if not raw:
        return 
    #text = [t.text for t in raw]

    # Remove all emojis
    emoji_pattern = re.compile(
        "["
        "\U0001F600-\U0001F64F"  # emoticons
        "\U0001F300-\U0001F5FF"  # symbols & pictographs
        "\U0001F680-\U0001F6FF"  # transport & map symbols
        "\U0001F1E0-\U0001F1FF"  # flags
        "\U00002700-\U000027BF"  # Dingbats
        "\U0001F900-\U0001F9FF"  # Supplemental Symbols and Pictographs
        "\U00002600-\U000026FF"  # Misc symbols
        "\U000025A0-\U000025FF"  # Geometric Shapes
        "]+",
        flags=re.UNICODE
    )

    cleaned_tweets = [re.sub(emoji_pattern, '', t) for t in raw]

    # Remove the hashtags that's in crawling list
    for tag in TWITTER_HASHTAGS:
        cleaned_tweets = [t.replace(tag, '') for t in cleaned_tweets]

    # Remove newlines
    cleaned_tweets = [t.replace('\n', ' ').replace('\r', ' ') for t in cleaned_tweets]

    # Remove links
    cleaned_tweets = [re.sub(r'https://t.co/\w+', '', t) for t in cleaned_tweets]

    # Remove trailing whitespaces
    cleaned_tweets = [t.strip() for t in cleaned_tweets]

    # Remove empty

    cleaned_tweets = [t for t in cleaned_tweets if t != '']

    return cleaned_tweets

def clean_data_v2(raw: Optional[List[str]] = None, hashtags: Optional[List[str]] = None) -> List[str]:
    if not raw:
        return []
    
    if hashtags is None:
        hashtags = TWITTER_HASHTAGS

    # Regex patterns
    emoji_pattern = re.compile(
        "["
        "\U0001F600-\U0001F64F"
        "\U0001F300-\U0001F5FF"
        "\U0001F680-\U0001F6FF"
        "\U0001F1E0-\U0001F1FF"
        "\U00002700-\U000027BF"
        "\U0001F900-\U0001F9FF"
        "\U00002600-\U000026FF"
        "\U000025A0-\U000025FF"
        "]+",
        flags=re.UNICODE
    )
    url_pattern = re.compile(r'https?://\S+')

    # Punctuation to remove
    punctuation_table = str.maketrans('', '', string.punctuation)

    cleaned = []
    for text in raw:
        if not text:
            continue

        # Lowercase
        text = text.lower()

        # Remove emojis
        text = emoji_pattern.sub('', text)

        # Remove hashtags
        for tag in hashtags:
            text = text.replace(tag.lower(), '')  # lowercase để chắc chắn match

        # Remove links
        text = url_pattern.sub('', text)

        # Remove newlines, carriage returns
        text = text.replace('\n', ' ').replace('\r', ' ')
        text = re.sub(r'\s+', ' ', text)

        # Remove punctuation
        text = text.translate(punctuation_table)

        # Remove leading/trailing spaces
        text = text.strip()

        if text:
            cleaned.append(text)

    return cleaned

In [78]:
blob_storage = BlobStorage(conn_str)
data = blob_storage.read_json_from_container(container="data", file_path="raw/raw.json")
# Trích xuất trường 'text' và làm sạch dữ liệu
texts = [item['text'] for item in data]
cleaned_texts = clean_data_v2(texts)

# Lồng dữ liệu đã làm sạch vào lại data (cập nhật trường 'text')
for i, item in enumerate(data):
    item['text'] = cleaned_texts[i]

# In lại dữ liệu đã được cập nhật
print(data)



In [154]:
import os

user = os.getenv('PG_USER')
host = os.getenv('PG_HOST')
port = os.getenv('PG_PORT')
password = os.getenv('PG_PASSWORD')
dbname = os.getenv('PG_DB1')
sslmode = os.getenv('PG_SSLMODE')

# Initialize database connection
db = Database(
    host=host,
    port=port,
    dbname=dbname,
    user=user,
    password=password,
    sslmode=sslmode
)

In [155]:
column_mapping = {
    "text": "content",
    "date_scraped": "crawl_at",
    "tag": "topic"
}

db.batch_insert(table_name="TWEET_STAGING", tweets_data=data, column_mapping=column_mapping)

Successfully inserted 35 records into TWEET_STAGING.


# Hide

In [None]:
import logging
import os
import json
import psycopg2
from azure.storage.blob import BlobServiceClient
from azure.functions import HttpRequest, HttpResponse

def parse_request(req: HttpRequest) -> str:
    """Parse the HTTP request to extract the blob name."""
    try:
        req_body = req.get_json()
        blob_name = req_body.get('blob_name')
        if not blob_name:
            raise ValueError("Missing 'blob_name' in request payload.")
        return blob_name
    except Exception as e:
        raise ValueError(f"Error parsing request: {e}")

def get_blob_data(conn_str: str, container_name: str, blob_name: str) -> list:
    """Download and parse blob content from Azure Blob Storage."""
    try:
        blob_storage = BlobStorage(conn_str)
        blob_data = blob_storage.download_blob(container_name, blob_name)
        return json.loads(blob_data)
    except Exception as e:
        raise RuntimeError(f"Error downloading blob: {e}")

def insert_data_to_postgres(data: list, db_config: dict):
    """Insert data into PostgreSQL using a stored procedure."""
    try:
        pg_conn = psycopg2.connect(**db_config)
        cursor = pg_conn.cursor()
        for record in data:
            cursor.execute("CALL insert_into_staging(%s, %s, %s, %s)", (
                record['text'],
                record['tag'],
                record['date_scraped'],
                record['timestamp']
            ))
        pg_conn.commit()
        cursor.close()
        pg_conn.close()
    except Exception as e:
        raise RuntimeError(f"Error inserting data into PostgreSQL: {e}")

def main(req: HttpRequest) -> HttpResponse:
    logging.info('Azure Function triggered to process Blob and insert into PostgreSQL.')

    try:
        # Parse the request
        blob_name = parse_request(req)
        #Init blobblob
        blob_storage = BlobStorage(conn_str)
        # Azure Blob Storage connection
        conn_str = os.getenv('LAKE_STORAGE_CONN_STR')
        container_name = 'data/raw'

        # Download blob content
        data = get_blob_data(conn_str, container_name, blob_name)

        # PostgreSQL connection configuration
        db_config = {
            'dbname': os.getenv('PG_DB1'),
            'user': os.getenv('PG_USER'),
            'password': os.getenv('PG_PASSWORD'),
            'host': os.getenv('PG_HOST'),
            'port': os.getenv('PG_PORT'),
            'sslmode': os.getenv('PG_SSLMODE')
        }

        # Insert data into PostgreSQL
        insert_data_to_postgres(data, db_config)

        logging.info(f"Successfully processed blob: {blob_name} and inserted data into PostgreSQL.")
        return HttpResponse(f"Successfully processed blob: {blob_name}.", status_code=200)

    except ValueError as ve:
        logging.error(f"Request error: {ve}")
        return HttpResponse(str(ve), status_code=400)
    except RuntimeError as re:
        logging.error(f"Processing error: {re}")
        return HttpResponse(str(re), status_code=500)
    except Exception as e:
        logging.error(f"Unexpected error: {e}")
        return HttpResponse(f"Error processing blob: {e}", status_code=500)

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import os
import requests
from your_module import crawl_task, BlobStorage

# Default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define the DAG
with DAG(
    'tweet_pipeline',
    default_args=default_args,
    description='Crawl tweets, upload to blob, and trigger staging DB via Azure Function',
    schedule_interval='@hourly',  # Run every hour
    start_date=datetime(2025, 4, 28),
    catchup=False,
) as dag:

    # Task 1: Crawl tweets and save to a local file
    def crawl_tweets(**kwargs):
        base_dir = "./data_cache/raw"
        hashtags = ["#AI", "#MachineLearning", "#Python"]
        output_path = crawl_task(base_dir=base_dir, hashtags=hashtags)
        return output_path

    # Task 2: Upload the file to Azure Blob Storage
    def upload_to_blob(**kwargs):
        ti = kwargs['ti']
        file_path = ti.xcom_pull(task_ids='crawl_tweets')  # Get file path from previous task
        conn_str = os.getenv('LAKE_STORAGE_CONN_STR')
        blob_storage = BlobStorage(conn_str)
        blob_storage.upload_file('raw', os.path.basename(file_path), file_path)
        print(f"Uploaded {file_path} to Blob Storage")
        return os.path.basename(file_path)  # Return the blob name for the next task

    # Task 3: Call Azure Function to trigger the staging database
    def call_azure_function(**kwargs):
        ti = kwargs['ti']
        blob_name = ti.xcom_pull(task_ids='upload_to_blob')  # Get blob name from previous task

        # Azure Function endpoint
        azure_function_url = "https://your-azure-function-url/trigger-staging"
        payload = {"blob_name": blob_name}
        headers = {"Content-Type": "application/json"}

        response = requests.post(azure_function_url, json=payload, headers=headers)
        if response.status_code == 200:
            print(f"Successfully triggered Azure Function for blob: {blob_name}")
        else:
            print(f"Failed to trigger Azure Function. Status code: {response.status_code}, Response: {response.text}")

    # Define tasks
    crawl_tweets_task = PythonOperator(
        task_id='crawl_tweets',
        python_callable=crawl_tweets,
        provide_context=True,
    )

    upload_to_blob_task = PythonOperator(
        task_id='upload_to_blob',
        python_callable=upload_to_blob,
        provide_context=True,
    )

    call_azure_function_task = PythonOperator(
        task_id='call_azure_function',
        python_callable=call_azure_function,
        provide_context=True,
    )

    # Set task dependencies
    crawl_tweets_task >> upload_to_blob_task >> call_azure_function_task

In [None]:
# !pip install azure-functions